From 13fe46a68c38645e264490377cf505a462cb8f43 Mon Sep 17 00:00:00 2001 From: Romaric P Date: Sat, 30 Nov 2024 17:19:17 -0800 Subject: [PATCH] initial commit --- .gitignore | 185 +++++++++++++++++++++++ LICENSE.md | 19 +++ cmd/root.go | 242 +++++++++++++++++++++++++++++++ cmd/validate.go | 208 ++++++++++++++++++++++++++ cmd/version.go | 22 +++ go.mod | 28 ++++ go.sum | 77 ++++++++++ internal/config/config.go | 32 ++++ internal/config/logger.go | 62 ++++++++ internal/migration/manager.go | 81 +++++++++++ internal/migration/mongodb.go | 62 ++++++++ internal/migration/mysql.go | 124 ++++++++++++++++ internal/migration/postgres.go | 90 ++++++++++++ internal/migration/types.go | 56 +++++++ internal/migration/verifier.go | 258 +++++++++++++++++++++++++++++++++ main.go | 9 ++ 16 files changed, 1555 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE.md create mode 100644 cmd/root.go create mode 100644 cmd/validate.go create mode 100644 cmd/version.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/config/config.go create mode 100644 internal/config/logger.go create mode 100644 internal/migration/manager.go create mode 100644 internal/migration/mongodb.go create mode 100644 internal/migration/mysql.go create mode 100644 internal/migration/postgres.go create mode 100644 internal/migration/types.go create mode 100644 internal/migration/verifier.go create mode 100644 main.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a05ca25 --- /dev/null +++ b/.gitignore @@ -0,0 +1,185 @@ +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# AWS User-specific +.idea/**/aws.xml + +# Generated files +.idea/**/contentModel.xml + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +# Gradle +.idea/**/gradle.xml +.idea/**/libraries + +# Gradle and Maven with auto-import +# When using Gradle or Maven with auto-import, you should exclude module files, +# since they will be recreated, and may cause churn. Uncomment if using +# auto-import. +# .idea/artifacts +# .idea/compiler.xml +# .idea/jarRepositories.xml +# .idea/modules.xml +# .idea/*.iml +# .idea/modules +# *.iml +# *.ipr + +# CMake +cmake-build-*/ + +# Mongo Explorer plugin +.idea/**/mongoSettings.xml + +# File-based project format +*.iws + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# SonarLint plugin +.idea/sonarlint/ + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +# Editor-based Rest Client +.idea/httpRequests + +# Android studio 3.1+ serialized cache file +.idea/caches/build_file_checksums.ser + +### Go template +# If you prefer the allow list template instead of the deny list, see community template: +# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore +# +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# Go workspace file +go.work +go.work.sum + +# env file +.env + +### GoLand template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# AWS User-specific +.idea/**/aws.xml + +# Generated files +.idea/**/contentModel.xml + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +# Gradle +.idea/**/gradle.xml +.idea/**/libraries + +# Gradle and Maven with auto-import +# When using Gradle or Maven with auto-import, you should exclude module files, +# since they will be recreated, and may cause churn. Uncomment if using +# auto-import. +# .idea/artifacts +# .idea/compiler.xml +# .idea/jarRepositories.xml +# .idea/modules.xml +# .idea/*.iml +# .idea/modules +# *.iml +# *.ipr + +# CMake +cmake-build-*/ + +# Mongo Explorer plugin +.idea/**/mongoSettings.xml + +# File-based project format +*.iws + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# SonarLint plugin +.idea/sonarlint/ + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +# Editor-based Rest Client +.idea/httpRequests + +# Android studio 3.1+ serialized cache file +.idea/caches/build_file_checksums.ser + diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..0d45045 --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,19 @@ +# MIT License + +Copyright (c) 2024 Qovery + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/cmd/root.go b/cmd/root.go new file mode 100644 index 0000000..f753487 --- /dev/null +++ b/cmd/root.go @@ -0,0 +1,242 @@ +package cmd + +import ( + "context" + "data-migration/internal/config" + "data-migration/internal/migration" + "errors" + "fmt" + "github.com/spf13/cobra" + "net/url" + "os" + "os/signal" + "strings" + "syscall" + "time" +) + +var ( + cfg = &config.Config{} + + // rootCmd represents the base command + rootCmd = &cobra.Command{ + Use: "migrationdb", + Short: "Database migration tool", + Long: `A database migration tool that supports streaming data between different database systems. +Source and target databases must be of the same type (e.g., postgres to postgres, mysql to mysql). + +Supported connection string formats: + PostgreSQL: postgresql://user:pass@host:5432/dbname + MySQL: mysql://user:pass@host:3306/dbname + MongoDB: mongodb://user:pass@host:27017/dbname`, + Example: ` # PostgreSQL migration + migrationdb --source postgresql://user:pass@source:5432/db --target postgresql://user:pass@target:5432/db + + # MySQL migration + migrationdb --source mysql://user:pass@source:3306/db --target mysql://user:pass@target:3306/db + + # Stream to stdout + migrationdb --source postgresql://user:pass@source:5432/db --stdout > dump.sql + + # Stream to stdout and compress + migrationdb --source postgresql://user:pass@source:5432/db --stdout | gzip > dump.sql.gz + + # Skip verification + migrationdb --source postgresql://user:pass@source:5432/db --target postgresql://user:pass@target:5432/db --skip-verify`, + RunE: runMigration, + // Silence usage on error + SilenceUsage: true, + // Silence error printing as we handle it in Execute() + SilenceErrors: true, + } +) + +func Execute() { + if err := rootCmd.Execute(); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} + +func init() { + // Persistent flags for root command + rootCmd.PersistentFlags().StringVar(&cfg.SourceConn, "source", "", "Source connection string") + rootCmd.PersistentFlags().StringVar(&cfg.TargetConn, "target", "", "Target connection string") + rootCmd.PersistentFlags().BoolVar(&cfg.StdoutMode, "stdout", false, "Stream to stdout instead of target database") + rootCmd.PersistentFlags().StringVar(&cfg.LogLevel, "log-level", "info", "Log level (debug, info, warn, error)") + rootCmd.PersistentFlags().IntVar(&cfg.BufferSize, "buffer-size", 10*1024*1024, "Buffer size in bytes for streaming") + rootCmd.PersistentFlags().DurationVar(&cfg.Timeout, "timeout", 24*time.Hour, "Migration timeout duration") + + // Add verification flags + rootCmd.PersistentFlags().BoolVar(&cfg.SkipVerification, "skip-verify", false, "Skip verification after migration") + rootCmd.PersistentFlags().IntVar(&cfg.VerifyChunkSize, "verify-chunk-size", 10*1024*1024, "Chunk size in bytes for verification streaming") + rootCmd.PersistentFlags().BoolVar(&cfg.SkipTLSVerify, "skip-tls-verify", false, "Skip TLS certificate verification when testing connections") + + // Mark required flags + _ = rootCmd.MarkPersistentFlagRequired("source") + + // Add sub-commands + rootCmd.AddCommand(versionCmd) + rootCmd.AddCommand(validateCmd) +} + +// inferDatabaseType determines the database type from a connection string +func inferDatabaseType(connString string) (string, error) { + if connString == "" { + return "", fmt.Errorf("connection string is empty") + } + + // Handle MongoDB connection strings + if strings.HasPrefix(connString, "mongodb://") || strings.HasPrefix(connString, "mongodb+srv://") { + return "mongodb", nil + } + + // Parse URL for other database types + u, err := url.Parse(connString) + if err != nil { + return "", fmt.Errorf("invalid connection string: %v", err) + } + + switch u.Scheme { + case "postgres", "postgresql": + return "postgres", nil + case "mysql": + return "mysql", nil + default: + return "", fmt.Errorf("unsupported database type in connection string: %s", u.Scheme) + } +} + +func setupSignalHandler(ctx context.Context, cancel context.CancelFunc, logger *config.Logger) { + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + select { + case sig := <-signalChan: + logger.Infof("Received signal: %v", sig) + cancel() + case <-ctx.Done(): + return + } + }() +} + +func runMigration(cmd *cobra.Command, args []string) error { + // Validate source and target connections + dbType, err := ValidateConnections(cfg.SourceConn, cfg.TargetConn, cfg.StdoutMode) + if err != nil { + return err + } + + // Update config with inferred types + cfg.SourceType = dbType + if !cfg.StdoutMode { + cfg.TargetType = dbType + } + + // Configure logger based on stdout mode + loggerOpts := config.LoggerOptions{ + ForceStderr: cfg.StdoutMode, // Force stderr logging when using stdout mode + } + logger := config.NewLogger(cfg.LogLevel, loggerOpts) + + // Test source connection + logger.Info("Testing source connection...") + if err := TestConnection(dbType, cfg.SourceConn, cfg.SkipTLSVerify); err != nil { + return fmt.Errorf("source connection (%s) test failed: %w", MaskConnectionString(cfg.SourceConn), err) + } + + logger.Info("Source connection test successful!") + + // Test target connection + logger.Info("Testing target connection...") + if err := TestConnection(dbType, cfg.TargetConn, cfg.SkipTLSVerify); err != nil { + return fmt.Errorf("target connection (%s) test failed: %w", MaskConnectionString(cfg.TargetConn), err) + } + + // Log startup information to stderr + logger.Infof("Starting migration - Database type: %s", dbType) + if cfg.StdoutMode { + logger.Info("Running in stdout mode - all logs will be written to stderr") + } + + // Create migration manager + manager := migration.NewManager(cfg, logger) + + // Setup context with cancellation + ctx, cancel := context.WithTimeout(context.Background(), cfg.Timeout) + defer cancel() + + // Setup signal handling + setupSignalHandler(ctx, cancel, logger) + + // Start migration + if err := manager.Migrate(ctx); err != nil { + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + return fmt.Errorf("migration timed out after %v", cfg.Timeout) + } + if errors.Is(ctx.Err(), context.Canceled) { + return fmt.Errorf("migration canceled by user") + } + return fmt.Errorf("migration failed: %w", err) + } + + logger.Info("Migration completed successfully!") + + // Skip verification if requested or in stdout mode + if cfg.SkipVerification || cfg.StdoutMode { + if cfg.SkipVerification { + logger.Info("Skipping verification as requested") + } + return nil + } + + // Create verifier + logger.Info("Starting verification...") + + sourceDumper, err := migration.CreateDumper(cfg.SourceType, cfg.SourceConn, cfg.StdoutMode) + targetDumper, err := migration.CreateDumper(cfg.TargetType, cfg.TargetConn, cfg.StdoutMode) + + if err != nil { + return fmt.Errorf("failed to create dumper: %w", err) + } + + verifier, err := migration.NewDatabaseVerifier( + sourceDumper, + targetDumper, + migration.WithChunkSize(cfg.VerifyChunkSize), + ) + if err != nil { + return fmt.Errorf("failed to create verifier: %w", err) + } + + // Create a new context for verification + verifyCtx, verifyCancel := context.WithTimeout(context.Background(), cfg.Timeout) + defer verifyCancel() + + // Setup signal handling for verification + setupSignalHandler(verifyCtx, verifyCancel, logger) + + // Start verification + if err := verifier.VerifyContent(verifyCtx); err != nil { + if errors.Is(verifyCtx.Err(), context.DeadlineExceeded) { + return fmt.Errorf("verification timed out after %v", cfg.Timeout) + } + if errors.Is(verifyCtx.Err(), context.Canceled) { + return fmt.Errorf("verification canceled by user") + } + return fmt.Errorf("verification failed: %w", err) + } + + // Calculate checksum + checksum, err := verifier.GetChecksum(verifyCtx) + if err != nil { + logger.Warnf("Failed to calculate checksum: %v", err) + } else { + logger.Infof("Database checksum: %s", checksum) + } + + logger.Info("Verification completed successfully!") + return nil +} diff --git a/cmd/validate.go b/cmd/validate.go new file mode 100644 index 0000000..b1df63d --- /dev/null +++ b/cmd/validate.go @@ -0,0 +1,208 @@ +package cmd + +import ( + "context" + "database/sql" + "fmt" + _ "github.com/go-sql-driver/mysql" + _ "github.com/lib/pq" + "github.com/spf13/cobra" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "net/url" + "strings" + "time" +) + +var ( + skipTLSVerify bool +) + +var validateCmd = &cobra.Command{ + Use: "validate", + Short: "Validate the connection strings and configurations", + RunE: runValidate, +} + +func init() { + validateCmd.Flags().BoolVar(&skipTLSVerify, "skip-tls-verify", false, "Skip TLS certificate verification when testing connections") +} + +func runValidate(cmd *cobra.Command, args []string) error { + // First validate the configuration + if err := cfg.Validate(); err != nil { + return fmt.Errorf("configuration validation failed: %v", err) + } + + // Validate and get database types + sourceType, err := ValidateConnections(cfg.SourceConn, cfg.TargetConn, cfg.StdoutMode) + if err != nil { + return fmt.Errorf("connection validation failed: %v", err) + } + + // Test source connection + fmt.Println("Testing source connection...") + if err := TestConnection(sourceType, cfg.SourceConn, skipTLSVerify); err != nil { + return fmt.Errorf("source connection test failed: %v", err) + } + fmt.Println("Source connection test successful!") + + // Test target connection if not in stdout mode + if !cfg.StdoutMode { + fmt.Println("\nTesting target connection...") + if err := TestConnection(sourceType, cfg.TargetConn, skipTLSVerify); err != nil { + return fmt.Errorf("target connection test failed: %v", err) + } + fmt.Println("Target connection test successful!") + } + + // Print configuration summary + fmt.Println("\nConfiguration Summary:") + fmt.Println("Source configuration:") + fmt.Printf(" Type: %s\n", sourceType) + fmt.Printf(" Connection: %s\n", MaskConnectionString(cfg.SourceConn)) + + if !cfg.StdoutMode { + fmt.Println("\nTarget configuration:") + fmt.Printf(" Type: %s\n", sourceType) + fmt.Printf(" Connection: %s\n", MaskConnectionString(cfg.TargetConn)) + } + + return nil +} + +// TestConnection attempts to establish a connection to the database +func TestConnection(dbType, connString string, skipTLSVerify bool) error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + switch dbType { + case "postgres", "mysql": + finalConnString := connString + if skipTLSVerify { + // Add SSL/TLS disable parameters if skipTLSVerify is true + if dbType == "postgres" && !strings.Contains(connString, "sslmode=") { + if strings.Contains(connString, "?") { + finalConnString += "&sslmode=disable" + } else { + finalConnString += "?sslmode=disable" + } + } + + if dbType == "mysql" && !strings.Contains(connString, "tls=") { + if strings.Contains(connString, "?") { + finalConnString += "&tls=false" + } else { + finalConnString += "?tls=false" + } + } + } + + db, err := sql.Open(dbType, finalConnString) + if err != nil { + return fmt.Errorf("failed to create database connection: %v", err) + } + defer db.Close() + + // Test the connection + err = db.PingContext(ctx) + if err != nil { + return fmt.Errorf("failed to connect to database: %v", err) + } + + case "mongodb": + clientOptions := options.Client().ApplyURI(connString) + + if skipTLSVerify { + clientOptions.SetTLSConfig(nil). + SetCompressors([]string{"none"}). + SetDirect(true) + } + + client, err := mongo.Connect(ctx, clientOptions) + if err != nil { + return fmt.Errorf("failed to create MongoDB client: %v", err) + } + defer client.Disconnect(ctx) + + // Test the connection + err = client.Ping(ctx, nil) + if err != nil { + return fmt.Errorf("failed to connect to MongoDB: %v", err) + } + + default: + return fmt.Errorf("unsupported database type: %s", dbType) + } + + return nil +} + +// MaskConnectionString masks sensitive information in connection strings +func MaskConnectionString(conn string) string { + if conn == "" { + return "" + } + + // Handle MongoDB connection strings + if strings.HasPrefix(conn, "mongodb://") || strings.HasPrefix(conn, "mongodb+srv://") { + // Replace password in MongoDB connection string + parts := strings.Split(conn, "@") + if len(parts) != 2 { + return conn // Return original if we can't parse it + } + auth := strings.Split(parts[0], "://") + if len(auth) != 2 { + return conn + } + userPass := strings.Split(auth[1], ":") + if len(userPass) != 2 { + return conn + } + return fmt.Sprintf("%s://%s:****@%s", auth[0], userPass[0], parts[1]) + } + + // Handle SQL connection strings + u, err := url.Parse(conn) + if err != nil { + return conn // Return original if we can't parse it + } + + if u.User != nil { + _, hasPassword := u.User.Password() + if hasPassword { + u.User = url.UserPassword(u.User.Username(), "****") + // Convert to string and manually unescape the asterisks + maskedStr := u.String() + return strings.ReplaceAll(maskedStr, "%2A", "*") + } + } + + return u.String() +} + +// ValidateConnections checks that both connections are valid and of the same type +func ValidateConnections(sourceConn, targetConn string, stdoutMode bool) (string, error) { + sourceType, err := inferDatabaseType(sourceConn) + if err != nil { + return "", fmt.Errorf("invalid source database: %v", err) + } + + if !stdoutMode { + if targetConn == "" { + return "", fmt.Errorf("target connection string is required when not using stdout mode") + } + + targetType, err := inferDatabaseType(targetConn) + if err != nil { + return "", fmt.Errorf("invalid target database: %v", err) + } + + if sourceType != targetType { + return "", fmt.Errorf("source and target must be the same database type (got source: %s, target: %s)", + sourceType, targetType) + } + } + + return sourceType, nil +} diff --git a/cmd/version.go b/cmd/version.go new file mode 100644 index 0000000..3f03a3a --- /dev/null +++ b/cmd/version.go @@ -0,0 +1,22 @@ +package cmd + +import ( + "fmt" + "github.com/spf13/cobra" +) + +var ( + Version = "0.1.0" + BuildTime = "unknown" + GitCommit = "unknown" + + versionCmd = &cobra.Command{ + Use: "version", + Short: "Print version information", + Run: func(cmd *cobra.Command, args []string) { + fmt.Printf("Data Migration Tool v%s\n", Version) + fmt.Printf("Build Time: %s\n", BuildTime) + fmt.Printf("Git Commit: %s\n", GitCommit) + }, + } +) diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..5a3aed7 --- /dev/null +++ b/go.mod @@ -0,0 +1,28 @@ +module data-migration + +go 1.23.3 + +require ( + github.com/go-sql-driver/mysql v1.8.1 + github.com/lib/pq v1.10.9 + github.com/spf13/cobra v1.8.1 + go.mongodb.org/mongo-driver v1.17.1 + go.uber.org/zap v1.27.0 +) + +require ( + filippo.io/edwards25519 v1.1.0 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/klauspost/compress v1.13.6 // indirect + github.com/montanaflynn/stats v0.7.1 // indirect + github.com/spf13/pflag v1.0.5 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect + github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect + go.uber.org/multierr v1.10.0 // indirect + golang.org/x/crypto v0.26.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/text v0.17.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..fc42b35 --- /dev/null +++ b/go.sum @@ -0,0 +1,77 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= +github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= +github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.mongodb.org/mongo-driver v1.17.1 h1:Wic5cJIwJgSpBhe3lx3+/RybR5PiYRMpVFgO7cOHyIM= +go.mongodb.org/mongo-driver v1.17.1/go.mod h1:wwWm/+BuOddhcq3n68LKRmgk2wXzmF6s0SFOa0GINL4= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..9a88a7a --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,32 @@ +package config + +import ( + "fmt" + "time" +) + +type Config struct { + SourceConn string + TargetConn string + SourceType string + TargetType string + StdoutMode bool + LogLevel string + BufferSize int + Timeout time.Duration + SkipVerification bool + VerifyChunkSize int + SkipTLSVerify bool +} + +func (c *Config) Validate() error { + if c.SourceType == "" || c.SourceConn == "" { + return fmt.Errorf("source type and connection string are required") + } + + if !c.StdoutMode && (c.TargetType == "" || c.TargetConn == "") { + return fmt.Errorf("target type and connection string are required when not using stdout") + } + + return nil +} diff --git a/internal/config/logger.go b/internal/config/logger.go new file mode 100644 index 0000000..183cc09 --- /dev/null +++ b/internal/config/logger.go @@ -0,0 +1,62 @@ +package config + +import ( + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "os" +) + +type LoggerOptions struct { + ForceStderr bool +} + +func NewLogger(level string, opts LoggerOptions) *Logger { + // Parse log level + var zapLevel zapcore.Level + if err := zapLevel.UnmarshalText([]byte(level)); err != nil { + zapLevel = zapcore.InfoLevel + } + + // Create encoder config + encoderConfig := zap.NewProductionEncoderConfig() + encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder + + // Create core + var core zapcore.Core + if opts.ForceStderr { + // Force all output to stderr when in stdout mode + core = zapcore.NewCore( + zapcore.NewConsoleEncoder(encoderConfig), + zapcore.Lock(os.Stderr), + zapLevel, + ) + } else { + // Normal configuration with both stdout and stderr + core = zapcore.NewCore( + zapcore.NewConsoleEncoder(encoderConfig), + zapcore.NewMultiWriteSyncer( + zapcore.Lock(os.Stdout), + zapcore.Lock(os.Stderr), + ), + zapLevel, + ) + } + + // Create logger + logger := zap.New(core) + return &Logger{ + logger: logger.Sugar(), + } +} + +// Logger wraps zap.SugaredLogger +type Logger struct { + logger *zap.SugaredLogger +} + +func (l *Logger) Info(args ...interface{}) { l.logger.Info(args...) } +func (l *Logger) Infof(template string, args ...interface{}) { l.logger.Infof(template, args...) } +func (l *Logger) Warn(args ...interface{}) { l.logger.Warn(args...) } +func (l *Logger) Warnf(template string, args ...interface{}) { l.logger.Warnf(template, args...) } +func (l *Logger) Error(args ...interface{}) { l.logger.Error(args...) } +func (l *Logger) Errorf(template string, args ...interface{}) { l.logger.Errorf(template, args...) } diff --git a/internal/migration/manager.go b/internal/migration/manager.go new file mode 100644 index 0000000..900278c --- /dev/null +++ b/internal/migration/manager.go @@ -0,0 +1,81 @@ +package migration + +import ( + "context" + "data-migration/internal/config" + "fmt" + "io" + "os" +) + +type Manager struct { + config *config.Config + logger *config.Logger +} + +func NewManager(cfg *config.Config, logger *config.Logger) *Manager { + return &Manager{ + config: cfg, + logger: logger, + } +} + +func (m *Manager) Migrate(ctx context.Context) error { + // Create source dumper + dumper, err := CreateDumper(m.config.SourceType, m.config.SourceConn, m.config.StdoutMode) + if err != nil { + return fmt.Errorf("failed to create dumper: %w", err) + } + + // Handle stdout mode + if m.config.StdoutMode { + m.logger.Info("Streaming database dump to stdout...") + return dumper.Dump(ctx, os.Stdout) + } + + // Create target restorer + restorer, err := CreateRestorer(m.config.TargetType, m.config.TargetConn) + if err != nil { + return fmt.Errorf("failed to create restorer: %w", err) + } + + // Create pipe for streaming + reader, writer := io.Pipe() + + // Setup error channels + errChan := make(chan error, 2) + + // Start dump process + go func() { + defer writer.Close() + m.logger.Info("Starting database dump...") + if err := dumper.Dump(ctx, writer); err != nil { + m.logger.Errorf("Dump error: %v", err) + errChan <- fmt.Errorf("dump failed: %w", err) + } else { + errChan <- nil + m.logger.Info("Database dump completed") + } + }() + + // Start restore process + go func() { + m.logger.Info("Starting database restore...") + if err := restorer.Restore(ctx, reader); err != nil { + m.logger.Errorf("Restore error: %v", err) + errChan <- fmt.Errorf("restore failed: %w", err) + } else { + errChan <- nil + m.logger.Info("Database restore completed") + } + }() + + // Wait for both processes to complete + for i := 0; i < 2; i++ { + if err := <-errChan; err != nil { + return err + } + } + + return nil +} diff --git a/internal/migration/mongodb.go b/internal/migration/mongodb.go new file mode 100644 index 0000000..f336a04 --- /dev/null +++ b/internal/migration/mongodb.go @@ -0,0 +1,62 @@ +package migration + +import ( + "context" + "fmt" + "io" + "os/exec" +) + +type MongoDBDumper struct { + connString string +} + +func NewMongoDBDumper(connString string) *MongoDBDumper { + return &MongoDBDumper{connString: connString} +} + +func (d *MongoDBDumper) GetType() DatabaseType { + return MongoDB +} + +func (d *MongoDBDumper) Dump(ctx context.Context, w io.Writer) error { + cmd := exec.CommandContext(ctx, "mongodump", + "--uri="+d.connString, + "--archive", + ) + cmd.Stdout = w + cmd.Stderr = io.Discard + + if err := cmd.Run(); err != nil { + return fmt.Errorf("mongodump failed: %w", err) + } + + return nil +} + +type MongoDBRestorer struct { + connString string +} + +func NewMongoDBRestorer(connString string) *MongoDBRestorer { + return &MongoDBRestorer{connString: connString} +} + +func (r *MongoDBRestorer) GetType() DatabaseType { + return MongoDB +} + +func (r *MongoDBRestorer) Restore(ctx context.Context, reader io.Reader) error { + cmd := exec.CommandContext(ctx, "mongorestore", + "--uri="+r.connString, + "--archive", + ) + cmd.Stdin = reader + cmd.Stderr = io.Discard + + if err := cmd.Run(); err != nil { + return fmt.Errorf("mongorestore failed: %w", err) + } + + return nil +} diff --git a/internal/migration/mysql.go b/internal/migration/mysql.go new file mode 100644 index 0000000..99f3758 --- /dev/null +++ b/internal/migration/mysql.go @@ -0,0 +1,124 @@ +package migration + +import ( + "context" + "fmt" + "io" + "log" + "os" + "os/exec" + "strings" +) + +type MySQLDumper struct { + connString string +} + +func NewMySQLDumper(connString string) *MySQLDumper { + return &MySQLDumper{connString: connString} +} + +func (d *MySQLDumper) GetType() DatabaseType { + return MySQL +} + +func (d *MySQLDumper) Dump(ctx context.Context, w io.Writer) error { + cmd := exec.CommandContext(ctx, "mysqldump", + "--defaults-extra-file="+createMySQLConfigFile(d.connString), + "--single-transaction", + "--quick", + "--compress", + ) + cmd.Stdout = w + cmd.Stderr = io.Discard + + if err := cmd.Run(); err != nil { + return fmt.Errorf("mysqldump failed: %w", err) + } + + return nil +} + +type MySQLRestorer struct { + connString string +} + +func NewMySQLRestorer(connString string) *MySQLRestorer { + return &MySQLRestorer{connString: connString} +} + +func (r *MySQLRestorer) GetType() DatabaseType { + return MySQL +} + +func (r *MySQLRestorer) Restore(ctx context.Context, reader io.Reader) error { + cmd := exec.CommandContext(ctx, "mysql", + "--defaults-extra-file="+createMySQLConfigFile(r.connString), + ) + cmd.Stdin = reader + cmd.Stderr = io.Discard + + if err := cmd.Run(); err != nil { + return fmt.Errorf("mysql restore failed: %w", err) + } + + return nil +} + +// Helper function to create MySQL config file from connection string +func createMySQLConfigFile(connString string) string { + // Parse MySQL connection string + // Expected format: user:pass@tcp(host:port)/dbname + + // Create a temporary file + tempFile, err := os.CreateTemp("", "mysql-config-*.cnf") + if err != nil { + log.Printf("Failed to create temp file: %v", err) + return "" + } + + // Extract credentials and connection info from connection string + user := "" + password := "" + host := "" + port := "3306" // default MySQL port + + // Basic parsing of MySQL connection string + if parts := strings.Split(connString, "@"); len(parts) == 2 { + // Extract credentials + credentials := strings.Split(parts[0], ":") + if len(credentials) == 2 { + user = credentials[0] + password = credentials[1] + } + + // Extract host and port + hostPart := parts[1] + if strings.HasPrefix(hostPart, "tcp(") { + hostPart = strings.TrimPrefix(hostPart, "tcp(") + hostPart = strings.Split(hostPart, ")/")[0] + hostDetails := strings.Split(hostPart, ":") + if len(hostDetails) == 2 { + host = hostDetails[0] + port = hostDetails[1] + } else { + host = hostPart + } + } + } + + // Write MySQL configuration + configContent := fmt.Sprintf(`[client] +user=%s +password=%s +host=%s +port=%s +`, user, password, host, port) + + if err := os.WriteFile(tempFile.Name(), []byte(configContent), 0600); err != nil { + log.Printf("Failed to write config file: %v", err) + return "" + } + + return tempFile.Name() +} diff --git a/internal/migration/postgres.go b/internal/migration/postgres.go new file mode 100644 index 0000000..a9fc9b6 --- /dev/null +++ b/internal/migration/postgres.go @@ -0,0 +1,90 @@ +package migration + +import ( + "bytes" + "context" + "fmt" + "io" + "os/exec" +) + +type PostgresDumper struct { + connString string + useCustomFormat bool +} + +func NewPostgresDumper(connString string, useCustomFormat bool) *PostgresDumper { + return &PostgresDumper{ + connString: connString, + useCustomFormat: useCustomFormat, + } +} + +func (d *PostgresDumper) GetType() DatabaseType { + return Postgres +} + +func (d *PostgresDumper) Dump(ctx context.Context, w io.Writer) error { + var stderr bytes.Buffer + + args := []string{ + "--verbose", + "--no-owner", // Add this to avoid permission issues + "--no-privileges", // Add this to avoid permission issues + } + + if d.useCustomFormat { + args = append(args, "--format=custom") + } else { + args = append(args, "--format=plain") + } + + args = append(args, d.connString) + + cmd := exec.CommandContext(ctx, "pg_dump", args...) + + cmd.Stdout = w + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + return fmt.Errorf("pg_dump failed: %v, stderr: %s", err, stderr.String()) + } + + return nil +} + +type PostgresRestorer struct { + connString string +} + +func NewPostgresRestorer(connString string) *PostgresRestorer { + return &PostgresRestorer{connString: connString} +} + +func (r *PostgresRestorer) GetType() DatabaseType { + return Postgres +} + +func (r *PostgresRestorer) Restore(ctx context.Context, reader io.Reader) error { + var stderr bytes.Buffer + + cmd := exec.CommandContext(ctx, "pg_restore", + "--verbose", + "--no-owner", // Add this to avoid permission issues + "--no-privileges", // Add this to avoid permission issues + "--clean", + "--if-exists", + "--no-comments", // Optional: skip restoring comments + "--no-security-labels", // Optional: skip security labels + fmt.Sprintf("--dbname=%s", r.connString), + ) + + cmd.Stdin = reader + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + return fmt.Errorf("pg_restore failed: %v, stderr: %s", err, stderr.String()) + } + + return nil +} diff --git a/internal/migration/types.go b/internal/migration/types.go new file mode 100644 index 0000000..a7e4fc0 --- /dev/null +++ b/internal/migration/types.go @@ -0,0 +1,56 @@ +package migration + +import ( + "context" + "fmt" + "io" +) + +type DatabaseType string + +const ( + Postgres DatabaseType = "postgres" + MySQL DatabaseType = "mysql" + MongoDB DatabaseType = "mongodb" +) + +// Dumper defines the interface for database dump operations +type Dumper interface { + Dump(ctx context.Context, w io.Writer) error + GetType() DatabaseType +} + +// Restorer defines the interface for database restore operations +type Restorer interface { + Restore(ctx context.Context, r io.Reader) error + GetType() DatabaseType +} + +func CreateDumper(dbType string, connStr string, useStdOut bool) (Dumper, error) { + switch DatabaseType(dbType) { + case Postgres: + // Use custom format when not in stdout mode -- better for large databases + // Use plain format when in stdout mode -- better for readability + useCustomFormat := !useStdOut + return NewPostgresDumper(connStr, useCustomFormat), nil + case MySQL: + return NewMySQLDumper(connStr), nil + case MongoDB: + return NewMongoDBDumper(connStr), nil + default: + return nil, fmt.Errorf("unsupported source database type: %s", dbType) + } +} + +func CreateRestorer(dbType string, connStr string) (Restorer, error) { + switch DatabaseType(dbType) { + case Postgres: + return NewPostgresRestorer(connStr), nil + case MySQL: + return NewMySQLRestorer(connStr), nil + case MongoDB: + return NewMongoDBRestorer(connStr), nil + default: + return nil, fmt.Errorf("unsupported target database type: %s", dbType) + } +} diff --git a/internal/migration/verifier.go b/internal/migration/verifier.go new file mode 100644 index 0000000..98048ad --- /dev/null +++ b/internal/migration/verifier.go @@ -0,0 +1,258 @@ +package migration + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "errors" + "fmt" + "io" +) + +// Verifier defines the interface for database verification operations +type Verifier interface { + VerifyContent(ctx context.Context) error + GetChecksum(ctx context.Context) (string, error) +} + +const defaultChunkSize = 10 * 1024 * 1024 // 10MB chunks by default + +// DatabaseVerifier implements verification for any database type +type DatabaseVerifier struct { + sourceDumper Dumper + targetDumper Dumper + chunkSize int +} + +// VerifierOption defines function type for verifier options +type VerifierOption func(*DatabaseVerifier) + +// WithChunkSize sets a custom chunk size for comparison +func WithChunkSize(size int) VerifierOption { + return func(v *DatabaseVerifier) { + if size > 0 { + v.chunkSize = size + } + } +} + +// NewDatabaseVerifier creates a new verifier instance +func NewDatabaseVerifier(sourceDumper, targetDumper Dumper, opts ...VerifierOption) (*DatabaseVerifier, error) { + if sourceDumper == nil || targetDumper == nil { + return nil, fmt.Errorf("source and target dumpers must not be nil") + } + + if sourceDumper.GetType() != targetDumper.GetType() { + return nil, fmt.Errorf("source and target databases must be of the same type") + } + + v := &DatabaseVerifier{ + sourceDumper: sourceDumper, + targetDumper: targetDumper, + chunkSize: defaultChunkSize, + } + + // Apply options + for _, opt := range opts { + opt(v) + } + + return v, nil +} + +// compareReaders compares two io.Reader streams chunk by chunk +func (v *DatabaseVerifier) compareReaders(ctx context.Context, source, target io.Reader) (bool, error) { + sourceChunk := make([]byte, v.chunkSize) + targetChunk := make([]byte, v.chunkSize) + + for { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + // Read chunks from both sources + sourceN, sourceErr := io.ReadFull(source, sourceChunk) + targetN, targetErr := io.ReadFull(target, targetChunk) + + // Handle read results + if sourceErr != nil && sourceErr != io.EOF && !errors.Is(sourceErr, io.ErrUnexpectedEOF) { + return false, fmt.Errorf("error reading source: %w", sourceErr) + } + if targetErr != nil && targetErr != io.EOF && !errors.Is(targetErr, io.ErrUnexpectedEOF) { + return false, fmt.Errorf("error reading target: %w", targetErr) + } + + // Compare chunks based on database type + switch v.sourceDumper.GetType() { + case Postgres: + if !bytes.Equal(sourceChunk[:sourceN], targetChunk[:targetN]) { + return false, nil + } + case MySQL: + normalizedSource, err := normalizeMySQLDump(sourceChunk[:sourceN]) + if err != nil { + return false, err + } + normalizedTarget, err := normalizeMySQLDump(targetChunk[:targetN]) + if err != nil { + return false, err + } + if !bytes.Equal(normalizedSource, normalizedTarget) { + return false, nil + } + case MongoDB: + normalizedSource, err := normalizeMongoDBDump(sourceChunk[:sourceN]) + if err != nil { + return false, err + } + normalizedTarget, err := normalizeMongoDBDump(targetChunk[:targetN]) + if err != nil { + return false, err + } + if !bytes.Equal(normalizedSource, normalizedTarget) { + return false, nil + } + } + + // Check if we've reached the end of both streams + if sourceErr == io.EOF || errors.Is(sourceErr, io.ErrUnexpectedEOF) { + if targetErr == io.EOF || errors.Is(targetErr, io.ErrUnexpectedEOF) { + // Both streams ended + return sourceN == targetN, nil + } + // Source ended but target didn't + return false, nil + } + if targetErr == io.EOF || errors.Is(targetErr, io.ErrUnexpectedEOF) { + // Target ended but source didn't + return false, nil + } + } + } +} + +// VerifyContent performs verification of the migration by comparing dumps +func (v *DatabaseVerifier) VerifyContent(ctx context.Context) error { + // Create pipes for streaming the dumps + sourceReader, sourceWriter := io.Pipe() + targetReader, targetWriter := io.Pipe() + + // Create error channels for the dump operations + sourceDumpErr := make(chan error, 1) + targetDumpErr := make(chan error, 1) + compareErr := make(chan error, 1) + + // Start dumping source database + go func() { + defer sourceWriter.Close() + err := v.sourceDumper.Dump(ctx, sourceWriter) + sourceDumpErr <- err + }() + + // Start dumping target database + go func() { + defer targetWriter.Close() + err := v.targetDumper.Dump(ctx, targetWriter) + targetDumpErr <- err + }() + + // Start comparison in a goroutine + go func() { + equal, err := v.compareReaders(ctx, sourceReader, targetReader) + if err != nil { + compareErr <- err + return + } + if !equal { + compareErr <- fmt.Errorf("content verification failed: source and target databases do not match") + return + } + compareErr <- nil + }() + + // Wait for all operations to complete or context to cancel + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-sourceDumpErr: + if err != nil { + return fmt.Errorf("failed to dump source database: %w", err) + } + case err := <-targetDumpErr: + if err != nil { + return fmt.Errorf("failed to dump target database: %w", err) + } + case err := <-compareErr: + if err != nil { + return err + } + } + + return nil +} + +// GetChecksum generates a checksum of the database content using streaming +func (v *DatabaseVerifier) GetChecksum(ctx context.Context) (string, error) { + reader, writer := io.Pipe() + hashErr := make(chan error, 1) + hash := sha256.New() + + // Start calculating hash in a goroutine + go func() { + defer reader.Close() + _, err := io.Copy(hash, reader) + hashErr <- err + }() + + // Dump the database + if err := v.targetDumper.Dump(ctx, writer); err != nil { + writer.Close() + return "", fmt.Errorf("failed to dump database for checksum: %w", err) + } + writer.Close() + + // Wait for hash calculation to complete + if err := <-hashErr; err != nil { + return "", fmt.Errorf("failed to calculate checksum: %w", err) + } + + return hex.EncodeToString(hash.Sum(nil)), nil +} + +// normalizeMySQLDump normalizes a MySQL dump chunk for comparison +func normalizeMySQLDump(chunk []byte) ([]byte, error) { + // Remove timestamps, variable content, and other non-deterministic elements + lines := bytes.Split(chunk, []byte("\n")) + var normalized [][]byte + + for _, line := range lines { + // Skip lines that contain non-deterministic content + if bytes.HasPrefix(line, []byte("-- Dump completed on")) || + bytes.HasPrefix(line, []byte("-- MySQL dump")) || + bytes.HasPrefix(line, []byte("-- Server version")) { + continue + } + normalized = append(normalized, line) + } + + return bytes.Join(normalized, []byte("\n")), nil +} + +// normalizeMongoDBDump normalizes a MongoDB dump chunk for comparison +func normalizeMongoDBDump(chunk []byte) ([]byte, error) { + // MongoDB dumps might contain ObjectIDs and timestamps that need normalization + lines := bytes.Split(chunk, []byte("\n")) + var normalized [][]byte + + for _, line := range lines { + // Skip or normalize lines containing non-deterministic content + if bytes.Contains(line, []byte("\"$timestamp\"")) || + bytes.Contains(line, []byte("\"$date\"")) { + continue + } + normalized = append(normalized, line) + } + + return bytes.Join(normalized, []byte("\n")), nil +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..42eb63c --- /dev/null +++ b/main.go @@ -0,0 +1,9 @@ +package main + +import ( + "data-migration/cmd" +) + +func main() { + cmd.Execute() +}