Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Add schema flags for channel watch command #306

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 55 additions & 11 deletions states/etcd/show/channel_watched.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package show

import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
Expand All @@ -18,38 +19,51 @@ import (

type ChannelWatchedParam struct {
framework.ParamBase `use:"show channel-watch" desc:"display channel watching info from data coord meta store" alias:"channel-watched"`
CollectionID int64 `name:"collection" default:"0" desc:"collection id to filter with"`
Format string `name:"format" default:"" desc:"output format"`
CollectionID int64 `name:"collection" default:"0" desc:"collection id to filter with"`
WithoutSchema bool `name:"withoutSchema" default:"false" desc:"filter channel watch info with not schema"`
PrintSchema bool `name:"printSchema" default:"false" desc:"print schema info stored in watch info"`
}

// ChannelWatchedCommand return show channel-watched commands.
func (c *ComponentShow) ChannelWatchedCommand(ctx context.Context, p *ChannelWatchedParam) (*ChannelsWatched, error) {
func (c *ComponentShow) ChannelWatchedCommand(ctx context.Context, p *ChannelWatchedParam) (*framework.PresetResultSet, error) {
infos, err := common.ListChannelWatch(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(channel *models.ChannelWatch) bool {
return p.CollectionID == 0 || channel.Vchan.CollectionID == p.CollectionID
return (p.CollectionID == 0 || channel.Vchan.CollectionID == p.CollectionID) && (!p.WithoutSchema || channel.Schema == nil)
})
if err != nil {
return nil, errors.Wrap(err, "failed to list channel watch info")
}

return framework.NewListResult[ChannelsWatched](infos), nil
rs := framework.NewListResult[ChannelsWatched](infos)
rs.printSchema = p.PrintSchema

return framework.NewPresetResultSet(rs, framework.NameFormat(p.Format)), nil
}

type ChannelsWatched struct {
framework.ListResultSet[*models.ChannelWatch]
printSchema bool
}

func (rs *ChannelsWatched) PrintAs(format framework.Format) string {
switch format {
case framework.FormatDefault, framework.FormatPlain:
sb := &strings.Builder{}
for _, info := range rs.Data {
sb := &strings.Builder{}
for _, info := range rs.Data {
switch format {
case framework.FormatDefault, framework.FormatPlain:
rs.printChannelWatchInfo(sb, info)
case framework.FormatJSON:
rs.printChannelWatchInfoJSON(sb, info)
default:
}
}

switch format {
case framework.FormatDefault, framework.FormatPlain:
fmt.Fprintf(sb, "--- Total Channels: %d\n", len(rs.Data))
return sb.String()
default:
}
return ""

return sb.String()
}

func (rs *ChannelsWatched) printChannelWatchInfo(sb *strings.Builder, info *models.ChannelWatch) {
Expand All @@ -71,11 +85,16 @@ func (rs *ChannelsWatched) printChannelWatchInfo(sb *strings.Builder, info *mode
fmt.Fprintf(sb, "Flushed segments: %v\n", info.Vchan.FlushedSegmentIds)
fmt.Fprintf(sb, "Dropped segments: %v\n", info.Vchan.DroppedSegmentIds)

fmt.Fprintf(sb, "Fields:\n")
// skip schema print
if !rs.printSchema {
return
}

if info.Schema == nil {
fmt.Fprintf(sb, "### Collection schema is empty!!! ###\n")
return
}
fmt.Fprintf(sb, "Fields:\n")
fields := info.Schema.Fields
sort.Slice(fields, func(i, j int) bool {
return fields[i].FieldID < fields[j].FieldID
Expand Down Expand Up @@ -106,3 +125,28 @@ func (rs *ChannelsWatched) printChannelWatchInfo(sb *strings.Builder, info *mode

fmt.Fprintf(sb, "Enable Dynamic Schema: %t\n", info.Schema.EnableDynamicSchema)
}

func (rs *ChannelsWatched) printChannelWatchInfoJSON(sb *strings.Builder, info *models.ChannelWatch) {
m := make(map[string]any)
m["key"] = info.Key()
m["channel_name"] = info.Vchan.ChannelName

pos := info.Vchan.SeekPosition
if pos != nil {
startTime, _ := utils.ParseTS(pos.Timestamp)
m["position_id"] = pos.MsgID
m["position_time"] = startTime
}

if rs.printSchema {
m["schema"] = info.Schema
}

bs, err := json.Marshal(m)
if err != nil {
fmt.Println("failed to marshal watch info json:", err.Error())
return
}

fmt.Fprintln(sb, string(bs))
}
Loading