diff --git a/states/binlog.go b/states/binlog.go index 7c89cc2..24f7fed 100644 --- a/states/binlog.go +++ b/states/binlog.go @@ -49,7 +49,10 @@ func (s *InstanceState) TestScanBinlogsCommand(ctx context.Context, p *ScanBinlo }) var f *os.File var pqWriter *storage.ParquetWriter + var quick bool // load rowid, ts, pk only switch p.OutputFormat { + case "stdout": + quick = true case "json": f, err = os.Create(fmt.Sprintf("%d.json", collection.ID)) if err != nil { @@ -70,9 +73,9 @@ func (s *InstanceState) TestScanBinlogsCommand(ctx context.Context, p *ScanBinlo return err } - s.ScanBinlogs(ctx, minioClient, bucketName, rootPath, collection, segment, func(readers map[int64]*storage.BinlogReader) { + s.ScanBinlogs(ctx, minioClient, bucketName, rootPath, collection, segment, quick, func(readers map[int64]*storage.BinlogReader) { - iter, err := NewBinlogIterator(collection, readers) + iter, err := NewBinlogIterator(collection, readers, quick) if err != nil { fmt.Println("failed to create iterator", err.Error()) return @@ -123,7 +126,7 @@ func (s *InstanceState) TestScanBinlogsCommand(ctx context.Context, p *ScanBinlo } // ScanBinlogs scans provided segment with delete record excluded. -func (s *InstanceState) ScanBinlogs(ctx context.Context, minioClient *minio.Client, bucketName string, rootPath string, collection *models.Collection, segment *models.Segment, fn func(map[int64]*storage.BinlogReader)) { +func (s *InstanceState) ScanBinlogs(ctx context.Context, minioClient *minio.Client, bucketName string, rootPath string, collection *models.Collection, segment *models.Segment, quick bool, fn func(map[int64]*storage.BinlogReader)) { pkField, has := lo.Find(collection.Schema.Fields, func(field models.FieldSchema) bool { return field.IsPrimaryKey }) @@ -141,6 +144,9 @@ func (s *InstanceState) ScanBinlogs(ctx context.Context, minioClient *minio.Clie for idx := range pkFieldData.Binlogs { field2Binlog := make(map[int64]*storage.BinlogReader) for _, fieldBinlog := range segment.GetBinlogs() { + if quick && fieldBinlog.FieldID != 0 && fieldBinlog.FieldID != 1 && fieldBinlog.FieldID != pkField.FieldID { + continue + } binlog := fieldBinlog.Binlogs[idx] filePath := strings.Replace(binlog.LogPath, "ROOT_PATH", rootPath, -1) object, err := minioClient.GetObject(ctx, bucketName, filePath, minio.GetObjectOptions{}) @@ -175,7 +181,7 @@ type BinlogIterator struct { rows int } -func NewBinlogIterator(collection *models.Collection, readers map[int64]*storage.BinlogReader) (*BinlogIterator, error) { +func NewBinlogIterator(collection *models.Collection, readers map[int64]*storage.BinlogReader, quick bool) (*BinlogIterator, error) { rowIDReader := readers[0] rowIDs, err := rowIDReader.NextInt64EventReader() if err != nil { @@ -214,16 +220,18 @@ func NewBinlogIterator(collection *models.Collection, readers map[int64]*storage return field.FieldID, field }) data := make(map[int64][]any) - for fieldID, reader := range readers { - if fieldID < 100 || fieldID == pkField.FieldID { - continue - } - field := idField[fieldID] - column, err := readerToSlice(reader, field) - if err != nil { - return nil, err + if !quick { + for fieldID, reader := range readers { + if fieldID < 100 || fieldID == pkField.FieldID { + continue + } + field := idField[fieldID] + column, err := readerToSlice(reader, field) + if err != nil { + return nil, err + } + data[fieldID] = column } - data[fieldID] = column } return &BinlogIterator{ diff --git a/states/check_partition_key.go b/states/check_partition_key.go index a80ca3d..1d8ab31 100644 --- a/states/check_partition_key.go +++ b/states/check_partition_key.go @@ -27,6 +27,8 @@ type CheckPartitionKeyParam struct { OutputFormat string `name:"outputFmt" default:"stdout"` } +var errQuickExit = errors.New("quick exit") + func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPartitionKeyParam) error { collections, err := common.ListCollectionsVersion(ctx, s.client, s.basePath, etcdversion.GetVersion()) if err != nil { @@ -117,6 +119,7 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa } var collectionErrs int + var found bool for _, segment := range segments { if segment.State == models.SegmentStateDropped || segment.State == models.SegmentStateSegmentStateNone { @@ -126,7 +129,10 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa err := func() error { var f *os.File var pqWriter *storage.ParquetWriter + var quick bool switch p.OutputFormat { + case "stdout": + quick = true case "json": f, err = os.Create(fmt.Sprintf("%d-%d.json", collection.ID, segment.ID)) if err != nil { @@ -138,16 +144,15 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa return err } pqWriter = storage.NewParquetWriter(collection) - } deltalog, err := s.DownloadDeltalogs(ctx, minioClient, bucketName, rootPath, collection, segment) if err != nil { return err } - s.ScanBinlogs(ctx, minioClient, bucketName, rootPath, collection, segment, func(readers map[int64]*storage.BinlogReader) { + s.ScanBinlogs(ctx, minioClient, bucketName, rootPath, collection, segment, quick, func(readers map[int64]*storage.BinlogReader) { targetIndex := partIdx[segment.PartitionID] - iter, err := NewBinlogIterator(collection, readers) + iter, err := NewBinlogIterator(collection, readers, quick) if err != nil { fmt.Println("failed to create iterator", err.Error()) return @@ -180,6 +185,7 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa return nil } errCnt++ + found = true output := lo.MapKeys(data, func(v any, k int64) string { return idField[k].Name @@ -187,7 +193,11 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa output[pkField.Name] = pk.GetValue() switch p.OutputFormat { case "stdout": - fmt.Printf("PK %v partition does not follow partition key rule\n", pk) + if p.StopIfErr { + return errQuickExit + } else { + fmt.Printf("PK %v partition does not follow partition key rule\n", pk) + } case "json": bs, err := json.Marshal(output) if err != nil { @@ -204,22 +214,31 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa } return nil }) - if err != nil { + if err != nil && !errors.Is(err, errQuickExit) { fmt.Println(err.Error()) } }) return nil }() - if err != nil { + if err != nil && !errors.Is(err, errQuickExit) { return err } + if p.StopIfErr && found { + break + } if errCnt > 0 { fmt.Printf("Segment %d of collection %s find %d partition-key error\n", segment.ID, collection.Schema.Name, errCnt) collectionErrs += errCnt } } + if p.StopIfErr { + if found { + fmt.Printf("Collection %s found partition key error\n", collection.Schema.Name) + } + } else { + fmt.Printf("Collection %s found %d partition key error\n", collection.Schema.Name, collectionErrs) + } - fmt.Printf("Collection %s found %d partition key error\n", collection.Schema.Name, collectionErrs) } return nil }