Skip to content

Commit

Permalink
Add remove leaked collection meta cmd
Browse files Browse the repository at this point in the history
Signed-off-by: xige-16 <[email protected]>
  • Loading branch information
xige-16 committed Dec 1, 2023
1 parent 7c2b8ec commit 56c31a8
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 4 deletions.
2 changes: 2 additions & 0 deletions states/etcd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func RemoveCommand(cli clientv3.KV, instanceName, basePath string) *cobra.Comman
remove.SegmentCollectionDroppedCommand(cli, basePath),
// remove etcd-config
remove.EtcdConfigCommand(cli, instanceName),
// remove collection has been dropped
remove.CollectionCleanCommand(cli, basePath),
)

return removeCmd
Expand Down
3 changes: 3 additions & 0 deletions states/etcd/common/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ import (
)

const (
SnapshotPrefix = "snapshots"
// CollectionMetaPrefix is prefix for rootcoord collection meta.
CollectionMetaPrefix = `root-coord/collection`
// DBCollectionMetaPrefix is prefix for rootcoord database collection meta
DBCollectionMetaPrefix = `root-coord/database/collection-info`
// FieldMetaPrefix is prefix for rootcoord collection fields meta
FieldMetaPrefix = `root-coord/fields`
// CollectionLoadPrefix is prefix for querycoord collection loaded in milvus v2.1.x
CollectionLoadPrefix = "queryCoord-collectionMeta"
// CollectionLoadPrefixV2 is prefix for querycoord collection loaded in milvus v2.2.x
Expand Down
9 changes: 5 additions & 4 deletions states/etcd/common/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ import (
)

const (
segmentMetaPrefix = "datacoord-meta/s"
SegmentMetaPrefix = "datacoord-meta/s"
SegmentStatsMetaPrefix = "datacoord-meta/statslog"
)

// ListSegmentsVersion list segment info as specified version.
func ListSegmentsVersion(ctx context.Context, cli clientv3.KV, basePath string, version string, filters ...func(*models.Segment) bool) ([]*models.Segment, error) {
prefix := path.Join(basePath, segmentMetaPrefix) + "/"
prefix := path.Join(basePath, SegmentMetaPrefix) + "/"
switch version {
case models.LTEVersion2_1:
segments, keys, err := ListProtoObjects[datapb.SegmentInfo](ctx, cli, prefix)
Expand Down Expand Up @@ -107,7 +108,7 @@ func getSegmentLazyFunc(cli clientv3.KV, basePath string, segment datapbv2.Segme
func ListSegments(cli clientv3.KV, basePath string, filter func(*datapb.SegmentInfo) bool) ([]*datapb.SegmentInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
resp, err := cli.Get(ctx, path.Join(basePath, segmentMetaPrefix)+"/", clientv3.WithPrefix())
resp, err := cli.Get(ctx, path.Join(basePath, SegmentMetaPrefix)+"/", clientv3.WithPrefix())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -309,7 +310,7 @@ func RemoveSegmentByID(ctx context.Context, cli clientv3.KV, basePath string, co

func UpdateSegments(ctx context.Context, cli clientv3.KV, basePath string, collectionID int64, fn func(segment *datapbv2.SegmentInfo)) error {

prefix := path.Join(basePath, fmt.Sprintf("%s/%d", segmentMetaPrefix, collectionID)) + "/"
prefix := path.Join(basePath, fmt.Sprintf("%s/%d", SegmentMetaPrefix, collectionID)) + "/"
segments, keys, err := ListProtoObjects[datapbv2.SegmentInfo](ctx, cli, prefix)
if err != nil {
return err
Expand Down
154 changes: 154 additions & 0 deletions states/etcd/remove/collection_clean.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package remove

import (
"context"
"fmt"
"path"
"strconv"
"strings"

"github.com/samber/lo"
"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/states/etcd/common"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
)

var paginationSize = 2000

type ExcludePrefixOptions func(string) bool

// CollectionCleanCommand returns command to remove
func CollectionCleanCommand(cli clientv3.KV, basePath string) *cobra.Command {
cmd := &cobra.Command{
Use: "collection-meta-leaked",
Short: "Remove leaked collection meta for collection has been dropped",
Run: func(cmd *cobra.Command, args []string) {
run, err := cmd.Flags().GetBool("run")
if err != nil {
fmt.Println(err.Error())
return
}

collections, err := common.ListCollectionsVersion(context.TODO(), cli, basePath, etcdversion.GetVersion())
if err != nil {
fmt.Println(err.Error())
return
}

id2Collection := lo.SliceToMap(collections, func(col *models.Collection) (string, *models.Collection) {
fmt.Printf("existing collectionID %v\n", col.ID)
return strconv.FormatInt(col.ID, 10), col
})

cleanMetaFn := func(ctx context.Context, prefix string, opts ...ExcludePrefixOptions) error {
return walkWithPrefix(ctx, cli, prefix, paginationSize, func(k []byte, v []byte) error {
sKey := string(k)
for _, opt := range opts {
if opt(sKey) {
return nil
}
}

key := sKey[len(prefix):]
ss := strings.Split(key, "/")
collectionExist := false
for _, s := range ss {
if _, ok := id2Collection[s]; ok {
collectionExist = true
}
}

if !collectionExist {
fmt.Println("clean meta key ", sKey)
if run {
_, err = cli.Delete(ctx, sKey)
return err
}
}

return nil
})
}

// remove collection meta
// meta before database
collectionMetaPrefix := path.Join(basePath, common.CollectionMetaPrefix)
// with database
dbCollectionMetaPrefix := path.Join(basePath, common.DBCollectionMetaPrefix)
// remove collection field meta
fieldsPrefix := path.Join(basePath, common.FieldMetaPrefix)
fieldsSnapShotPrefix := path.Join(basePath, common.SnapshotPrefix, common.FieldMetaPrefix)
// remove collection partition meta
partitionsPrefix := path.Join(basePath, common.PartitionPrefix)
partitionsSnapShotPrefix := path.Join(basePath, common.SnapshotPrefix, common.PartitionPrefix)
prefixes := []string{
collectionMetaPrefix,
dbCollectionMetaPrefix,
fieldsPrefix,
fieldsSnapShotPrefix,
partitionsPrefix,
partitionsSnapShotPrefix}

for _, prefix := range prefixes {
fmt.Printf("start cleaning leaked collection meta, prefix: %s\n", prefix)
err = cleanMetaFn(context.TODO(), prefix)
if err != nil {
fmt.Println(err.Error())
return
}
fmt.Printf("clean leaked collection meta done, prefix: %s\n", prefix)
}

// remove segment meta
segmentPrefix := path.Join(basePath, common.SegmentMetaPrefix)
segmentStatsPrefix := path.Join(basePath, common.SegmentStatsMetaPrefix)
fmt.Printf("start cleaning leaked segment meta, prefix: %s, exclude prefix%s\n", segmentPrefix, segmentStatsPrefix)
err = cleanMetaFn(context.TODO(), segmentPrefix, func(key string) bool {
return strings.HasPrefix(key, segmentStatsPrefix)
})
if err != nil {
fmt.Println(err.Error())
return
}

fmt.Printf("clean leaked segment meta done, prefix: %s\n", segmentPrefix)
},
}

cmd.Flags().Bool("run", false, "flags indicating whether to execute removed command")
return cmd
}

func walkWithPrefix(ctx context.Context, cli clientv3.KV, prefix string, paginationSize int, fn func([]byte, []byte) error) error {
batch := int64(paginationSize)
opts := []clientv3.OpOption{
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend),
clientv3.WithLimit(batch),
clientv3.WithRange(clientv3.GetPrefixRangeEnd(prefix)),
}

key := prefix
for {
resp, err := cli.Get(ctx, key, opts...)
if err != nil {
return err
}

for _, kv := range resp.Kvs {
if err = fn(kv.Key, kv.Value); err != nil {
return err
}
}

if !resp.More {
break
}
// move to next key
key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0))
}

return nil
}

0 comments on commit 56c31a8

Please sign in to comment.