diff --git a/states/download_pk.go b/states/download_pk.go index 976ecaf..6b62e4c 100644 --- a/states/download_pk.go +++ b/states/download_pk.go @@ -12,6 +12,7 @@ import ( "github.com/manifoldco/promptui" "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" "github.com/milvus-io/birdwatcher/states/etcd/common" + etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/spf13/cobra" @@ -28,7 +29,9 @@ func getDownloadPKCmd(cli clientv3.KV, basePath string) *cobra.Command { return err } - coll, err := common.GetCollectionByID(cli, basePath, collectionID) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + coll, err := common.GetCollectionByIDVersion(ctx, cli, basePath, etcdversion.GetVersion(), collectionID) if err != nil { fmt.Println("Collection not found for id", collectionID) return nil diff --git a/states/etcd/common/channel.go b/states/etcd/common/channel.go index 241a824..1fba247 100644 --- a/states/etcd/common/channel.go +++ b/states/etcd/common/channel.go @@ -58,9 +58,7 @@ func ListChannelWatch(ctx context.Context, cli clientv3.KV, basePath string, ver return nil, err } result = lo.Map(infos, func(info datapbv2.ChannelWatchInfo, idx int) *models.ChannelWatch { - fmt.Println(info.String()) return models.GetChannelWatchInfo[*datapbv2.ChannelWatchInfo, datapbv2.ChannelWatchState, *datapbv2.VchannelInfo, *internalpbv2.MsgPosition](&info, paths[idx]) - }) default: return nil, errors.New("version not supported") diff --git a/states/etcd/common/collection.go b/states/etcd/common/collection.go index bc4c698..340ae86 100644 --- a/states/etcd/common/collection.go +++ b/states/etcd/common/collection.go @@ -112,39 +112,6 @@ func ListCollectionsVersion(ctx context.Context, cli clientv3.KV, basePath strin } } -// GetCollectionByID returns collection info from etcd with provided id. -func GetCollectionByID(cli clientv3.KV, basePath string, collID int64) (*etcdpb.CollectionInfo, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() - resp, err := cli.Get(ctx, path.Join(basePath, CollectionMetaPrefix, strconv.FormatInt(collID, 10))) - - if err != nil { - return nil, err - } - - if len(resp.Kvs) != 1 { - return nil, errors.New("invalid collection id") - } - - if bytes.Equal(resp.Kvs[0].Value, CollectionTombstone) { - return nil, fmt.Errorf("%w, collection id: %d", ErrCollectionDropped, collID) - } - - coll := &etcdpb.CollectionInfo{} - - err = proto.Unmarshal(resp.Kvs[0].Value, coll) - if err != nil { - return nil, err - } - - err = FillFieldSchemaIfEmpty(cli, basePath, coll) - if err != nil { - return nil, err - } - - return coll, nil -} - // GetCollectionByIDVersion retruns collection info from etcd with provided version & id. func GetCollectionByIDVersion(ctx context.Context, cli clientv3.KV, basePath string, version string, collID int64) (*models.Collection, error) { diff --git a/states/etcd/repair/channel.go b/states/etcd/repair/channel.go index f1835b2..c345880 100644 --- a/states/etcd/repair/channel.go +++ b/states/etcd/repair/channel.go @@ -8,6 +8,7 @@ import ( "github.com/milvus-io/birdwatcher/proto/v2.0/commonpb" "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" "github.com/milvus-io/birdwatcher/states/etcd/common" + etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" @@ -36,15 +37,17 @@ func ChannelCommand(cli clientv3.KV, basePath string) *cobra.Command { return } - coll, err := common.GetCollectionByID(cli, basePath, collID) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + coll, err := common.GetCollectionByIDVersion(ctx, cli, basePath, etcdversion.GetVersion(), collID) if err != nil { fmt.Println("collection not found") return } chans := make(map[string]struct{}) - for _, vchan := range coll.GetVirtualChannelNames() { - chans[vchan] = struct{}{} + for _, c := range coll.Channels { + chans[c.VirtualName] = struct{}{} } infos, _, err := common.ListChannelWatchV1(cli, basePath) diff --git a/states/etcd/repair/checkpoint.go b/states/etcd/repair/checkpoint.go index e7713ba..585e6d1 100644 --- a/states/etcd/repair/checkpoint.go +++ b/states/etcd/repair/checkpoint.go @@ -10,12 +10,13 @@ import ( "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" + "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/mq" "github.com/milvus-io/birdwatcher/proto/v2.0/commonpb" "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" - "github.com/milvus-io/birdwatcher/proto/v2.0/etcdpb" "github.com/milvus-io/birdwatcher/proto/v2.0/internalpb" "github.com/milvus-io/birdwatcher/states/etcd/common" + etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" "github.com/milvus-io/birdwatcher/utils" ) @@ -46,7 +47,11 @@ func CheckpointCommand(cli clientv3.KV, basePath string) *cobra.Command { return } - coll, err := common.GetCollectionByID(cli, basePath, collID) + //coll, err := common.GetCollectionByID(cli, basePath, collID) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + coll, err := common.GetCollectionByIDVersion(ctx, cli, basePath, etcdversion.GetVersion(), collID) + if err != nil { fmt.Println("failed to get collection", err.Error()) return @@ -85,20 +90,20 @@ func CheckpointCommand(cli clientv3.KV, basePath string) *cobra.Command { return cmd } -func setCheckPointWithLatestMsgID(cli clientv3.KV, basePath string, coll *etcdpb.CollectionInfo, mqType, address, vchannel string) { - for _, ch := range coll.GetVirtualChannelNames() { - if ch == vchannel { - pChannel := ToPhysicalChannel(ch) +func setCheckPointWithLatestMsgID(cli clientv3.KV, basePath string, coll *models.Collection, mqType, address, vchannel string) { + for _, ch := range coll.Channels { + if ch.VirtualName == vchannel { + pChannel := ch.PhysicalName cp, err := getLatestFromPChannel(mqType, address, vchannel) if err != nil { - fmt.Printf("vchannel:%s -> pchannel:%s, get latest msgID faile, err:%s\n", ch, pChannel, err.Error()) + fmt.Printf("vchannel:%s -> pchannel:%s, get latest msgID faile, err:%s\n", ch.VirtualName, pChannel, err.Error()) return } - err = saveChannelCheckpoint(cli, basePath, ch, cp) + err = saveChannelCheckpoint(cli, basePath, ch.VirtualName, cp) t, _ := utils.ParseTS(cp.GetTimestamp()) if err != nil { - fmt.Printf("failed to set latest msgID(ts:%v) for vchannel:%s", t, ch) + fmt.Printf("failed to set latest msgID(ts:%v) for vchannel:%s", t, ch.VirtualName) return } fmt.Printf("vchannel:%s set to latest msgID(ts:%v) finshed\n", vchannel, t) @@ -108,7 +113,7 @@ func setCheckPointWithLatestMsgID(cli clientv3.KV, basePath string, coll *etcdpb fmt.Printf("vchannel:%s doesn't exists in collection: %d\n", vchannel, coll.ID) } -func setCheckPointWithLatestCheckPoint(cli clientv3.KV, basePath string, coll *etcdpb.CollectionInfo, vchannel string) { +func setCheckPointWithLatestCheckPoint(cli clientv3.KV, basePath string, coll *models.Collection, vchannel string) { pChannelName2LatestCP, err := getLatestCheckpointFromPChannel(cli, basePath) if err != nil { fmt.Println("failed to get latest cp of all pchannel", err.Error()) @@ -121,19 +126,19 @@ func setCheckPointWithLatestCheckPoint(cli clientv3.KV, basePath string, coll *e fmt.Printf("pchannel: %s, the lastest checkpoint ts: %v\n", k, t) } - for _, ch := range coll.GetVirtualChannelNames() { - if ch == vchannel { - pChannel := ToPhysicalChannel(ch) + for _, ch := range coll.Channels { + if ch.VirtualName == vchannel { + pChannel := ch.PhysicalName cp, ok := pChannelName2LatestCP[pChannel] if !ok { - fmt.Printf("vchannel:%s -> pchannel:%s, the pchannel doesn't exists\n", ch, pChannel) + fmt.Printf("vchannel:%s -> pchannel:%s, the pchannel doesn't exists\n", ch.VirtualName, pChannel) return } - err := saveChannelCheckpoint(cli, basePath, ch, cp) + err := saveChannelCheckpoint(cli, basePath, ch.VirtualName, cp) t, _ := utils.ParseTS(cp.GetTimestamp()) if err != nil { - fmt.Printf("failed to set latest checkpoint(ts:%v) for vchannel:%s", t, ch) + fmt.Printf("failed to set latest checkpoint(ts:%v) for vchannel:%s", t, ch.VirtualName) return } fmt.Printf("vchannel:%s set to latest checkpoint(ts:%v) finshed\n", vchannel, t) diff --git a/states/etcd/repair/segment.go b/states/etcd/repair/segment.go index 6d26db8..ac11028 100644 --- a/states/etcd/repair/segment.go +++ b/states/etcd/repair/segment.go @@ -6,12 +6,13 @@ import ( "path" "github.com/golang/protobuf/proto" + "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/proto/v2.0/commonpb" "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" "github.com/milvus-io/birdwatcher/proto/v2.0/etcdpb" "github.com/milvus-io/birdwatcher/proto/v2.0/indexpb" - "github.com/milvus-io/birdwatcher/proto/v2.0/schemapb" "github.com/milvus-io/birdwatcher/states/etcd/common" + etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -86,7 +87,7 @@ func SegmentCommand(cli clientv3.KV, basePath string) *cobra.Command { buildID2Info[info.IndexBuildID] = info } - collections := make(map[int64]*etcdpb.CollectionInfo) + collections := make(map[int64]*models.Collection) targetOld := make(map[int64]*datapb.SegmentInfo) target := make(map[int64]*datapb.SegmentInfo) @@ -105,7 +106,11 @@ func SegmentCommand(cli clientv3.KV, basePath string) *cobra.Command { coll, ok := collections[segment.CollectionID] if !ok { - coll, err = common.GetCollectionByID(cli, basePath, segment.CollectionID) + //coll, err = common.GetCollectionByID(cli, basePath, segment.CollectionID) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + coll, err := common.GetCollectionByIDVersion(ctx, cli, basePath, etcdversion.GetVersion(), collID) + if err != nil { fmt.Printf("failed to query collection(id=%d) info error: %s", segment.CollectionID, err.Error()) continue @@ -117,9 +122,9 @@ func SegmentCommand(cli clientv3.KV, basePath string) *cobra.Command { for _, segIdx := range segIdxs { var valid bool - for _, field := range coll.GetSchema().GetFields() { - if field.GetFieldID() == segIdx.GetFieldID() { - if field.GetDataType() == schemapb.DataType_FloatVector || field.GetDataType() == schemapb.DataType_BinaryVector { + for _, field := range coll.Schema.Fields { + if field.FieldID == segIdx.GetFieldID() { + if field.DataType == models.DataTypeFloatVector || field.DataType == models.DataTypeBinaryVector { valid = true } break diff --git a/states/etcd/show/checkpoint.go b/states/etcd/show/checkpoint.go index d5ddbc2..dc39b4b 100644 --- a/states/etcd/show/checkpoint.go +++ b/states/etcd/show/checkpoint.go @@ -9,6 +9,7 @@ import ( "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" "github.com/milvus-io/birdwatcher/proto/v2.0/internalpb" "github.com/milvus-io/birdwatcher/states/etcd/common" + etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" "github.com/milvus-io/birdwatcher/utils" "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" @@ -28,27 +29,27 @@ func CheckpointCommand(cli clientv3.KV, basePath string) *cobra.Command { return } - coll, err := common.GetCollectionByID(cli, basePath, collID) + coll, err := common.GetCollectionByIDVersion(context.Background(), cli, basePath, etcdversion.GetVersion(), collID) if err != nil { fmt.Println("failed to get collection", err.Error()) return } - for _, vchannel := range coll.GetVirtualChannelNames() { + for _, channel := range coll.Channels { var cp *internalpb.MsgPosition var segmentID int64 var err error - cp, err = getChannelCheckpoint(cli, basePath, vchannel) + cp, err = getChannelCheckpoint(cli, basePath, channel.VirtualName) if err != nil { - cp, segmentID, err = getCheckpointFromSegments(cli, basePath, collID, vchannel) + cp, segmentID, err = getCheckpointFromSegments(cli, basePath, collID, channel.VirtualName) } if cp == nil { - fmt.Printf("vchannel %s position nil\n", vchannel) + fmt.Printf("vchannel %s position nil\n", channel.VirtualName) } else { t, _ := utils.ParseTS(cp.GetTimestamp()) - fmt.Printf("vchannel %s seek to %v, cp channel: %s", vchannel, t, cp.ChannelName) + fmt.Printf("vchannel %s seek to %v, cp channel: %s", channel.VirtualName, t, cp.ChannelName) if segmentID > 0 { fmt.Printf(", for segment ID:%d\n", segmentID) } else { diff --git a/states/force_release.go b/states/force_release.go index 74b0f55..9b1b7c0 100644 --- a/states/force_release.go +++ b/states/force_release.go @@ -72,7 +72,7 @@ func getReleaseDroppedCollectionCmd(cli clientv3.KV, basePath string) *cobra.Com var missing []int64 for _, info := range collectionLoadInfos { - _, err := common.GetCollectionByID(cli, basePath, info.CollectionID) + _, err := common.GetCollectionByIDVersion(ctx, cli, basePath, etcdversion.GetVersion(), info.CollectionID) if err != nil { missing = append(missing, info.CollectionID) } diff --git a/states/inspect_primary_key.go b/states/inspect_primary_key.go index 0233f95..ed11526 100644 --- a/states/inspect_primary_key.go +++ b/states/inspect_primary_key.go @@ -1,6 +1,7 @@ package states import ( + "context" "fmt" "os" "path" @@ -9,6 +10,7 @@ import ( "github.com/milvus-io/birdwatcher/proto/v2.0/commonpb" "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" "github.com/milvus-io/birdwatcher/states/etcd/common" + etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" "github.com/milvus-io/birdwatcher/storage" "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" @@ -50,7 +52,10 @@ func getInspectPKCmd(cli clientv3.KV, basePath string) *cobra.Command { } pkID, has := cachedCollection[segment.CollectionID] if !has { - coll, err := common.GetCollectionByID(cli, basePath, segment.CollectionID) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + coll, err := common.GetCollectionByIDVersion(ctx, cli, basePath, etcdversion.GetVersion(), segment.GetCollectionID()) + if err != nil { fmt.Println("Collection not found for id", segment.CollectionID) return