diff --git a/framework/param.go b/framework/param.go new file mode 100644 index 0000000..6a11243 --- /dev/null +++ b/framework/param.go @@ -0,0 +1,18 @@ +package framework + +// CmdParam is the interface definition for command parameter. +type CmdParam interface { + ParseArgs(args []string) error + Desc() (string, string) +} + +// ParamBase implmenet CmdParam when empty args parser. +type ParamBase struct{} + +func (pb ParamBase) ParseArgs(args []string) error { + return nil +} + +func (pb ParamBase) Desc() (string, string) { + return "", "" +} diff --git a/states/backup_mock_connect.go b/states/backup_mock_connect.go index c6f34ff..9a18b33 100644 --- a/states/backup_mock_connect.go +++ b/states/backup_mock_connect.go @@ -15,6 +15,8 @@ import ( "github.com/milvus-io/birdwatcher/configs" "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/states/etcd" + "github.com/milvus-io/birdwatcher/states/etcd/remove" + "github.com/milvus-io/birdwatcher/states/etcd/show" "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/embed" @@ -27,6 +29,8 @@ const ( type embedEtcdMockState struct { cmdState + *show.ComponentShow + *remove.ComponentRemove client *clientv3.Client server *embed.Etcd instanceName string @@ -89,6 +93,9 @@ func (s *embedEtcdMockState) SetupCommands() { func (s *embedEtcdMockState) SetInstance(instanceName string) { s.cmdState.label = fmt.Sprintf("Backup(%s)", instanceName) s.instanceName = instanceName + rootPath := path.Join(instanceName, metaPath) + s.ComponentShow = show.NewComponent(s.client, s.config, rootPath) + s.ComponentRemove = remove.NewComponent(s.client, s.config, rootPath) s.SetupCommands() } @@ -168,16 +175,20 @@ func (s *embedEtcdMockState) readWorkspaceMeta(path string) { func getEmbedEtcdInstance(server *embed.Etcd, cli *clientv3.Client, instanceName string, config *configs.Config) State { + basePath := path.Join(instanceName, metaPath) + state := &embedEtcdMockState{ cmdState: cmdState{ label: fmt.Sprintf("Backup(%s)", instanceName), }, - instanceName: instanceName, - server: server, - client: cli, - metrics: make(map[string][]byte), - defaultMetrics: make(map[string][]byte), - config: config, + ComponentShow: show.NewComponent(cli, config, basePath), + ComponentRemove: remove.NewComponent(cli, config, basePath), + instanceName: instanceName, + server: server, + client: cli, + metrics: make(map[string][]byte), + defaultMetrics: make(map[string][]byte), + config: config, } state.SetupCommands() @@ -186,7 +197,6 @@ func getEmbedEtcdInstance(server *embed.Etcd, cli *clientv3.Client, instanceName } func getEmbedEtcdInstanceV2(server *embed.Etcd, config *configs.Config) *embedEtcdMockState { - client := v3client.New(server.Server) state := &embedEtcdMockState{ cmdState: cmdState{}, diff --git a/states/configuration.go b/states/configuration.go index 052c6e2..3350975 100644 --- a/states/configuration.go +++ b/states/configuration.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/milvus-io/birdwatcher/framework" datapbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/datapb" indexpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/indexpb" querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb" @@ -18,8 +19,8 @@ import ( ) type GetConfigurationParam struct { - ParamBase `use:"show configurations" desc:"iterate all online components and inspect configuration"` - Format string `name:"format" default:"line" desc:"output format"` + framework.ParamBase `use:"show configurations" desc:"iterate all online components and inspect configuration"` + Format string `name:"format" default:"line" desc:"output format"` } func (s *instanceState) GetConfigurationCommand(ctx context.Context, p *GetConfigurationParam) error { diff --git a/states/current_version.go b/states/current_version.go index f197532..53e59aa 100644 --- a/states/current_version.go +++ b/states/current_version.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/cockroachdb/errors" + "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/models" etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" "github.com/spf13/cobra" @@ -22,8 +23,8 @@ func CurrentVersionCommand() *cobra.Command { } type setCurrentVersionParam struct { - ParamBase `use:"set current-version" desc:"set current version for etcd meta parsing"` - newVersion string + framework.ParamBase `use:"set current-version" desc:"set current version for etcd meta parsing"` + newVersion string } func (p *setCurrentVersionParam) ParseArgs(args []string) error { diff --git a/states/etcd/commands.go b/states/etcd/commands.go index 1c71f58..cbdab04 100644 --- a/states/etcd/commands.go +++ b/states/etcd/commands.go @@ -22,38 +22,10 @@ func ShowCommand(cli clientv3.KV, basePath string) *cobra.Command { } showCmd.AddCommand( - // show database - show.DatabaseCommand(cli, basePath), - // show collection - show.CollectionCommand(cli, basePath), - // show collection-history - show.CollectionHistoryCommand(cli, basePath), - // show sessions - show.SessionCommand(cli, basePath), - // show segments - show.SegmentCommand(cli, basePath), + // v2.1 legacy commands // show segment-loaded show.SegmentLoadedCommand(cli, basePath), - // show index - show.IndexCommand(cli, basePath), - // show segment-index - show.SegmentIndexCommand(cli, basePath), - // show partition - show.PartitionCommand(cli, basePath), - - // show replica - show.ReplicaCommand(cli, basePath), - // show checkpoint - show.CheckpointCommand(cli, basePath), - // show channel-watched - show.ChannelWatchedCommand(cli, basePath), - - // show collection-loaded - show.CollectionLoadedCommand(cli, basePath), - show.ConfigEtcdCommand(cli, basePath), - - // v2.1 legacy commands // show querycoord-tasks show.QueryCoordTasks(cli, basePath), // show querycoord-channels diff --git a/states/etcd/common/collection.go b/states/etcd/common/collection.go index 340ae86..6d0de95 100644 --- a/states/etcd/common/collection.go +++ b/states/etcd/common/collection.go @@ -128,7 +128,7 @@ func GetCollectionByIDVersion(ctx context.Context, cli clientv3.KV, basePath str // with database, dbID unknown here prefix = path.Join(basePath, DBCollectionMetaPrefix) - resp, err = cli.Get(ctx, prefix, clientv3.WithPrefix()) + resp, _ = cli.Get(ctx, prefix, clientv3.WithPrefix()) suffix := strconv.FormatInt(collID, 10) for _, kv := range resp.Kvs { if strings.HasSuffix(string(kv.Key), suffix) { @@ -137,7 +137,7 @@ func GetCollectionByIDVersion(ctx context.Context, cli clientv3.KV, basePath str } if len(result) != 1 { - return nil, fmt.Errorf("collection %d not found in etcd", collID) + return nil, fmt.Errorf("collection %d not found in etcd %w", collID, ErrCollectionNotFound) } kv := result[0] diff --git a/states/etcd/common/index.go b/states/etcd/common/index.go index 0ba6c55..3add0b0 100644 --- a/states/etcd/common/index.go +++ b/states/etcd/common/index.go @@ -11,11 +11,8 @@ import ( ) // ListIndex list all index with all filter satified. -func ListIndex(cli clientv3.KV, basePath string, filters ...func(index *indexpb.IndexMeta) bool) ([]indexpb.IndexMeta, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() +func ListIndex(ctx context.Context, cli clientv3.KV, basePath string, filters ...func(index *indexpb.IndexMeta) bool) ([]indexpb.IndexMeta, error) { prefix := path.Join(basePath, "indexes") + "/" - result, _, err := ListProtoObjects(ctx, cli, prefix, filters...) return result, err } diff --git a/states/etcd/common/replica.go b/states/etcd/common/replica.go index c50470b..8042ba1 100644 --- a/states/etcd/common/replica.go +++ b/states/etcd/common/replica.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "path" - "time" "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/proto/v2.0/milvuspb" @@ -13,15 +12,15 @@ import ( ) // ListReplica list current replica info -func ListReplica(cli clientv3.KV, basePath string, collectionID int64) ([]*models.Replica, error) { - v1Results, err := listReplicas(cli, basePath, func(replica *milvuspb.ReplicaInfo) bool { +func ListReplica(ctx context.Context, cli clientv3.KV, basePath string, collectionID int64) ([]*models.Replica, error) { + v1Results, err := listReplicas(ctx, cli, basePath, func(replica *milvuspb.ReplicaInfo) bool { return collectionID == 0 || replica.GetCollectionID() == collectionID }) if err != nil { fmt.Println(err.Error()) } - v2Results, err := listQCReplicas(cli, basePath, func(replica *querypb.Replica) bool { + v2Results, err := listQCReplicas(ctx, cli, basePath, func(replica *querypb.Replica) bool { return collectionID == 0 || replica.GetCollectionID() == collectionID }) if err != nil { @@ -45,6 +44,7 @@ func ListReplica(cli clientv3.KV, basePath string, collectionID int64) ([]*model NodeIDs: r.GetNodeIds(), ResourceGroup: "n/a", Version: "<=2.1.4", + ShardReplicas: srs, }) } @@ -60,9 +60,7 @@ func ListReplica(cli clientv3.KV, basePath string, collectionID int64) ([]*model return results, nil } -func listReplicas(cli clientv3.KV, basePath string, filters ...func(*milvuspb.ReplicaInfo) bool) ([]milvuspb.ReplicaInfo, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() +func listReplicas(ctx context.Context, cli clientv3.KV, basePath string, filters ...func(*milvuspb.ReplicaInfo) bool) ([]milvuspb.ReplicaInfo, error) { prefix := path.Join(basePath, "queryCoord-ReplicaMeta") replicas, _, err := ListProtoObjects(ctx, cli, prefix, filters...) @@ -74,10 +72,7 @@ func listReplicas(cli clientv3.KV, basePath string, filters ...func(*milvuspb.Re return replicas, nil } -func listQCReplicas(cli clientv3.KV, basePath string, filters ...func(*querypb.Replica) bool) ([]querypb.Replica, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() - +func listQCReplicas(ctx context.Context, cli clientv3.KV, basePath string, filters ...func(*querypb.Replica) bool) ([]querypb.Replica, error) { prefix := path.Join(basePath, "querycoord-replica") replicas, _, err := ListProtoObjects(ctx, cli, prefix, filters...) diff --git a/states/etcd/common/segment.go b/states/etcd/common/segment.go index afdc4c3..9faa005 100644 --- a/states/etcd/common/segment.go +++ b/states/etcd/common/segment.go @@ -275,3 +275,60 @@ func RemoveSegment(cli clientv3.KV, basePath string, info *datapb.SegmentInfo) e return err } + +func RemoveSegmentByID(ctx context.Context, cli clientv3.KV, basePath string, collectionID, partitionID, segmentID int64) error { + segmentPath := path.Join(basePath, "datacoord-meta/s", fmt.Sprintf("%d/%d/%d", collectionID, partitionID, segmentID)) + _, err := cli.Delete(ctx, segmentPath) + if err != nil { + return err + } + + // delete binlog entries + binlogPrefix := path.Join(basePath, "datacoord-meta/binlog", fmt.Sprintf("%d/%d/%d", collectionID, partitionID, segmentID)) + _, err = cli.Delete(ctx, binlogPrefix, clientv3.WithPrefix()) + if err != nil { + fmt.Printf("failed to delete binlogs from etcd for segment %d, err: %s\n", segmentID, err.Error()) + } + + // delete deltalog entries + deltalogPrefix := path.Join(basePath, "datacoord-meta/deltalog", fmt.Sprintf("%d/%d/%d", collectionID, partitionID, segmentID)) + _, err = cli.Delete(ctx, deltalogPrefix, clientv3.WithPrefix()) + if err != nil { + fmt.Printf("failed to delete deltalogs from etcd for segment %d, err: %s\n", segmentID, err.Error()) + } + + // delete statslog entries + statslogPrefix := path.Join(basePath, "datacoord-meta/statslog", fmt.Sprintf("%d/%d/%d", collectionID, partitionID, segmentID)) + _, err = cli.Delete(ctx, statslogPrefix, clientv3.WithPrefix()) + if err != nil { + fmt.Printf("failed to delete statslogs from etcd for segment %d, err: %s\n", segmentID, err.Error()) + } + + return err +} + +func UpdateSegments(ctx context.Context, cli clientv3.KV, basePath string, collectionID int64, fn func(segment *datapbv2.SegmentInfo)) error { + + prefix := path.Join(basePath, fmt.Sprintf("%s/%d", segmentMetaPrefix, collectionID)) + "/" + segments, keys, err := ListProtoObjects[datapbv2.SegmentInfo](ctx, cli, prefix) + if err != nil { + return err + } + + for idx, info := range segments { + info := info + seg := &info + fn(seg) + bs, err := proto.Marshal(seg) + if err != nil { + return err + } + + _, err = cli.Put(ctx, keys[idx], string(bs)) + if err != nil { + return err + } + + } + return nil +} diff --git a/states/etcd/remove/component.go b/states/etcd/remove/component.go new file mode 100644 index 0000000..7c55e06 --- /dev/null +++ b/states/etcd/remove/component.go @@ -0,0 +1,20 @@ +package remove + +import ( + "github.com/milvus-io/birdwatcher/configs" + clientv3 "go.etcd.io/etcd/client/v3" +) + +type ComponentRemove struct { + client clientv3.KV + config *configs.Config + basePath string +} + +func NewComponent(cli clientv3.KV, config *configs.Config, basePath string) *ComponentRemove { + return &ComponentRemove{ + client: cli, + config: config, + basePath: basePath, + } +} diff --git a/states/etcd/remove/segment_orphan.go b/states/etcd/remove/segment_orphan.go new file mode 100644 index 0000000..6ec208d --- /dev/null +++ b/states/etcd/remove/segment_orphan.go @@ -0,0 +1,53 @@ +package remove + +import ( + "context" + "errors" + "fmt" + + "github.com/milvus-io/birdwatcher/framework" + "github.com/milvus-io/birdwatcher/models" + "github.com/milvus-io/birdwatcher/states/etcd/common" + etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" + "github.com/samber/lo" +) + +type SegmentOrphan struct { + framework.ParamBase `use:"remove segment-orphan" desc:"remove orphan segments that collection meta already gone"` + CollectionID int64 `name:"collection" default:"0" desc:"collection id to filter with"` + Run bool `name:"run" default:"false" desc:"flag to control actually run or dry"` +} + +// SegmentOrphanCommand returns command to remove +func (c *ComponentRemove) SegmentOrphanCommand(ctx context.Context, p *SegmentOrphan) error { + segments, err := common.ListSegmentsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(segment *models.Segment) bool { + return (p.CollectionID == 0 || segment.CollectionID == p.CollectionID) + }) + if err != nil { + return err + } + + groups := lo.GroupBy(segments, func(segment *models.Segment) int64 { + return segment.CollectionID + }) + + for collectionID, segments := range groups { + _, err := common.GetCollectionByIDVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), collectionID) + if errors.Is(err, common.ErrCollectionNotFound) { + // print segments + fmt.Printf("Collection %d missing, orphan segments: %v\n", collectionID, lo.Map(segments, func(segment *models.Segment, idx int) int64 { + return segment.ID + })) + + if p.Run { + for _, segment := range segments { + err := common.RemoveSegmentByID(ctx, c.client, c.basePath, segment.CollectionID, segment.PartitionID, segment.ID) + if err != nil { + fmt.Printf("failed to remove segment %d, err: %s\n", segment.ID, err.Error()) + } + } + } + } + } + return nil +} diff --git a/states/etcd/repair/segment.go b/states/etcd/repair/segment.go index ac11028..2b15ec4 100644 --- a/states/etcd/repair/segment.go +++ b/states/etcd/repair/segment.go @@ -63,7 +63,7 @@ func SegmentCommand(cli clientv3.KV, basePath string) *cobra.Command { return } - indexBuildInfo, err := common.ListIndex(cli, basePath) + indexBuildInfo, err := common.ListIndex(context.Background(), cli, basePath) if err != nil { fmt.Println(err.Error()) return diff --git a/states/etcd/show/channel_watched.go b/states/etcd/show/channel_watched.go index 14aa13a..2ec02f6 100644 --- a/states/etcd/show/channel_watched.go +++ b/states/etcd/show/channel_watched.go @@ -5,46 +5,33 @@ import ( "fmt" "time" + "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/states/etcd/common" etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" "github.com/milvus-io/birdwatcher/utils" - "github.com/spf13/cobra" - clientv3 "go.etcd.io/etcd/client/v3" ) -// ChannelWatchedCommand return show channel-watched commands. -func ChannelWatchedCommand(cli clientv3.KV, basePath string) *cobra.Command { - cmd := &cobra.Command{ - Use: "channel-watch", - Short: "display channel watching info from data coord meta store", - Aliases: []string{"channel-watched"}, - Run: func(cmd *cobra.Command, args []string) { - - collID, err := cmd.Flags().GetInt64("collection") - if err != nil { - return - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - infos, err := common.ListChannelWatch(ctx, cli, basePath, etcdversion.GetVersion(), func(channel *models.ChannelWatch) bool { - return collID == 0 || channel.Vchan.CollectionID == collID - }) - if err != nil { - fmt.Println("failed to list channel watch info", err.Error()) - return - } +type ChannelWatchedParam struct { + framework.ParamBase `use:"show channel-watch" desc:"display channel watching info from data coord meta store" alias:"channel-watched"` + CollectionID int64 `name:"collection" default:"0" desc:"collection id to filter with"` +} - for _, info := range infos { - printChannelWatchInfo(info) - } +// ChannelWatchedCommand return show channel-watched commands. +func (c *ComponentShow) ChannelWatchedCommand(ctx context.Context, p *ChannelWatchedParam) { + infos, err := common.ListChannelWatch(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(channel *models.ChannelWatch) bool { + return p.CollectionID == 0 || channel.Vchan.CollectionID == p.CollectionID + }) + if err != nil { + fmt.Println("failed to list channel watch info", err.Error()) + return + } - fmt.Printf("--- Total Channels: %d\n", len(infos)) - }, + for _, info := range infos { + printChannelWatchInfo(info) } - cmd.Flags().Int64("collection", 0, "collection id to filter with") - return cmd + + fmt.Printf("--- Total Channels: %d\n", len(infos)) } func printChannelWatchInfo(info *models.ChannelWatch) { diff --git a/states/etcd/show/checkpoint.go b/states/etcd/show/checkpoint.go index dc39b4b..13836d1 100644 --- a/states/etcd/show/checkpoint.go +++ b/states/etcd/show/checkpoint.go @@ -5,67 +5,58 @@ import ( "fmt" "path" + "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/proto/v2.0/commonpb" "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" "github.com/milvus-io/birdwatcher/proto/v2.0/internalpb" "github.com/milvus-io/birdwatcher/states/etcd/common" etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" "github.com/milvus-io/birdwatcher/utils" - "github.com/spf13/cobra" - clientv3 "go.etcd.io/etcd/client/v3" ) +type CheckpointParam struct { + framework.ParamBase `use:"show checkpoint" desc:"list checkpoint collection vchannels" alias:"checkpoints,cp"` + CollectionID int64 `name:"collection" default:"0" desc:"collection id to filter with"` +} + // CheckpointCommand returns show checkpoint command. -func CheckpointCommand(cli clientv3.KV, basePath string) *cobra.Command { - cmd := &cobra.Command{ - Use: "checkpoint", - Short: "list checkpoint collection vchannels", - Aliases: []string{"checkpoints", "cp"}, - Run: func(cmd *cobra.Command, args []string) { +func (c *ComponentShow) CheckpointCommand(ctx context.Context, p *CheckpointParam) { + coll, err := common.GetCollectionByIDVersion(context.Background(), c.client, c.basePath, etcdversion.GetVersion(), p.CollectionID) + if err != nil { + fmt.Println("failed to get collection", err.Error()) + return + } - collID, err := cmd.Flags().GetInt64("collection") - if err != nil { - fmt.Println(err.Error()) - return - } + for _, channel := range coll.Channels { + var cp *internalpb.MsgPosition + var segmentID int64 + var err error + cp, err = c.getChannelCheckpoint(ctx, channel.VirtualName) - coll, err := common.GetCollectionByIDVersion(context.Background(), cli, basePath, etcdversion.GetVersion(), collID) + if err != nil { + cp, segmentID, err = c.getCheckpointFromSegments(ctx, p.CollectionID, channel.VirtualName) if err != nil { - fmt.Println("failed to get collection", err.Error()) - return + fmt.Println("failed to get checkpoint from segments", err.Error()) } + } - for _, channel := range coll.Channels { - var cp *internalpb.MsgPosition - var segmentID int64 - var err error - cp, err = getChannelCheckpoint(cli, basePath, channel.VirtualName) - - if err != nil { - cp, segmentID, err = getCheckpointFromSegments(cli, basePath, collID, channel.VirtualName) - } - - if cp == nil { - fmt.Printf("vchannel %s position nil\n", channel.VirtualName) - } else { - t, _ := utils.ParseTS(cp.GetTimestamp()) - fmt.Printf("vchannel %s seek to %v, cp channel: %s", channel.VirtualName, t, cp.ChannelName) - if segmentID > 0 { - fmt.Printf(", for segment ID:%d\n", segmentID) - } else { - fmt.Printf(", from channel checkpoint\n") - } - } + if cp == nil { + fmt.Printf("vchannel %s position nil\n", channel.VirtualName) + } else { + t, _ := utils.ParseTS(cp.GetTimestamp()) + fmt.Printf("vchannel %s seek to %v, cp channel: %s", channel.VirtualName, t, cp.ChannelName) + if segmentID > 0 { + fmt.Printf(", for segment ID:%d\n", segmentID) + } else { + fmt.Printf(", from channel checkpoint\n") } - }, + } } - cmd.Flags().Int64("collection", 0, "collection id to filter with") - return cmd } -func getChannelCheckpoint(cli clientv3.KV, basePath string, channelName string) (*internalpb.MsgPosition, error) { - prefix := path.Join(basePath, "datacoord-meta", "channel-cp", channelName) - results, _, err := common.ListProtoObjects[internalpb.MsgPosition](context.Background(), cli, prefix) +func (c *ComponentShow) getChannelCheckpoint(ctx context.Context, channelName string) (*internalpb.MsgPosition, error) { + prefix := path.Join(c.basePath, "datacoord-meta", "channel-cp", channelName) + results, _, err := common.ListProtoObjects[internalpb.MsgPosition](context.Background(), c.client, prefix) if err != nil { return nil, err } @@ -77,8 +68,8 @@ func getChannelCheckpoint(cli clientv3.KV, basePath string, channelName string) return &results[0], nil } -func getCheckpointFromSegments(cli clientv3.KV, basePath string, collID int64, vchannel string) (*internalpb.MsgPosition, int64, error) { - segments, err := common.ListSegments(cli, basePath, func(info *datapb.SegmentInfo) bool { +func (c *ComponentShow) getCheckpointFromSegments(ctx context.Context, collID int64, vchannel string) (*internalpb.MsgPosition, int64, error) { + segments, err := common.ListSegments(c.client, c.basePath, func(info *datapb.SegmentInfo) bool { return info.CollectionID == collID && info.InsertChannel == vchannel }) if err != nil { diff --git a/states/etcd/show/collection.go b/states/etcd/show/collection.go index 8876fed..764da82 100644 --- a/states/etcd/show/collection.go +++ b/states/etcd/show/collection.go @@ -5,9 +5,7 @@ import ( "fmt" "sort" - "github.com/spf13/cobra" - clientv3 "go.etcd.io/etcd/client/v3" - + "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/states/etcd/common" etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" @@ -16,76 +14,54 @@ import ( // CollectionCommand returns sub command for showCmd. // show collection [options...] -func CollectionCommand(cli clientv3.KV, basePath string) *cobra.Command { - cmd := &cobra.Command{ - Use: "collections", - Short: "list current available collection from RootCoord", - Aliases: []string{"collection"}, - Run: func(cmd *cobra.Command, args []string) { - collectionID, err := cmd.Flags().GetInt64("id") - if err != nil { - fmt.Println(err.Error()) - return - } - collectionName, err := cmd.Flags().GetString("name") - if err != nil { - fmt.Println(err.Error()) - return - } - dbID, err := cmd.Flags().GetInt64("dbid") - if err != nil { - fmt.Println(err.Error()) - return - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var collections []*models.Collection - var total int64 - // perform get by id to accelerate - if collectionID > 0 { - var collection *models.Collection - collection, err = common.GetCollectionByIDVersion(ctx, cli, basePath, etcdversion.GetVersion(), collectionID) - if err == nil { - collections = append(collections, collection) - } - } else { - collections, err = common.ListCollectionsVersion(ctx, cli, basePath, etcdversion.GetVersion(), func(coll *models.Collection) bool { - if collectionName != "" && coll.Schema.Name != collectionName { - return false - } - if dbID > -1 && coll.DBID != dbID { - return false - } - total++ - return true - }) - } +type CollectionParam struct { + framework.ParamBase `use:"show collections" desc:"list current available collection from RootCoord"` + CollectionID int64 `name:"id" default:"0" desc:"collection id to display"` + CollectionName string `name:"name" default:"" desc:"collection name to display"` + DatabaseID int64 `name:"dbid" default:"-1" desc:"database id to filter"` +} - if err != nil { - fmt.Println(err.Error()) - return +func (c *ComponentShow) CollectionCommand(ctx context.Context, p *CollectionParam) error { + var collections []*models.Collection + var total int64 + var err error + // perform get by id to accelerate + if p.CollectionID > 0 { + var collection *models.Collection + collection, err = common.GetCollectionByIDVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), p.CollectionID) + if err == nil { + collections = append(collections, collection) + } + } else { + collections, err = common.ListCollectionsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(coll *models.Collection) bool { + if p.CollectionName != "" && coll.Schema.Name != p.CollectionName { + return false } - channels := 0 - healthy := 0 - for _, collection := range collections { - printCollection(collection) - if collection.State == models.CollectionStateCollectionCreated { - channels += len(collection.Channels) - healthy++ - } + if p.DatabaseID > -1 && coll.DBID != p.DatabaseID { + return false } - fmt.Println("================================================================================") - fmt.Printf("--- Total collections: %d\t Matched collections: %d\n", total, len(collections)) - fmt.Printf("--- Total channel: %d\t Healthy collections: %d\n", channels, healthy) - }, + total++ + return true + }) + } + + if err != nil { + return err } + channels := 0 + healthy := 0 + for _, collection := range collections { + printCollection(collection) + if collection.State == models.CollectionStateCollectionCreated { + channels += len(collection.Channels) + healthy++ + } + } + fmt.Println("================================================================================") + fmt.Printf("--- Total collections: %d\t Matched collections: %d\n", total, len(collections)) + fmt.Printf("--- Total channel: %d\t Healthy collections: %d\n", channels, healthy) - cmd.Flags().Int64("id", 0, "collection id to display") - cmd.Flags().String("name", "", "collection name to display") - cmd.Flags().Int64("dbid", -1, "database id") - return cmd + return nil } func printCollection(collection *models.Collection) { diff --git a/states/etcd/show/collection_history.go b/states/etcd/show/collection_history.go index d77a576..b174925 100644 --- a/states/etcd/show/collection_history.go +++ b/states/etcd/show/collection_history.go @@ -4,69 +4,55 @@ import ( "context" "errors" "fmt" - "time" + "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/states/etcd/common" etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" "github.com/milvus-io/birdwatcher/utils" - "github.com/spf13/cobra" - clientv3 "go.etcd.io/etcd/client/v3" ) +type CollectionHistoryParam struct { + framework.ParamBase `use:"show collection-history" desc:"display collection change history"` + CollectionID int64 `name:"id" default:"0" desc:"collection id to display"` +} + // CollectionHistoryCommand returns sub command for showCmd. // show collection-history [options...] -func CollectionHistoryCommand(cli clientv3.KV, basePath string) *cobra.Command { - cmd := &cobra.Command{ - Use: "collection-history", - Short: "display collection change history", - Run: func(cmd *cobra.Command, args []string) { - collectionID, err := cmd.Flags().GetInt64("id") - if err != nil { - fmt.Println(err.Error()) - return - } - if collectionID == 0 { - fmt.Println("collection id not provided") - return - } - - // fetch current for now - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() - - collection, err := common.GetCollectionByIDVersion(ctx, cli, basePath, etcdversion.GetVersion(), collectionID) - if err != nil { - switch { - case errors.Is(err, common.ErrCollectionDropped): - fmt.Printf("[Current] collection id %d already marked with Tombstone\n", collectionID) - case errors.Is(err, common.ErrCollectionNotFound): - fmt.Printf("[Current] collection id %d not found\n", collectionID) - return - default: - fmt.Println("failed to get current collection state:", err.Error()) - return - } - } - printCollection(collection) - // fetch history - items, err := common.ListCollectionHistory(ctx, cli, basePath, etcdversion.GetVersion(), collectionID) - if err != nil { - fmt.Println("failed to list history", err.Error()) - return - } +func (c *ComponentShow) CollectionHistoryCommand(ctx context.Context, p *CollectionHistoryParam) { + if p.CollectionID == 0 { + fmt.Println("collection id not provided") + return + } - for _, item := range items { - t, _ := utils.ParseTS(item.Ts) - fmt.Println("Snapshot at", t.Format("2006-01-02 15:04:05")) - if item.Dropped { - fmt.Println("Collection Dropped") - continue - } - printCollection(&item.Collection) - } - }, + // fetch current for now + collection, err := common.GetCollectionByIDVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), p.CollectionID) + if err != nil { + switch { + case errors.Is(err, common.ErrCollectionDropped): + fmt.Printf("[Current] collection id %d already marked with Tombstone\n", p.CollectionID) + case errors.Is(err, common.ErrCollectionNotFound): + fmt.Printf("[Current] collection id %d not found\n", p.CollectionID) + return + default: + fmt.Println("failed to get current collection state:", err.Error()) + return + } + } + printCollection(collection) + // fetch history + items, err := common.ListCollectionHistory(ctx, c.client, c.basePath, etcdversion.GetVersion(), p.CollectionID) + if err != nil { + fmt.Println("failed to list history", err.Error()) + return } - cmd.Flags().Int64("id", 0, "collection id to display") - return cmd + for _, item := range items { + t, _ := utils.ParseTS(item.Ts) + fmt.Println("Snapshot at", t.Format("2006-01-02 15:04:05")) + if item.Dropped { + fmt.Println("Collection Dropped") + continue + } + printCollection(&item.Collection) + } } diff --git a/states/etcd/show/collection_loaded.go b/states/etcd/show/collection_loaded.go index 0a0cb45..68c6f50 100644 --- a/states/etcd/show/collection_loaded.go +++ b/states/etcd/show/collection_loaded.go @@ -3,31 +3,17 @@ package show import ( "context" "fmt" - "sort" + "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/models" - "github.com/milvus-io/birdwatcher/proto/v2.0/querypb" "github.com/milvus-io/birdwatcher/states/etcd/common" etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" - "github.com/spf13/cobra" - clientv3 "go.etcd.io/etcd/client/v3" ) const ( ReplicaMetaPrefix = "queryCoord-ReplicaMeta" ) -func printLoadedCollections(infos []*querypb.CollectionInfo) { - sort.Slice(infos, func(i, j int) bool { - return infos[i].GetCollectionID() < infos[j].GetCollectionID() - }) - - for _, info := range infos { - // TODO beautify output - fmt.Println(info.String()) - } -} - func printCollectionLoaded(info *models.CollectionLoaded) { fmt.Printf("Version: [%s]\tCollectionID: %d\n", info.Version, info.CollectionID) fmt.Printf("ReplicaNumber: %d", info.ReplicaNumber) @@ -39,30 +25,25 @@ func printCollectionLoaded(info *models.CollectionLoaded) { } } +type CollectionLoadedParam struct { + framework.ParamBase `use:"show collection-loaded" desc:"display information of loaded collection from querycoord" alias:"collection-load"` + //CollectionID int64 `name:""` +} + // CollectionLoadedCommand return show collection-loaded command. -func CollectionLoadedCommand(cli clientv3.KV, basePath string) *cobra.Command { - cmd := &cobra.Command{ - Use: "collection-loaded", - Short: "display information of loaded collection from querycoord", - Aliases: []string{"collection-load"}, - Run: func(cmd *cobra.Command, args []string) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - var total int - infos, err := common.ListCollectionLoadedInfo(ctx, cli, basePath, etcdversion.GetVersion(), func(_ any) bool { - total++ - return true - }) - if err != nil { - fmt.Println("failed to list collection load info:", err.Error()) - return - } +func (c *ComponentShow) CollectionLoadedCommand(ctx context.Context, p *CollectionLoadedParam) { + var total int + infos, err := common.ListCollectionLoadedInfo(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(_ any) bool { + total++ + return true + }) + if err != nil { + fmt.Println("failed to list collection load info:", err.Error()) + return + } - for _, info := range infos { - printCollectionLoaded(info) - } - fmt.Printf("--- Collections Loaded: %d\n", len(infos)) - }, + for _, info := range infos { + printCollectionLoaded(info) } - return cmd + fmt.Printf("--- Collections Loaded: %d\n", len(infos)) } diff --git a/states/etcd/show/component.go b/states/etcd/show/component.go new file mode 100644 index 0000000..ff6e3ae --- /dev/null +++ b/states/etcd/show/component.go @@ -0,0 +1,20 @@ +package show + +import ( + "github.com/milvus-io/birdwatcher/configs" + clientv3 "go.etcd.io/etcd/client/v3" +) + +type ComponentShow struct { + client clientv3.KV + config *configs.Config + basePath string +} + +func NewComponent(cli clientv3.KV, config *configs.Config, basePath string) *ComponentShow { + return &ComponentShow{ + client: cli, + config: config, + basePath: basePath, + } +} diff --git a/states/etcd/show/config_etcd.go b/states/etcd/show/config_etcd.go index cc0e553..1e0891e 100644 --- a/states/etcd/show/config_etcd.go +++ b/states/etcd/show/config_etcd.go @@ -4,31 +4,23 @@ import ( "context" "fmt" + "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/states/etcd/common" - "github.com/spf13/cobra" - clientv3 "go.etcd.io/etcd/client/v3" ) -// ConfigEtcdCommand return show config-etcd command. -func ConfigEtcdCommand(cli clientv3.KV, basePath string) *cobra.Command { - cmd := &cobra.Command{ - Use: "config-etcd", - Short: "list configuations set by etcd source", - Run: func(cmd *cobra.Command, args []string) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - keys, values, err := common.ListEtcdConfigs(ctx, cli, basePath) - if err != nil { - fmt.Println("failed to list configurations from etcd", err.Error()) - return - } +type ConfigEtcdParam struct { + framework.ParamBase `use:"show config-etcd" desc:"list configuations set by etcd source"` +} - for i, key := range keys { - fmt.Printf("Key: %s, Value: %s\n", key, values[i]) - } - }, +// ConfigEtcdCommand return show config-etcd command. +func (c *ComponentShow) ConfigEtcdCommand(ctx context.Context, p *ConfigEtcdParam) { + keys, values, err := common.ListEtcdConfigs(ctx, c.client, c.basePath) + if err != nil { + fmt.Println("failed to list configurations from etcd", err.Error()) + return } - return cmd + for i, key := range keys { + fmt.Printf("Key: %s, Value: %s\n", key, values[i]) + } } diff --git a/states/etcd/show/database.go b/states/etcd/show/database.go index 0f312c0..014916d 100644 --- a/states/etcd/show/database.go +++ b/states/etcd/show/database.go @@ -4,34 +4,29 @@ import ( "context" "fmt" + "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/states/etcd/common" - "github.com/spf13/cobra" - clientv3 "go.etcd.io/etcd/client/v3" ) -// DatabaseCommand returns show database comand. -func DatabaseCommand(cli clientv3.KV, basePath string) *cobra.Command { - cmd := &cobra.Command{ - Use: "database", - Short: "display Database info from rootcoord meta", - Run: func(cmd *cobra.Command, args []string) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - dbs, err := common.ListDatabase(ctx, cli, basePath) - if err != nil { - fmt.Println("failed to list database info", err.Error()) - return - } +type DatabaseParam struct { + framework.ParamBase `use:"show database" desc:"display Database info from rootcoord meta"` + DatabaseName string `name:"name" default:"" desc:"database name to filter with"` +} - for _, db := range dbs { - printDatabaseInfo(db) - } +// DatabaseCommand returns show database comand. +func (c *ComponentShow) DatabaseCommand(ctx context.Context, p *DatabaseParam) { + dbs, err := common.ListDatabase(ctx, c.client, c.basePath) + if err != nil { + fmt.Println("failed to list database info", err.Error()) + return + } - fmt.Printf("--- Total Database(s): %d\n", len(dbs)) - }, + for _, db := range dbs { + printDatabaseInfo(db) } - return cmd + + fmt.Printf("--- Total Database(s): %d\n", len(dbs)) } func printDatabaseInfo(db *models.Database) { diff --git a/states/etcd/show/index.go b/states/etcd/show/index.go index 1966918..b59c780 100644 --- a/states/etcd/show/index.go +++ b/states/etcd/show/index.go @@ -4,49 +4,43 @@ import ( "context" "fmt" "path" - "time" + "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/proto/v2.0/etcdpb" indexpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/indexpb" "github.com/milvus-io/birdwatcher/states/etcd/common" "github.com/milvus-io/birdwatcher/utils" - "github.com/spf13/cobra" - clientv3 "go.etcd.io/etcd/client/v3" ) -// IndexCommand returns show index command. -func IndexCommand(cli clientv3.KV, basePath string) *cobra.Command { - cmd := &cobra.Command{ - Use: "index", - Aliases: []string{"indexes"}, - Run: func(cmd *cobra.Command, args []string) { +type IndexParam struct { + framework.ParamBase `use:"show index" desc:"" alias:"indexes"` +} - fmt.Println("*************2.1.x***************") - // v2.0+ - meta, err := listIndexMeta(cli, basePath) - if err != nil { - fmt.Println(err.Error()) - return - } +// IndexCommand returns show index command. +func (c *ComponentShow) IndexCommand(ctx context.Context, p *IndexParam) { + fmt.Println("*************2.1.x***************") + // v2.0+ + meta, err := c.listIndexMeta(ctx) + if err != nil { + fmt.Println(err.Error()) + return + } - for _, m := range meta { - printIndex(m) - } + for _, m := range meta { + printIndex(m) + } - fmt.Println("*************2.2.x***************") - // v2.2+ - fieldIndexes, err := listIndexMetaV2(cli, basePath) - if err != nil { - fmt.Println(err.Error()) - return - } + fmt.Println("*************2.2.x***************") + // v2.2+ + fieldIndexes, err := c.listIndexMetaV2(ctx) + if err != nil { + fmt.Println(err.Error()) + return + } - for _, index := range fieldIndexes { - printIndexV2(index) - } - }, + for _, index := range fieldIndexes { + printIndexV2(index) } - return cmd } type IndexInfoV1 struct { @@ -54,12 +48,9 @@ type IndexInfoV1 struct { collectionID int64 } -func listIndexMeta(cli clientv3.KV, basePath string) ([]IndexInfoV1, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() - - prefix := path.Join(basePath, "root-coord/index") - indexes, keys, err := common.ListProtoObjects[etcdpb.IndexInfo](ctx, cli, prefix) +func (c *ComponentShow) listIndexMeta(ctx context.Context) ([]IndexInfoV1, error) { + prefix := path.Join(c.basePath, "root-coord/index") + indexes, keys, err := common.ListProtoObjects[etcdpb.IndexInfo](ctx, c.client, prefix) result := make([]IndexInfoV1, 0, len(indexes)) for idx, info := range indexes { collectionID, err := common.PathPartInt64(keys[idx], -2) @@ -75,10 +66,8 @@ func listIndexMeta(cli clientv3.KV, basePath string) ([]IndexInfoV1, error) { return result, err } -func listIndexMetaV2(cli clientv3.KV, basePath string) ([]indexpbv2.FieldIndex, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() - indexes, _, err := common.ListProtoObjects[indexpbv2.FieldIndex](ctx, cli, path.Join(basePath, "field-index")) +func (c *ComponentShow) listIndexMetaV2(ctx context.Context) ([]indexpbv2.FieldIndex, error) { + indexes, _, err := common.ListProtoObjects[indexpbv2.FieldIndex](ctx, c.client, path.Join(c.basePath, "field-index")) return indexes, err } diff --git a/states/etcd/show/partition.go b/states/etcd/show/partition.go index d2a1a6b..3b8c212 100644 --- a/states/etcd/show/partition.go +++ b/states/etcd/show/partition.go @@ -4,44 +4,32 @@ import ( "context" "fmt" + "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/states/etcd/common" - "github.com/spf13/cobra" - clientv3 "go.etcd.io/etcd/client/v3" ) -func PartitionCommand(cli clientv3.KV, basePath string) *cobra.Command { - cmd := &cobra.Command{ - Use: "partition", - Short: "list partitions of provided collection", - Run: func(cmd *cobra.Command, args []string) { - collectionID, err := cmd.Flags().GetInt64("collection") - if err != nil { - fmt.Println(err.Error()) - return - } - - if collectionID == 0 { - fmt.Println("please provided collection id") - return - } +type PartitionParam struct { + framework.ParamBase `use:"partition" desc:"list partitions of provided collection"` + CollectionID int64 `name:"collection" default:"0" desc:"collection id to list"` +} - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - partitions, err := common.ListCollectionPartitions(ctx, cli, basePath, collectionID) - if err != nil { - fmt.Println("failed to list partition info", err.Error()) - } +// PartitionCommand returns command to list partition info for provided collection. +func (c *ComponentShow) PartitionCommand(ctx context.Context, p *PartitionParam) { + if p.CollectionID == 0 { + fmt.Println("please provided collection id") + return + } - if len(partitions) == 0 { - fmt.Printf("no partition found for collection %d\n", collectionID) - } + partitions, err := common.ListCollectionPartitions(ctx, c.client, c.basePath, p.CollectionID) + if err != nil { + fmt.Println("failed to list partition info", err.Error()) + } - for _, partition := range partitions { - fmt.Printf("Parition ID: %d\tName: %s\tState: %s\n", partition.ID, partition.Name, partition.State.String()) - } - }, + if len(partitions) == 0 { + fmt.Printf("no partition found for collection %d\n", p.CollectionID) } - cmd.Flags().Int64("collection", 0, "collection id to list") - return cmd + for _, partition := range partitions { + fmt.Printf("Parition ID: %d\tName: %s\tState: %s\n", partition.ID, partition.Name, partition.State.String()) + } } diff --git a/states/etcd/show/replica.go b/states/etcd/show/replica.go index 82ef745..4463c8d 100644 --- a/states/etcd/show/replica.go +++ b/states/etcd/show/replica.go @@ -1,40 +1,30 @@ package show import ( + "context" "fmt" + "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/states/etcd/common" - "github.com/spf13/cobra" - clientv3 "go.etcd.io/etcd/client/v3" ) -// ReplicaCommand returns command for show querycoord replicas. -func ReplicaCommand(cli clientv3.KV, basePath string) *cobra.Command { - cmd := &cobra.Command{ - Use: "replica", - Short: "list current replica information from QueryCoord", - Aliases: []string{"replicas"}, - Run: func(cmd *cobra.Command, args []string) { - collID, err := cmd.Flags().GetInt64("collection") - if err != nil { - fmt.Println(err.Error()) - return - } - replicas, err := common.ListReplica(cli, basePath, collID) - if err != nil { - fmt.Println("failed to list replicas", err.Error()) - return - } +type ReplicaParam struct { + framework.ParamBase `use:"show replica" desc:"list current replica information from QueryCoord" alias:"replicas"` + CollectionID int64 `name:"collection" default:"0" desc:"collection id to filter with"` +} - for _, replica := range replicas { - printReplica(replica) - } - }, +// ReplicaCommand returns command for show querycoord replicas. +func (c *ComponentShow) ReplicaCommand(ctx context.Context, p *ReplicaParam) { + replicas, err := common.ListReplica(ctx, c.client, c.basePath, p.CollectionID) + if err != nil { + fmt.Println("failed to list replicas", err.Error()) + return } - cmd.Flags().Int64("collection", 0, "collection id to filter with") - return cmd + for _, replica := range replicas { + printReplica(replica) + } } func printReplica(replica *models.Replica) { diff --git a/states/etcd/show/segment.go b/states/etcd/show/segment.go index 259cffb..9b218e6 100644 --- a/states/etcd/show/segment.go +++ b/states/etcd/show/segment.go @@ -6,118 +6,90 @@ import ( "sort" "time" + "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/states/etcd/common" etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" "github.com/milvus-io/birdwatcher/utils" - - "github.com/spf13/cobra" - clientv3 "go.etcd.io/etcd/client/v3" ) -// SegmentCommand returns show segments command. -func SegmentCommand(cli clientv3.KV, basePath string) *cobra.Command { - cmd := &cobra.Command{ - Use: "segment", - Short: "display segment information from data coord meta store", - Aliases: []string{"segments"}, - RunE: func(cmd *cobra.Command, args []string) error { - - collID, err := cmd.Flags().GetInt64("collection") - if err != nil { - return err - } - segmentID, err := cmd.Flags().GetInt64("segment") - if err != nil { - return err - } - format, err := cmd.Flags().GetString("format") - if err != nil { - return err - } - detail, err := cmd.Flags().GetBool("detail") - if err != nil { - return err - } - state, err := cmd.Flags().GetString("state") - if err != nil { - return err - } +type SegmentParam struct { + framework.ParamBase `use:"show segment" desc:"display segment information from data coord meta store" alias:"segments"` + CollectionID int64 `name:"collection" default:"0" desc:"collection id to filter with"` + PartitionID int64 `name:"partition" default:"0" desc:"partition id to filter with"` + SegmentID int64 `name:"segment" default:"0" desc:"segment id to display"` + Format string `name:"format" default:"line" desc:"segment display format"` + Detail bool `name:"detail" default:"false" desc:"flags indicating whether printing detail binlog info"` + State string `name:"state" default:"" desc:"target segment state"` +} - segments, err := common.ListSegmentsVersion(context.Background(), cli, basePath, etcdversion.GetVersion(), func(segment *models.Segment) bool { - return (collID == 0 || segment.CollectionID == collID) && - (segmentID == 0 || segment.ID == segmentID) && - (state == "" || segment.State.String() == state) - }) - if err != nil { - fmt.Println("failed to list segments", err.Error()) - return nil - } +// SegmentCommand returns show segments command. +func (c *ComponentShow) SegmentCommand(ctx context.Context, p *SegmentParam) error { + segments, err := common.ListSegmentsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(segment *models.Segment) bool { + return (p.CollectionID == 0 || segment.CollectionID == p.CollectionID) && + (p.SegmentID == 0 || segment.ID == p.SegmentID) && + (p.State == "" || segment.State.String() == p.State) + }) + if err != nil { + fmt.Println("failed to list segments", err.Error()) + return nil + } - totalRC := int64(0) - healthy := 0 - var statslogSize int64 - var growing, sealed, flushed int - fieldSize := make(map[int64]int64) - for _, info := range segments { + totalRC := int64(0) + healthy := 0 + var statslogSize int64 + var growing, sealed, flushed int + fieldSize := make(map[int64]int64) + for _, info := range segments { - if info.State != models.SegmentStateDropped { - totalRC += info.NumOfRows - healthy++ - } - switch info.State { - case models.SegmentStateGrowing: - growing++ - case models.SegmentStateSealed: - sealed++ - case models.SegmentStateFlushing, models.SegmentStateFlushed: - flushed++ - } + if info.State != models.SegmentStateDropped { + totalRC += info.NumOfRows + healthy++ + } + switch info.State { + case models.SegmentStateGrowing: + growing++ + case models.SegmentStateSealed: + sealed++ + case models.SegmentStateFlushing, models.SegmentStateFlushed: + flushed++ + } - switch format { - case "table": - PrintSegmentInfo(info, detail) - case "line": - fmt.Printf("SegmentID: %d State: %s, Row Count:%d\n", info.ID, info.State.String(), info.NumOfRows) - case "statistics": - if info.State != models.SegmentStateDropped { - for _, binlog := range info.GetBinlogs() { - for _, log := range binlog.Binlogs { - fieldSize[binlog.FieldID] += log.LogSize - } - } - for _, statslog := range info.GetStatslogs() { - for _, binlog := range statslog.Binlogs { - statslogSize += binlog.LogSize - } - } + switch p.Format { + case "table": + PrintSegmentInfo(info, p.Detail) + case "line": + fmt.Printf("SegmentID: %d State: %s, Row Count:%d\n", info.ID, info.State.String(), info.NumOfRows) + case "statistics": + if info.State != models.SegmentStateDropped { + for _, binlog := range info.GetBinlogs() { + for _, log := range binlog.Binlogs { + fieldSize[binlog.FieldID] += log.LogSize } - } - - } - if format == "statistics" { - var totalBinlogSize int64 - for fieldID, size := range fieldSize { - fmt.Printf("\t field binlog size[%d]: %s\n", fieldID, hrSize(size)) - totalBinlogSize += size + for _, statslog := range info.GetStatslogs() { + for _, binlog := range statslog.Binlogs { + statslogSize += binlog.LogSize + } } - fmt.Printf("--- Total binlog size: %s\n", hrSize(totalBinlogSize)) - fmt.Printf("--- Total statslog size: %s\n", hrSize(statslogSize)) } - fmt.Printf("--- Growing: %d, Sealed: %d, Flushed: %d\n", growing, sealed, flushed) - fmt.Printf("--- Total Segments: %d, row count: %d\n", healthy, totalRC) - return nil - }, + } + } - cmd.Flags().Int64("collection", 0, "collection id to filter with") - cmd.Flags().Int64("partition", 0, "partition id to filter with") - cmd.Flags().String("format", "line", "segment display format") - cmd.Flags().Bool("detail", false, "flags indicating whether pring detail binlog info") - cmd.Flags().Int64("segment", 0, "segment id to filter with") - cmd.Flags().String("state", "", "target segment state") - return cmd + if p.Format == "statistics" { + var totalBinlogSize int64 + for fieldID, size := range fieldSize { + fmt.Printf("\t field binlog size[%d]: %s\n", fieldID, hrSize(size)) + totalBinlogSize += size + } + fmt.Printf("--- Total binlog size: %s\n", hrSize(totalBinlogSize)) + fmt.Printf("--- Total statslog size: %s\n", hrSize(statslogSize)) + } + + fmt.Printf("--- Growing: %d, Sealed: %d, Flushed: %d\n", growing, sealed, flushed) + fmt.Printf("--- Total Segments: %d, row count: %d\n", healthy, totalRC) + return nil } func hrSize(size int64) string { diff --git a/states/etcd/show/segment_index.go b/states/etcd/show/segment_index.go index 6fe4da7..35c0bbd 100644 --- a/states/etcd/show/segment_index.go +++ b/states/etcd/show/segment_index.go @@ -4,156 +4,131 @@ import ( "context" "fmt" "path" - "time" + "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/proto/v2.0/commonpb" "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" "github.com/milvus-io/birdwatcher/proto/v2.0/etcdpb" "github.com/milvus-io/birdwatcher/proto/v2.0/indexpb" indexpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/indexpb" "github.com/milvus-io/birdwatcher/states/etcd/common" - "github.com/spf13/cobra" - clientv3 "go.etcd.io/etcd/client/v3" ) -// SegmentIndexCommand returns show segment-index command. -func SegmentIndexCommand(cli clientv3.KV, basePath string) *cobra.Command { - cmd := &cobra.Command{ - Use: "segment-index", - Aliases: []string{"segments-index", "segment-indexes", "segments-indexes"}, - Short: "display segment index information", - Run: func(cmd *cobra.Command, args []string) { - collID, err := cmd.Flags().GetInt64("collection") - if err != nil { - fmt.Println(err.Error()) - return - } - segmentID, err := cmd.Flags().GetInt64("segment") - if err != nil { - fmt.Println(err.Error()) - return - } +type SegmentIndexParam struct { + framework.ParamBase `use:"show segment-index" desc:"display segment index information" alias:"segments-index,segment-indexes,segments-indexes"` + CollectionID int64 `name:"collection" default:"0"` + SegmentID int64 `name:"segment" default:"0"` +} - segments, err := common.ListSegments(cli, basePath, func(info *datapb.SegmentInfo) bool { - return (collID == 0 || info.CollectionID == collID) && - (segmentID == 0 || info.ID == segmentID) - }) - if err != nil { - fmt.Println(err.Error()) - return - } +// SegmentIndexCommand returns show segment-index command. +func (c *ComponentShow) SegmentIndexCommand(ctx context.Context, p *SegmentIndexParam) error { + segments, err := common.ListSegments(c.client, c.basePath, func(info *datapb.SegmentInfo) bool { + return (p.CollectionID == 0 || info.CollectionID == p.CollectionID) && + (p.SegmentID == 0 || info.ID == p.SegmentID) + }) + if err != nil { + return err + } - segmentIndexes, err := common.ListSegmentIndex(cli, basePath) - if err != nil { - fmt.Println(err.Error()) - return - } - segmentIndexesV2, err := listSegmentIndexV2(cli, basePath) - if err != nil { - fmt.Println(err.Error()) - return - } + segmentIndexes, err := common.ListSegmentIndex(c.client, c.basePath) + if err != nil { + return err + } + segmentIndexesV2, err := c.listSegmentIndexV2(ctx) + if err != nil { + return err + } - indexBuildInfo, err := common.ListIndex(cli, basePath) - if err != nil { - fmt.Println(err.Error()) - return - } + indexBuildInfo, err := common.ListIndex(ctx, c.client, c.basePath) + if err != nil { + return err + } - indexes, _, err := common.ListProtoObjects[indexpbv2.FieldIndex](context.Background(), cli, path.Join(basePath, "field-index")) - if err != nil { - fmt.Println(err.Error()) - return - } - idIdx := make(map[int64]indexpbv2.FieldIndex) - for _, idx := range indexes { - idIdx[idx.IndexInfo.IndexID] = idx - } + indexes, _, err := common.ListProtoObjects[indexpbv2.FieldIndex](ctx, c.client, path.Join(c.basePath, "field-index")) + if err != nil { + return err + } + idIdx := make(map[int64]indexpbv2.FieldIndex) + for _, idx := range indexes { + idIdx[idx.IndexInfo.IndexID] = idx + } - seg2Idx := make(map[int64][]etcdpb.SegmentIndexInfo) - seg2Idxv2 := make(map[int64][]indexpbv2.SegmentIndex) - for _, segIdx := range segmentIndexes { - idxs, ok := seg2Idx[segIdx.SegmentID] - if !ok { - idxs = []etcdpb.SegmentIndexInfo{} - } + seg2Idx := make(map[int64][]etcdpb.SegmentIndexInfo) + seg2Idxv2 := make(map[int64][]indexpbv2.SegmentIndex) + for _, segIdx := range segmentIndexes { + idxs, ok := seg2Idx[segIdx.SegmentID] + if !ok { + idxs = []etcdpb.SegmentIndexInfo{} + } - idxs = append(idxs, segIdx) + idxs = append(idxs, segIdx) - seg2Idx[segIdx.GetSegmentID()] = idxs - } - for _, segIdx := range segmentIndexesV2 { - idxs, ok := seg2Idxv2[segIdx.SegmentID] - if !ok { - idxs = []indexpbv2.SegmentIndex{} - } + seg2Idx[segIdx.GetSegmentID()] = idxs + } + for _, segIdx := range segmentIndexesV2 { + idxs, ok := seg2Idxv2[segIdx.SegmentID] + if !ok { + idxs = []indexpbv2.SegmentIndex{} + } - idxs = append(idxs, segIdx) + idxs = append(idxs, segIdx) - seg2Idxv2[segIdx.GetSegmentID()] = idxs - } + seg2Idxv2[segIdx.GetSegmentID()] = idxs + } - buildID2Info := make(map[int64]indexpb.IndexMeta) - for _, info := range indexBuildInfo { - buildID2Info[info.IndexBuildID] = info + buildID2Info := make(map[int64]indexpb.IndexMeta) + for _, info := range indexBuildInfo { + buildID2Info[info.IndexBuildID] = info + } + count := make(map[string]int) + + for _, segment := range segments { + if segment.State != commonpb.SegmentState_Flushed { + continue + } + fmt.Printf("SegmentID: %d\t State: %s", segment.GetID(), segment.GetState().String()) + segIdxs, ok := seg2Idx[segment.GetID()] + if !ok { + // try v2 index information + segIdxv2, ok := seg2Idxv2[segment.GetID()] + if !ok { + fmt.Println("\tno segment index info") + continue } - count := make(map[string]int) - - for _, segment := range segments { - if segment.State != commonpb.SegmentState_Flushed { - continue - } - fmt.Printf("SegmentID: %d\t State: %s", segment.GetID(), segment.GetState().String()) - segIdxs, ok := seg2Idx[segment.GetID()] - if !ok { - // try v2 index information - segIdxv2, ok := seg2Idxv2[segment.GetID()] - if !ok { - fmt.Println("\tno segment index info") - continue - } - for _, segIdx := range segIdxv2 { - fmt.Printf("\n\tIndexV2 build ID: %d, states %s", segIdx.GetBuildID(), segIdx.GetState().String()) - count[segIdx.GetState().String()]++ - idx, ok := idIdx[segIdx.GetIndexID()] - if ok { - fmt.Printf("\t Index Type:%v on Field ID: %d", common.GetKVPair(idx.GetIndexInfo().GetIndexParams(), "index_type"), idx.GetIndexInfo().GetFieldID()) - } - fmt.Printf("\tSerialized Size: %d\n", segIdx.GetSerializeSize()) - } - fmt.Println() + for _, segIdx := range segIdxv2 { + fmt.Printf("\n\tIndexV2 build ID: %d, states %s", segIdx.GetBuildID(), segIdx.GetState().String()) + count[segIdx.GetState().String()]++ + idx, ok := idIdx[segIdx.GetIndexID()] + if ok { + fmt.Printf("\t Index Type:%v on Field ID: %d", common.GetKVPair(idx.GetIndexInfo().GetIndexParams(), "index_type"), idx.GetIndexInfo().GetFieldID()) } - - for _, segIdx := range segIdxs { - info, ok := buildID2Info[segIdx.BuildID] - if !ok { - fmt.Printf("\tno build info found for id: %d\n", segIdx.BuildID) - fmt.Println(segIdx.String()) - } - fmt.Printf("\n\tIndex build ID: %d, state: %s", info.IndexBuildID, info.State.String()) - fmt.Printf("\t Index Type:%v on Field ID: %d", common.GetKVPair(info.GetReq().GetIndexParams(), "index_type"), segIdx.GetFieldID()) - fmt.Printf("\t info.SerializeSize: %d\n", info.GetSerializeSize()) - } - fmt.Println() - } - // Print count statistics - for idxSta, cnt := range count { - fmt.Printf("[%s]: %d\t", idxSta, cnt) + fmt.Printf("\tSerialized Size: %d\n", segIdx.GetSerializeSize()) } fmt.Println() - }, - } + } - cmd.Flags().Int64("collection", 0, "collection id to filter with") - cmd.Flags().Int64("segment", 0, "segment id to filter with") - return cmd + for _, segIdx := range segIdxs { + info, ok := buildID2Info[segIdx.BuildID] + if !ok { + fmt.Printf("\tno build info found for id: %d\n", segIdx.BuildID) + fmt.Println(segIdx.String()) + } + fmt.Printf("\n\tIndex build ID: %d, state: %s", info.IndexBuildID, info.State.String()) + fmt.Printf("\t Index Type:%v on Field ID: %d", common.GetKVPair(info.GetReq().GetIndexParams(), "index_type"), segIdx.GetFieldID()) + fmt.Printf("\t info.SerializeSize: %d\n", info.GetSerializeSize()) + } + fmt.Println() + } + // Print count statistics + for idxSta, cnt := range count { + fmt.Printf("[%s]: %d\t", idxSta, cnt) + } + fmt.Println() + return nil } -func listSegmentIndexV2(cli clientv3.KV, basePath string) ([]indexpbv2.SegmentIndex, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() - - prefix := path.Join(basePath, "segment-index") + "/" - result, _, err := common.ListProtoObjects[indexpbv2.SegmentIndex](ctx, cli, prefix) +func (c *ComponentShow) listSegmentIndexV2(ctx context.Context) ([]indexpbv2.SegmentIndex, error) { + prefix := path.Join(c.basePath, "segment-index") + "/" + result, _, err := common.ListProtoObjects[indexpbv2.SegmentIndex](ctx, c.client, prefix) return result, err } diff --git a/states/etcd/show/session.go b/states/etcd/show/session.go index 7bcda7d..6ea0303 100644 --- a/states/etcd/show/session.go +++ b/states/etcd/show/session.go @@ -1,15 +1,31 @@ package show import ( + "context" "fmt" + "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/states/etcd/common" - "github.com/spf13/cobra" - clientv3 "go.etcd.io/etcd/client/v3" ) +type SessionParam struct { + framework.ParamBase `use:"show session" desc:"list online milvus components" alias:"sessions"` +} + // SessionCommand returns show session command. // usage: show session +func (c *ComponentShow) SessionCommand(ctx context.Context, p *SessionParam) error { + sessions, err := common.ListSessions(c.client, c.basePath) + if err != nil { + return err + } + for _, session := range sessions { + fmt.Println(session.String()) + } + return nil +} + +/* func SessionCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "session", @@ -27,4 +43,4 @@ func SessionCommand(cli clientv3.KV, basePath string) *cobra.Command { }, } return cmd -} +}*/ diff --git a/states/etcd_connect.go b/states/etcd_connect.go index 6819ee3..c4d90e3 100644 --- a/states/etcd_connect.go +++ b/states/etcd_connect.go @@ -9,6 +9,7 @@ import ( "time" "github.com/milvus-io/birdwatcher/configs" + "github.com/milvus-io/birdwatcher/framework" "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" @@ -37,12 +38,12 @@ func pingEtcd(ctx context.Context, cli clientv3.KV, rootPath string, metaPath st } type ConnectParams struct { - ParamBase `use:"connect" desc:"Connect to etcd"` - EtcdAddr string `name:"etcd" default:"127.0.0.1:2379" desc:"the etcd endpoint to connect"` - RootPath string `name:"rootPath" default:"by-dev" desc:"meta root paht milvus is using"` - MetaPath string `name:"metaPath" default:"meta" desc:"meta path prefix"` - Force bool `name:"force" default:"false" desc:"force connect ignoring ping Etcd & rootPath check"` - Dry bool `name:"dry" default:"false" desc:"dry connect without specifying milvus instance"` + framework.ParamBase `use:"connect" desc:"Connect to etcd"` + EtcdAddr string `name:"etcd" default:"127.0.0.1:2379" desc:"the etcd endpoint to connect"` + RootPath string `name:"rootPath" default:"by-dev" desc:"meta root paht milvus is using"` + MetaPath string `name:"metaPath" default:"meta" desc:"meta path prefix"` + Force bool `name:"force" default:"false" desc:"force connect ignoring ping Etcd & rootPath check"` + Dry bool `name:"dry" default:"false" desc:"dry connect without specifying milvus instance"` } func (s *disconnectState) ConnectCommand(ctx context.Context, cp *ConnectParams) error { @@ -173,14 +174,14 @@ func getEtcdConnectedState(cli *clientv3.Client, addr string, config *configs.Co return state } -func (s *etcdConnectedState) DisconnectCommand(ctx context.Context, p *disconnectParam) error { +func (s *etcdConnectedState) DisconnectCommand(ctx context.Context, p *DisconnectParam) error { s.SetNext(Start(s.config)) s.Close() return nil } type FindMilvusParam struct { - ParamBase `use:"find-milvus" desc:"search etcd kvs to find milvus instance"` + framework.ParamBase `use:"find-milvus" desc:"search etcd kvs to find milvus instance"` } func (s *etcdConnectedState) FindMilvusCommand(ctx context.Context, p *FindMilvusParam) error { @@ -197,10 +198,10 @@ func (s *etcdConnectedState) FindMilvusCommand(ctx context.Context, p *FindMilvu } type UseParam struct { - ParamBase `use:"use [instance-name]" desc:"use specified milvus instance"` - instanceName string - Force bool `name:"force" default:"false" desc:"force connect ignoring ping result"` - MetaPath string `name:"metaPath" default:"meta" desc:"meta path prefix"` + framework.ParamBase `use:"use [instance-name]" desc:"use specified milvus instance"` + instanceName string + Force bool `name:"force" default:"false" desc:"force connect ignoring ping result"` + MetaPath string `name:"metaPath" default:"meta" desc:"meta path prefix"` } func (p *UseParam) ParseArgs(args []string) error { diff --git a/states/exit.go b/states/exit.go index 9f56ccb..6efccf6 100644 --- a/states/exit.go +++ b/states/exit.go @@ -3,6 +3,7 @@ package states import ( "context" + "github.com/milvus-io/birdwatcher/framework" "github.com/spf13/cobra" ) @@ -44,11 +45,11 @@ func (s *exitState) SetupCommands() {} // IsEnding returns true for exit State func (s *exitState) IsEnding() bool { return true } -type disconnectParam struct { - ParamBase `use:"disconnect" desc:"disconnect from current etcd instance"` +type DisconnectParam struct { + framework.ParamBase `use:"disconnect" desc:"disconnect from current etcd instance"` } -func (s *instanceState) DisconnectCommand(ctx context.Context, _ *disconnectParam) { +func (s *instanceState) DisconnectCommand(ctx context.Context, _ *DisconnectParam) { s.SetNext(Start(s.config)) s.Close() } diff --git a/states/instance.go b/states/instance.go index 1aac07d..75d8dd3 100644 --- a/states/instance.go +++ b/states/instance.go @@ -8,6 +8,8 @@ import ( "github.com/milvus-io/birdwatcher/configs" "github.com/milvus-io/birdwatcher/states/etcd" "github.com/milvus-io/birdwatcher/states/etcd/audit" + "github.com/milvus-io/birdwatcher/states/etcd/remove" + "github.com/milvus-io/birdwatcher/states/etcd/show" "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -15,6 +17,8 @@ import ( // instanceState provides command for single milvus instance. type instanceState struct { cmdState + *show.ComponentShow + *remove.ComponentRemove instanceName string client clientv3.KV auditFile *os.File @@ -46,8 +50,6 @@ func (s *instanceState) SetupCommands() { CurrentVersionCommand(), // show segment-loaded-grpc GetDistributionCommand(cli, basePath), - // show configurations - // GetConfigurationCommand(cli, basePath), ) cmd.AddCommand( @@ -62,7 +64,7 @@ func (s *instanceState) SetupCommands() { // set [subcommand] options... etcd.SetCommand(cli, instanceName, metaPath), // restore [subcommand] options... - etcd.RestoreCommand(cli, basePath), + // etcd.RestoreCommand(cli, basePath), // backup [component] getBackupEtcdCmd(cli, basePath), @@ -141,9 +143,11 @@ func getInstanceState(cli clientv3.KV, instanceName, metaPath string, etcdState cmdState: cmdState{ label: fmt.Sprintf("Milvus(%s)", instanceName), }, - instanceName: instanceName, - client: kv, - auditFile: file, + ComponentShow: show.NewComponent(cli, config, basePath), + ComponentRemove: remove.NewComponent(cli, config, basePath), + instanceName: instanceName, + client: kv, + auditFile: file, etcdState: etcdState, config: config, diff --git a/states/load_backup.go b/states/load_backup.go index 72930cd..860adc2 100644 --- a/states/load_backup.go +++ b/states/load_backup.go @@ -11,15 +11,16 @@ import ( "github.com/cockroachdb/errors" "github.com/milvus-io/birdwatcher/configs" + "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/models" "github.com/mitchellh/go-homedir" ) type LoadBackupParam struct { - ParamBase `use:"load-backup [file]" desc:"load etcd backup file"` - backupFile string - UseWorkspace bool `name:"use-workspace" default:"false"` - WorkspaceName string `name:"workspace-name" default:""` + framework.ParamBase `use:"load-backup [file]" desc:"load etcd backup file"` + backupFile string + UseWorkspace bool `name:"use-workspace" default:"false"` + WorkspaceName string `name:"workspace-name" default:""` } func (p *LoadBackupParam) ParseArgs(args []string) error { diff --git a/states/open.go b/states/open.go index 8d4eb62..534cbb6 100644 --- a/states/open.go +++ b/states/open.go @@ -7,15 +7,16 @@ import ( "path" "github.com/cockroachdb/errors" + "github.com/milvus-io/birdwatcher/framework" ) -type openParam struct { - ParamBase `use:"open-workspace [workspace-name]" desc:"Open workspace"` - workspaceName string +type OpenParam struct { + framework.ParamBase `use:"open-workspace [workspace-name]" desc:"Open workspace"` + workspaceName string } // ParseArgs parse args -func (p *openParam) ParseArgs(args []string) error { +func (p *OpenParam) ParseArgs(args []string) error { if len(args) == 0 { return errors.New("no backup file provided") } @@ -27,7 +28,7 @@ func (p *openParam) ParseArgs(args []string) error { } // OpenCommand implements open workspace command -func (s *disconnectState) OpenCommand(ctx context.Context, p *openParam) error { +func (s *disconnectState) OpenCommand(ctx context.Context, p *OpenParam) error { workspaceName := p.workspaceName workPath := path.Join(s.config.WorkspacePath, workspaceName) info, err := os.Stat(workPath) diff --git a/states/parse_file.go b/states/parse_file.go index 5781eaf..038e01f 100644 --- a/states/parse_file.go +++ b/states/parse_file.go @@ -15,12 +15,13 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/storage" ) type ParseIndexParam struct { - ParamBase `use:"parse-indexparam [file]" desc:"parse index params"` - filePath string + framework.ParamBase `use:"parse-indexparam [file]" desc:"parse index params"` + filePath string } func (p *ParseIndexParam) ParseArgs(args []string) error { @@ -72,8 +73,8 @@ func (s *disconnectState) ParseIndexParamCommand(ctx context.Context, p *ParseIn } type ValidateIndexParam struct { - ParamBase `use:"validate-indexfiles [directory]" desc:"validate index file size"` - directory string + framework.ParamBase `use:"validate-indexfiles [directory]" desc:"validate index file size"` + directory string } func (p *ValidateIndexParam) ParseArgs(args []string) error { @@ -175,8 +176,8 @@ func validateIndexFolder(fp string, params map[string]string) { } type AssembleIndexFilesParam struct { - ParamBase `use:"assemble-indexfiles [directory]" desc:""` - directory string + framework.ParamBase `use:"assemble-indexfiles [directory]" desc:""` + directory string } func (p *AssembleIndexFilesParam) ParseArgs(args []string) error { diff --git a/states/pulsarctl_connect.go b/states/pulsarctl_connect.go index ff9a68d..a74c1fb 100644 --- a/states/pulsarctl_connect.go +++ b/states/pulsarctl_connect.go @@ -4,20 +4,21 @@ import ( "context" "fmt" + "github.com/milvus-io/birdwatcher/framework" "github.com/spf13/cobra" pulsarctl "github.com/streamnative/pulsarctl/pkg/pulsar" "github.com/streamnative/pulsarctl/pkg/pulsar/common" "github.com/streamnative/pulsarctl/pkg/pulsar/utils" ) -type pulsarctlParam struct { - ParamBase `use:"pulsarctl" desc:"connect to pulsar admin with pulsarctl"` - Address string `name:"addr" default:"http://localhost:18080" desc:"pulsar admin address"` - AuthPlugin string `name:"authPlugin" default:"" desc:"pulsar admin auth plugin"` - AuthParam string `name:"authParam" default:"" desc:"pulsar admin auth parameters"` +type PulsarctlParam struct { + framework.ParamBase `use:"pulsarctl" desc:"connect to pulsar admin with pulsarctl"` + Address string `name:"addr" default:"http://localhost:18080" desc:"pulsar admin address"` + AuthPlugin string `name:"authPlugin" default:"" desc:"pulsar admin auth plugin"` + AuthParam string `name:"authParam" default:"" desc:"pulsar admin auth parameters"` } -func (s *disconnectState) PulsarctlCommand(ctx context.Context, p *pulsarctlParam) error { +func (s *disconnectState) PulsarctlCommand(ctx context.Context, p *PulsarctlParam) error { config := common.Config{ WebServiceURL: p.Address, diff --git a/states/states.go b/states/states.go index ad71334..7fc96fe 100644 --- a/states/states.go +++ b/states/states.go @@ -11,6 +11,7 @@ import ( "strings" "syscall" + "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/states/autocomplete" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -131,7 +132,7 @@ func (s *cmdState) Close() {} func (s *cmdState) IsEnding() bool { return false } type exitParam struct { - ParamBase `use:"exit" desc:"Close this CLI tool"` + framework.ParamBase `use:"exit" desc:"Close this CLI tool"` } // ExitCommand returns exit command @@ -157,92 +158,103 @@ func parseFunctionCommands(state State) []commandItem { continue } - t := mt.Type - var use string - var short string - var paramType reflect.Type - - if t.NumIn() == 0 { - // shall not be reached + cmd, uses, ok := parseMethod(state, mt) + if !ok { continue } - if t.NumIn() > 1 { - // should be context.Context - in := t.In(1) - if !in.Implements(reflect.TypeOf((*context.Context)(nil)).Elem()) { - continue - } - } - if t.NumIn() > 2 { - // should be CmdParam - in := t.In(2) - if !in.Implements(reflect.TypeOf((*CmdParam)(nil)).Elem()) { - continue - } - cp, ok := reflect.New(in.Elem()).Interface().(CmdParam) - if !ok { - fmt.Println("conversion failed", in.Name()) - } else { - paramType = in - use, short = cp.Desc() - } - } - cp := reflect.New(paramType.Elem()).Interface().(CmdParam) - fUse, fDesc := getCmdFromFlag(cp) - if len(use) == 0 { - use = fUse + commands = append(commands, commandItem{ + kws: uses[:len(uses)-1], + cmd: cmd, + }) + } + + return commands +} + +func parseMethod(state State, mt reflect.Method) (*cobra.Command, []string, bool) { + v := reflect.ValueOf(state) + t := mt.Type + var use string + var short string + var paramType reflect.Type + + if t.NumIn() == 0 { + // shall not be reached + return nil, nil, false + } + if t.NumIn() > 1 { + // should be context.Context + in := t.In(1) + if !in.Implements(reflect.TypeOf((*context.Context)(nil)).Elem()) { + return nil, nil, false } - if len(short) == 0 { - short = fDesc + } + if t.NumIn() > 2 { + // should be CmdParam + in := t.In(2) + if !in.Implements(reflect.TypeOf((*framework.CmdParam)(nil)).Elem()) { + return nil, nil, false } - if len(use) == 0 { - fnName := mt.Name - use = strings.ToLower(fnName[:len(fnName)-8]) + cp, ok := reflect.New(in.Elem()).Interface().(framework.CmdParam) + if !ok { + fmt.Println("conversion failed", in.Name()) + } else { + paramType = in + use, short = cp.Desc() } - uses := parseUseSegments(use) - lastKw := uses[len(uses)-1] + } + + //fmt.Println(mt.Name) + cp := reflect.New(paramType.Elem()).Interface().(framework.CmdParam) + fUse, fDesc := getCmdFromFlag(cp) + if len(use) == 0 { + use = fUse + } + if len(short) == 0 { + short = fDesc + } + if len(use) == 0 { + fnName := mt.Name + use = strings.ToLower(fnName[:len(fnName)-8]) + } + uses := parseUseSegments(use) + lastKw := uses[len(uses)-1] - cmd := &cobra.Command{ - Use: lastKw, + cmd := &cobra.Command{ + Use: lastKw, + } + setupFlags(cp, cmd.Flags()) + cmd.Short = short + cmd.Run = func(cmd *cobra.Command, args []string) { + cp := reflect.New(paramType.Elem()).Interface().(framework.CmdParam) + + cp.ParseArgs(args) + if err := parseFlags(cp, cmd.Flags()); err != nil { + fmt.Println(err.Error()) + return } - setupFlags(cp, cmd.Flags()) - cmd.Short = short - cmd.Run = func(cmd *cobra.Command, args []string) { - cp := reflect.New(paramType.Elem()).Interface().(CmdParam) - - cp.ParseArgs(args) - if err := parseFlags(cp, cmd.Flags()); err != nil { - fmt.Println(err.Error()) - return - } - ctx, cancel := state.Ctx() //context.WithCancel(context.Background()) - defer cancel() - - m := v.MethodByName(mt.Name) - results := m.Call([]reflect.Value{ - reflect.ValueOf(ctx), - reflect.ValueOf(cp), - }) - if len(results) > 0 { - if results[0].Type().Implements(reflect.TypeOf((*error)(nil)).Elem()) { - if !results[0].IsNil() { - err := results[0].Interface().(error) - fmt.Println(err.Error()) - } + ctx, cancel := state.Ctx() + defer cancel() + + m := v.MethodByName(mt.Name) + results := m.Call([]reflect.Value{ + reflect.ValueOf(ctx), + reflect.ValueOf(cp), + }) + if len(results) > 0 { + if results[0].Type().Implements(reflect.TypeOf((*error)(nil)).Elem()) { + if !results[0].IsNil() { + err := results[0].Interface().(error) + fmt.Println(err.Error()) } } } - commands = append(commands, commandItem{ - kws: uses[:len(uses)-1], - cmd: cmd, - }) } - - return commands + return cmd, uses, true } -func getCmdFromFlag(p CmdParam) (string, string) { +func getCmdFromFlag(p framework.CmdParam) (string, string) { v := reflect.ValueOf(p) if v.Kind() != reflect.Pointer { fmt.Println("param is not pointer") @@ -287,7 +299,7 @@ func parseUseSegments(use string) []string { return result } -func setupFlags(p CmdParam, flags *pflag.FlagSet) { +func setupFlags(p framework.CmdParam, flags *pflag.FlagSet) { v := reflect.ValueOf(p) if v.Kind() != reflect.Pointer { fmt.Println("param is not pointer") @@ -330,7 +342,7 @@ func setupFlags(p CmdParam, flags *pflag.FlagSet) { } } -func parseFlags(p CmdParam, flags *pflag.FlagSet) error { +func parseFlags(p framework.CmdParam, flags *pflag.FlagSet) error { v := reflect.ValueOf(p) if v.Kind() != reflect.Pointer { @@ -374,20 +386,3 @@ func parseFlags(p CmdParam, flags *pflag.FlagSet) error { return nil } - -// CmdParam is the interface definition for command parameter. -type CmdParam interface { - ParseArgs(args []string) error - Desc() (string, string) -} - -// ParamBase implmenet CmdParam when empty args parser. -type ParamBase struct{} - -func (pb ParamBase) ParseArgs(args []string) error { - return nil -} - -func (pb ParamBase) Desc() (string, string) { - return "", "" -} diff --git a/states/util.go b/states/util.go index 00c70d1..6663a06 100644 --- a/states/util.go +++ b/states/util.go @@ -27,6 +27,7 @@ import ( "time" "github.com/milvus-io/birdwatcher/common" + "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" clientv3 "go.etcd.io/etcd/client/v3" @@ -37,17 +38,17 @@ const ( logicalBitsMask = (1 << logicalBits) - 1 ) -type parseTSParam struct { - ParamBase `use:"parse-ts" desc:"parse hybrid timestamp"` - args []string +type ParseTSParam struct { + framework.ParamBase `use:"parse-ts" desc:"parse hybrid timestamp"` + args []string } -func (p *parseTSParam) ParseArgs(args []string) error { +func (p *ParseTSParam) ParseArgs(args []string) error { p.args = args return nil } -func (s *cmdState) ParseTSCommand(ctx context.Context, p *parseTSParam) { +func (s *cmdState) ParseTSCommand(ctx context.Context, p *ParseTSParam) { if len(p.args) == 0 { fmt.Println("no ts provided") } @@ -64,11 +65,11 @@ func (s *cmdState) ParseTSCommand(ctx context.Context, p *parseTSParam) { } } -type printVerParam struct { - ParamBase `use:"version" desc:"print version"` +type PrintVerParam struct { + framework.ParamBase `use:"version" desc:"print version"` } -func (s *cmdState) PrintVersionCommand(ctx context.Context, _ *printVerParam) { +func (s *cmdState) PrintVersionCommand(ctx context.Context, _ *PrintVerParam) { fmt.Println("Birdwatcher Version", common.Version) }