diff --git a/states/backup_mock_connect.go b/states/backup_mock_connect.go index 0464ceea..d711794a 100644 --- a/states/backup_mock_connect.go +++ b/states/backup_mock_connect.go @@ -68,10 +68,6 @@ func (s *embedEtcdMockState) SetupCommands() { // remove [subcommand] options... // used for testing etcd.RemoveCommand(s.client, s.instanceName, rootPath), - // download-pk - getDownloadPKCmd(s.client, rootPath), - // inspect-pk - getInspectPKCmd(s.client, rootPath), // for testing etcd.RepairCommand(s.client, rootPath), @@ -262,7 +258,7 @@ func readFixLengthHeader[T proto.Message](rd *bufio.Reader, header T) error { lb := make([]byte, 8) lenRead, err := rd.Read(lb) if err == io.EOF || lenRead < 8 { - return fmt.Errorf("File does not contains valid header") + return errors.New("File does not contains valid header") } nextBytes := binary.LittleEndian.Uint64(lb) diff --git a/states/current_version.go b/states/current_version.go index 44f4a7a5..47f114e9 100644 --- a/states/current_version.go +++ b/states/current_version.go @@ -23,12 +23,12 @@ func CurrentVersionCommand() *cobra.Command { return cmd } -type setCurrentVersionParam struct { +type SetCurrentVersionParam struct { framework.ParamBase `use:"set current-version" desc:"set current version for etcd meta parsing"` newVersion string } -func (p *setCurrentVersionParam) ParseArgs(args []string) error { +func (p *SetCurrentVersionParam) ParseArgs(args []string) error { if len(args) != 1 { return errors.New("invalid parameter number") } @@ -36,7 +36,7 @@ func (p *setCurrentVersionParam) ParseArgs(args []string) error { return nil } -func (s *InstanceState) SetCurrentVersionCommand(ctx context.Context, param setCurrentVersionParam) { +func (s *InstanceState) SetCurrentVersionCommand(ctx context.Context, param *SetCurrentVersionParam) error { switch param.newVersion { case models.LTEVersion2_1: fallthrough @@ -49,39 +49,5 @@ func (s *InstanceState) SetCurrentVersionCommand(ctx context.Context, param setC default: fmt.Println("Invalid version string:", param.newVersion) } -} - -// SetCurrentVersionCommand returns command for set current-version. -func SetCurrentVersionCommand() *cobra.Command { - cmd := &cobra.Command{ - Use: "set", - } - - subCmd := &cobra.Command{ - Use: "current-version", - Run: func(_ *cobra.Command, args []string) { - if len(args) != 1 { - fmt.Println("invalid parameter numbers") - return - } - - newVersion := args[0] - switch newVersion { - case models.LTEVersion2_1: - fallthrough - case "LTEVersion2_1": - etcdversion.SetVersion(models.LTEVersion2_1) - case models.GTEVersion2_2: - fallthrough - case "GTEVersion2_2": - etcdversion.SetVersion(models.GTEVersion2_2) - default: - fmt.Println("Invalid version string:", newVersion) - } - }, - } - - cmd.AddCommand(subCmd) - - return cmd + return nil } diff --git a/states/download_pk.go b/states/download_pk.go index 2d030d4d..23ed47e0 100644 --- a/states/download_pk.go +++ b/states/download_pk.go @@ -7,91 +7,112 @@ import ( "io" "os" "path" + "strings" + "time" + "github.com/cockroachdb/errors" "github.com/gosuri/uilive" "github.com/manifoldco/promptui" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" - "github.com/spf13/cobra" - clientv3 "go.etcd.io/etcd/client/v3" - "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" + "github.com/milvus-io/birdwatcher/framework" + "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/states/etcd/common" etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" ) -func getDownloadPKCmd(cli clientv3.KV, basePath string) *cobra.Command { - cmd := &cobra.Command{ - Use: "download-pk", - Short: "download pk column of a collection", - RunE: func(cmd *cobra.Command, args []string) error { - collectionID, err := cmd.Flags().GetInt64("id") - if err != nil { - return err - } +type DownloadPKParam struct { + framework.ParamBase `use:"download-pk" desc:"download segment pk with provided collection/segment id"` + MinioAddress string `name:"minioAddr" default:"" desc:"the minio address to override, leave empty to use milvus.yaml value"` + CollectionID int64 `name:"collection" default:"0" desc:"collection id to download"` + SegmentID int64 `name:"segment" default:"0" desc:"segment id to download"` +} - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - coll, err := common.GetCollectionByIDVersion(ctx, cli, basePath, etcdversion.GetVersion(), collectionID) - if err != nil { - fmt.Println("Collection not found for id", collectionID) - return nil - } +func (s *InstanceState) DownloadPKCommand(ctx context.Context, p *DownloadPKParam) error { + collection, err := common.GetCollectionByIDVersion(ctx, s.client, s.basePath, etcdversion.GetVersion(), p.CollectionID) + if err != nil { + return err + } + pkField, ok := collection.GetPKField() + if !ok { + return errors.New("pk field not found") + } - var pkID int64 = -1 + segments, err := common.ListSegmentsVersion(ctx, s.client, s.basePath, etcdversion.GetVersion(), func(s *models.Segment) bool { + return s.CollectionID == p.CollectionID && (p.SegmentID == 0 || p.SegmentID == s.ID) + }) + if err != nil { + return err + } - for _, field := range coll.Schema.Fields { - if field.IsPrimaryKey { - pkID = field.FieldID - break - } - } + minioClient, bucketName, rootPath, err := s.GetMinioClientFromCfg(ctx, p.MinioAddress) + if err != nil { + return err + } - if pkID < 0 { - fmt.Println("collection pk not found") - return nil - } + return s.downloadPKs(ctx, minioClient, bucketName, rootPath, p.CollectionID, pkField.FieldID, segments, s.writeLogfile) +} - segments, err := common.ListSegments(cli, basePath, func(segment *datapb.SegmentInfo) bool { - return segment.CollectionID == collectionID - }) - if err != nil { - return err - } +func (s *InstanceState) writeLogfile(ctx context.Context, obj *minio.Object) error { + return nil +} - p := promptui.Prompt{ - Label: "BucketName", - } - bucketName, err := p.Run() - if err != nil { - return err - } +func (s *InstanceState) downloadPKs(ctx context.Context, cli *minio.Client, bucketName, rootPath string, collID int64, pkID int64, segments []*models.Segment, handler func(context.Context, *minio.Object) error) error { + folder := fmt.Sprintf("dlpks_%s", time.Now().Format("20060102150406")) + err := os.Mkdir(folder, 0o777) + if err != nil { + fmt.Println("Failed to create folder,", err.Error()) + } - minioClient, err := getMinioClient() - if err != nil { - fmt.Println("cannot get minio client", err.Error()) - return nil - } - exists, err := minioClient.BucketExists(context.Background(), bucketName) - if err != nil { - return err - } - if !exists { - fmt.Printf("bucket %s not exists\n", bucketName) - return nil - } + pd := uilive.New() + pf := "Downloading pk files ... %d%%(%d/%d)\n" + pd.Start() + fmt.Fprintf(pd, pf, 0, 0, len(segments)) + defer pd.Stop() - for _, segment := range segments { - common.FillFieldsIfV2(cli, basePath, segment) + count := 0 + for i, segment := range segments { + targetFolder := fmt.Sprintf("%s/%d", folder, segment.ID) + os.Mkdir(targetFolder, 0o777) + for _, fieldBinlog := range segment.GetBinlogs() { + if fieldBinlog.FieldID != pkID { + continue } - downloadPks(minioClient, bucketName, collectionID, pkID, segments) - return nil - }, + for _, binlog := range fieldBinlog.Binlogs { + logPath := strings.Replace(binlog.LogPath, "ROOT_PATH", rootPath, -1) + obj, err := cli.GetObject(ctx, bucketName, logPath, minio.GetObjectOptions{}) + if err != nil { + fmt.Println("failed to download file", bucketName, logPath) + return err + } + + name := path.Base(logPath) + + f, err := os.Create(path.Join(targetFolder, name)) + if err != nil { + fmt.Println("failed to open file") + return err + } + w := bufio.NewWriter(f) + r := bufio.NewReader(obj) + _, err = io.Copy(w, r) + if err != nil { + fmt.Println(err.Error()) + } + w.Flush() + f.Close() + count++ + } + } + progress := (i + 1) * 100 / len(segments) + fmt.Fprintf(pd, pf, progress, i+1, len(segments)) } - cmd.Flags().Int64("id", 0, "collection id to download") - return cmd + fmt.Println() + fmt.Printf("pk file download completed for collection :%d, %d file(s) downloaded\n", collID, count) + return nil } func getMinioClient() (*minio.Client, error) { @@ -166,56 +187,3 @@ func getMinioClient() (*minio.Client, error) { return minioClient, nil } - -func downloadPks(cli *minio.Client, bucketName string, collID, pkID int64, segments []*datapb.SegmentInfo) { - err := os.Mkdir(fmt.Sprintf("%d", collID), 0o777) - if err != nil { - fmt.Println("Failed to create folder,", err.Error()) - } - - pd := uilive.New() - pf := "Downloading pk files ... %d%%(%d/%d)\n" - pd.Start() - fmt.Fprintf(pd, pf, 0, 0, len(segments)) - defer pd.Stop() - - count := 0 - for i, segment := range segments { - for _, fieldBinlog := range segment.Binlogs { - if fieldBinlog.FieldID != pkID { - continue - } - - folder := fmt.Sprintf("%d/%d", collID, segment.ID) - err := os.MkdirAll(folder, 0o777) - if err != nil { - fmt.Println("Failed to create sub-folder", err.Error()) - return - } - - for _, binlog := range fieldBinlog.Binlogs { - obj, err := cli.GetObject(context.Background(), bucketName, binlog.GetLogPath(), minio.GetObjectOptions{}) - if err != nil { - fmt.Println("failed to download file", bucketName, binlog.GetLogPath()) - return - } - - name := path.Base(binlog.GetLogPath()) - - f, err := os.Create(path.Join(folder, name)) - if err != nil { - fmt.Println("failed to open file") - return - } - w := bufio.NewWriter(f) - r := bufio.NewReader(obj) - io.Copy(w, r) - count++ - } - } - progress := (i + 1) * 100 / len(segments) - fmt.Fprintf(pd, pf, progress, i+1, len(segments)) - } - fmt.Println() - fmt.Printf("pk file download completed for collection :%d, %d file(s) downloaded\n", collID, count) -} diff --git a/states/download_segment.go b/states/download_segment.go index 38024487..59405040 100644 --- a/states/download_segment.go +++ b/states/download_segment.go @@ -22,7 +22,7 @@ import ( type DownloadSegmentParam struct { framework.ParamBase `use:"download-segment" desc:"download segment file with provided segment id"` MinioAddress string `name:"minioAddr" default:"" desc:"the minio address to override, leave empty to use milvus.yaml value"` - SegmentID int64 `name:"segment" default:"0" desc:"segment id to downloads"` + SegmentID int64 `name:"segment" default:"0" desc:"segment id to download"` } func (s *InstanceState) DownloadSegmentCommand(ctx context.Context, p *DownloadSegmentParam) error { diff --git a/states/inspect_primary_key.go b/states/inspect_primary_key.go index 2744f7af..91f55967 100644 --- a/states/inspect_primary_key.go +++ b/states/inspect_primary_key.go @@ -3,153 +3,208 @@ package states import ( "context" "fmt" + "io/fs" + "log" "os" - "path" - "strconv" + "path/filepath" + "strings" - "github.com/spf13/cobra" - clientv3 "go.etcd.io/etcd/client/v3" + "github.com/cockroachdb/errors" + "github.com/minio/minio-go/v7" - "github.com/milvus-io/birdwatcher/proto/v2.0/commonpb" - "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" + "github.com/milvus-io/birdwatcher/framework" + "github.com/milvus-io/birdwatcher/models" + "github.com/milvus-io/birdwatcher/proto/v2.0/schemapb" "github.com/milvus-io/birdwatcher/states/etcd/common" etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" "github.com/milvus-io/birdwatcher/storage" ) -func getInspectPKCmd(cli clientv3.KV, basePath string) *cobra.Command { - cmd := &cobra.Command{ - Use: "inspect-pk [segment id]", - Short: "inspect pk num&dup condition", - Run: func(cmd *cobra.Command, args []string) { - var segments []*datapb.SegmentInfo - var err error - if len(args) == 0 { - segments, err = common.ListSegments(cli, basePath, nil) - } else { - var segmentID int64 - segmentID, err = strconv.ParseInt(args[0], 10, 64) - if err != nil { - fmt.Println("failed to parse segment id") - cmd.Help() - return - } - segments, err = common.ListSegments(cli, basePath, func(info *datapb.SegmentInfo) bool { return info.ID == segmentID }) +type InspectPKParam struct { + framework.ParamBase `use:"inspect-pk" desc:"inspect pk column remote or local"` + Remote bool `name:"remote" default:"false" desc:"inspect remote pk"` + LocalPath string `name:"localPath" default:"" desc:"local pk file path"` + // remote related params + CollectionID int64 `name:"collection" default:"0" desc:"collection id to inspect"` + SegmentID int64 `name:"segment" default:"0" desc:"segment id to inspect"` + MinioAddress string `name:"minioAddr" default:"" desc:"the minio address to override, leave empty to use milvus.yaml value"` + + ResultLimit int64 `name:"resultLimit" default:"10" desc:"Dedup result print limit, default 10"` +} + +func (s *InstanceState) InspectPKCommand(ctx context.Context, p *InspectPKParam) error { + var intResult map[int64]int + var strResult map[string]int + var err error + if p.Remote { + intResult, strResult, err = s.inspectRemote(ctx, p) + } else { + intResult, strResult, err = s.inspectLocal(ctx, p) + } + + if err != nil { + return err + } + + total := len(intResult) + len(strResult) + if total == 0 { + fmt.Println("no duplicated pk found") + return nil + } + var i int64 + for pk, cnt := range intResult { + if i > p.ResultLimit { + break + } + fmt.Printf("PK %d duplicated %d times\n", pk, cnt+1) + i++ + } + for pk, cnt := range strResult { + if i > p.ResultLimit { + break + } + fmt.Printf("PK %s duplicated %d times\n", pk, cnt+1) + i++ + } + if i < int64(total) { + fmt.Println("...") + fmt.Printf("%d total duplicated pk value found\n", total) + } + + return nil +} + +func (s *InstanceState) DedupScanner() (map[int64]int, map[string]int, func(int64), func(string)) { + intIDs := make(map[int64]struct{}) + strIDs := make(map[string]struct{}) + intResult := make(map[int64]int) + strResult := make(map[string]int) + return intResult, strResult, + func(v int64) { + _, ok := intIDs[v] + if ok { + intResult[v]++ } - if err != nil { - fmt.Println("failed to parse argument:", err.Error()) - return + intIDs[v] = struct{}{} + }, func(v string) { + _, ok := strIDs[v] + if ok { + strResult[v]++ } + strIDs[v] = struct{}{} + } +} - // collid -> pk id - cachedCollection := make(map[int64]int64) +func (s *InstanceState) inspectRemote(ctx context.Context, p *InspectPKParam) (map[int64]int, map[string]int, error) { + fmt.Println("Using remote inspect mode") + if p.CollectionID == 0 { + return nil, nil, errors.New("collection id not provided") + } - globalMap := make(map[int64]int64) + collection, err := common.GetCollectionByIDVersion(ctx, s.client, s.basePath, etcdversion.GetVersion(), p.CollectionID) + if err != nil { + return nil, nil, err + } + pkField, ok := collection.GetPKField() + if !ok { + return nil, nil, errors.New("pk field not found") + } - for _, segment := range segments { - if segment.State != commonpb.SegmentState_Flushed { - continue - } - pkID, has := cachedCollection[segment.CollectionID] - if !has { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - coll, err := common.GetCollectionByIDVersion(ctx, cli, basePath, etcdversion.GetVersion(), segment.GetCollectionID()) - if err != nil { - fmt.Println("Collection not found for id", segment.CollectionID) - return - } - - for _, field := range coll.Schema.Fields { - if field.IsPrimaryKey { - pkID = field.FieldID - break - } - } - - if pkID < 0 { - fmt.Println("collection pk not found") - return - } - } + segments, err := common.ListSegmentsVersion(ctx, s.client, s.basePath, etcdversion.GetVersion(), func(s *models.Segment) bool { + return p.SegmentID == 0 || p.SegmentID == s.ID + }) + if err != nil { + return nil, nil, err + } + + minioClient, bucketName, rootPath, err := s.GetMinioClientFromCfg(ctx, p.MinioAddress) + if err != nil { + fmt.Println("Failed to create folder,", err.Error()) + } + + intResult, strResult, is, ss := s.DedupScanner() + + count := 0 + for _, segment := range segments { + for _, fieldBinlog := range segment.GetBinlogs() { + if fieldBinlog.FieldID != pkField.FieldID { + continue + } - common.FillFieldsIfV2(cli, basePath, segment) - for _, fieldBinlog := range segment.Binlogs { - if fieldBinlog.FieldID != pkID { - continue - } - total := 0 - for _, binlog := range fieldBinlog.Binlogs { - name := path.Base(binlog.GetLogPath()) - rp := fmt.Sprintf("%d/%d/%s", segment.CollectionID, segment.ID, name) - f, err := os.Open(rp) - if err != nil { - fmt.Println(err) - return - } - r, _, err := storage.NewBinlogReader(f) - if err != nil { - fmt.Println("fail to create binlog reader:", err.Error()) - continue - } - ids, err := r.NextInt64EventReader() - if err != nil { - fmt.Println("faild to read next event:", err.Error()) - continue - } - f.Close() - // binlog entries num = 0 skip - if binlog.EntriesNum != int64(len(ids)) && binlog.EntriesNum > 0 { - fmt.Printf("found mismatch segment id:%d, name:%s, binlog: %d, id-len%d\n", segment.ID, name, binlog.EntriesNum, len(ids)) - } - - /* - c := ddup(ids) - if c > 0 { - fmt.Printf("found %d dup entry for segment %d-%s", c, segment.ID, name) - }*/ - dr, c := globalDDup(segment.ID, ids, globalMap) - if len(dr) > 0 || c > 0 { - fmt.Printf("found %d dup entry for segment %d, distribution:%v\n", c, segment.ID, dr) - } - total += len(ids) - } - - if int64(total) != segment.NumOfRows { - fmt.Printf("found mismatch segment %d, info:%d, binlog count:%d", segment.ID, segment.NumOfRows, total) - } + for _, binlog := range fieldBinlog.Binlogs { + logPath := strings.Replace(binlog.LogPath, "ROOT_PATH", rootPath, -1) + log.Println("start to scan pk binlog: ", logPath) + obj, err := minioClient.GetObject(ctx, bucketName, logPath, minio.GetObjectOptions{}) + if err != nil { + fmt.Println("failed to open file:", bucketName, logPath) + return nil, nil, err } + + s.scanPKBinlogFile(obj, is, ss) + obj.Close() + + count++ } - }, + } } - return cmd + return intResult, strResult, nil } -func ddup(ids []int64) int { - m := make(map[int64]struct{}) - counter := 0 - for _, id := range ids { - _, has := m[id] - if has { - counter++ +func (s *InstanceState) inspectLocal(ctx context.Context, p *InspectPKParam) (map[int64]int, map[string]int, error) { + fmt.Println("Using local inspect mode, localPath: ", p.LocalPath) + intResult, strResult, is, ss := s.DedupScanner() + err := filepath.WalkDir(p.LocalPath, func(path string, d fs.DirEntry, err error) error { + if ctx.Err() != nil { + return nil } - m[id] = struct{}{} - } - return counter + if err != nil { + return err + } + + if d.IsDir() { + return nil + } + fmt.Println("Start parsing binlog file: ", path) + f, err := os.Open(path) + if err != nil { + return err + } + defer f.Close() + var cnt int + s.scanPKBinlogFile(f, is, ss) + + fmt.Printf("%d values scanned\n", cnt) + + return nil + }) + return intResult, strResult, err } -func globalDDup(segmentID int64, ids []int64, m map[int64]int64) (map[int64]int, int) { - count := 0 - dr := make(map[int64]int) - for _, id := range ids { - origin, has := m[id] - if has { - dr[origin]++ - count++ - } else { - m[id] = segmentID +func (s *InstanceState) scanPKBinlogFile(f storage.ReadSeeker, scanInt func(v int64), scanVarchar func(v string)) error { + r, desc, err := storage.NewBinlogReader(f) + if err != nil { + fmt.Println(err.Error()) + return err + } + + switch desc.PayloadDataType { + case schemapb.DataType_Int64: + values, err := r.NextInt64EventReader() + if err != nil { + return err + } + for _, v := range values { + scanInt(v) + } + case schemapb.DataType_VarChar: + values, err := r.NextVarcharEventReader() + if err != nil { + return err + } + for _, v := range values { + scanVarchar(v) } } - return dr, count + return nil } diff --git a/states/instance.go b/states/instance.go index 9b1fb84a..1f366ff7 100644 --- a/states/instance.go +++ b/states/instance.go @@ -72,8 +72,6 @@ func (s *InstanceState) SetupCommands() { getBackupEtcdCmd(cli, basePath), // kill --component [component] --id [id] getEtcdKillCmd(cli, basePath), - // download-pk - getDownloadPKCmd(cli, basePath), // visit [component] [id] getVisitCmd(s, cli, basePath), // show-log-level diff --git a/storage/binlog_reader.go b/storage/binlog_reader.go index 102ebd06..745860cc 100644 --- a/storage/binlog_reader.go +++ b/storage/binlog_reader.go @@ -28,16 +28,16 @@ type BinlogReader struct { reader io.Reader } -type descriptorEvent struct { +type DescriptorEvent struct { descriptorEventHeader descriptorEventData } -func NewBinlogReader(f ReadSeeker) (*BinlogReader, descriptorEvent, error) { +func NewBinlogReader(f ReadSeeker) (*BinlogReader, DescriptorEvent, error) { reader := &BinlogReader{ reader: f, } - var de descriptorEvent + var de DescriptorEvent var err error if _, err = reader.readMagicNumber(f); err != nil { @@ -119,7 +119,7 @@ func (reader *BinlogReader) readMagicNumber(f ReadSeeker) (int32, error) { return magicNumber, err } -func (reader *BinlogReader) readDescriptorEvent(f ReadSeeker) (descriptorEvent, error) { +func (reader *BinlogReader) readDescriptorEvent(f ReadSeeker) (DescriptorEvent, error) { event, err := ReadDescriptorEvent(f) if err != nil { return event, err @@ -140,8 +140,8 @@ func readMagicNumber(buffer ReadSeeker) (int32, error) { } // ReadDescriptorEvent reads a descriptorEvent from buffer -func ReadDescriptorEvent(buffer ReadSeeker) (descriptorEvent, error) { - de := descriptorEvent{} +func ReadDescriptorEvent(buffer ReadSeeker) (DescriptorEvent, error) { + de := DescriptorEvent{} header, err := readDescriptorEventHeader(buffer) if err != nil { return de, err @@ -150,7 +150,7 @@ func ReadDescriptorEvent(buffer ReadSeeker) (descriptorEvent, error) { if err != nil { return de, err } - return descriptorEvent{ + return DescriptorEvent{ descriptorEventHeader: *header, descriptorEventData: *data, }, nil diff --git a/storage/index_reader.go b/storage/index_reader.go index 32595b6d..aabea63c 100644 --- a/storage/index_reader.go +++ b/storage/index_reader.go @@ -13,9 +13,9 @@ import ( type IndexReader struct{} -func NewIndexReader(f *os.File) (*IndexReader, descriptorEvent, error) { +func NewIndexReader(f *os.File) (*IndexReader, DescriptorEvent, error) { reader := &IndexReader{} - var de descriptorEvent + var de DescriptorEvent var err error _, err = readMagicNumber(f)