diff --git a/bapps/webserver.go b/bapps/webserver.go new file mode 100644 index 0000000..c5adb33 --- /dev/null +++ b/bapps/webserver.go @@ -0,0 +1,272 @@ +package bapps + +import ( + "context" + "errors" + "fmt" + "net/http" + "reflect" + "strconv" + "strings" + + "github.com/gin-gonic/gin" + "github.com/milvus-io/birdwatcher/common" + "github.com/milvus-io/birdwatcher/configs" + "github.com/milvus-io/birdwatcher/framework" + "github.com/milvus-io/birdwatcher/models" + "github.com/milvus-io/birdwatcher/states" + etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" +) + +type WebServerApp struct { + port int + config *configs.Config +} + +type InstanceInfo struct { + EtcdAddr string `form:"etcd"` + RootPath string `form:"rootPath"` +} + +func (app *WebServerApp) Run(states.State) { + r := gin.Default() + etcdversion.SetVersion(models.GTEVersion2_2) + + r.GET("/version", func(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{"version": common.Version}) + }) + + app.ParseRouter(r, &states.InstanceState{}) + + r.Run(fmt.Sprintf(":%d", app.port)) +} + +func (app *WebServerApp) ParseRouter(r *gin.Engine, s states.State) { + v := reflect.ValueOf(s) + tp := v.Type() + + for i := 0; i < v.NumMethod(); i++ { + mt := tp.Method(i) + + // parse method like with pattern %Command + if !strings.HasSuffix(mt.Name, "Command") { + continue + } + + // fmt.Println("parsing method", mt.Name) + app.parseMethod(r, mt, mt.Name) + } +} + +func (app *WebServerApp) parseMethod(r *gin.Engine, mt reflect.Method, name string) { + // v := reflect.ValueOf(s) + t := mt.Type + var use string + var paramType reflect.Type + + if t.NumIn() == 0 { + // shall not be reached + return + } + if t.NumIn() > 1 { + // should be context.Context + in := t.In(1) + if !in.Implements(reflect.TypeOf((*context.Context)(nil)).Elem()) { + return + } + } + if t.NumIn() > 2 { + // should be CmdParam + in := t.In(2) + if !in.Implements(reflect.TypeOf((*framework.CmdParam)(nil)).Elem()) { + return + } + cp, ok := reflect.New(in.Elem()).Interface().(framework.CmdParam) + if !ok { + fmt.Println("conversion failed", in.Name()) + } else { + paramType = in + use, _ = cp.Desc() + } + } + if t.NumOut() == 0 { + fmt.Printf("%s not output\n", name) + return + } + + if t.NumOut() > 0 { + // should be ResultSet + out := t.Out(0) + if !out.Implements(reflect.TypeOf((*framework.ResultSet)(nil)).Elem()) { + fmt.Printf("%s output not ResultSet\n", name) + return + } + } + + //fmt.Println(mt.Name) + cp := reflect.New(paramType.Elem()).Interface().(framework.CmdParam) + fUse, _ := states.GetCmdFromFlag(cp) + if len(use) == 0 { + use = fUse + } + + if len(use) == 0 { + fnName := mt.Name + use = strings.ToLower(fnName[:len(fnName)-8]) + } + uses := states.ParseUseSegments(use) + lastKw := uses[len(uses)-1] + // hard code, show xxx command only + if uses[0] != "show" { + return + } + + // fmt.Printf("path: /%s\n", lastKw) + + r.GET(fmt.Sprintf("/%s", lastKw), func(c *gin.Context) { + + info := &InstanceInfo{} + c.ShouldBind(info) + + start := states.Start(app.config) + s, err := start.Process(fmt.Sprintf("connect --etcd=%s --rootPath=%s", info.EtcdAddr, info.RootPath)) + + if err != nil { + c.Error(err) + return + } + + v := reflect.ValueOf(s) + cp := reflect.New(paramType.Elem()).Interface().(framework.CmdParam) + setupDefaultValue(cp) + if err := app.BindCmdParam(c, cp); err != nil { + c.Error(err) + return + } + + m := v.MethodByName(mt.Name) + results := m.Call([]reflect.Value{ + reflect.ValueOf(c), + reflect.ValueOf(cp), + }) + + // reverse order, check error first + for i := 0; i < len(results); i++ { + result := results[len(results)-i-1] + switch { + case result.Type().Implements(reflect.TypeOf((*error)(nil)).Elem()): + // error nil, skip + if result.IsNil() { + continue + } + err := result.Interface().(error) + c.Error(err) + return + case result.Type().Implements(reflect.TypeOf((*framework.ResultSet)(nil)).Elem()): + if result.IsNil() { + continue + } + rs := result.Interface().(framework.ResultSet) + c.JSON(http.StatusOK, rs.Entities()) + return + } + } + + c.Error(errors.New("unexpected branch reached, no result set found")) + }) +} + +func (app *WebServerApp) BindCmdParam(c *gin.Context, cp framework.CmdParam) error { + v := reflect.ValueOf(cp) + if v.Kind() != reflect.Pointer { + return errors.New("param is not pointer") + } + + for v.Kind() != reflect.Struct { + v = v.Elem() + } + tp := v.Type() + + for i := 0; i < v.NumField(); i++ { + f := tp.Field(i) + if !f.IsExported() { + continue + } + name := f.Tag.Get("name") + rawStr, ok := c.GetQuery(name) + if !ok { + continue + } + switch f.Type.Kind() { + case reflect.Int64: + var dv int64 + if v, err := strconv.ParseInt(rawStr, 10, 64); err == nil { + dv = v + } + v.Field(i).SetInt(dv) + fmt.Println("set default", f.Name, dv) + case reflect.String: + v.Field(i).SetString(rawStr) + case reflect.Bool: + var dv bool + if v, err := strconv.ParseBool(rawStr); err == nil { + dv = v + } + v.Field(i).SetBool(dv) + case reflect.Struct: + continue + default: + return fmt.Errorf("field %s with kind %s not supported yet", f.Name, f.Type.Kind()) + } + } + return nil +} + +func setupDefaultValue(p framework.CmdParam) { + v := reflect.ValueOf(p) + if v.Kind() != reflect.Pointer { + fmt.Println("param is not pointer") + return + } + + for v.Kind() != reflect.Struct { + v = v.Elem() + } + tp := v.Type() + + for i := 0; i < v.NumField(); i++ { + f := tp.Field(i) + if !f.IsExported() { + continue + } + defaultStr := f.Tag.Get("default") + switch f.Type.Kind() { + case reflect.Int64: + var dv int64 + if v, err := strconv.ParseInt(defaultStr, 10, 64); err == nil { + dv = v + } + v.Field(i).SetInt(dv) + fmt.Println("set default", f.Name, dv) + case reflect.String: + v.Field(i).SetString(defaultStr) + case reflect.Bool: + var dv bool + if v, err := strconv.ParseBool(defaultStr); err == nil { + dv = v + } + v.Field(i).SetBool(dv) + case reflect.Struct: + continue + default: + fmt.Printf("field %s with kind %s not supported yet\n", f.Name, f.Type.Kind()) + } + } +} + +func NewWebServerApp(port int, config *configs.Config) *WebServerApp { + return &WebServerApp{ + port: port, + config: config, + } +} diff --git a/framework/resultset.go b/framework/resultset.go new file mode 100644 index 0000000..eaecc2d --- /dev/null +++ b/framework/resultset.go @@ -0,0 +1,69 @@ +package framework + +type Format int32 + +const ( + FormatDefault Format = iota + 1 + FormatPlain + FormatJSON + FormatTable +) + +var ( + name2Format = map[string]Format{ + "default": FormatDefault, + "plain": FormatPlain, + "json": FormatJSON, + "table": FormatTable, + } +) + +// ResultSet is the interface for command result set. +type ResultSet interface { + PrintAs(Format) string + Entities() any +} + +// PresetResultSet implements Stringer and "memorize" output format. +type PresetResultSet struct { + ResultSet + format Format +} + +func (rs *PresetResultSet) String() string { + if rs.format < FormatDefault { + return rs.PrintAs(FormatDefault) + } + return rs.PrintAs(rs.format) +} + +// NameFormat name to format mapping tool function. +func NameFormat(name string) Format { + f, ok := name2Format[name] + if !ok { + return FormatDefault + } + return f +} + +type ListResultSet[T any] struct { + Data []T +} + +func (rs *ListResultSet[T]) Entities() any { + return rs.Data +} + +func (rs *ListResultSet[T]) SetData(data []T) { + rs.Data = data +} + +func NewListResult[LRS any, P interface { + *LRS + SetData([]E) +}, E any](data []E) *LRS { + var t LRS + var p P = &t + p.SetData(data) + return &t +} diff --git a/main.go b/main.go index 8c9843b..a4755b2 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,8 @@ import ( var ( oneLineCommand = flag.String("olc", "", "one line command execution mode") simple = flag.Bool("simple", false, "use simple ui without suggestion and history") + restServer = flag.Bool("rest", false, "rest server address") + webPort = flag.Int("port", 8002, "listening port for web server") printVersion = flag.Bool("version", false, "print version") ) @@ -34,6 +36,8 @@ func main() { appFactory = func(*configs.Config) bapps.BApp { return bapps.NewSimpleApp() } case len(*oneLineCommand) > 0: appFactory = func(*configs.Config) bapps.BApp { return bapps.NewOlcApp(*oneLineCommand) } + case *restServer: + appFactory = func(config *configs.Config) bapps.BApp { return bapps.NewWebServerApp(*webPort, config) } default: defer handleExit() // open file and create if non-existent diff --git a/models/channel_watch.go b/models/channel_watch.go index 10fb2e5..5de7d51 100644 --- a/models/channel_watch.go +++ b/models/channel_watch.go @@ -59,6 +59,6 @@ func getVChannelInfo[info interface { UnflushedSegmentIds: vchan.GetUnflushedSegmentIds(), FlushedSegmentIds: vchan.GetFlushedSegmentIds(), DroppedSegmentIds: vchan.GetDroppedSegmentIds(), - SeekPosition: newMsgPosition(vchan.GetSeekPosition()), + SeekPosition: NewMsgPosition(vchan.GetSeekPosition()), } } diff --git a/models/data_type.go b/models/data_type.go index b472c90..2f4c6e8 100644 --- a/models/data_type.go +++ b/models/data_type.go @@ -13,6 +13,8 @@ const ( DataTypeDouble DataType = 11 DataTypeString DataType = 20 DataTypeVarChar DataType = 21 + DataTypeArray DataType = 22 + DataTypeJSON DataType = 23 DataTypeBinaryVector DataType = 100 DataTypeFloatVector DataType = 101 ) @@ -28,6 +30,8 @@ var DataTypename = map[int32]string{ 11: "Double", 20: "String", 21: "VarChar", + 22: "Array", + 23: "JSON", 100: "BinaryVector", 101: "FloatVector", } @@ -43,6 +47,8 @@ var DataTypevalue = map[string]int32{ "Double": 11, "String": 20, "VarChar": 21, + "Array": 22, + "JSON": 23, "BinaryVector": 100, "FloatVector": 101, } diff --git a/models/segment.go b/models/segment.go index 2f4275a..8eb0d81 100644 --- a/models/segment.go +++ b/models/segment.go @@ -72,8 +72,8 @@ func NewSegmentFromV2_1(info *datapb.SegmentInfo, key string) *Segment { s := newSegmentFromBase(info) s.key = key s.State = SegmentState(info.GetState()) - s.StartPosition = newMsgPosition(info.GetStartPosition()) - s.DmlPosition = newMsgPosition(info.GetDmlPosition()) + s.StartPosition = NewMsgPosition(info.GetStartPosition()) + s.DmlPosition = NewMsgPosition(info.GetDmlPosition()) mFunc := func(fbl *datapb.FieldBinlog, _ int) *FieldBinlog { r := &FieldBinlog{ @@ -97,8 +97,8 @@ func NewSegmentFromV2_2(info *datapbv2.SegmentInfo, key string, s := newSegmentFromBase(info) s.key = key s.State = SegmentState(info.GetState()) - s.StartPosition = newMsgPosition(info.GetStartPosition()) - s.DmlPosition = newMsgPosition(info.GetDmlPosition()) + s.StartPosition = NewMsgPosition(info.GetStartPosition()) + s.DmlPosition = NewMsgPosition(info.GetDmlPosition()) s.lazyLoad = func(s *Segment) { mFunc := func(fbl datapbv2.FieldBinlog, _ int) *FieldBinlog { @@ -179,7 +179,7 @@ type msgPosBase interface { GetTimestamp() uint64 } -func newMsgPosition[T msgPosBase](pos T) *MsgPosition { +func NewMsgPosition[T msgPosBase](pos T) *MsgPosition { return &MsgPosition{ ChannelName: pos.GetChannelName(), MsgID: pos.GetMsgID(), diff --git a/states/configuration.go b/states/configuration.go index 3350975..8fcd43f 100644 --- a/states/configuration.go +++ b/states/configuration.go @@ -23,7 +23,7 @@ type GetConfigurationParam struct { Format string `name:"format" default:"line" desc:"output format"` } -func (s *instanceState) GetConfigurationCommand(ctx context.Context, p *GetConfigurationParam) error { +func (s *InstanceState) GetConfigurationCommand(ctx context.Context, p *GetConfigurationParam) error { sessions, err := common.ListSessions(s.client, s.basePath) if err != nil { return err diff --git a/states/current_version.go b/states/current_version.go index 53e59aa..f655112 100644 --- a/states/current_version.go +++ b/states/current_version.go @@ -35,7 +35,7 @@ func (p *setCurrentVersionParam) ParseArgs(args []string) error { return nil } -func (s *instanceState) SetCurrentVersionCommand(ctx context.Context, param setCurrentVersionParam) { +func (s *InstanceState) SetCurrentVersionCommand(ctx context.Context, param setCurrentVersionParam) { switch param.newVersion { case models.LTEVersion2_1: fallthrough diff --git a/states/etcd/show/alias.go b/states/etcd/show/alias.go index 68c12db..b774d5f 100644 --- a/states/etcd/show/alias.go +++ b/states/etcd/show/alias.go @@ -3,7 +3,9 @@ package show import ( "context" "fmt" + "strings" + "github.com/cockroachdb/errors" "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/states/etcd/common" @@ -17,26 +19,43 @@ type AliasParam struct { } // AliasCommand implements `show alias` command. -func (c *ComponentShow) AliasCommand(ctx context.Context, p *AliasParam) error { +func (c *ComponentShow) AliasCommand(ctx context.Context, p *AliasParam) (*Aliases, error) { aliases, err := common.ListAliasVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(a *models.Alias) bool { return p.DBID == -1 || p.DBID == a.DBID }) if err != nil { - return err + return nil, errors.Wrap(err, "failed to list alias info") } - for dbid, aliases := range lo.GroupBy(aliases, func(a *models.Alias) int64 { return a.DBID }) { - fmt.Println("==========================") - fmt.Println("Database ID: ", dbid) - for _, alias := range aliases { - c.PrintAlias(alias) + return framework.NewListResult[Aliases](aliases), nil +} + +type Aliases struct { + framework.ListResultSet[*models.Alias] +} + +func (rs *Aliases) PrintAs(format framework.Format) string { + switch format { + case framework.FormatDefault, framework.FormatPlain: + sb := &strings.Builder{} + for dbid, aliases := range lo.GroupBy(rs.Data, func(a *models.Alias) int64 { return a.DBID }) { + fmt.Fprintln(sb, "==========================") + fmt.Println(sb, "Database ID: ", dbid) + for _, alias := range aliases { + rs.PrintAlias(sb, alias) + } } + return sb.String() + default: } + return "" +} - return nil +func (rs *Aliases) Entities() any { + return rs.Data } -func (c *ComponentShow) PrintAlias(a *models.Alias) { +func (rs *Aliases) PrintAlias(sb *strings.Builder, a *models.Alias) { fmt.Printf("Collection ID: %d\tAlias Name: %s\tState: %s\n", a.CollectionID, a.Name, a.State.String()) } diff --git a/states/etcd/show/channel_watched.go b/states/etcd/show/channel_watched.go index 2ec02f6..55d0b1b 100644 --- a/states/etcd/show/channel_watched.go +++ b/states/etcd/show/channel_watched.go @@ -3,8 +3,10 @@ package show import ( "context" "fmt" + "strings" "time" + "github.com/cockroachdb/errors" "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/states/etcd/common" @@ -18,39 +20,53 @@ type ChannelWatchedParam struct { } // ChannelWatchedCommand return show channel-watched commands. -func (c *ComponentShow) ChannelWatchedCommand(ctx context.Context, p *ChannelWatchedParam) { +func (c *ComponentShow) ChannelWatchedCommand(ctx context.Context, p *ChannelWatchedParam) (*ChannelsWatched, error) { infos, err := common.ListChannelWatch(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(channel *models.ChannelWatch) bool { return p.CollectionID == 0 || channel.Vchan.CollectionID == p.CollectionID }) if err != nil { - fmt.Println("failed to list channel watch info", err.Error()) - return + return nil, errors.Wrap(err, "failed to list channel watch info") } - for _, info := range infos { - printChannelWatchInfo(info) - } + return framework.NewListResult[ChannelsWatched](infos), nil +} + +type ChannelsWatched struct { + framework.ListResultSet[*models.ChannelWatch] +} + +func (rs *ChannelsWatched) PrintAs(format framework.Format) string { + switch format { + case framework.FormatDefault, framework.FormatPlain: + sb := &strings.Builder{} + for _, info := range rs.Data { + rs.printChannelWatchInfo(sb, info) + } - fmt.Printf("--- Total Channels: %d\n", len(infos)) + fmt.Fprintf(sb, "--- Total Channels: %d\n", len(rs.Data)) + return sb.String() + default: + } + return "" } -func printChannelWatchInfo(info *models.ChannelWatch) { - fmt.Println("=============================") - fmt.Printf("key: %s\n", info.Key()) - fmt.Printf("Channel Name:%s \t WatchState: %s\n", info.Vchan.ChannelName, info.State.String()) +func (rs *ChannelsWatched) printChannelWatchInfo(sb *strings.Builder, info *models.ChannelWatch) { + fmt.Fprintln(sb, "=============================") + fmt.Fprintf(sb, "key: %s\n", info.Key()) + fmt.Fprintf(sb, "Channel Name:%s \t WatchState: %s\n", info.Vchan.ChannelName, info.State.String()) //t, _ := ParseTS(uint64(info.GetStartTs())) //to, _ := ParseTS(uint64(info.GetTimeoutTs())) t := time.Unix(info.StartTs, 0) to := time.Unix(0, info.TimeoutTs) - fmt.Printf("Channel Watch start from: %s, timeout at: %s\n", t.Format(tsPrintFormat), to.Format(tsPrintFormat)) + fmt.Fprintf(sb, "Channel Watch start from: %s, timeout at: %s\n", t.Format(tsPrintFormat), to.Format(tsPrintFormat)) pos := info.Vchan.SeekPosition if pos != nil { startTime, _ := utils.ParseTS(pos.Timestamp) - fmt.Printf("Start Position ID: %v, time: %s\n", pos.MsgID, startTime.Format(tsPrintFormat)) + fmt.Fprintf(sb, "Start Position ID: %v, time: %s\n", pos.MsgID, startTime.Format(tsPrintFormat)) } - fmt.Printf("Unflushed segments: %v\n", info.Vchan.UnflushedSegmentIds) - fmt.Printf("Flushed segments: %v\n", info.Vchan.FlushedSegmentIds) - fmt.Printf("Dropped segments: %v\n", info.Vchan.DroppedSegmentIds) + fmt.Fprintf(sb, "Unflushed segments: %v\n", info.Vchan.UnflushedSegmentIds) + fmt.Fprintf(sb, "Flushed segments: %v\n", info.Vchan.FlushedSegmentIds) + fmt.Fprintf(sb, "Dropped segments: %v\n", info.Vchan.DroppedSegmentIds) } diff --git a/states/etcd/show/checkpoint.go b/states/etcd/show/checkpoint.go index 13836d1..67422fb 100644 --- a/states/etcd/show/checkpoint.go +++ b/states/etcd/show/checkpoint.go @@ -4,8 +4,11 @@ import ( "context" "fmt" "path" + "strings" + "github.com/cockroachdb/errors" "github.com/milvus-io/birdwatcher/framework" + "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/proto/v2.0/commonpb" "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" "github.com/milvus-io/birdwatcher/proto/v2.0/internalpb" @@ -20,38 +23,74 @@ type CheckpointParam struct { } // CheckpointCommand returns show checkpoint command. -func (c *ComponentShow) CheckpointCommand(ctx context.Context, p *CheckpointParam) { +func (c *ComponentShow) CheckpointCommand(ctx context.Context, p *CheckpointParam) (*Checkpoints, error) { coll, err := common.GetCollectionByIDVersion(context.Background(), c.client, c.basePath, etcdversion.GetVersion(), p.CollectionID) if err != nil { - fmt.Println("failed to get collection", err.Error()) - return + return nil, errors.Wrap(err, "failed to get collection") } + checkpoints := make([]*Checkpoint, 0, len(coll.Channels)) for _, channel := range coll.Channels { + var checkpoint = &Checkpoint{ + Channel: &models.Channel{ + PhysicalName: channel.PhysicalName, + VirtualName: channel.VirtualName, + }, + } var cp *internalpb.MsgPosition var segmentID int64 var err error + cp, err = c.getChannelCheckpoint(ctx, channel.VirtualName) + if err == nil { + checkpoint.Source = "Channel Checkpoint" + checkpoint.Checkpoint = models.NewMsgPosition(cp) + checkpoints = append(checkpoints, checkpoint) + continue + } - if err != nil { - cp, segmentID, err = c.getCheckpointFromSegments(ctx, p.CollectionID, channel.VirtualName) - if err != nil { - fmt.Println("failed to get checkpoint from segments", err.Error()) - } + cp, segmentID, err = c.getCheckpointFromSegments(ctx, p.CollectionID, channel.VirtualName) + if err == nil { + checkpoint.Source = fmt.Sprintf("from segment id %d", segmentID) + checkpoint.Checkpoint = models.NewMsgPosition(cp) + checkpoints = append(checkpoints, checkpoint) + continue } - if cp == nil { - fmt.Printf("vchannel %s position nil\n", channel.VirtualName) - } else { - t, _ := utils.ParseTS(cp.GetTimestamp()) - fmt.Printf("vchannel %s seek to %v, cp channel: %s", channel.VirtualName, t, cp.ChannelName) - if segmentID > 0 { - fmt.Printf(", for segment ID:%d\n", segmentID) - } else { - fmt.Printf(", from channel checkpoint\n") + checkpoints = append(checkpoints, checkpoint) + } + + return framework.NewListResult[Checkpoints](checkpoints), nil +} + +type Checkpoint struct { + Channel *models.Channel + Source string + Checkpoint *models.MsgPosition +} + +type Checkpoints struct { + framework.ListResultSet[*Checkpoint] +} + +func (rs *Checkpoints) PrintAs(format framework.Format) string { + switch format { + case framework.FormatDefault, framework.FormatPlain: + sb := &strings.Builder{} + for _, checkpoint := range rs.Data { + if checkpoint.Checkpoint == nil { + fmt.Fprintf(sb, "Vchannel %s checkpoint not found, fallback to collection start pos\n", checkpoint.Channel.VirtualName) + continue } + t, _ := utils.ParseTS(checkpoint.Checkpoint.GetTimestamp()) + fmt.Fprintf(sb, "vchannel %s seek to %v, cp channel: %s, Source: %s\n", + checkpoint.Channel.VirtualName, t, checkpoint.Checkpoint.ChannelName, + checkpoint.Source) } + return sb.String() + default: } + return "" } func (c *ComponentShow) getChannelCheckpoint(ctx context.Context, channelName string) (*internalpb.MsgPosition, error) { diff --git a/states/etcd/show/collection.go b/states/etcd/show/collection.go index 9dde779..8e185ef 100644 --- a/states/etcd/show/collection.go +++ b/states/etcd/show/collection.go @@ -23,7 +23,7 @@ type CollectionParam struct { State string `name:"state" default:"" desc:"collection state to filter"` } -func (c *ComponentShow) CollectionCommand(ctx context.Context, p *CollectionParam) error { +func (c *ComponentShow) CollectionCommand(ctx context.Context, p *CollectionParam) (*Collections, error) { var collections []*models.Collection var total int64 var err error @@ -51,25 +51,52 @@ func (c *ComponentShow) CollectionCommand(ctx context.Context, p *CollectionPara } if err != nil { - return err + return nil, err } channels := 0 healthy := 0 for _, collection := range collections { - printCollection(collection) if collection.State == models.CollectionStateCollectionCreated { channels += len(collection.Channels) healthy++ } } - fmt.Println("================================================================================") - fmt.Printf("--- Total collections: %d\t Matched collections: %d\n", total, len(collections)) - fmt.Printf("--- Total channel: %d\t Healthy collections: %d\n", channels, healthy) - return nil + return &Collections{ + collections: collections, + total: total, + channels: channels, + healthy: healthy, + }, nil +} + +type Collections struct { + collections []*models.Collection + total int64 + channels int + healthy int +} + +func (rs *Collections) PrintAs(format framework.Format) string { + switch format { + case framework.FormatDefault, framework.FormatPlain: + sb := &strings.Builder{} + for _, coll := range rs.collections { + printCollection(sb, coll) + } + fmt.Fprintln(sb, "================================================================================") + fmt.Printf("--- Total collections: %d\t Matched collections: %d\n", rs.total, len(rs.collections)) + fmt.Printf("--- Total channel: %d\t Healthy collections: %d\n", rs.channels, rs.healthy) + return sb.String() + } + return "" +} + +func (rs *Collections) Entities() any { + return rs.collections } -func printCollection(collection *models.Collection) { +func printCollection(sb *strings.Builder, collection *models.Collection) { fmt.Println("================================================================================") fmt.Printf("DBID: %d\n", collection.DBID) fmt.Printf("Collection ID: %d\tCollection Name: %s\n", collection.ID, collection.Schema.Name) diff --git a/states/etcd/show/collection_history.go b/states/etcd/show/collection_history.go index b174925..992b771 100644 --- a/states/etcd/show/collection_history.go +++ b/states/etcd/show/collection_history.go @@ -4,8 +4,10 @@ import ( "context" "errors" "fmt" + "strings" "github.com/milvus-io/birdwatcher/framework" + "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/states/etcd/common" etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" "github.com/milvus-io/birdwatcher/utils" @@ -13,15 +15,14 @@ import ( type CollectionHistoryParam struct { framework.ParamBase `use:"show collection-history" desc:"display collection change history"` - CollectionID int64 `name:"id" default:"0" desc:"collection id to display"` + CollectionID int64 `name:"id" default:"0" desc:"collection id to display" form:"id"` } // CollectionHistoryCommand returns sub command for showCmd. // show collection-history [options...] -func (c *ComponentShow) CollectionHistoryCommand(ctx context.Context, p *CollectionHistoryParam) { +func (c *ComponentShow) CollectionHistoryCommand(ctx context.Context, p *CollectionHistoryParam) (*CollectionHistory, error) { if p.CollectionID == 0 { - fmt.Println("collection id not provided") - return + return nil, errors.New("collection id not provided") } // fetch current for now @@ -29,30 +30,51 @@ func (c *ComponentShow) CollectionHistoryCommand(ctx context.Context, p *Collect if err != nil { switch { case errors.Is(err, common.ErrCollectionDropped): - fmt.Printf("[Current] collection id %d already marked with Tombstone\n", p.CollectionID) + return nil, fmt.Errorf("[Current] collection id %d already marked with Tombstone", p.CollectionID) case errors.Is(err, common.ErrCollectionNotFound): - fmt.Printf("[Current] collection id %d not found\n", p.CollectionID) - return + return nil, fmt.Errorf("[Current] collection id %d not found", p.CollectionID) default: - fmt.Println("failed to get current collection state:", err.Error()) - return + return nil, err } } - printCollection(collection) + + result := &CollectionHistory{ + Collection: collection, + } // fetch history items, err := common.ListCollectionHistory(ctx, c.client, c.basePath, etcdversion.GetVersion(), p.CollectionID) if err != nil { - fmt.Println("failed to list history", err.Error()) - return + return nil, err } - for _, item := range items { - t, _ := utils.ParseTS(item.Ts) - fmt.Println("Snapshot at", t.Format("2006-01-02 15:04:05")) - if item.Dropped { - fmt.Println("Collection Dropped") - continue + result.HistoryItems = items + return result, nil +} + +type CollectionHistory struct { + Collection *models.Collection + HistoryItems []*models.CollectionHistory +} + +func (rs *CollectionHistory) PrintAs(format framework.Format) string { + switch format { + case framework.FormatDefault, framework.FormatPlain: + sb := &strings.Builder{} + printCollection(sb, rs.Collection) + for _, item := range rs.HistoryItems { + t, _ := utils.ParseTS(item.Ts) + fmt.Fprintln(sb, "Snapshot at", t.Format("2006-01-02 15:04:05")) + if item.Dropped { + fmt.Fprintln(sb, "Collection Dropped") + continue + } + printCollection(sb, &item.Collection) } - printCollection(&item.Collection) + default: } + return "" +} + +func (rs *CollectionHistory) Entities() any { + return rs } diff --git a/states/etcd/show/collection_loaded.go b/states/etcd/show/collection_loaded.go index 68c6f50..fab614d 100644 --- a/states/etcd/show/collection_loaded.go +++ b/states/etcd/show/collection_loaded.go @@ -3,7 +3,9 @@ package show import ( "context" "fmt" + "strings" + "github.com/cockroachdb/errors" "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/states/etcd/common" @@ -14,36 +16,50 @@ const ( ReplicaMetaPrefix = "queryCoord-ReplicaMeta" ) -func printCollectionLoaded(info *models.CollectionLoaded) { - fmt.Printf("Version: [%s]\tCollectionID: %d\n", info.Version, info.CollectionID) - fmt.Printf("ReplicaNumber: %d", info.ReplicaNumber) - switch info.Version { - case models.LTEVersion2_1: - fmt.Printf("\tInMemoryPercent: %d\n", info.InMemoryPercentage) - case models.GTEVersion2_2: - fmt.Printf("\tLoadStatus: %s\n", info.Status.String()) - } -} - type CollectionLoadedParam struct { framework.ParamBase `use:"show collection-loaded" desc:"display information of loaded collection from querycoord" alias:"collection-load"` //CollectionID int64 `name:""` } // CollectionLoadedCommand return show collection-loaded command. -func (c *ComponentShow) CollectionLoadedCommand(ctx context.Context, p *CollectionLoadedParam) { +func (c *ComponentShow) CollectionLoadedCommand(ctx context.Context, p *CollectionLoadedParam) (*CollectionsLoaded, error) { var total int infos, err := common.ListCollectionLoadedInfo(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(_ any) bool { total++ return true }) if err != nil { - fmt.Println("failed to list collection load info:", err.Error()) - return + return nil, errors.Wrap(err, "failed to list collection load info") + } + + return framework.NewListResult[CollectionsLoaded](infos), nil +} + +type CollectionsLoaded struct { + framework.ListResultSet[*models.CollectionLoaded] +} + +func (rs *CollectionsLoaded) PrintAs(format framework.Format) string { + switch format { + case framework.FormatDefault, framework.FormatPlain: + sb := &strings.Builder{} + for _, info := range rs.Data { + rs.printCollectionLoaded(sb, info) + } + fmt.Fprintf(sb, "--- Collections Loaded: %d\n", len(rs.Data)) + return sb.String() + default: } + return "" +} - for _, info := range infos { - printCollectionLoaded(info) +func (rs *CollectionsLoaded) printCollectionLoaded(sb *strings.Builder, info *models.CollectionLoaded) { + fmt.Fprintf(sb, "Version: [%s]\tCollectionID: %d\n", info.Version, info.CollectionID) + fmt.Fprintf(sb, "ReplicaNumber: %d", info.ReplicaNumber) + switch info.Version { + case models.LTEVersion2_1: + fmt.Fprintf(sb, "\tInMemoryPercent: %d\n", info.InMemoryPercentage) + case models.GTEVersion2_2: + fmt.Fprintf(sb, "\tLoadStatus: %s\n", info.Status.String()) } - fmt.Printf("--- Collections Loaded: %d\n", len(infos)) } diff --git a/states/etcd/show/database.go b/states/etcd/show/database.go index 014916d..49bbf86 100644 --- a/states/etcd/show/database.go +++ b/states/etcd/show/database.go @@ -3,7 +3,9 @@ package show import ( "context" "fmt" + "strings" + "github.com/cockroachdb/errors" "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/states/etcd/common" @@ -15,22 +17,36 @@ type DatabaseParam struct { } // DatabaseCommand returns show database comand. -func (c *ComponentShow) DatabaseCommand(ctx context.Context, p *DatabaseParam) { +func (c *ComponentShow) DatabaseCommand(ctx context.Context, p *DatabaseParam) (*Databases, error) { dbs, err := common.ListDatabase(ctx, c.client, c.basePath) if err != nil { fmt.Println("failed to list database info", err.Error()) - return + return nil, errors.Wrap(err, "failed to list database info") } - for _, db := range dbs { - printDatabaseInfo(db) - } + return framework.NewListResult[Databases](dbs), nil +} - fmt.Printf("--- Total Database(s): %d\n", len(dbs)) +type Databases struct { + framework.ListResultSet[*models.Database] +} + +func (rs *Databases) PrintAs(format framework.Format) string { + switch format { + case framework.FormatDefault, framework.FormatPlain: + sb := &strings.Builder{} + for _, database := range rs.Data { + rs.printDatabaseInfo(sb, database) + } + fmt.Fprintf(sb, "--- Total Database(s): %d\n", len(rs.Data)) + return sb.String() + default: + } + return "" } -func printDatabaseInfo(db *models.Database) { - fmt.Println("=============================") - fmt.Printf("ID: %d\tName: %s\n", db.ID, db.Name) - fmt.Printf("TenantID: %s\t State: %s\n", db.TenantID, db.State.String()) +func (rs *Databases) printDatabaseInfo(sb *strings.Builder, db *models.Database) { + fmt.Fprintln(sb, "=============================") + fmt.Fprintf(sb, "ID: %d\tName: %s\n", db.ID, db.Name) + fmt.Fprintf(sb, "TenantID: %s\t State: %s\n", db.TenantID, db.State.String()) } diff --git a/states/etcd/show/partition.go b/states/etcd/show/partition.go index 4e1e5a4..645bf15 100644 --- a/states/etcd/show/partition.go +++ b/states/etcd/show/partition.go @@ -3,8 +3,11 @@ package show import ( "context" "fmt" + "strings" + "github.com/cockroachdb/errors" "github.com/milvus-io/birdwatcher/framework" + "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/states/etcd/common" ) @@ -14,22 +17,37 @@ type PartitionParam struct { } // PartitionCommand returns command to list partition info for provided collection. -func (c *ComponentShow) PartitionCommand(ctx context.Context, p *PartitionParam) { +func (c *ComponentShow) PartitionCommand(ctx context.Context, p *PartitionParam) (*Partitions, error) { if p.CollectionID == 0 { - fmt.Println("please provided collection id") - return + return nil, errors.New("collection id not provided") } partitions, err := common.ListCollectionPartitions(ctx, c.client, c.basePath, p.CollectionID) if err != nil { - fmt.Println("failed to list partition info", err.Error()) + return nil, errors.Wrap(err, "failed to list partition info") } if len(partitions) == 0 { - fmt.Printf("no partition found for collection %d\n", p.CollectionID) + return nil, fmt.Errorf("no partition found for collection %d", p.CollectionID) } - for _, partition := range partitions { - fmt.Printf("Parition ID: %d\tName: %s\tState: %s\n", partition.ID, partition.Name, partition.State.String()) + return framework.NewListResult[Partitions](partitions), nil +} + +type Partitions struct { + framework.ListResultSet[*models.Partition] +} + +func (rs *Partitions) PrintAs(format framework.Format) string { + switch format { + case framework.FormatDefault, framework.FormatPlain: + sb := &strings.Builder{} + for _, partition := range rs.Data { + fmt.Fprintf(sb, "Parition ID: %d\tName: %s\tState: %s\n", partition.ID, partition.Name, partition.State.String()) + } + fmt.Fprintf(sb, "--- Total Database(s): %d\n", len(rs.Data)) + return sb.String() + default: } + return "" } diff --git a/states/etcd/show/replica.go b/states/etcd/show/replica.go index 4463c8d..b89b2bc 100644 --- a/states/etcd/show/replica.go +++ b/states/etcd/show/replica.go @@ -3,7 +3,9 @@ package show import ( "context" "fmt" + "strings" + "github.com/cockroachdb/errors" "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/states/etcd/common" @@ -15,23 +17,37 @@ type ReplicaParam struct { } // ReplicaCommand returns command for show querycoord replicas. -func (c *ComponentShow) ReplicaCommand(ctx context.Context, p *ReplicaParam) { +func (c *ComponentShow) ReplicaCommand(ctx context.Context, p *ReplicaParam) (*Replicas, error) { replicas, err := common.ListReplica(ctx, c.client, c.basePath, p.CollectionID) if err != nil { - fmt.Println("failed to list replicas", err.Error()) - return + return nil, errors.Wrap(err, "failed to list replica info") } - for _, replica := range replicas { - printReplica(replica) + return framework.NewListResult[Replicas](replicas), nil +} + +type Replicas struct { + framework.ListResultSet[*models.Replica] +} + +func (rs *Replicas) PrintAs(format framework.Format) string { + switch format { + case framework.FormatDefault, framework.FormatPlain: + sb := &strings.Builder{} + for _, replica := range rs.Data { + rs.printReplica(sb, replica) + } + return sb.String() + default: } + return "" } -func printReplica(replica *models.Replica) { - fmt.Println("================================================================================") - fmt.Printf("ReplicaID: %d CollectionID: %d version:%s\n", replica.ID, replica.CollectionID, replica.Version) - fmt.Printf("All Nodes:%v\n", replica.NodeIDs) +func (rs *Replicas) printReplica(sb *strings.Builder, replica *models.Replica) { + fmt.Fprintln(sb, "================================================================================") + fmt.Fprintf(sb, "ReplicaID: %d CollectionID: %d version:%s\n", replica.ID, replica.CollectionID, replica.Version) + fmt.Fprintf(sb, "All Nodes:%v\n", replica.NodeIDs) for _, shardReplica := range replica.ShardReplicas { - fmt.Printf("-- Shard Replica: Leader ID:%d(%s) Nodes:%v\n", shardReplica.LeaderID, shardReplica.LeaderAddr, shardReplica.NodeIDs) + fmt.Fprintf(sb, "-- Shard Replica: Leader ID:%d(%s) Nodes:%v\n", shardReplica.LeaderID, shardReplica.LeaderAddr, shardReplica.NodeIDs) } } diff --git a/states/etcd/show/session.go b/states/etcd/show/session.go index 6ea0303..c6d1b27 100644 --- a/states/etcd/show/session.go +++ b/states/etcd/show/session.go @@ -3,8 +3,11 @@ package show import ( "context" "fmt" + "strings" + "github.com/cockroachdb/errors" "github.com/milvus-io/birdwatcher/framework" + "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/states/etcd/common" ) @@ -14,33 +17,28 @@ type SessionParam struct { // SessionCommand returns show session command. // usage: show session -func (c *ComponentShow) SessionCommand(ctx context.Context, p *SessionParam) error { +func (c *ComponentShow) SessionCommand(ctx context.Context, p *SessionParam) (*Sessions, error) { sessions, err := common.ListSessions(c.client, c.basePath) if err != nil { - return err + return nil, errors.Wrap(err, "failed to list sessions") } - for _, session := range sessions { - fmt.Println(session.String()) - } - return nil + + return framework.NewListResult[Sessions](sessions), nil } -/* -func SessionCommand(cli clientv3.KV, basePath string) *cobra.Command { - cmd := &cobra.Command{ - Use: "session", - Short: "list online milvus components", - Aliases: []string{"sessions"}, - RunE: func(cmd *cobra.Command, args []string) error { - sessions, err := common.ListSessions(cli, basePath) - if err != nil { - return err - } - for _, session := range sessions { - fmt.Println(session.String()) - } - return nil - }, +type Sessions struct { + framework.ListResultSet[*models.Session] +} + +func (rs *Sessions) PrintAs(format framework.Format) string { + switch format { + case framework.FormatDefault, framework.FormatPlain: + sb := &strings.Builder{} + for _, session := range rs.Data { + fmt.Fprintln(sb, session.String()) + } + return sb.String() + default: } - return cmd -}*/ + return "" +} diff --git a/states/exit.go b/states/exit.go index 6efccf6..106d182 100644 --- a/states/exit.go +++ b/states/exit.go @@ -49,7 +49,7 @@ type DisconnectParam struct { framework.ParamBase `use:"disconnect" desc:"disconnect from current etcd instance"` } -func (s *instanceState) DisconnectCommand(ctx context.Context, _ *DisconnectParam) { +func (s *InstanceState) DisconnectCommand(ctx context.Context, _ *DisconnectParam) { s.SetNext(Start(s.config)) s.Close() } diff --git a/states/instance.go b/states/instance.go index 1fdf289..648ec0c 100644 --- a/states/instance.go +++ b/states/instance.go @@ -14,8 +14,8 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" ) -// instanceState provides command for single milvus instance. -type instanceState struct { +// InstanceState provides command for single milvus instance. +type InstanceState struct { cmdState *show.ComponentShow *remove.ComponentRemove @@ -28,7 +28,7 @@ type instanceState struct { basePath string } -func (s *instanceState) Close() { +func (s *InstanceState) Close() { if s.auditFile != nil { s.auditFile.Close() } @@ -36,7 +36,7 @@ func (s *instanceState) Close() { // SetupCommands setups the command. // also called after each command run to reset flag values. -func (s *instanceState) SetupCommands() { +func (s *InstanceState) SetupCommands() { cmd := &cobra.Command{} cli := s.client @@ -115,7 +115,7 @@ func (s *instanceState) SetupCommands() { } // getDryModeCmd enter dry-mode -func getDryModeCmd(cli clientv3.KV, state *instanceState, etcdState State) *cobra.Command { +func getDryModeCmd(cli clientv3.KV, state *InstanceState, etcdState State) *cobra.Command { cmd := &cobra.Command{ Use: "dry-mode", Short: "enter dry mode to select instance", @@ -139,7 +139,7 @@ func getInstanceState(cli clientv3.KV, instanceName, metaPath string, etcdState basePath := path.Join(instanceName, metaPath) // use audit kv - state := &instanceState{ + state := &InstanceState{ cmdState: cmdState{ label: fmt.Sprintf("Milvus(%s)", instanceName), }, diff --git a/states/states.go b/states/states.go index 7fc96fe..b32a487 100644 --- a/states/states.go +++ b/states/states.go @@ -207,7 +207,7 @@ func parseMethod(state State, mt reflect.Method) (*cobra.Command, []string, bool //fmt.Println(mt.Name) cp := reflect.New(paramType.Elem()).Interface().(framework.CmdParam) - fUse, fDesc := getCmdFromFlag(cp) + fUse, fDesc := GetCmdFromFlag(cp) if len(use) == 0 { use = fUse } @@ -218,7 +218,7 @@ func parseMethod(state State, mt reflect.Method) (*cobra.Command, []string, bool fnName := mt.Name use = strings.ToLower(fnName[:len(fnName)-8]) } - uses := parseUseSegments(use) + uses := ParseUseSegments(use) lastKw := uses[len(uses)-1] cmd := &cobra.Command{ @@ -242,19 +242,35 @@ func parseMethod(state State, mt reflect.Method) (*cobra.Command, []string, bool reflect.ValueOf(ctx), reflect.ValueOf(cp), }) - if len(results) > 0 { - if results[0].Type().Implements(reflect.TypeOf((*error)(nil)).Elem()) { - if !results[0].IsNil() { - err := results[0].Interface().(error) - fmt.Println(err.Error()) + // reverse order, check error first + for i := 0; i < len(results); i++ { + result := results[len(results)-i-1] + switch { + case result.Type().Implements(reflect.TypeOf((*error)(nil)).Elem()): + // error nil, skip + if result.IsNil() { + continue } + err := result.Interface().(error) + fmt.Println(err.Error()) + return + case result.Type().Implements(reflect.TypeOf((*framework.ResultSet)(nil)).Elem()): + if result.IsNil() { + continue + } + rs := result.Interface().(framework.ResultSet) + if preset, ok := rs.(*framework.PresetResultSet); ok { + fmt.Println(preset.String()) + return + } + fmt.Println(rs.PrintAs(framework.FormatDefault)) } } } return cmd, uses, true } -func getCmdFromFlag(p framework.CmdParam) (string, string) { +func GetCmdFromFlag(p framework.CmdParam) (string, string) { v := reflect.ValueOf(p) if v.Kind() != reflect.Pointer { fmt.Println("param is not pointer") @@ -279,7 +295,7 @@ func getCmdFromFlag(p framework.CmdParam) (string, string) { return tag.Get("use"), tag.Get("desc") } -func parseUseSegments(use string) []string { +func ParseUseSegments(use string) []string { parts := strings.Split(use, " ") last := "" result := make([]string, 0, len(parts))