Skip to content

Commit

Permalink
enhance: Add schema flags for channel watch command
Browse files Browse the repository at this point in the history
Add `withoutSchema` & `printSchema` flag for `show channel-watch`
command

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia committed Aug 26, 2024
1 parent 89193d8 commit ed10c2e
Showing 1 changed file with 55 additions and 11 deletions.
66 changes: 55 additions & 11 deletions states/etcd/show/channel_watched.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package show

import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
Expand All @@ -18,38 +19,51 @@ import (

type ChannelWatchedParam struct {
framework.ParamBase `use:"show channel-watch" desc:"display channel watching info from data coord meta store" alias:"channel-watched"`
CollectionID int64 `name:"collection" default:"0" desc:"collection id to filter with"`
Format string `name:"format" default:"" desc:"output format"`
CollectionID int64 `name:"collection" default:"0" desc:"collection id to filter with"`
WithoutSchema bool `name:"withoutSchema" default:"false" desc:"filter channel watch info with not schema"`
PrintSchema bool `name:"printSchema" default:"false" desc:"print schema info stored in watch info"`
}

// ChannelWatchedCommand return show channel-watched commands.
func (c *ComponentShow) ChannelWatchedCommand(ctx context.Context, p *ChannelWatchedParam) (*ChannelsWatched, error) {
func (c *ComponentShow) ChannelWatchedCommand(ctx context.Context, p *ChannelWatchedParam) (*framework.PresetResultSet, error) {
infos, err := common.ListChannelWatch(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(channel *models.ChannelWatch) bool {
return p.CollectionID == 0 || channel.Vchan.CollectionID == p.CollectionID
return (p.CollectionID == 0 || channel.Vchan.CollectionID == p.CollectionID) && (!p.WithoutSchema || channel.Schema == nil)
})
if err != nil {
return nil, errors.Wrap(err, "failed to list channel watch info")
}

return framework.NewListResult[ChannelsWatched](infos), nil
rs := framework.NewListResult[ChannelsWatched](infos)
rs.printSchema = p.PrintSchema

return framework.NewPresetResultSet(rs, framework.NameFormat(p.Format)), nil
}

type ChannelsWatched struct {
framework.ListResultSet[*models.ChannelWatch]
printSchema bool
}

func (rs *ChannelsWatched) PrintAs(format framework.Format) string {
switch format {
case framework.FormatDefault, framework.FormatPlain:
sb := &strings.Builder{}
for _, info := range rs.Data {
sb := &strings.Builder{}
for _, info := range rs.Data {
switch format {
case framework.FormatDefault, framework.FormatPlain:
rs.printChannelWatchInfo(sb, info)
case framework.FormatJSON:
rs.printChannelWatchInfoJSON(sb, info)
default:
}
}

switch format {
case framework.FormatDefault, framework.FormatPlain:
fmt.Fprintf(sb, "--- Total Channels: %d\n", len(rs.Data))
return sb.String()
default:
}
return ""

return sb.String()
}

func (rs *ChannelsWatched) printChannelWatchInfo(sb *strings.Builder, info *models.ChannelWatch) {
Expand All @@ -71,11 +85,16 @@ func (rs *ChannelsWatched) printChannelWatchInfo(sb *strings.Builder, info *mode
fmt.Fprintf(sb, "Flushed segments: %v\n", info.Vchan.FlushedSegmentIds)
fmt.Fprintf(sb, "Dropped segments: %v\n", info.Vchan.DroppedSegmentIds)

fmt.Fprintf(sb, "Fields:\n")
// skip schema print
if !rs.printSchema {
return
}

if info.Schema == nil {
fmt.Fprintf(sb, "### Collection schema is empty!!! ###\n")
return
}
fmt.Fprintf(sb, "Fields:\n")
fields := info.Schema.Fields
sort.Slice(fields, func(i, j int) bool {
return fields[i].FieldID < fields[j].FieldID
Expand Down Expand Up @@ -106,3 +125,28 @@ func (rs *ChannelsWatched) printChannelWatchInfo(sb *strings.Builder, info *mode

fmt.Fprintf(sb, "Enable Dynamic Schema: %t\n", info.Schema.EnableDynamicSchema)
}

func (rs *ChannelsWatched) printChannelWatchInfoJSON(sb *strings.Builder, info *models.ChannelWatch) {
m := make(map[string]any)
m["key"] = info.Key()
m["channel_name"] = info.Vchan.ChannelName

pos := info.Vchan.SeekPosition
if pos != nil {
startTime, _ := utils.ParseTS(pos.Timestamp)
m["position_id"] = pos.MsgID
m["position_time"] = startTime
}

if rs.printSchema {
m["schema"] = info.Schema
}

bs, err := json.Marshal(m)
if err != nil {
fmt.Println("failed to marshal watch info json:", err.Error())
return
}

fmt.Fprintln(sb, string(bs))
}

0 comments on commit ed10c2e

Please sign in to comment.