diff --git a/states/etcd/commands.go b/states/etcd/commands.go index 907c15f..ce9724b 100644 --- a/states/etcd/commands.go +++ b/states/etcd/commands.go @@ -91,6 +91,8 @@ func RemoveCommand(cli clientv3.KV, instanceName, basePath string) *cobra.Comman remove.SegmentCollectionDroppedCommand(cli, basePath), // remove etcd-config remove.EtcdConfigCommand(cli, instanceName), + // remove collection has been dropped + remove.CollectionCleanCommand(cli, basePath), ) return removeCmd diff --git a/states/etcd/common/collection.go b/states/etcd/common/collection.go index 6d0de95..82d9d92 100644 --- a/states/etcd/common/collection.go +++ b/states/etcd/common/collection.go @@ -22,10 +22,13 @@ import ( ) const ( + SnapshotPrefix = "snapshots" // CollectionMetaPrefix is prefix for rootcoord collection meta. CollectionMetaPrefix = `root-coord/collection` // DBCollectionMetaPrefix is prefix for rootcoord database collection meta DBCollectionMetaPrefix = `root-coord/database/collection-info` + // FieldMetaPrefix is prefix for rootcoord collection fields meta + FieldMetaPrefix = `root-coord/fields` // CollectionLoadPrefix is prefix for querycoord collection loaded in milvus v2.1.x CollectionLoadPrefix = "queryCoord-collectionMeta" // CollectionLoadPrefixV2 is prefix for querycoord collection loaded in milvus v2.2.x diff --git a/states/etcd/common/segment.go b/states/etcd/common/segment.go index 9faa005..707fbcd 100644 --- a/states/etcd/common/segment.go +++ b/states/etcd/common/segment.go @@ -17,12 +17,13 @@ import ( ) const ( - segmentMetaPrefix = "datacoord-meta/s" + SegmentMetaPrefix = "datacoord-meta/s" + SegmentStatsMetaPrefix = "datacoord-meta/statslog" ) // ListSegmentsVersion list segment info as specified version. func ListSegmentsVersion(ctx context.Context, cli clientv3.KV, basePath string, version string, filters ...func(*models.Segment) bool) ([]*models.Segment, error) { - prefix := path.Join(basePath, segmentMetaPrefix) + "/" + prefix := path.Join(basePath, SegmentMetaPrefix) + "/" switch version { case models.LTEVersion2_1: segments, keys, err := ListProtoObjects[datapb.SegmentInfo](ctx, cli, prefix) @@ -107,7 +108,7 @@ func getSegmentLazyFunc(cli clientv3.KV, basePath string, segment datapbv2.Segme func ListSegments(cli clientv3.KV, basePath string, filter func(*datapb.SegmentInfo) bool) ([]*datapb.SegmentInfo, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() - resp, err := cli.Get(ctx, path.Join(basePath, segmentMetaPrefix)+"/", clientv3.WithPrefix()) + resp, err := cli.Get(ctx, path.Join(basePath, SegmentMetaPrefix)+"/", clientv3.WithPrefix()) if err != nil { return nil, err } @@ -309,7 +310,7 @@ func RemoveSegmentByID(ctx context.Context, cli clientv3.KV, basePath string, co func UpdateSegments(ctx context.Context, cli clientv3.KV, basePath string, collectionID int64, fn func(segment *datapbv2.SegmentInfo)) error { - prefix := path.Join(basePath, fmt.Sprintf("%s/%d", segmentMetaPrefix, collectionID)) + "/" + prefix := path.Join(basePath, fmt.Sprintf("%s/%d", SegmentMetaPrefix, collectionID)) + "/" segments, keys, err := ListProtoObjects[datapbv2.SegmentInfo](ctx, cli, prefix) if err != nil { return err diff --git a/states/etcd/remove/collection_clean.go b/states/etcd/remove/collection_clean.go new file mode 100644 index 0000000..e307cb9 --- /dev/null +++ b/states/etcd/remove/collection_clean.go @@ -0,0 +1,154 @@ +package remove + +import ( + "context" + "fmt" + "path" + "strconv" + "strings" + + "github.com/samber/lo" + "github.com/spf13/cobra" + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/milvus-io/birdwatcher/models" + "github.com/milvus-io/birdwatcher/states/etcd/common" + etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" +) + +var paginationSize = 2000 + +type ExcludePrefixOptions func(string) bool + +// CollectionCleanCommand returns command to remove +func CollectionCleanCommand(cli clientv3.KV, basePath string) *cobra.Command { + cmd := &cobra.Command{ + Use: "collection-meta-leaked", + Short: "Remove leaked collection meta for collection has been dropped", + Run: func(cmd *cobra.Command, args []string) { + run, err := cmd.Flags().GetBool("run") + if err != nil { + fmt.Println(err.Error()) + return + } + + collections, err := common.ListCollectionsVersion(context.TODO(), cli, basePath, etcdversion.GetVersion()) + if err != nil { + fmt.Println(err.Error()) + return + } + + id2Collection := lo.SliceToMap(collections, func(col *models.Collection) (string, *models.Collection) { + fmt.Printf("existing collectionID %v\n", col.ID) + return strconv.FormatInt(col.ID, 10), col + }) + + cleanMetaFn := func(ctx context.Context, prefix string, opts ...ExcludePrefixOptions) error { + return walkWithPrefix(ctx, cli, prefix, paginationSize, func(k []byte, v []byte) error { + sKey := string(k) + for _, opt := range opts { + if opt(sKey) { + return nil + } + } + + key := sKey[len(prefix):] + ss := strings.Split(key, "/") + collectionExist := false + for _, s := range ss { + if _, ok := id2Collection[s]; ok { + collectionExist = true + } + } + + if !collectionExist { + fmt.Println("clean meta key ", sKey) + if run { + _, err = cli.Delete(ctx, sKey) + return err + } + } + + return nil + }) + } + + // remove collection meta + // meta before database + collectionMetaPrefix := path.Join(basePath, common.CollectionMetaPrefix) + // with database + dbCollectionMetaPrefix := path.Join(basePath, common.DBCollectionMetaPrefix) + // remove collection field meta + fieldsPrefix := path.Join(basePath, common.FieldMetaPrefix) + fieldsSnapShotPrefix := path.Join(basePath, common.SnapshotPrefix, common.FieldMetaPrefix) + // remove collection partition meta + partitionsPrefix := path.Join(basePath, common.PartitionPrefix) + partitionsSnapShotPrefix := path.Join(basePath, common.SnapshotPrefix, common.PartitionPrefix) + prefixes := []string{ + collectionMetaPrefix, + dbCollectionMetaPrefix, + fieldsPrefix, + fieldsSnapShotPrefix, + partitionsPrefix, + partitionsSnapShotPrefix} + + for _, prefix := range prefixes { + fmt.Printf("start cleaning leaked collection meta, prefix: %s\n", prefix) + err = cleanMetaFn(context.TODO(), prefix) + if err != nil { + fmt.Println(err.Error()) + return + } + fmt.Printf("clean leaked collection meta done, prefix: %s\n", prefix) + } + + // remove segment meta + segmentPrefix := path.Join(basePath, common.SegmentMetaPrefix) + segmentStatsPrefix := path.Join(basePath, common.SegmentStatsMetaPrefix) + fmt.Printf("start cleaning leaked segment meta, prefix: %s, exclude prefix%s\n", segmentPrefix, segmentStatsPrefix) + err = cleanMetaFn(context.TODO(), segmentPrefix, func(key string) bool { + return strings.HasPrefix(key, segmentStatsPrefix) + }) + if err != nil { + fmt.Println(err.Error()) + return + } + + fmt.Printf("clean leaked segment meta done, prefix: %s\n", segmentPrefix) + }, + } + + cmd.Flags().Bool("run", false, "flags indicating whether to execute removed command") + return cmd +} + +func walkWithPrefix(ctx context.Context, cli clientv3.KV, prefix string, paginationSize int, fn func([]byte, []byte) error) error { + batch := int64(paginationSize) + opts := []clientv3.OpOption{ + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), + clientv3.WithLimit(batch), + clientv3.WithRange(clientv3.GetPrefixRangeEnd(prefix)), + } + + key := prefix + for { + resp, err := cli.Get(ctx, key, opts...) + if err != nil { + return err + } + + for _, kv := range resp.Kvs { + if err = fn(kv.Key, kv.Value); err != nil { + return err + } + } + + if !resp.More { + break + } + // move to next key + key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0)) + } + + return nil +}