Skip to content

Commit

Permalink
v1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
abraham-leal authored Nov 29, 2020
2 parents 4bbdd10 + cb5baaa commit edd6f37
Show file tree
Hide file tree
Showing 20 changed files with 475 additions and 168 deletions.
15 changes: 7 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ However, for stable images tag a release.

## Run
- `./ccloud-schema-exporter -batchExport` : Running the app with this flag will perform a batch export.
Starting v1.1, `-batchExport` can be declared with `-syncDeletes` to perform an export of soft deleted schemas.
- `./ccloud-schema-exporter -sync` : Running the app with this flag will start a continuous sync
between the source and destination schema registries.
- `./ccloud-schema-exporter -getLocalCopy` : Running the app with this flag will get a snapshot of your Schema Registry
Expand Down Expand Up @@ -148,10 +149,10 @@ NOTE: Lists aren't respected with the utility `-deleteAllFromDestination`

#### A note on syncing hard deletions

As of v1.1, `ccloud-schema-exporter` provides an efficient way of syncing hard deletions.
Starting v1.1, `ccloud-schema-exporter` provides an efficient way of syncing hard deletions.
In previous versions, this was done through inefficient lookups.

Support for syncing hard deletions is only when the source and destination are both a Confluent Cloud Schema Registry.
Support for syncing hard deletions applies when the source and destination are both a Confluent Cloud Schema Registries.

#### Non-Interactive Run

Expand All @@ -172,17 +173,16 @@ If you'd like more info on how to change the Schema Registry mode to enable non-

#### Extendability: Custom Sources and Destinations

`ccloud-schema-exporter` supports custom implementations of source registries and destination registries.
If you'd like to leverage the already built back-end, all you have to do is an implementation the `CustomSource` or `CustomDestination` interface.
`ccloud-schema-exporter` supports custom implementations of sources and destinations.
If you'd like to leverage the already built back-end, all you have to do is an implementation of the `CustomSource` or `CustomDestination` interfaces.
A copy of the interface definitions is below for convenience:

````
type CustomSource interface {
// Perform any set-up behavior before start of sync/batch export
SetUp() error
// An implementation should handle the retrieval of a schema from the source.
// The id should be a unique identifier for the schema.
GetSchema(SchemaSourceID int64) (subject string, version int64, id int64, stype string, schema string, err error)
GetSchema(subject string, version int64) (id int64, stype string, schema string, err error)
// An implementation should be able to send exactly one map describing the state of the source
// This map should be minimal. Describing only the Subject and Versions that exist.
GetSourceState() (map[string][]int64, error)
Expand Down Expand Up @@ -227,7 +227,6 @@ var customSrcFactory = map[string]client.CustomSource{

You will see that these maps already have one entry, that is because `ccloud-schema-exporter` comes with sample
implementations of the interface under `cmd/internals/customDestination.go` and `cmd/internals/customSource.go`, check them out!
Make sure to add your implementation to this map.

For the custom source example, there is an implementation to allow sourcing schemas from Apicurio into Schema Registry.
It defaults to looking for Apicurio in `http://localhost:8081`, but you can override it by providing a mapping
Expand All @@ -238,7 +237,7 @@ Note: The schemas get exported using record names (all treated as `-value`), so
Once added, all you have to do is indicate you will want to run with a custom source/destination with the `-customSource | -customDestination` flag.
The value of this flag must be the name you gave it in the factory mapping.

The following options are respected for custom destinations as well:
The following options are respected for custom sources / destinations as well:

````
-allowList value
Expand Down
103 changes: 56 additions & 47 deletions cmd/ccloud-schema-exporter/ccloud-schema-exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

//
// ccloud-schema-exporter.go
// Author: Abraham Leal
// Copyright 2020 Abraham Leal
//

import (
Expand Down Expand Up @@ -33,6 +33,9 @@ func main() {
if client.CustomSourceName != "" {

destClient := client.NewSchemaRegistryClient(client.DestSRUrl, client.DestSRKey, client.DestSRSecret, "dst")
if !client.NoPrompt {
preflightWriteChecks(destClient)
}

if client.ThisRun == client.BATCH {
client.RunCustomSourceBatch(destClient, customSrcFactory[client.CustomSourceName])
Expand All @@ -53,6 +56,9 @@ func main() {
}

destClient := client.NewSchemaRegistryClient(client.DestSRUrl, client.DestSRKey, client.DestSRSecret, "dst")
if !client.NoPrompt {
preflightWriteChecks(destClient)
}

client.WriteFromFS(destClient, client.PathToWrite, workingDir)

Expand Down Expand Up @@ -95,23 +101,64 @@ func main() {
}

destClient := client.NewSchemaRegistryClient(client.DestSRUrl, client.DestSRKey, client.DestSRSecret, "dst")
if !client.NoPrompt {
preflightWriteChecks(destClient)
}

if (!strings.HasSuffix(srcClient.SRUrl, "confluent.cloud") ||
!strings.HasSuffix(destClient.SRUrl, "confluent.cloud")) &&
client.ThisRun == client.SYNC && client.SyncHardDeletes && !client.NoPrompt {

fmt.Println("It looks like you are trying to sync hard deletions between non-Confluent Cloud Schema Registries")
fmt.Println("Starting v1.1, ccloud-schema-exporter only supports hard deletion sync between Confluent Cloud Schema Registries")
fmt.Println("------------------------------------------------------")
fmt.Println("Do you wish to continue? (Y/n)")

var text string

_, err := fmt.Scanln(&text)
if err != nil {
log.Fatal(err)
}

if !strings.EqualFold(text, "Y") {
os.Exit(0)
}
}

if client.ThisRun == client.SYNC {
client.Sync(srcClient, destClient)
}
if client.ThisRun == client.BATCH {
client.BatchExport(srcClient, destClient)
}

log.Println("-----------------------------------------------")

if client.ThisRun == client.BATCH {
log.Println("Resetting target to READWRITE")
destClient.SetMode(client.READWRITE)
}

log.Println("All Done! Thanks for using ccloud-schema-exporter!")

}

func preflightWriteChecks (destClient *client.SchemaRegistryClient) {

if !destClient.IsReachable() {
log.Println("Could not reach destination registry. Possible bad credentials?")
os.Exit(0)
}

destChan := make(chan map[string][]int64)
go destClient.GetSubjectsWithVersions(destChan)
destSubjects := <-destChan
close(destChan)

if len(destSubjects) != 0 && client.ThisRun != client.SYNC && !client.NoPrompt {
destSubjects := client.GetCurrentSubjectState(destClient)
if len(destSubjects) != 0 && client.ThisRun != client.SYNC {
log.Println("You have existing subjects registered in the destination registry, exporter cannot write schemas when " +
"previous schemas exist in batch mode.")
os.Exit(0)
}

if !destClient.IsImportModeReady() && !client.NoPrompt {
if !destClient.IsImportModeReady() {

fmt.Println("Destination Schema Registry is not set to IMPORT mode!")
fmt.Println("------------------------------------------------------")
Expand All @@ -136,7 +183,7 @@ func main() {
}
}

if !destClient.IsCompatReady() && !client.NoPrompt {
if !destClient.IsCompatReady() {

fmt.Println("Destination Schema Registry is not set to NONE global compatibility level!")
fmt.Println("We assume the source to be maintaining correct compatibility between registrations, per subject compatibility changes are not supported.")
Expand All @@ -159,42 +206,4 @@ func main() {
log.Println("Continuing without NONE Global Compatibility Level. Note this might arise some failures in registration of some schemas.")
}
}

if (!strings.HasSuffix(srcClient.SRUrl, "confluent.cloud") ||
!strings.HasSuffix(destClient.SRUrl, "confluent.cloud")) &&
client.ThisRun == client.SYNC && client.SyncHardDeletes && !client.NoPrompt {

fmt.Println("It looks like you are trying to sync hard deletions between non-Confluent Cloud Schema Registries")
fmt.Println("Starting v1.1, ccloud-schema-exporter only supports hard deletion sync between Confluent Cloud Schema Registries")
fmt.Println("------------------------------------------------------")
fmt.Println("Do you wish to continue? (Y/n)")

var text string

_, err := fmt.Scanln(&text)
if err != nil {
log.Fatal(err)
}

if !strings.EqualFold(text, "Y") {
os.Exit(0)
}
}

if client.ThisRun == client.SYNC {
client.Sync(srcClient, destClient)
}
if client.ThisRun == client.BATCH {
client.BatchExport(srcClient, destClient)
}

log.Println("-----------------------------------------------")

if client.ThisRun == client.BATCH {
log.Println("Resetting target to READWRITE")
destClient.SetMode(client.READWRITE)
}

log.Println("All Done! Thanks for using ccloud-schema-exporter!")

}
6 changes: 6 additions & 0 deletions cmd/integrationTests/exporter-integration_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package integration

//
// exporter-integration_test.go
// Copyright 2020 Abraham Leal
//


import (
client "github.com/abraham-leal/ccloud-schema-exporter/cmd/internals"
"github.com/stretchr/testify/assert"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package integration_deletion

//
// hardDeleteCloud_test.go
// Copyright 2020 Abraham Leal
//


import (
client "github.com/abraham-leal/ccloud-schema-exporter/cmd/internals"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -98,7 +104,7 @@ func printSubjectTestResult(srcSubjects map[string][]int64, destSubjects map[str
log.Printf("Destination subject-version mapping contents: %v", destSubjects)
}

func printIDTestResult(srcIDs map[int64]map[string]int64, dstIDs map[int64]map[string]int64) {
func printIDTestResult(srcIDs map[int64]map[string][]int64, dstIDs map[int64]map[string][]int64) {
log.Printf("Source IDs contents: %v", srcIDs)
log.Printf("Destination IDs contents: %v", dstIDs)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/internals/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package client

//
// context.go
// Author: Abraham Leal
// Copyright 2020 Abraham Leal
//

import (
Expand Down
11 changes: 8 additions & 3 deletions cmd/internals/customDestination.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package client

//
// customDestination.go
// Copyright 2020 Abraham Leal
//

import (
"log"
"reflect"
Expand Down Expand Up @@ -72,7 +77,7 @@ func RunCustomDestinationBatch(srcClient *SchemaRegistryClient, customDest Custo
return
}
for _, v := range srcVersions {
schema := srcClient.GetSchema(srcSubject, v)
schema := srcClient.GetSchema(srcSubject, v, false)
log.Printf("Registering schema: %s with version: %d and ID: %d and Type: %s",
schema.Subject, schema.Version, schema.Id, schema.SType)
err := customDest.RegisterSchema(schema)
Expand All @@ -86,7 +91,7 @@ func customDestSync(diff map[string][]int64, srcClient *SchemaRegistryClient, cu
log.Println("Source registry has values that Destination does not, syncing...")
for subject, versions := range diff {
for _, v := range versions {
schema := srcClient.GetSchema(subject, v)
schema := srcClient.GetSchema(subject, v, false)
log.Println("Registering new schema: " + schema.Subject +
" with version: " + strconv.FormatInt(schema.Version, 10) +
" and ID: " + strconv.FormatInt(schema.Id, 10) +
Expand All @@ -104,7 +109,7 @@ func customDestSyncDeletes(destSubjects map[string][]int64, srcSubjects map[stri
log.Println("Source registry has deletes that Destination does not, syncing...")
for subject, versions := range diff {
for _, v := range versions {
schema := srcClient.GetSchema(subject, v)
schema := srcClient.GetSchema(subject, v, false)
err := customDest.DeleteSchema(schema)
checkCouldNotRegister(err)
}
Expand Down
5 changes: 5 additions & 0 deletions cmd/internals/customDestination_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package client

//
// customDestination_test.go
// Copyright 2020 Abraham Leal
//

import (
"github.com/stretchr/testify/assert"
"log"
Expand Down
5 changes: 5 additions & 0 deletions cmd/internals/customSource.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package client

//
// customSource.go
// Copyright 2020 Abraham Leal
//

import (
"encoding/json"
"fmt"
Expand Down
6 changes: 6 additions & 0 deletions cmd/internals/customSource_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package client

//
// customSource_test.go
// Copyright 2020 Abraham Leal
//


import (
"github.com/stretchr/testify/assert"
"log"
Expand Down
16 changes: 6 additions & 10 deletions cmd/internals/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package client

//
// definitions.go
// Author: Abraham Leal
// Copyright 2020 Abraham Leal
//

import (
Expand All @@ -18,8 +18,6 @@ type SchemaRegistryClient struct {
SRUrl string
SRApiKey string
SRApiSecret string
InMemSchemas map[string][]int64
srcInMemDeletedIDs map[int64]map[string]int64
}

/*
Expand Down Expand Up @@ -56,6 +54,7 @@ type CustomSource interface {
TearDown() error
}

// Holding struct that describes a schema record
type SchemaRecord struct {
Subject string `json:"subject"`
Schema string `json:"schema"`
Expand All @@ -73,13 +72,15 @@ func (srs SchemaRecord) setTypeIfEmpty() SchemaRecord {
return srs
}

// Holding struct for registering a schema in an SR compliant way
type SchemaToRegister struct {
Schema string `json:"schema"`
Id int64 `json:"id,omitempty"`
Version int64 `json:"version,omitempty"`
SType string `json:"schemaType"`
}

// Holding struct for retrieving a schema
type SchemaExtraction struct {
Schema string `json:"schema"`
Id int64 `json:"id"`
Expand Down Expand Up @@ -113,9 +114,9 @@ func (i *StringArrayFlag) String() string {

func (i *StringArrayFlag) Set(value string) error {
currentPath, _ := os.Getwd()
path := CheckPath(value, currentPath)

if strings.LastIndexAny(value, "/.") != -1 {
path := CheckPath(value, currentPath)
if fileExists(path) {
f, err := ioutil.ReadFile(path)
if err != nil {
panic(err)
Expand Down Expand Up @@ -144,8 +145,3 @@ func (i *StringArrayFlag) removeSpaces(str string) string {
return r
}, str)
}

type idSubjectVersion struct {
Id int64 `json:"Id"`
SubjectAndVersion map[string]int64 `json:"SubjectAndVersion"`
}
Loading

0 comments on commit edd6f37

Please sign in to comment.