From 7fa12fe75919c38881d1241a5a565e6774402b95 Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 4 Jul 2023 18:52:20 +0800 Subject: [PATCH] Refactor command registration logic (#161) Signed-off-by: Congqi Xia --- states/backup_mock_connect.go | 9 +- states/configuration.go | 71 ++++++++ states/current_version.go | 30 ++++ states/download_segment.go | 24 +++ states/etcd_connect.go | 258 +++++++++++++-------------- states/exit.go | 22 +-- states/instance.go | 20 ++- states/load_backup.go | 186 ++++++++++---------- states/open.go | 88 +++++----- states/parse_file.go | 313 +++++++++++++++++---------------- states/pulsarctl_connect.go | 27 +++ states/start.go | 35 +--- states/states.go | 317 +++++++++++++++++++++++++++++++++- states/util.go | 74 ++++---- states/visit_datacoord.go | 4 +- states/visit_datanode.go | 3 +- states/visit_indexcoord.go | 3 +- states/visit_indexnode.go | 2 +- states/visit_querycoord.go | 34 +++- states/visit_querynode.go | 3 +- states/visit_rootcoord.go | 2 +- 21 files changed, 980 insertions(+), 545 deletions(-) diff --git a/states/backup_mock_connect.go b/states/backup_mock_connect.go index c997b33..c6f34ff 100644 --- a/states/backup_mock_connect.go +++ b/states/backup_mock_connect.go @@ -71,22 +71,17 @@ func (s *embedEtcdMockState) SetupCommands() { // force-release getForceReleaseCmd(s.client, rootPath), - // disconnect - getDisconnectCmd(s, s.config), - // for testing etcd.RepairCommand(s.client, rootPath), getPrintMetricsCmd(s), getListMetricsNodeCmd(s), - - // exit - getExitCmd(s), ) - cmd.AddCommand(getGlobalUtilCommands()...) cmd.AddCommand(etcd.RawCommands(s.client)...) + s.mergeFunctionCommands(cmd, s) + s.cmdState.rootCmd = cmd s.setupFn = s.SetupCommands } diff --git a/states/configuration.go b/states/configuration.go index c7fe875..052c6e2 100644 --- a/states/configuration.go +++ b/states/configuration.go @@ -17,6 +17,77 @@ import ( "google.golang.org/grpc" ) +type GetConfigurationParam struct { + ParamBase `use:"show configurations" desc:"iterate all online components and inspect configuration"` + Format string `name:"format" default:"line" desc:"output format"` +} + +func (s *instanceState) GetConfigurationCommand(ctx context.Context, p *GetConfigurationParam) error { + sessions, err := common.ListSessions(s.client, s.basePath) + if err != nil { + return err + } + + results := make(map[string]map[string]string) + + for _, session := range sessions { + opts := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithBlock(), + grpc.WithTimeout(2 * time.Second), + } + + conn, err := grpc.DialContext(ctx, session.Address, opts...) + if err != nil { + fmt.Printf("failed to connect %s(%d), err: %s\n", session.ServerName, session.ServerID, err.Error()) + continue + } + var client configurationSource + switch strings.ToLower(session.ServerName) { + case "rootcoord": + client = rootcoordpbv2.NewRootCoordClient(conn) + case "datacoord": + client = datapbv2.NewDataCoordClient(conn) + case "indexcoord": + client = indexpbv2.NewIndexCoordClient(conn) + case "querycoord": + client = querypbv2.NewQueryCoordClient(conn) + case "datanode": + client = datapbv2.NewDataNodeClient(conn) + case "querynode": + client = querypbv2.NewQueryNodeClient(conn) + case "indexnode": + client = indexpbv2.NewIndexNodeClient(conn) + } + if client == nil { + continue + } + + configurations, err := getConfiguration(context.Background(), client, session.ServerID) + if err != nil { + continue + } + + results[fmt.Sprintf("%s-%d", session.ServerName, session.ServerID)] = common.KVListMap(configurations) + } + + switch strings.ToLower(p.Format) { + case "json": + bs, _ := json.MarshalIndent(results, "", "\t") + fmt.Println(string(bs)) + case "line": + fallthrough + default: + for comp, configs := range results { + fmt.Println("Component", comp) + for key, value := range configs { + fmt.Printf("%s: %s\n", key, value) + } + } + } + return nil +} + // GetConfigurationCommand returns command to iterate all online components and fetch configurations. func GetConfigurationCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ diff --git a/states/current_version.go b/states/current_version.go index dd9d9a4..f197532 100644 --- a/states/current_version.go +++ b/states/current_version.go @@ -1,8 +1,10 @@ package states import ( + "context" "fmt" + "github.com/cockroachdb/errors" "github.com/milvus-io/birdwatcher/models" etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" "github.com/spf13/cobra" @@ -19,6 +21,34 @@ func CurrentVersionCommand() *cobra.Command { return cmd } +type setCurrentVersionParam struct { + ParamBase `use:"set current-version" desc:"set current version for etcd meta parsing"` + newVersion string +} + +func (p *setCurrentVersionParam) ParseArgs(args []string) error { + if len(args) != 1 { + return errors.New("invalid parameter number") + } + p.newVersion = args[0] + return nil +} + +func (s *instanceState) SetCurrentVersionCommand(ctx context.Context, param setCurrentVersionParam) { + switch param.newVersion { + case models.LTEVersion2_1: + fallthrough + case "LTEVersion2_1": + etcdversion.SetVersion(models.LTEVersion2_1) + case models.GTEVersion2_2: + fallthrough + case "GTEVersion2_2": + etcdversion.SetVersion(models.GTEVersion2_2) + default: + fmt.Println("Invalid version string:", param.newVersion) + } +} + // SetCurrentVersionCommand returns command for set current-version. func SetCurrentVersionCommand() *cobra.Command { cmd := &cobra.Command{ diff --git a/states/download_segment.go b/states/download_segment.go index 2a7c238..92832c1 100644 --- a/states/download_segment.go +++ b/states/download_segment.go @@ -16,6 +16,7 @@ import ( "github.com/milvus-io/birdwatcher/proto/v2.0/indexpb" "github.com/milvus-io/birdwatcher/states/etcd/common" "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -66,6 +67,29 @@ func getDownloadSegmentCmd(cli clientv3.KV, basePath string) *cobra.Command { return cmd } +func getMinioWithInfo(addr string, ak, sk string, bucketName string) (*minio.Client, string, error) { + cred := credentials.NewStaticV4(ak, sk, "") + minioClient, err := minio.New(addr, &minio.Options{ + Creds: cred, + Secure: false, + }) + if err != nil { + return nil, "", err + } + exists, err := minioClient.BucketExists(context.Background(), bucketName) + if !exists { + fmt.Printf("bucket %s not exists\n", bucketName) + return nil, "", err + } + + if !exists { + fmt.Printf("Bucket not exist\n") + return nil, "", errors.New("bucket not exists") + } + + return minioClient, bucketName, nil +} + func getMinioAccess() (*minio.Client, string, error) { p := promptui.Prompt{ Label: "BucketName", diff --git a/states/etcd_connect.go b/states/etcd_connect.go index 9aabdf1..6819ee3 100644 --- a/states/etcd_connect.go +++ b/states/etcd_connect.go @@ -36,135 +36,60 @@ func pingEtcd(ctx context.Context, cli clientv3.KV, rootPath string, metaPath st return nil } -// getConnectCommand returns the command for connect etcd. -// usage: connect --etcd [address] --rootPath [rootPath] -func getConnectCommand(state State, config *configs.Config) *cobra.Command { - cmd := &cobra.Command{ - Use: "connect [options]", - Short: "Connect to etcd instance", - RunE: func(cmd *cobra.Command, args []string) error { - etcdAddr, err := cmd.Flags().GetString("etcd") - if err != nil { - return err - } - rootPath, err := cmd.Flags().GetString("rootPath") - if err != nil { - return err - } - metaPath, err := cmd.Flags().GetString("metaPath") - if err != nil { - return err - } - force, err := cmd.Flags().GetBool("force") - if err != nil { - return err - } - dry, err := cmd.Flags().GetBool("dry") - if err != nil { - return err - } - - etcdCli, err := clientv3.New(clientv3.Config{ - Endpoints: []string{etcdAddr}, - DialTimeout: time.Second * 10, - - // disable grpc logging - Logger: zap.NewNop(), - }) - if err != nil { - return err - } - - etcdState := getEtcdConnectedState(etcdCli, etcdAddr, config) - if !dry { - // ping etcd - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - err = pingEtcd(ctx, etcdCli, rootPath, metaPath) - if err != nil { - if errors.Is(err, ErrNotMilvsuRootPath) { - if !force { - etcdCli.Close() - fmt.Printf("Connection established, but %s, please check your config or use Dry mode\n", err.Error()) - return nil - } - } else { - fmt.Println("cannot connect to etcd with addr:", etcdAddr, err.Error()) - return nil - } - } +type ConnectParams struct { + ParamBase `use:"connect" desc:"Connect to etcd"` + EtcdAddr string `name:"etcd" default:"127.0.0.1:2379" desc:"the etcd endpoint to connect"` + RootPath string `name:"rootPath" default:"by-dev" desc:"meta root paht milvus is using"` + MetaPath string `name:"metaPath" default:"meta" desc:"meta path prefix"` + Force bool `name:"force" default:"false" desc:"force connect ignoring ping Etcd & rootPath check"` + Dry bool `name:"dry" default:"false" desc:"dry connect without specifying milvus instance"` +} - fmt.Println("Using meta path:", fmt.Sprintf("%s/%s/", rootPath, metaPath)) +func (s *disconnectState) ConnectCommand(ctx context.Context, cp *ConnectParams) error { + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{cp.EtcdAddr}, + DialTimeout: time.Second * 10, - // use rootPath as instanceName - state.SetNext(getInstanceState(etcdCli, rootPath, etcdState, config)) - } else { - fmt.Println("using dry mode, ignore rootPath and metaPath") - // rootPath empty fall back to etcd connected state - state.SetNext(etcdState) - } - return nil - }, + // disable grpc logging + Logger: zap.NewNop(), + }) + if err != nil { + fmt.Println(err.Error()) + return err } - cmd.Flags().String("etcd", "127.0.0.1:2379", "the etcd endpoint to connect") - cmd.Flags().String("rootPath", "by-dev", "meta root path milvus is using") - cmd.Flags().String("metaPath", metaPath, "meta path prefix") - cmd.Flags().Bool("force", false, "force connect ignoring ping Etcd rootPath check") - cmd.Flags().Bool("dry", false, "dry connect without specify milvus instance") - return cmd -} -// findMilvusInstance iterate all possible rootPath -func findMilvusInstance(cli clientv3.KV) ([]string, error) { - var apps []string - current := "" - for { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + etcdState := getEtcdConnectedState(etcdCli, cp.EtcdAddr, s.config) + if !cp.Dry { + // ping etcd + ctx, cancel := context.WithTimeout(ctx, time.Second*10) defer cancel() - resp, err := cli.Get(ctx, current, clientv3.WithKeysOnly(), clientv3.WithLimit(1), clientv3.WithFromKey()) - + err = pingEtcd(ctx, etcdCli, cp.RootPath, cp.MetaPath) if err != nil { - return nil, err - } - for _, kv := range resp.Kvs { - key := string(kv.Key) - parts := strings.Split(key, "/") - if parts[0] != "" { - apps = append(apps, parts[0]) + if errors.Is(err, ErrNotMilvsuRootPath) { + if !cp.Force { + etcdCli.Close() + fmt.Printf("Connection established, but %s, please check your config or use Dry mode\n", err.Error()) + return err + } + } else { + fmt.Println("cannot connect to etcd with addr:", cp.EtcdAddr, err.Error()) + return err } - // next key, since '0' is the next ascii char of '/' - current = parts[0] + "0" - } - - if !resp.More { - break } - } - return apps, nil -} + fmt.Println("Using meta path:", fmt.Sprintf("%s/%s/", cp.RootPath, metaPath)) -func getFindMilvusCmd(cli clientv3.KV, state *etcdConnectedState) *cobra.Command { - cmd := &cobra.Command{ - Use: "find-milvus", - Short: "search etcd kvs to find milvus instance", - Run: func(cmd *cobra.Command, args []string) { - apps, err := findMilvusInstance(cli) - if err != nil { - fmt.Println("failed to find milvus instance:", err.Error()) - return - } - fmt.Printf("%d candidates found:\n", len(apps)) - for _, app := range apps { - fmt.Println(app) - } - state.candidates = apps - }, + // use rootPath as instanceName + s.SetNext(getInstanceState(etcdCli, cp.RootPath, cp.MetaPath, etcdState, s.config)) + } else { + fmt.Println("using dry mode, ignore rootPath and metaPath") + // rootPath empty fall back to etcd connected state + s.SetNext(etcdState) } - - return cmd + return nil } +/* func getUseCmd(cli clientv3.KV, state State, config *configs.Config) *cobra.Command { cmd := &cobra.Command{ Use: "use [instance name]", @@ -210,7 +135,7 @@ func getUseCmd(cli clientv3.KV, state State, config *configs.Config) *cobra.Comm cmd.Flags().Bool("force", false, "force connect ignoring ping Etcd rootPath check") cmd.Flags().String("metaPath", metaPath, "meta path prefix") return cmd -} +}*/ type etcdConnectedState struct { cmdState @@ -225,22 +150,13 @@ type etcdConnectedState struct { func (s *etcdConnectedState) SetupCommands() { cmd := &cobra.Command{} - cmd.AddCommand( - // find-milvus - getFindMilvusCmd(s.client, s), - // use - getUseCmd(s.client, s, s.config), - // disconnect - getDisconnectCmd(s, s.config), - // exit - getExitCmd(s), - ) + s.mergeFunctionCommands(cmd, s) s.cmdState.rootCmd = cmd s.setupFn = s.SetupCommands } -// TBD for testing only +// getEtcdConnectedState returns etcdConnectedState for unknown instance func getEtcdConnectedState(cli *clientv3.Client, addr string, config *configs.Config) State { state := &etcdConnectedState{ @@ -257,6 +173,92 @@ func getEtcdConnectedState(cli *clientv3.Client, addr string, config *configs.Co return state } +func (s *etcdConnectedState) DisconnectCommand(ctx context.Context, p *disconnectParam) error { + s.SetNext(Start(s.config)) + s.Close() + return nil +} + +type FindMilvusParam struct { + ParamBase `use:"find-milvus" desc:"search etcd kvs to find milvus instance"` +} + +func (s *etcdConnectedState) FindMilvusCommand(ctx context.Context, p *FindMilvusParam) error { + apps, err := findMilvusInstance(ctx, s.client) + if err != nil { + return err + } + fmt.Printf("%d candidates found:\n", len(apps)) + for _, app := range apps { + fmt.Println(app) + } + s.candidates = apps + return nil +} + +type UseParam struct { + ParamBase `use:"use [instance-name]" desc:"use specified milvus instance"` + instanceName string + Force bool `name:"force" default:"false" desc:"force connect ignoring ping result"` + MetaPath string `name:"metaPath" default:"meta" desc:"meta path prefix"` +} + +func (p *UseParam) ParseArgs(args []string) error { + if len(args) != 1 { + return errors.New("instance shall be provided") + } + p.instanceName = args[0] + return nil +} + +func (s *etcdConnectedState) UseCommand(ctx context.Context, p *UseParam) error { + err := pingEtcd(ctx, s.client, p.instanceName, p.MetaPath) + if err != nil { + if errors.Is(err, ErrNotMilvsuRootPath) { + if !p.Force { + fmt.Printf("Connection established, but %s, please check your config or use Dry mode\n", err.Error()) + return err + } + } else { + fmt.Println("failed to ping etcd", err.Error()) + return err + } + } + + fmt.Printf("Using meta path: %s/%s/\n", p.instanceName, p.MetaPath) + + s.SetNext(getInstanceState(s.client, p.instanceName, p.MetaPath, s, s.config)) + return nil +} + +// findMilvusInstance iterate all possible rootPath +func findMilvusInstance(ctx context.Context, cli clientv3.KV) ([]string, error) { + var apps []string + current := "" + for { + resp, err := cli.Get(ctx, current, clientv3.WithKeysOnly(), clientv3.WithLimit(1), clientv3.WithFromKey()) + + if err != nil { + return nil, err + } + for _, kv := range resp.Kvs { + key := string(kv.Key) + parts := strings.Split(key, "/") + if parts[0] != "" { + apps = append(apps, parts[0]) + } + // next key, since '0' is the next ascii char of '/' + current = parts[0] + "0" + } + + if !resp.More { + break + } + } + + return apps, nil +} + func (s *etcdConnectedState) Close() { s.client.Close() } diff --git a/states/exit.go b/states/exit.go index 7e641e0..9f56ccb 100644 --- a/states/exit.go +++ b/states/exit.go @@ -1,7 +1,8 @@ package states import ( - "github.com/milvus-io/birdwatcher/configs" + "context" + "github.com/spf13/cobra" ) @@ -43,16 +44,11 @@ func (s *exitState) SetupCommands() {} // IsEnding returns true for exit State func (s *exitState) IsEnding() bool { return true } -// getDisconnectCmd disconnect from current state. -// will call close method for current state. -func getDisconnectCmd(state State, config *configs.Config) *cobra.Command { - cmd := &cobra.Command{ - Use: "disconnect", - Short: "disconnect from current state", - Run: func(*cobra.Command, []string) { - state.SetNext(Start(config)) - state.Close() - }, - } - return cmd +type disconnectParam struct { + ParamBase `use:"disconnect" desc:"disconnect from current etcd instance"` +} + +func (s *instanceState) DisconnectCommand(ctx context.Context, _ *disconnectParam) { + s.SetNext(Start(s.config)) + s.Close() } diff --git a/states/instance.go b/states/instance.go index 210abfe..1aac07d 100644 --- a/states/instance.go +++ b/states/instance.go @@ -21,6 +21,7 @@ type instanceState struct { etcdState State config *configs.Config + basePath string } func (s *instanceState) Close() { @@ -37,7 +38,7 @@ func (s *instanceState) SetupCommands() { cli := s.client instanceName := s.instanceName - basePath := path.Join(instanceName, metaPath) + basePath := s.basePath showCmd := etcd.ShowCommand(cli, basePath) showCmd.AddCommand( @@ -46,7 +47,7 @@ func (s *instanceState) SetupCommands() { // show segment-loaded-grpc GetDistributionCommand(cli, basePath), // show configurations - GetConfigurationCommand(cli, basePath), + // GetConfigurationCommand(cli, basePath), ) cmd.AddCommand( @@ -60,6 +61,8 @@ func (s *instanceState) SetupCommands() { etcd.RemoveCommand(cli, basePath), // set [subcommand] options... etcd.SetCommand(cli, instanceName, metaPath), + // restore [subcommand] options... + etcd.RestoreCommand(cli, basePath), // backup [component] getBackupEtcdCmd(cli, basePath), @@ -101,12 +104,10 @@ func (s *instanceState) SetupCommands() { getFetchMetricsCmd(cli, basePath), // dry-mode getDryModeCmd(cli, s, s.etcdState), - // disconnect - getDisconnectCmd(s, s.config), - // exit - getExitCmd(s), ) + //cmd.AddCommand(etcd.RawCommands(cli)...) + s.mergeFunctionCommands(cmd, s) s.cmdState.rootCmd = cmd s.setupFn = s.SetupCommands } @@ -123,8 +124,7 @@ func getDryModeCmd(cli clientv3.KV, state *instanceState, etcdState State) *cobr return cmd } -func getInstanceState(cli clientv3.KV, instanceName string, etcdState State, config *configs.Config) State { - +func getInstanceState(cli clientv3.KV, instanceName, metaPath string, etcdState State, config *configs.Config) State { var kv clientv3.KV file, err := os.OpenFile("audit.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, os.ModeAppend) if err != nil { @@ -133,6 +133,9 @@ func getInstanceState(cli clientv3.KV, instanceName string, etcdState State, con } else { kv = audit.NewFileAuditKV(cli, file) } + + basePath := path.Join(instanceName, metaPath) + // use audit kv state := &instanceState{ cmdState: cmdState{ @@ -144,6 +147,7 @@ func getInstanceState(cli clientv3.KV, instanceName string, etcdState State, con etcdState: etcdState, config: config, + basePath: basePath, } state.SetupCommands() diff --git a/states/load_backup.go b/states/load_backup.go index 4b37c4b..72930cd 100644 --- a/states/load_backup.go +++ b/states/load_backup.go @@ -3,6 +3,7 @@ package states import ( "bufio" "compress/gzip" + "context" "fmt" "os" "path" @@ -12,106 +13,95 @@ import ( "github.com/milvus-io/birdwatcher/configs" "github.com/milvus-io/birdwatcher/models" "github.com/mitchellh/go-homedir" - "github.com/spf13/cobra" ) -func getLoadBackupCmd(state State, config *configs.Config) *cobra.Command { - cmd := &cobra.Command{ - Use: "load-backup [file]", - Short: "load etcd backup file as env", - Run: func(cmd *cobra.Command, args []string) { - useWorkspace, err := cmd.Flags().GetBool("use-workspace") - if err != nil { - fmt.Println(err.Error()) - return - } - workspaceName, err := cmd.Flags().GetString("workspace-name") - if err != nil { - fmt.Println(err.Error()) - return - } - if len(args) == 0 { - fmt.Println("No backup file provided.") - return - } - if len(args) > 1 { - fmt.Println("only one backup file is allowed") - return - } - - arg := args[0] - f, err := openBackupFile(arg) - if err != nil { - return - } - defer f.Close() - - r, err := gzip.NewReader(f) - if err != nil { - fmt.Println("failed to open gzip reader, err:", err.Error()) - return - } - defer r.Close() - - rd := bufio.NewReader(r) - var header models.BackupHeader - err = readFixLengthHeader(rd, &header) - if err != nil { - fmt.Println("failed to load backup header", err.Error()) - return - } - - if useWorkspace { - if workspaceName == "" { - fileName := path.Base(arg) - workspaceName = fileName - } - workspaceName = createWorkspaceFolder(config, workspaceName) - } - - server, err := startEmbedEtcdServer(workspaceName, useWorkspace) - if err != nil { - fmt.Println("failed to start embed etcd server:", err.Error()) - return - } - fmt.Println("using data dir:", server.Config().Dir) - // TODO - nextState := getEmbedEtcdInstanceV2(server, config) - switch header.Version { - case 1: - fmt.Printf("Found backup version: %d, instance name :%s\n", header.Version, header.Instance) - err = restoreFromV1File(nextState.client, rd, &header) - if err != nil { - fmt.Println("failed to restore v1 backup file", err.Error()) - nextState.Close() - return - } - nextState.SetInstance(header.Instance) - case 2: - err = restoreV2File(rd, nextState) - if err != nil { - fmt.Println("failed to restore v2 backup file", err.Error()) - nextState.Close() - return - } - default: - fmt.Printf("backup version %d not supported\n", header.Version) - nextState.Close() - return - } - err = nextState.setupWorkDir(server.Config().Dir) - if err != nil { - fmt.Println("failed to setup workspace for backup file", err.Error()) - return - } - - state.SetNext(nextState) - }, - } - - cmd.Flags().Bool("use-workspace", false, "load backup into workspace") - cmd.Flags().String("workspace-name", "", "workspace name if used") - return cmd +type LoadBackupParam struct { + ParamBase `use:"load-backup [file]" desc:"load etcd backup file"` + backupFile string + UseWorkspace bool `name:"use-workspace" default:"false"` + WorkspaceName string `name:"workspace-name" default:""` +} + +func (p *LoadBackupParam) ParseArgs(args []string) error { + if len(args) == 0 { + return errors.New("no backup file provided") + } + if len(args) > 1 { + return errors.New("only one backup file is allowed") + } + + p.backupFile = args[0] + return nil +} + +func (s *disconnectState) LoadBackupCommand(ctx context.Context, p *LoadBackupParam) error { + f, err := openBackupFile(p.backupFile) + if err != nil { + return err + } + defer f.Close() + + r, err := gzip.NewReader(f) + if err != nil { + fmt.Println("failed to open gzip reader, err:", err.Error()) + return err + } + defer r.Close() + + rd := bufio.NewReader(r) + var header models.BackupHeader + err = readFixLengthHeader(rd, &header) + if err != nil { + fmt.Println("failed to load backup header", err.Error()) + return err + } + + if p.UseWorkspace { + if p.WorkspaceName == "" { + fileName := path.Base(p.backupFile) + p.WorkspaceName = fileName + } + p.WorkspaceName = createWorkspaceFolder(s.config, p.WorkspaceName) + } + + server, err := startEmbedEtcdServer(p.WorkspaceName, p.UseWorkspace) + if err != nil { + fmt.Println("failed to start embed etcd server:", err.Error()) + return err + } + fmt.Println("using data dir:", server.Config().Dir) + // TODO + nextState := getEmbedEtcdInstanceV2(server, s.config) + switch header.Version { + case 1: + fmt.Printf("Found backup version: %d, instance name :%s\n", header.Version, header.Instance) + err = restoreFromV1File(nextState.client, rd, &header) + if err != nil { + fmt.Println("failed to restore v1 backup file", err.Error()) + nextState.Close() + return err + } + nextState.SetInstance(header.Instance) + case 2: + err = restoreV2File(rd, nextState) + if err != nil { + fmt.Println("failed to restore v2 backup file", err.Error()) + nextState.Close() + return err + } + default: + fmt.Printf("backup version %d not supported\n", header.Version) + nextState.Close() + return err + } + err = nextState.setupWorkDir(server.Config().Dir) + if err != nil { + fmt.Println("failed to setup workspace for backup file", err.Error()) + return err + } + + s.SetNext(nextState) + return nil } func openBackupFile(arg string) (*os.File, error) { diff --git a/states/open.go b/states/open.go index 3d91caf..8d4eb62 100644 --- a/states/open.go +++ b/states/open.go @@ -1,57 +1,55 @@ package states import ( + "context" "fmt" "os" "path" - "github.com/milvus-io/birdwatcher/configs" - "github.com/spf13/cobra" + "github.com/cockroachdb/errors" ) -// getOpenWorkspaceCmd returns open-workspace command. -func getOpenWorkspaceCmd(state State, config *configs.Config) *cobra.Command { - cmd := &cobra.Command{ - //TODO add workspace auto complete - Use: "open-workspace [workspace name]", - Short: "Open workspace", - Run: func(cmd *cobra.Command, args []string) { - if len(args) == 0 { - fmt.Println("No backup file provided.") - return - } - if len(args) > 1 { - fmt.Println("only one backup file is allowed") - return - } - - workspaceName := args[0] - workPath := path.Join(config.WorkspacePath, workspaceName) - info, err := os.Stat(workPath) - if os.IsNotExist(err) { - fmt.Printf("workspace %s not exist\n", workspaceName) - return - } - if !info.IsDir() { - fmt.Printf("workspace %s is not a directory\n", workspaceName) - return - } - - server, err := startEmbedEtcdServer(workPath, true) - if err != nil { - fmt.Printf("failed to start embed etcd server in workspace %s, err: %s\n", workspaceName, err.Error()) - return - } - - nextState := getEmbedEtcdInstanceV2(server, config) - err = nextState.setupWorkDir(workPath) - if err != nil { - fmt.Printf("failed to setup workspace for %s, err: %s\n", workspaceName, err.Error()) - } - - state.SetNext(nextState) - }, +type openParam struct { + ParamBase `use:"open-workspace [workspace-name]" desc:"Open workspace"` + workspaceName string +} + +// ParseArgs parse args +func (p *openParam) ParseArgs(args []string) error { + if len(args) == 0 { + return errors.New("no backup file provided") + } + if len(args) > 1 { + return errors.New("only one backup file is allowed") + } + p.workspaceName = args[0] + return nil +} + +// OpenCommand implements open workspace command +func (s *disconnectState) OpenCommand(ctx context.Context, p *openParam) error { + workspaceName := p.workspaceName + workPath := path.Join(s.config.WorkspacePath, workspaceName) + info, err := os.Stat(workPath) + if os.IsNotExist(err) { + fmt.Printf("workspace %s not exist\n", workspaceName) + return err + } + if !info.IsDir() { + return fmt.Errorf("workspace %s is not a directory", workspaceName) + } + + server, err := startEmbedEtcdServer(workPath, true) + if err != nil { + return fmt.Errorf("failed to start embed etcd server in workspace %s, err: %s", workspaceName, err.Error()) + } + + nextState := getEmbedEtcdInstanceV2(server, s.config) + err = nextState.setupWorkDir(workPath) + if err != nil { + return fmt.Errorf("failed to setup workspace for %s, err: %s", workspaceName, err.Error()) } - return cmd + s.SetNext(nextState) + return nil } diff --git a/states/parse_file.go b/states/parse_file.go index 1f5c2c2..5781eaf 100644 --- a/states/parse_file.go +++ b/states/parse_file.go @@ -2,6 +2,7 @@ package states import ( "bytes" + "context" "encoding/binary" "encoding/json" "fmt" @@ -15,105 +16,106 @@ import ( "github.com/cockroachdb/errors" "github.com/milvus-io/birdwatcher/storage" - "github.com/spf13/cobra" ) -func GetParseIndexParamCmd() *cobra.Command { - cmd := &cobra.Command{ - Use: "parse-indexparam [file]", - Short: "parse index params", - Run: func(cmd *cobra.Command, args []string) { - if len(args) != 1 { - fmt.Println("should provide only one file path") - return - } - f, err := openBackupFile(args[0]) - if err != nil { - fmt.Println(err.Error()) - return - } - defer f.Close() +type ParseIndexParam struct { + ParamBase `use:"parse-indexparam [file]" desc:"parse index params"` + filePath string +} - r, evt, err := storage.NewIndexReader(f) - if err != nil { - fmt.Println(err.Error()) - return - } - extra := make(map[string]any) - json.Unmarshal(evt.ExtraBytes, &extra) - key := extra["key"].(string) - if key != "indexParams" && key != "SLICE_META" { - fmt.Println("index data file found", extra) - return - } - data, err := r.NextEventReader(f, evt.PayloadDataType) - if err != nil { - fmt.Println(err.Error()) - return - } +func (p *ParseIndexParam) ParseArgs(args []string) error { + if len(args) != 1 { + return errors.New("should provide only one file path") + } + p.filePath = args[0] + return nil +} - if len(data) != 1 { - fmt.Println("event data length is not 1") - return - } +// ParseIndexParamCommand parses index params from file. +func (s *disconnectState) ParseIndexParamCommand(ctx context.Context, p *ParseIndexParam) error { + f, err := openBackupFile(p.filePath) + if err != nil { + return err + } + defer f.Close() - switch key { - case "indexParams": - params := make(map[string]string) - json.Unmarshal(data[0], ¶ms) - fmt.Println(params) - case "SLICE_META": - fmt.Println(string(data[0])) - } + r, evt, err := storage.NewIndexReader(f) + if err != nil { + return err + } + extra := make(map[string]any) + json.Unmarshal(evt.ExtraBytes, &extra) + key := extra["key"].(string) + if key != "indexParams" && key != "SLICE_META" { + fmt.Println("index data file found", extra) + return nil + } + data, err := r.NextEventReader(f, evt.PayloadDataType) + if err != nil { + return err + } + + if len(data) != 1 { + fmt.Println("event data length is not 1") + return nil + } - }, + switch key { + case "indexParams": + params := make(map[string]string) + json.Unmarshal(data[0], ¶ms) + fmt.Println(params) + case "SLICE_META": + fmt.Println(string(data[0])) } - return cmd + return nil } -func GetValidateIndexFilesCmd() *cobra.Command { - cmd := &cobra.Command{ - Use: "validate-indexfiles [directory]", - Run: func(cmd *cobra.Command, args []string) { - if len(args) != 1 { - fmt.Println("should provide only one file path") - return - } +type ValidateIndexParam struct { + ParamBase `use:"validate-indexfiles [directory]" desc:"validate index file size"` + directory string +} - folder := args[0] - if err := testFolder(folder); err != nil { - fmt.Println(err.Error()) - return - } +func (p *ValidateIndexParam) ParseArgs(args []string) error { + if len(args) != 1 { + return errors.New("should provide only one folder path") + } + p.directory = args[0] + return nil +} + +func (s *disconnectState) ValidateIndexFilesCommand(ctx context.Context, p *ValidateIndexParam) error { + folder := p.directory + if err := testFolder(folder); err != nil { + return err + } - filepath.WalkDir(folder, func(fp string, d fs.DirEntry, err error) error { - if d.IsDir() { - idxParam := path.Join(fp, "indexParams") - if info, err := os.Stat(idxParam); err == nil { - if !info.IsDir() { - bs, err := readIndexFile(idxParam, func(key string) bool { - return key == "indexParams" - }) - if err != nil { - return nil - } - params := make(map[string]string) - json.Unmarshal(bs, ¶ms) - indexType := params["index_type"] - fmt.Printf("Path:[%s] IndexParam file found, index type is %s\n", fp, indexType) - validateIndexFolder(fp, params) - } - } else if errors.Is(err, os.ErrNotExist) { - // file not exist - } else { - fmt.Println(err.Error()) + filepath.WalkDir(folder, func(fp string, d fs.DirEntry, err error) error { + if d.IsDir() { + idxParam := path.Join(fp, "indexParams") + if info, err := os.Stat(idxParam); err == nil { + if !info.IsDir() { + bs, err := readIndexFile(idxParam, func(key string) bool { + return key == "indexParams" + }) + if err != nil { + return nil } + params := make(map[string]string) + json.Unmarshal(bs, ¶ms) + indexType := params["index_type"] + fmt.Printf("Path:[%s] IndexParam file found, index type is %s\n", fp, indexType) + validateIndexFolder(fp, params) } - return nil - }) - }, - } - return cmd + } else if errors.Is(err, os.ErrNotExist) { + // file not exist + } else { + fmt.Println(err.Error()) + } + } + return nil + }) + return nil } func validateIndexFolder(fp string, params map[string]string) { @@ -126,7 +128,6 @@ func validateIndexFolder(fp string, params map[string]string) { case "": fallthrough case "STL_SORT": - //fmt.Println(file, d.Name()) switch d.Name() { case "index_length": bs, err := readIndexFile(file, func(key string) bool { return key == "index_length" }) @@ -171,89 +172,87 @@ func validateIndexFolder(fp string, params map[string]string) { case "STL_SORT": fmt.Printf("indexSize: %d, dataSize:%d, multipler: %f\n", indexSize, dataSize, float64(dataSize)/float64(indexSize)) } +} +type AssembleIndexFilesParam struct { + ParamBase `use:"assemble-indexfiles [directory]" desc:""` + directory string } -func GetAssembleIndexFilesCmd() *cobra.Command { - cmd := &cobra.Command{ - Use: "assemble-indexfiles [directory]", - Run: func(cmd *cobra.Command, args []string) { - if len(args) != 1 { - fmt.Println("should provide only one file path") - return - } +func (p *AssembleIndexFilesParam) ParseArgs(args []string) error { + if len(args) != 1 { + return errors.New("should provide only one folder path") + } + p.directory = args[0] + return nil +} - folder := args[0] - if err := testFolder(folder); err != nil { - fmt.Println(err.Error()) - return - } +func (s *disconnectState) AssembleIndexFilesCommand(ctx context.Context, p *AssembleIndexFilesParam) error { + folder := p.directory + if err := testFolder(folder); err != nil { + return err + } - sliceMetaFile := path.Join(folder, "SLICE_META") - prefix, num, err := tryParseSliceMeta(sliceMetaFile) - if err != nil { - fmt.Println("failed to parse SLICE_META", err.Error()) - return - } + sliceMetaFile := path.Join(folder, "SLICE_META") + prefix, num, err := tryParseSliceMeta(sliceMetaFile) + if err != nil { + fmt.Println("failed to parse SLICE_META", err.Error()) + return err + } - fmt.Printf("original file name: %s, slice num: %d\n", prefix, num) + fmt.Printf("original file name: %s, slice num: %d\n", prefix, num) - m := make(map[int64]struct{}) + m := make(map[int64]struct{}) - filepath.Walk(folder, func(file string, info os.FileInfo, err error) error { - file = path.Base(file) - if !strings.HasPrefix(file, prefix+"_") { - fmt.Println("skip file", file) - return nil - } + filepath.Walk(folder, func(file string, info os.FileInfo, _ error) error { + file = path.Base(file) + if !strings.HasPrefix(file, prefix+"_") { + fmt.Println("skip file", file) + return nil + } - suffix := file[len(prefix)+1:] - idx, err := strconv.ParseInt(suffix, 10, 64) - if err != nil { - fmt.Println(err.Error()) - return nil - } + suffix := file[len(prefix)+1:] + idx, err := strconv.ParseInt(suffix, 10, 64) + if err != nil { + fmt.Println(err.Error()) + return nil + } - m[idx] = struct{}{} - return nil - }) - if len(m) != num { - fmt.Println("slice files not complete", m) - return - } + m[idx] = struct{}{} + return nil + }) + if len(m) != num { + fmt.Println("slice files not complete", m) + return nil + } - outputPath := fmt.Sprintf("%s_%s", prefix, time.Now().Format("060102150406")) - output, err := os.OpenFile(outputPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600) - if err != nil { - fmt.Println(err.Error()) - return - } - defer output.Close() - totalLen := int64(0) - - for i := 0; i < num; i++ { - key := fmt.Sprintf("%s_%d", prefix, i) - fmt.Print("processing file:", key) - data, err := readIndexFile(path.Join(folder, key), func(metaKey string) bool { - return metaKey == key - }) - if err != nil { - fmt.Println(err.Error()) - return - } - fmt.Println(" read data size:", len(data), hrSize(int64(len(data)))) + outputPath := fmt.Sprintf("%s_%s", prefix, time.Now().Format("060102150406")) + output, err := os.OpenFile(outputPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600) + if err != nil { + return err + } + defer output.Close() + totalLen := int64(0) + + for i := 0; i < num; i++ { + key := fmt.Sprintf("%s_%d", prefix, i) + fmt.Print("processing file:", key) + data, err := readIndexFile(path.Join(folder, key), func(metaKey string) bool { + return metaKey == key + }) + if err != nil { + return err + } + fmt.Println("read data size:", len(data), hrSize(int64(len(data)))) - _, err = output.Write(data) - if err != nil { - fmt.Println(err.Error()) - return - } - totalLen += int64(len(data)) - } - fmt.Printf("index file write to %s success, total len %d\n", outputPath, totalLen) - }, + _, err = output.Write(data) + if err != nil { + return err + } + totalLen += int64(len(data)) } - return cmd + fmt.Printf("index file write to %s success, total len %d\n", outputPath, totalLen) + return nil } func hrSize(size int64) string { diff --git a/states/pulsarctl_connect.go b/states/pulsarctl_connect.go index bcf20a9..ff9a68d 100644 --- a/states/pulsarctl_connect.go +++ b/states/pulsarctl_connect.go @@ -1,6 +1,7 @@ package states import ( + "context" "fmt" "github.com/spf13/cobra" @@ -9,6 +10,32 @@ import ( "github.com/streamnative/pulsarctl/pkg/pulsar/utils" ) +type pulsarctlParam struct { + ParamBase `use:"pulsarctl" desc:"connect to pulsar admin with pulsarctl"` + Address string `name:"addr" default:"http://localhost:18080" desc:"pulsar admin address"` + AuthPlugin string `name:"authPlugin" default:"" desc:"pulsar admin auth plugin"` + AuthParam string `name:"authParam" default:"" desc:"pulsar admin auth parameters"` +} + +func (s *disconnectState) PulsarctlCommand(ctx context.Context, p *pulsarctlParam) error { + + config := common.Config{ + WebServiceURL: p.Address, + AuthPlugin: p.AuthPlugin, + AuthParams: p.AuthParam, + PulsarAPIVersion: common.V2, + } + admin, err := pulsarctl.New(&config) + if err != nil { + fmt.Println("failed to build pulsar admin client, error:", err.Error()) + return err + } + + adminState := getPulsarAdminState(admin, p.Address) + s.SetNext(adminState) + return nil +} + func getPulsarctlCmd(state State) *cobra.Command { cmd := &cobra.Command{ Use: "pulsarctl", diff --git a/states/start.go b/states/start.go index 7c5fedf..bf9468d 100644 --- a/states/start.go +++ b/states/start.go @@ -4,48 +4,17 @@ import ( "github.com/milvus-io/birdwatcher/configs" "github.com/milvus-io/birdwatcher/models" etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" - "github.com/spf13/cobra" ) // Start returns the first state - offline. func Start(config *configs.Config) State { - root := &cobra.Command{ - Use: "", - Short: "", - } - - state := &cmdState{ - label: "Offline", - rootCmd: root, - } app := &ApplicationState{ - State: state, + State: getDisconnectedState(config), + config: config, } - app.config = config - etcdversion.SetVersion(models.GTEVersion2_2) - root.AddCommand( - // connect - getConnectCommand(state, app.config), - // load-backup - getLoadBackupCmd(state, app.config), - // open-workspace - getOpenWorkspaceCmd(state, app.config), - // pulsarctl - getPulsarctlCmd(state), - // parse-indexparams - GetParseIndexParamCmd(), - // assemble-indexfiles - GetAssembleIndexFilesCmd(), - // validate-indexfiles - GetValidateIndexFilesCmd(), - // exit - getExitCmd(state)) - - root.AddCommand(getGlobalUtilCommands()...) - return app } diff --git a/states/states.go b/states/states.go index 7652c51..ad71334 100644 --- a/states/states.go +++ b/states/states.go @@ -1,16 +1,24 @@ package states import ( + "context" "errors" "fmt" + "os" + "os/signal" + "reflect" + "strconv" "strings" + "syscall" "github.com/milvus-io/birdwatcher/states/autocomplete" "github.com/spf13/cobra" + "github.com/spf13/pflag" ) // State is the interface for application state. type State interface { + Ctx() (context.Context, context.CancelFunc) Label() string Process(cmd string) (State, error) Close() @@ -25,16 +33,49 @@ type cmdState struct { label string rootCmd *cobra.Command nextState State + signal <-chan os.Signal setupFn func() } +// Ctx returns context which bind to sigint handler. +func (s *cmdState) Ctx() (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + defer cancel() + select { + case <-s.signal: + case <-ctx.Done(): + } + }() + return ctx, cancel +} + +// SetupCommands perform command setup & reset. func (s *cmdState) SetupCommands() { if s.setupFn != nil { s.setupFn() } } +// mergeFunctionCommands parses all member methods for provided state and add it into cmd. +func (s *cmdState) mergeFunctionCommands(cmd *cobra.Command, state State) { + items := parseFunctionCommands(state) + for _, item := range items { + target := cmd + for _, kw := range item.kws { + node, _, err := cmd.Find([]string{kw}) + if err != nil { + newNode := &cobra.Command{Use: kw} + target.AddCommand(newNode) + node = newNode + } + target = node + } + target.AddCommand(item.cmd) + } +} + // Label returns the display label for current cli. func (s *cmdState) Label() string { return s.label @@ -44,13 +85,6 @@ func (s *cmdState) Suggestions(input string) map[string]string { return autocomplete.SuggestInputCommands(input, s.rootCmd.Commands()) } -func printCommands(cmds []*cobra.Command) { - for _, cmd := range cmds { - fmt.Printf("\"%s\"", cmd.Use) - } - fmt.Println() -} - // Process is the main entry for processing command. func (s *cmdState) Process(cmd string) (State, error) { args := strings.Split(cmd, " ") @@ -59,8 +93,15 @@ func (s *cmdState) Process(cmd string) (State, error) { if err == nil && target != nil { defer target.SetArgs(nil) } + + signal.Reset(syscall.SIGINT) + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGINT) + s.signal = c + s.rootCmd.SetArgs(args) err = s.rootCmd.Execute() + signal.Reset(syscall.SIGINT) if errors.Is(err, ExitErr) { return s.nextState, ExitErr } @@ -88,3 +129,265 @@ func (s *cmdState) Close() {} // Check state is ending state. func (s *cmdState) IsEnding() bool { return false } + +type exitParam struct { + ParamBase `use:"exit" desc:"Close this CLI tool"` +} + +// ExitCommand returns exit command +func (s *cmdState) ExitCommand(ctx context.Context, _ *exitParam) { + s.SetNext(&exitState{}) +} + +type commandItem struct { + kws []string + cmd *cobra.Command +} + +func parseFunctionCommands(state State) []commandItem { + v := reflect.ValueOf(state) + tp := v.Type() + + var commands []commandItem + for i := 0; i < v.NumMethod(); i++ { + mt := tp.Method(i) + + // parse method like with pattern %Command + if !strings.HasSuffix(mt.Name, "Command") { + continue + } + + t := mt.Type + var use string + var short string + var paramType reflect.Type + + if t.NumIn() == 0 { + // shall not be reached + continue + } + if t.NumIn() > 1 { + // should be context.Context + in := t.In(1) + if !in.Implements(reflect.TypeOf((*context.Context)(nil)).Elem()) { + continue + } + } + if t.NumIn() > 2 { + // should be CmdParam + in := t.In(2) + if !in.Implements(reflect.TypeOf((*CmdParam)(nil)).Elem()) { + continue + } + cp, ok := reflect.New(in.Elem()).Interface().(CmdParam) + if !ok { + fmt.Println("conversion failed", in.Name()) + } else { + paramType = in + use, short = cp.Desc() + } + } + + cp := reflect.New(paramType.Elem()).Interface().(CmdParam) + fUse, fDesc := getCmdFromFlag(cp) + if len(use) == 0 { + use = fUse + } + if len(short) == 0 { + short = fDesc + } + if len(use) == 0 { + fnName := mt.Name + use = strings.ToLower(fnName[:len(fnName)-8]) + } + uses := parseUseSegments(use) + lastKw := uses[len(uses)-1] + + cmd := &cobra.Command{ + Use: lastKw, + } + setupFlags(cp, cmd.Flags()) + cmd.Short = short + cmd.Run = func(cmd *cobra.Command, args []string) { + cp := reflect.New(paramType.Elem()).Interface().(CmdParam) + + cp.ParseArgs(args) + if err := parseFlags(cp, cmd.Flags()); err != nil { + fmt.Println(err.Error()) + return + } + ctx, cancel := state.Ctx() //context.WithCancel(context.Background()) + defer cancel() + + m := v.MethodByName(mt.Name) + results := m.Call([]reflect.Value{ + reflect.ValueOf(ctx), + reflect.ValueOf(cp), + }) + if len(results) > 0 { + if results[0].Type().Implements(reflect.TypeOf((*error)(nil)).Elem()) { + if !results[0].IsNil() { + err := results[0].Interface().(error) + fmt.Println(err.Error()) + } + } + } + } + commands = append(commands, commandItem{ + kws: uses[:len(uses)-1], + cmd: cmd, + }) + } + + return commands +} + +func getCmdFromFlag(p CmdParam) (string, string) { + v := reflect.ValueOf(p) + if v.Kind() != reflect.Pointer { + fmt.Println("param is not pointer") + return "", "" + } + + for v.Kind() != reflect.Struct { + v = v.Elem() + } + tp := v.Type() + + f, has := tp.FieldByName("ParamBase") + if !has { + return "", "" + } + + if f.Type.Kind() != reflect.Struct { + return "", "" + } + + tag := f.Tag + return tag.Get("use"), tag.Get("desc") +} + +func parseUseSegments(use string) []string { + parts := strings.Split(use, " ") + last := "" + result := make([]string, 0, len(parts)) + for _, part := range parts { + if strings.HasPrefix(part, "[") && strings.HasSuffix(part, "]") { + last = fmt.Sprintf("%s %s", last, part) + continue + } + if len(last) > 0 { + result = append(result, last) + } + last = part + } + if len(last) > 0 { + result = append(result, last) + } + return result +} + +func setupFlags(p CmdParam, flags *pflag.FlagSet) { + v := reflect.ValueOf(p) + if v.Kind() != reflect.Pointer { + fmt.Println("param is not pointer") + return + } + + for v.Kind() != reflect.Struct { + v = v.Elem() + } + tp := v.Type() + + for i := 0; i < v.NumField(); i++ { + f := tp.Field(i) + if !f.IsExported() { + continue + } + name := f.Tag.Get("name") + defaultStr := f.Tag.Get("default") + desc := f.Tag.Get("desc") + switch f.Type.Kind() { + case reflect.Int64: + var dv int64 + if v, err := strconv.ParseInt(defaultStr, 10, 64); err == nil { + dv = v + } + flags.Int64(name, dv, desc) + case reflect.String: + flags.String(name, defaultStr, desc) + case reflect.Bool: + var dv bool + if v, err := strconv.ParseBool(defaultStr); err == nil { + dv = v + } + flags.Bool(name, dv, desc) + case reflect.Struct: + continue + default: + fmt.Printf("field %s with kind %s not supported yet\n", f.Name, f.Type.Kind()) + } + } +} + +func parseFlags(p CmdParam, flags *pflag.FlagSet) error { + + v := reflect.ValueOf(p) + if v.Kind() != reflect.Pointer { + return errors.New("param is not pointer") + } + + v = v.Elem() + tp := v.Type() + + for i := 0; i < v.NumField(); i++ { + f := tp.Field(i) + if !f.IsExported() { + continue + } + name := f.Tag.Get("name") + switch f.Type.Kind() { + case reflect.Int64: + p, err := flags.GetInt64(name) + if err != nil { + return err + } + v.FieldByName(f.Name).SetInt(p) + case reflect.String: + p, err := flags.GetString(name) + if err != nil { + return err + } + v.FieldByName(f.Name).SetString(p) + case reflect.Bool: + p, err := flags.GetBool(name) + if err != nil { + return err + } + v.FieldByName(f.Name).SetBool(p) + case reflect.Struct: + continue + default: + fmt.Printf("field %s with kind %s not supported yet\n", f.Name, f.Type.Kind()) + } + } + + return nil +} + +// CmdParam is the interface definition for command parameter. +type CmdParam interface { + ParseArgs(args []string) error + Desc() (string, string) +} + +// ParamBase implmenet CmdParam when empty args parser. +type ParamBase struct{} + +func (pb ParamBase) ParseArgs(args []string) error { + return nil +} + +func (pb ParamBase) Desc() (string, string) { + return "", "" +} diff --git a/states/util.go b/states/util.go index ab8e50b..00c70d1 100644 --- a/states/util.go +++ b/states/util.go @@ -29,7 +29,6 @@ import ( "github.com/milvus-io/birdwatcher/common" "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" - "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -38,13 +37,41 @@ const ( logicalBitsMask = (1 << logicalBits) - 1 ) -func getGlobalUtilCommands() []*cobra.Command { - return []*cobra.Command{ - getParseTSCmd(), - getPrintVersionCommand(), +type parseTSParam struct { + ParamBase `use:"parse-ts" desc:"parse hybrid timestamp"` + args []string +} + +func (p *parseTSParam) ParseArgs(args []string) error { + p.args = args + return nil +} + +func (s *cmdState) ParseTSCommand(ctx context.Context, p *parseTSParam) { + if len(p.args) == 0 { + fmt.Println("no ts provided") + } + + for _, arg := range p.args { + ts, err := strconv.ParseUint(arg, 10, 64) + if err != nil { + fmt.Printf("failed to parse ts from %s, err: %s\n", arg, err.Error()) + continue + } + + t, _ := ParseTS(ts) + fmt.Printf("Parse ts result, ts:%d, time: %v\n", ts, t) } } +type printVerParam struct { + ParamBase `use:"version" desc:"print version"` +} + +func (s *cmdState) PrintVersionCommand(ctx context.Context, _ *printVerParam) { + fmt.Println("Birdwatcher Version", common.Version) +} + func ParseTS(ts uint64) (time.Time, uint64) { logical := ts & logicalBitsMask physical := ts >> logicalBits @@ -74,43 +101,6 @@ func listSessionsByPrefix(cli clientv3.KV, prefix string) ([]*models.Session, er return sessions, nil } -// getParseTSCmd returns command for parse timestamp -func getParseTSCmd() *cobra.Command { - cmd := &cobra.Command{ - Use: "parse-ts", - Short: "parse hybrid timestamp", - Run: func(cmd *cobra.Command, args []string) { - if len(args) == 0 { - fmt.Println("no ts provided") - return - } - - for _, arg := range args { - ts, err := strconv.ParseUint(arg, 10, 64) - if err != nil { - fmt.Printf("failed to parse ts from %s, err: %s\n", arg, err.Error()) - continue - } - - t, _ := ParseTS(ts) - fmt.Printf("Parse ts result, ts:%d, time: %v\n", ts, t) - } - }, - } - return cmd -} - -func getPrintVersionCommand() *cobra.Command { - cmd := &cobra.Command{ - Use: "version", - Short: "print version", - Run: func(_ *cobra.Command, args []string) { - fmt.Println("Birdwatcher Version", common.Version) - }, - } - return cmd -} - // reviseVChannelInfo will revise the datapb.VchannelInfo for upgrade compatibility from 2.0.2 func reviseVChannelInfo(vChannel *datapb.VchannelInfo) { removeDuplicateSegmentIDFn := func(ids []int64) []int64 { diff --git a/states/visit_datacoord.go b/states/visit_datacoord.go index 9b70d11..758c8a4 100644 --- a/states/visit_datacoord.go +++ b/states/visit_datacoord.go @@ -30,10 +30,12 @@ func (s *dataCoordState) SetupCommands() { getConfigurationCmd(s.clientv2, s.session.ServerID), //back getBackCmd(s, s.prevState), + // exit getExitCmd(s), ) - cmd.AddCommand(getGlobalUtilCommands()...) + + s.mergeFunctionCommands(cmd, s) s.cmdState.rootCmd = cmd s.setupFn = s.SetupCommands diff --git a/states/visit_datanode.go b/states/visit_datanode.go index 81396e5..9e52f99 100644 --- a/states/visit_datanode.go +++ b/states/visit_datanode.go @@ -33,7 +33,8 @@ func (s *dataNodeState) SetupCommands() { // exit getExitCmd(s), ) - cmd.AddCommand(getGlobalUtilCommands()...) + + s.mergeFunctionCommands(cmd, s) s.cmdState.rootCmd = cmd s.setupFn = s.SetupCommands diff --git a/states/visit_indexcoord.go b/states/visit_indexcoord.go index 0dafc89..6e55083 100644 --- a/states/visit_indexcoord.go +++ b/states/visit_indexcoord.go @@ -33,7 +33,8 @@ func (s *indexCoordState) SetupCommands() { // exit getExitCmd(s), ) - cmd.AddCommand(getGlobalUtilCommands()...) + + s.mergeFunctionCommands(cmd, s) s.cmdState.rootCmd = cmd s.setupFn = s.SetupCommands diff --git a/states/visit_indexnode.go b/states/visit_indexnode.go index 4745f1d..4b5b40a 100644 --- a/states/visit_indexnode.go +++ b/states/visit_indexnode.go @@ -33,7 +33,7 @@ func (s *indexNodeState) SetupCommands() { // exit getExitCmd(s), ) - cmd.AddCommand(getGlobalUtilCommands()...) + s.mergeFunctionCommands(cmd, s) s.cmdState.rootCmd = cmd s.setupFn = s.SetupCommands diff --git a/states/visit_querycoord.go b/states/visit_querycoord.go index 57c0fc3..a4454c1 100644 --- a/states/visit_querycoord.go +++ b/states/visit_querycoord.go @@ -33,12 +33,44 @@ func (s *queryCoordState) SetupCommands() { // exit getExitCmd(s), ) - cmd.AddCommand(getGlobalUtilCommands()...) + s.mergeFunctionCommands(cmd, s) s.cmdState.rootCmd = cmd s.setupFn = s.SetupCommands } +/* +func (s *queryCoordState) ShowCollectionCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "show-collection", + Run: func(cmd *cobra.Command, args []string) { + collection, err := cmd.Flags().GetInt64("collection") + if err != nil { + cmd.Usage() + return + } + + req := &querypbv2.ShowCollectionsRequest{ + Base: &commonpbv2.MsgBase{ + TargetID: s.session.ServerID, + }, + CollectionIDs: []int64{collection}, + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + resp, err := s.clientv2.ShowCollections(ctx, req) + if err != nil { + fmt.Println(err.Error()) + } + + fmt.Printf("%s, %s", resp.GetStatus().GetErrorCode().String(), resp.GetStatus().GetReason()) + }, + } + + cmd.Flags().Int64("collection", 0, "collection to show") + return cmd +}*/ + func getQueryCoordState(client querypb.QueryCoordClient, conn *grpc.ClientConn, prev State, session *models.Session) State { state := &queryCoordState{ diff --git a/states/visit_querynode.go b/states/visit_querynode.go index 52f2c8b..343b658 100644 --- a/states/visit_querynode.go +++ b/states/visit_querynode.go @@ -44,7 +44,8 @@ func (s *queryNodeState) SetupCommands() { // exit getExitCmd(s), ) - cmd.AddCommand(getGlobalUtilCommands()...) + + s.mergeFunctionCommands(cmd, s) s.cmdState.rootCmd = cmd s.setupFn = s.SetupCommands diff --git a/states/visit_rootcoord.go b/states/visit_rootcoord.go index 0cd6097..5f18217 100644 --- a/states/visit_rootcoord.go +++ b/states/visit_rootcoord.go @@ -33,7 +33,7 @@ func (s *rootCoordState) SetupCommands() { // exit getExitCmd(s), ) - cmd.AddCommand(getGlobalUtilCommands()...) + s.mergeFunctionCommands(cmd, s) s.cmdState.rootCmd = cmd s.setupFn = s.SetupCommands