From ed10c2e8bed9cf6a616022d6e3ee43e1c8a50d30 Mon Sep 17 00:00:00 2001 From: Congqi Xia Date: Mon, 26 Aug 2024 10:39:11 +0800 Subject: [PATCH] enhance: Add schema flags for channel watch command Add `withoutSchema` & `printSchema` flag for `show channel-watch` command Signed-off-by: Congqi Xia --- states/etcd/show/channel_watched.go | 66 ++++++++++++++++++++++++----- 1 file changed, 55 insertions(+), 11 deletions(-) diff --git a/states/etcd/show/channel_watched.go b/states/etcd/show/channel_watched.go index a98965c..c9724be 100644 --- a/states/etcd/show/channel_watched.go +++ b/states/etcd/show/channel_watched.go @@ -2,6 +2,7 @@ package show import ( "context" + "encoding/json" "fmt" "sort" "strings" @@ -18,38 +19,51 @@ import ( type ChannelWatchedParam struct { framework.ParamBase `use:"show channel-watch" desc:"display channel watching info from data coord meta store" alias:"channel-watched"` - CollectionID int64 `name:"collection" default:"0" desc:"collection id to filter with"` + Format string `name:"format" default:"" desc:"output format"` + CollectionID int64 `name:"collection" default:"0" desc:"collection id to filter with"` + WithoutSchema bool `name:"withoutSchema" default:"false" desc:"filter channel watch info with not schema"` + PrintSchema bool `name:"printSchema" default:"false" desc:"print schema info stored in watch info"` } // ChannelWatchedCommand return show channel-watched commands. -func (c *ComponentShow) ChannelWatchedCommand(ctx context.Context, p *ChannelWatchedParam) (*ChannelsWatched, error) { +func (c *ComponentShow) ChannelWatchedCommand(ctx context.Context, p *ChannelWatchedParam) (*framework.PresetResultSet, error) { infos, err := common.ListChannelWatch(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(channel *models.ChannelWatch) bool { - return p.CollectionID == 0 || channel.Vchan.CollectionID == p.CollectionID + return (p.CollectionID == 0 || channel.Vchan.CollectionID == p.CollectionID) && (!p.WithoutSchema || channel.Schema == nil) }) if err != nil { return nil, errors.Wrap(err, "failed to list channel watch info") } - return framework.NewListResult[ChannelsWatched](infos), nil + rs := framework.NewListResult[ChannelsWatched](infos) + rs.printSchema = p.PrintSchema + + return framework.NewPresetResultSet(rs, framework.NameFormat(p.Format)), nil } type ChannelsWatched struct { framework.ListResultSet[*models.ChannelWatch] + printSchema bool } func (rs *ChannelsWatched) PrintAs(format framework.Format) string { - switch format { - case framework.FormatDefault, framework.FormatPlain: - sb := &strings.Builder{} - for _, info := range rs.Data { + sb := &strings.Builder{} + for _, info := range rs.Data { + switch format { + case framework.FormatDefault, framework.FormatPlain: rs.printChannelWatchInfo(sb, info) + case framework.FormatJSON: + rs.printChannelWatchInfoJSON(sb, info) + default: } + } + switch format { + case framework.FormatDefault, framework.FormatPlain: fmt.Fprintf(sb, "--- Total Channels: %d\n", len(rs.Data)) - return sb.String() default: } - return "" + + return sb.String() } func (rs *ChannelsWatched) printChannelWatchInfo(sb *strings.Builder, info *models.ChannelWatch) { @@ -71,11 +85,16 @@ func (rs *ChannelsWatched) printChannelWatchInfo(sb *strings.Builder, info *mode fmt.Fprintf(sb, "Flushed segments: %v\n", info.Vchan.FlushedSegmentIds) fmt.Fprintf(sb, "Dropped segments: %v\n", info.Vchan.DroppedSegmentIds) - fmt.Fprintf(sb, "Fields:\n") + // skip schema print + if !rs.printSchema { + return + } + if info.Schema == nil { fmt.Fprintf(sb, "### Collection schema is empty!!! ###\n") return } + fmt.Fprintf(sb, "Fields:\n") fields := info.Schema.Fields sort.Slice(fields, func(i, j int) bool { return fields[i].FieldID < fields[j].FieldID @@ -106,3 +125,28 @@ func (rs *ChannelsWatched) printChannelWatchInfo(sb *strings.Builder, info *mode fmt.Fprintf(sb, "Enable Dynamic Schema: %t\n", info.Schema.EnableDynamicSchema) } + +func (rs *ChannelsWatched) printChannelWatchInfoJSON(sb *strings.Builder, info *models.ChannelWatch) { + m := make(map[string]any) + m["key"] = info.Key() + m["channel_name"] = info.Vchan.ChannelName + + pos := info.Vchan.SeekPosition + if pos != nil { + startTime, _ := utils.ParseTS(pos.Timestamp) + m["position_id"] = pos.MsgID + m["position_time"] = startTime + } + + if rs.printSchema { + m["schema"] = info.Schema + } + + bs, err := json.Marshal(m) + if err != nil { + fmt.Println("failed to marshal watch info json:", err.Error()) + return + } + + fmt.Fprintln(sb, string(bs)) +}