Skip to content

Commit

Permalink
Add remove segment-orphan command (#163)
Browse files Browse the repository at this point in the history
Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Jul 6, 2023
1 parent 350cd9a commit 677b3f6
Show file tree
Hide file tree
Showing 35 changed files with 788 additions and 800 deletions.
18 changes: 18 additions & 0 deletions framework/param.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package framework

// CmdParam is the interface definition for command parameter.
type CmdParam interface {
ParseArgs(args []string) error
Desc() (string, string)
}

// ParamBase implmenet CmdParam when empty args parser.
type ParamBase struct{}

func (pb ParamBase) ParseArgs(args []string) error {
return nil
}

func (pb ParamBase) Desc() (string, string) {
return "", ""
}
24 changes: 17 additions & 7 deletions states/backup_mock_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/milvus-io/birdwatcher/configs"
"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/states/etcd"
"github.com/milvus-io/birdwatcher/states/etcd/remove"
"github.com/milvus-io/birdwatcher/states/etcd/show"
"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
Expand All @@ -27,6 +29,8 @@ const (

type embedEtcdMockState struct {
cmdState
*show.ComponentShow
*remove.ComponentRemove
client *clientv3.Client
server *embed.Etcd
instanceName string
Expand Down Expand Up @@ -89,6 +93,9 @@ func (s *embedEtcdMockState) SetupCommands() {
func (s *embedEtcdMockState) SetInstance(instanceName string) {
s.cmdState.label = fmt.Sprintf("Backup(%s)", instanceName)
s.instanceName = instanceName
rootPath := path.Join(instanceName, metaPath)
s.ComponentShow = show.NewComponent(s.client, s.config, rootPath)
s.ComponentRemove = remove.NewComponent(s.client, s.config, rootPath)
s.SetupCommands()
}

Expand Down Expand Up @@ -168,16 +175,20 @@ func (s *embedEtcdMockState) readWorkspaceMeta(path string) {

func getEmbedEtcdInstance(server *embed.Etcd, cli *clientv3.Client, instanceName string, config *configs.Config) State {

basePath := path.Join(instanceName, metaPath)

state := &embedEtcdMockState{
cmdState: cmdState{
label: fmt.Sprintf("Backup(%s)", instanceName),
},
instanceName: instanceName,
server: server,
client: cli,
metrics: make(map[string][]byte),
defaultMetrics: make(map[string][]byte),
config: config,
ComponentShow: show.NewComponent(cli, config, basePath),
ComponentRemove: remove.NewComponent(cli, config, basePath),
instanceName: instanceName,
server: server,
client: cli,
metrics: make(map[string][]byte),
defaultMetrics: make(map[string][]byte),
config: config,
}

state.SetupCommands()
Expand All @@ -186,7 +197,6 @@ func getEmbedEtcdInstance(server *embed.Etcd, cli *clientv3.Client, instanceName
}

func getEmbedEtcdInstanceV2(server *embed.Etcd, config *configs.Config) *embedEtcdMockState {

client := v3client.New(server.Server)
state := &embedEtcdMockState{
cmdState: cmdState{},
Expand Down
5 changes: 3 additions & 2 deletions states/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
"time"

"github.com/milvus-io/birdwatcher/framework"
datapbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/datapb"
indexpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/indexpb"
querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb"
Expand All @@ -18,8 +19,8 @@ import (
)

type GetConfigurationParam struct {
ParamBase `use:"show configurations" desc:"iterate all online components and inspect configuration"`
Format string `name:"format" default:"line" desc:"output format"`
framework.ParamBase `use:"show configurations" desc:"iterate all online components and inspect configuration"`
Format string `name:"format" default:"line" desc:"output format"`
}

func (s *instanceState) GetConfigurationCommand(ctx context.Context, p *GetConfigurationParam) error {
Expand Down
5 changes: 3 additions & 2 deletions states/current_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/cockroachdb/errors"
"github.com/milvus-io/birdwatcher/framework"
"github.com/milvus-io/birdwatcher/models"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
"github.com/spf13/cobra"
Expand All @@ -22,8 +23,8 @@ func CurrentVersionCommand() *cobra.Command {
}

type setCurrentVersionParam struct {
ParamBase `use:"set current-version" desc:"set current version for etcd meta parsing"`
newVersion string
framework.ParamBase `use:"set current-version" desc:"set current version for etcd meta parsing"`
newVersion string
}

func (p *setCurrentVersionParam) ParseArgs(args []string) error {
Expand Down
30 changes: 1 addition & 29 deletions states/etcd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,10 @@ func ShowCommand(cli clientv3.KV, basePath string) *cobra.Command {
}

showCmd.AddCommand(
// show database
show.DatabaseCommand(cli, basePath),
// show collection
show.CollectionCommand(cli, basePath),
// show collection-history
show.CollectionHistoryCommand(cli, basePath),
// show sessions
show.SessionCommand(cli, basePath),
// show segments
show.SegmentCommand(cli, basePath),
// v2.1 legacy commands
// show segment-loaded
show.SegmentLoadedCommand(cli, basePath),
// show index
show.IndexCommand(cli, basePath),
// show segment-index
show.SegmentIndexCommand(cli, basePath),
// show partition
show.PartitionCommand(cli, basePath),

// show replica
show.ReplicaCommand(cli, basePath),
// show checkpoint
show.CheckpointCommand(cli, basePath),
// show channel-watched
show.ChannelWatchedCommand(cli, basePath),

// show collection-loaded
show.CollectionLoadedCommand(cli, basePath),

show.ConfigEtcdCommand(cli, basePath),

// v2.1 legacy commands
// show querycoord-tasks
show.QueryCoordTasks(cli, basePath),
// show querycoord-channels
Expand Down
4 changes: 2 additions & 2 deletions states/etcd/common/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func GetCollectionByIDVersion(ctx context.Context, cli clientv3.KV, basePath str

// with database, dbID unknown here
prefix = path.Join(basePath, DBCollectionMetaPrefix)
resp, err = cli.Get(ctx, prefix, clientv3.WithPrefix())
resp, _ = cli.Get(ctx, prefix, clientv3.WithPrefix())
suffix := strconv.FormatInt(collID, 10)
for _, kv := range resp.Kvs {
if strings.HasSuffix(string(kv.Key), suffix) {
Expand All @@ -137,7 +137,7 @@ func GetCollectionByIDVersion(ctx context.Context, cli clientv3.KV, basePath str
}

if len(result) != 1 {
return nil, fmt.Errorf("collection %d not found in etcd", collID)
return nil, fmt.Errorf("collection %d not found in etcd %w", collID, ErrCollectionNotFound)
}

kv := result[0]
Expand Down
5 changes: 1 addition & 4 deletions states/etcd/common/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@ import (
)

// ListIndex list all index with all filter satified.
func ListIndex(cli clientv3.KV, basePath string, filters ...func(index *indexpb.IndexMeta) bool) ([]indexpb.IndexMeta, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
func ListIndex(ctx context.Context, cli clientv3.KV, basePath string, filters ...func(index *indexpb.IndexMeta) bool) ([]indexpb.IndexMeta, error) {
prefix := path.Join(basePath, "indexes") + "/"

result, _, err := ListProtoObjects(ctx, cli, prefix, filters...)
return result, err
}
Expand Down
17 changes: 6 additions & 11 deletions states/etcd/common/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"path"
"time"

"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/proto/v2.0/milvuspb"
Expand All @@ -13,15 +12,15 @@ import (
)

// ListReplica list current replica info
func ListReplica(cli clientv3.KV, basePath string, collectionID int64) ([]*models.Replica, error) {
v1Results, err := listReplicas(cli, basePath, func(replica *milvuspb.ReplicaInfo) bool {
func ListReplica(ctx context.Context, cli clientv3.KV, basePath string, collectionID int64) ([]*models.Replica, error) {
v1Results, err := listReplicas(ctx, cli, basePath, func(replica *milvuspb.ReplicaInfo) bool {
return collectionID == 0 || replica.GetCollectionID() == collectionID
})
if err != nil {
fmt.Println(err.Error())
}

v2Results, err := listQCReplicas(cli, basePath, func(replica *querypb.Replica) bool {
v2Results, err := listQCReplicas(ctx, cli, basePath, func(replica *querypb.Replica) bool {
return collectionID == 0 || replica.GetCollectionID() == collectionID
})
if err != nil {
Expand All @@ -45,6 +44,7 @@ func ListReplica(cli clientv3.KV, basePath string, collectionID int64) ([]*model
NodeIDs: r.GetNodeIds(),
ResourceGroup: "n/a",
Version: "<=2.1.4",
ShardReplicas: srs,
})
}

Expand All @@ -60,9 +60,7 @@ func ListReplica(cli clientv3.KV, basePath string, collectionID int64) ([]*model
return results, nil

}
func listReplicas(cli clientv3.KV, basePath string, filters ...func(*milvuspb.ReplicaInfo) bool) ([]milvuspb.ReplicaInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
func listReplicas(ctx context.Context, cli clientv3.KV, basePath string, filters ...func(*milvuspb.ReplicaInfo) bool) ([]milvuspb.ReplicaInfo, error) {
prefix := path.Join(basePath, "queryCoord-ReplicaMeta")

replicas, _, err := ListProtoObjects(ctx, cli, prefix, filters...)
Expand All @@ -74,10 +72,7 @@ func listReplicas(cli clientv3.KV, basePath string, filters ...func(*milvuspb.Re
return replicas, nil
}

func listQCReplicas(cli clientv3.KV, basePath string, filters ...func(*querypb.Replica) bool) ([]querypb.Replica, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()

func listQCReplicas(ctx context.Context, cli clientv3.KV, basePath string, filters ...func(*querypb.Replica) bool) ([]querypb.Replica, error) {
prefix := path.Join(basePath, "querycoord-replica")

replicas, _, err := ListProtoObjects(ctx, cli, prefix, filters...)
Expand Down
57 changes: 57 additions & 0 deletions states/etcd/common/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,60 @@ func RemoveSegment(cli clientv3.KV, basePath string, info *datapb.SegmentInfo) e

return err
}

func RemoveSegmentByID(ctx context.Context, cli clientv3.KV, basePath string, collectionID, partitionID, segmentID int64) error {
segmentPath := path.Join(basePath, "datacoord-meta/s", fmt.Sprintf("%d/%d/%d", collectionID, partitionID, segmentID))
_, err := cli.Delete(ctx, segmentPath)
if err != nil {
return err
}

// delete binlog entries
binlogPrefix := path.Join(basePath, "datacoord-meta/binlog", fmt.Sprintf("%d/%d/%d", collectionID, partitionID, segmentID))
_, err = cli.Delete(ctx, binlogPrefix, clientv3.WithPrefix())
if err != nil {
fmt.Printf("failed to delete binlogs from etcd for segment %d, err: %s\n", segmentID, err.Error())
}

// delete deltalog entries
deltalogPrefix := path.Join(basePath, "datacoord-meta/deltalog", fmt.Sprintf("%d/%d/%d", collectionID, partitionID, segmentID))
_, err = cli.Delete(ctx, deltalogPrefix, clientv3.WithPrefix())
if err != nil {
fmt.Printf("failed to delete deltalogs from etcd for segment %d, err: %s\n", segmentID, err.Error())
}

// delete statslog entries
statslogPrefix := path.Join(basePath, "datacoord-meta/statslog", fmt.Sprintf("%d/%d/%d", collectionID, partitionID, segmentID))
_, err = cli.Delete(ctx, statslogPrefix, clientv3.WithPrefix())
if err != nil {
fmt.Printf("failed to delete statslogs from etcd for segment %d, err: %s\n", segmentID, err.Error())
}

return err
}

func UpdateSegments(ctx context.Context, cli clientv3.KV, basePath string, collectionID int64, fn func(segment *datapbv2.SegmentInfo)) error {

prefix := path.Join(basePath, fmt.Sprintf("%s/%d", segmentMetaPrefix, collectionID)) + "/"
segments, keys, err := ListProtoObjects[datapbv2.SegmentInfo](ctx, cli, prefix)
if err != nil {
return err
}

for idx, info := range segments {
info := info
seg := &info
fn(seg)
bs, err := proto.Marshal(seg)
if err != nil {
return err
}

_, err = cli.Put(ctx, keys[idx], string(bs))
if err != nil {
return err
}

}
return nil
}
20 changes: 20 additions & 0 deletions states/etcd/remove/component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package remove

import (
"github.com/milvus-io/birdwatcher/configs"
clientv3 "go.etcd.io/etcd/client/v3"
)

type ComponentRemove struct {
client clientv3.KV
config *configs.Config
basePath string
}

func NewComponent(cli clientv3.KV, config *configs.Config, basePath string) *ComponentRemove {
return &ComponentRemove{
client: cli,
config: config,
basePath: basePath,
}
}
53 changes: 53 additions & 0 deletions states/etcd/remove/segment_orphan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package remove

import (
"context"
"errors"
"fmt"

"github.com/milvus-io/birdwatcher/framework"
"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/states/etcd/common"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
"github.com/samber/lo"
)

type SegmentOrphan struct {
framework.ParamBase `use:"remove segment-orphan" desc:"remove orphan segments that collection meta already gone"`
CollectionID int64 `name:"collection" default:"0" desc:"collection id to filter with"`
Run bool `name:"run" default:"false" desc:"flag to control actually run or dry"`
}

// SegmentOrphanCommand returns command to remove
func (c *ComponentRemove) SegmentOrphanCommand(ctx context.Context, p *SegmentOrphan) error {
segments, err := common.ListSegmentsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(segment *models.Segment) bool {
return (p.CollectionID == 0 || segment.CollectionID == p.CollectionID)
})
if err != nil {
return err
}

groups := lo.GroupBy(segments, func(segment *models.Segment) int64 {
return segment.CollectionID
})

for collectionID, segments := range groups {
_, err := common.GetCollectionByIDVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), collectionID)
if errors.Is(err, common.ErrCollectionNotFound) {
// print segments
fmt.Printf("Collection %d missing, orphan segments: %v\n", collectionID, lo.Map(segments, func(segment *models.Segment, idx int) int64 {
return segment.ID
}))

if p.Run {
for _, segment := range segments {
err := common.RemoveSegmentByID(ctx, c.client, c.basePath, segment.CollectionID, segment.PartitionID, segment.ID)
if err != nil {
fmt.Printf("failed to remove segment %d, err: %s\n", segment.ID, err.Error())
}
}
}
}
}
return nil
}
2 changes: 1 addition & 1 deletion states/etcd/repair/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func SegmentCommand(cli clientv3.KV, basePath string) *cobra.Command {
return
}

indexBuildInfo, err := common.ListIndex(cli, basePath)
indexBuildInfo, err := common.ListIndex(context.Background(), cli, basePath)
if err != nil {
fmt.Println(err.Error())
return
Expand Down
Loading

0 comments on commit 677b3f6

Please sign in to comment.