From b2fd4f7bb4149705527e454f61d636d551894f24 Mon Sep 17 00:00:00 2001 From: jaime Date: Wed, 25 Sep 2024 16:06:55 +0800 Subject: [PATCH] support batch remove segment meta with filters Signed-off-by: jaime --- go.mod | 1 - states/etcd/common/segment.go | 61 ++++++++++++++++ states/etcd/remove/collection_clean.go | 33 +-------- states/etcd/remove/segment.go | 98 +++++++++++++++++++------- 4 files changed, 133 insertions(+), 60 deletions(-) diff --git a/go.mod b/go.mod index 4786a8ae..a38f4c29 100644 --- a/go.mod +++ b/go.mod @@ -121,7 +121,6 @@ require ( github.com/rs/xid v1.2.1 // indirect github.com/sirupsen/logrus v1.8.1 // indirect github.com/soheilhy/cmux v0.1.5 // indirect - github.com/stretchr/objx v0.5.0 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.11 // indirect diff --git a/states/etcd/common/segment.go b/states/etcd/common/segment.go index cb87f34a..ab13d5ac 100644 --- a/states/etcd/common/segment.go +++ b/states/etcd/common/segment.go @@ -7,6 +7,7 @@ import ( "sort" "time" + "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" "github.com/samber/lo" clientv3 "go.etcd.io/etcd/client/v3" @@ -22,6 +23,8 @@ const ( SegmentStatsMetaPrefix = "datacoord-meta/statslog" ) +var ErrReachMaxNumOfWalkSegment = errors.New("reach max number of the walked segments") + // ListSegmentsVersion list segment info as specified version. func ListSegmentsVersion(ctx context.Context, cli clientv3.KV, basePath string, version string, filters ...func(*models.Segment) bool) ([]*models.Segment, error) { prefix := path.Join(basePath, SegmentMetaPrefix) + "/" @@ -332,3 +335,61 @@ func UpdateSegments(ctx context.Context, cli clientv3.KV, basePath string, colle } return nil } + +// WalkAllSegments walk all segment info from etcd with func +func WalkAllSegments(cli clientv3.KV, basePath string, filter func(*datapb.SegmentInfo) bool, op func(*datapb.SegmentInfo) error, limit int64) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + + cnt := int64(0) + return WalkWithPrefix(ctx, cli, path.Join(basePath, SegmentMetaPrefix)+"/", 1000, func(k []byte, v []byte) error { + info := &datapb.SegmentInfo{} + err := proto.Unmarshal(v, info) + if err != nil { + return err + } + + if filter == nil || filter(info) { + err = op(info) + if err != nil { + return err + } + cnt++ + if cnt >= limit { + return ErrReachMaxNumOfWalkSegment + } + } + return nil + }) +} + +func WalkWithPrefix(ctx context.Context, cli clientv3.KV, prefix string, paginationSize int, fn func([]byte, []byte) error) error { + batch := int64(paginationSize) + opts := []clientv3.OpOption{ + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), + clientv3.WithLimit(batch), + clientv3.WithRange(clientv3.GetPrefixRangeEnd(prefix)), + } + + key := prefix + for { + resp, err := cli.Get(ctx, key, opts...) + if err != nil { + return err + } + + for _, kv := range resp.Kvs { + if err = fn(kv.Key, kv.Value); err != nil { + return err + } + } + + if !resp.More { + break + } + // move to next key + key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0)) + } + + return nil +} diff --git a/states/etcd/remove/collection_clean.go b/states/etcd/remove/collection_clean.go index 9edf5ad6..694cbbe6 100644 --- a/states/etcd/remove/collection_clean.go +++ b/states/etcd/remove/collection_clean.go @@ -44,7 +44,7 @@ func CollectionCleanCommand(cli clientv3.KV, basePath string) *cobra.Command { }) cleanMetaFn := func(ctx context.Context, prefix string, opts ...ExcludePrefixOptions) error { - return walkWithPrefix(ctx, cli, prefix, paginationSize, func(k []byte, v []byte) error { + return common.WalkWithPrefix(ctx, cli, prefix, paginationSize, func(k []byte, v []byte) error { sKey := string(k) for _, opt := range opts { if opt(sKey) { @@ -122,34 +122,3 @@ func CollectionCleanCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd.Flags().Bool("run", false, "flags indicating whether to execute removed command") return cmd } - -func walkWithPrefix(ctx context.Context, cli clientv3.KV, prefix string, paginationSize int, fn func([]byte, []byte) error) error { - batch := int64(paginationSize) - opts := []clientv3.OpOption{ - clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), - clientv3.WithLimit(batch), - clientv3.WithRange(clientv3.GetPrefixRangeEnd(prefix)), - } - - key := prefix - for { - resp, err := cli.Get(ctx, key, opts...) - if err != nil { - return err - } - - for _, kv := range resp.Kvs { - if err = fn(kv.Key, kv.Value); err != nil { - return err - } - } - - if !resp.More { - break - } - // move to next key - key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0)) - } - - return nil -} diff --git a/states/etcd/remove/segment.go b/states/etcd/remove/segment.go index 9f062a72..a6be4f36 100644 --- a/states/etcd/remove/segment.go +++ b/states/etcd/remove/segment.go @@ -2,9 +2,12 @@ package remove import ( "fmt" + "math" "os" + "strings" "time" + "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" @@ -17,9 +20,14 @@ import ( func SegmentCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "segment", - Short: "Remove segment from meta with specified segment id", + Short: "Remove segment from meta with specified filters", Run: func(cmd *cobra.Command, args []string) { - targetSegmentID, err := cmd.Flags().GetInt64("segment") + targetSegmentID, err := cmd.Flags().GetInt64("segmentID") + if err != nil { + fmt.Println(err.Error()) + return + } + collectionID, err := cmd.Flags().GetInt64("collectionID") if err != nil { fmt.Println(err.Error()) return @@ -30,51 +38,86 @@ func SegmentCommand(cli clientv3.KV, basePath string) *cobra.Command { return } - segments, err := common.ListSegments(cli, basePath, func(segmentInfo *datapb.SegmentInfo) bool { - return segmentInfo.GetID() == targetSegmentID - }) + maxNum, err := cmd.Flags().GetInt64("maxNum") if err != nil { - fmt.Println("failed to list segments", err.Error()) + fmt.Println(err.Error()) return } - if len(segments) != 1 { - fmt.Printf("failed to get segment with id %d, get %d result(s)\n", targetSegmentID, len(segments)) + state, err := cmd.Flags().GetString("state") + if err != nil { + fmt.Println(err.Error()) return } - // dry run, display segment first - if !run { - // show.PrintSegmentInfo(segments[0], false) - fmt.Printf("segment info %v", segments[0]) - return + backupDir := fmt.Sprintf("segments-backup_%d", time.Now().UnixMilli()) + + filterFunc := func(segmentInfo *datapb.SegmentInfo) bool { + return (collectionID == 0 || segmentInfo.CollectionID == collectionID) && + (targetSegmentID == 0 || segmentInfo.GetID() == targetSegmentID) && + (state == "" || strings.EqualFold(segmentInfo.State.String(), state)) } - // TODO put audit log - info := segments[0] - backupSegmentInfo(info) - fmt.Println("[WARNING] about to remove segment from etcd") - err = common.RemoveSegment(cli, basePath, info) - if err != nil { - fmt.Printf("Remove segment %d from Etcd failed, err: %s\n", info.ID, err.Error()) + removedCnt := 0 + dryRunCount := 0 + opFunc := func(info *datapb.SegmentInfo) error { + // dry run, display segment first + if !run { + dryRunCount++ + fmt.Printf("dry run segment:%d collectionID:%d state:%s\n", info.ID, info.CollectionID, info.State.String()) + return nil + } + + if err = backupSegmentInfo(info, backupDir); err != nil { + return err + } + + if err = common.RemoveSegment(cli, basePath, info); err != nil { + fmt.Printf("Remove segment %d from Etcd failed, err: %s\n", info.ID, err.Error()) + return err + } + + removedCnt++ + fmt.Printf("Remove segment %d from etcd succeeds.\n", info.GetID()) + return nil + } + + err = common.WalkAllSegments(cli, basePath, filterFunc, opFunc, maxNum) + if err != nil && !errors.Is(err, common.ErrReachMaxNumOfWalkSegment) { + fmt.Printf("WalkAllSegmentsfailed, err: %s\n", err.Error()) + } + + if !run { + fmt.Println("dry run segments, total count:", dryRunCount) return } - fmt.Printf("Remove segment %d from etcd succeeds.\n", info.GetID()) + fmt.Println("Remove segments succeeds, total count:", removedCnt) }, } cmd.Flags().Bool("run", false, "flags indicating whether to remove segment from meta") - cmd.Flags().Int64("segment", 0, "segment id to remove") + cmd.Flags().Int64("segmentID", 0, "segment id") + cmd.Flags().Int64("collectionID", 0, "collection id") + cmd.Flags().String("state", "", "segment state") + cmd.Flags().Int64("maxNum", math.MaxInt64, "max number of segment to remove") return cmd } -func backupSegmentInfo(info *datapb.SegmentInfo) { +func backupSegmentInfo(info *datapb.SegmentInfo, backupDir string) error { + if _, err := os.Stat(backupDir); errors.Is(err, os.ErrNotExist) { + err := os.MkdirAll(backupDir, os.ModePerm) + if err != nil { + fmt.Println("Failed to create folder,", err.Error()) + return err + } + } + now := time.Now() - filePath := fmt.Sprintf("bw_etcd_segment_%d.%s.bak", info.GetID(), now.Format("060102-150405")) + filePath := fmt.Sprintf("%s/bw_etcd_segment_%d.%s.bak", backupDir, info.GetID(), now.Format("060102-150405")) f, err := os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o600) if err != nil { fmt.Println("failed to open backup segment file", err.Error()) - return + return err } defer f.Close() @@ -82,8 +125,9 @@ func backupSegmentInfo(info *datapb.SegmentInfo) { bs, err := proto.Marshal(info) if err != nil { fmt.Println("failed to marshal backup segment", err.Error()) - return + return err } - f.Write(bs) + _, err = f.Write(bs) + return err }