Skip to content

Commit

Permalink
Support quick stop
Browse files Browse the repository at this point in the history
Signed-off-by: Congqi.Xia <[email protected]>
  • Loading branch information
congqixia committed Feb 26, 2024
1 parent 377a548 commit c057684
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 20 deletions.
34 changes: 21 additions & 13 deletions states/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ func (s *InstanceState) TestScanBinlogsCommand(ctx context.Context, p *ScanBinlo
})
var f *os.File
var pqWriter *storage.ParquetWriter
var quick bool // load rowid, ts, pk only
switch p.OutputFormat {
case "stdout":
quick = true
case "json":
f, err = os.Create(fmt.Sprintf("%d.json", collection.ID))
if err != nil {
Expand All @@ -70,9 +73,9 @@ func (s *InstanceState) TestScanBinlogsCommand(ctx context.Context, p *ScanBinlo
return err
}

s.ScanBinlogs(ctx, minioClient, bucketName, rootPath, collection, segment, func(readers map[int64]*storage.BinlogReader) {
s.ScanBinlogs(ctx, minioClient, bucketName, rootPath, collection, segment, quick, func(readers map[int64]*storage.BinlogReader) {

iter, err := NewBinlogIterator(collection, readers)
iter, err := NewBinlogIterator(collection, readers, quick)
if err != nil {
fmt.Println("failed to create iterator", err.Error())
return
Expand Down Expand Up @@ -123,7 +126,7 @@ func (s *InstanceState) TestScanBinlogsCommand(ctx context.Context, p *ScanBinlo
}

// ScanBinlogs scans provided segment with delete record excluded.
func (s *InstanceState) ScanBinlogs(ctx context.Context, minioClient *minio.Client, bucketName string, rootPath string, collection *models.Collection, segment *models.Segment, fn func(map[int64]*storage.BinlogReader)) {
func (s *InstanceState) ScanBinlogs(ctx context.Context, minioClient *minio.Client, bucketName string, rootPath string, collection *models.Collection, segment *models.Segment, quick bool, fn func(map[int64]*storage.BinlogReader)) {
pkField, has := lo.Find(collection.Schema.Fields, func(field models.FieldSchema) bool {
return field.IsPrimaryKey
})
Expand All @@ -141,6 +144,9 @@ func (s *InstanceState) ScanBinlogs(ctx context.Context, minioClient *minio.Clie
for idx := range pkFieldData.Binlogs {
field2Binlog := make(map[int64]*storage.BinlogReader)
for _, fieldBinlog := range segment.GetBinlogs() {
if quick && fieldBinlog.FieldID != 0 && fieldBinlog.FieldID != 1 && fieldBinlog.FieldID != pkField.FieldID {
continue
}
binlog := fieldBinlog.Binlogs[idx]
filePath := strings.Replace(binlog.LogPath, "ROOT_PATH", rootPath, -1)
object, err := minioClient.GetObject(ctx, bucketName, filePath, minio.GetObjectOptions{})
Expand Down Expand Up @@ -175,7 +181,7 @@ type BinlogIterator struct {
rows int
}

func NewBinlogIterator(collection *models.Collection, readers map[int64]*storage.BinlogReader) (*BinlogIterator, error) {
func NewBinlogIterator(collection *models.Collection, readers map[int64]*storage.BinlogReader, quick bool) (*BinlogIterator, error) {
rowIDReader := readers[0]
rowIDs, err := rowIDReader.NextInt64EventReader()
if err != nil {
Expand Down Expand Up @@ -214,16 +220,18 @@ func NewBinlogIterator(collection *models.Collection, readers map[int64]*storage
return field.FieldID, field
})
data := make(map[int64][]any)
for fieldID, reader := range readers {
if fieldID < 100 || fieldID == pkField.FieldID {
continue
}
field := idField[fieldID]
column, err := readerToSlice(reader, field)
if err != nil {
return nil, err
if !quick {
for fieldID, reader := range readers {
if fieldID < 100 || fieldID == pkField.FieldID {
continue
}
field := idField[fieldID]
column, err := readerToSlice(reader, field)
if err != nil {
return nil, err
}
data[fieldID] = column
}
data[fieldID] = column
}

return &BinlogIterator{
Expand Down
33 changes: 26 additions & 7 deletions states/check_partition_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type CheckPartitionKeyParam struct {
OutputFormat string `name:"outputFmt" default:"stdout"`
}

var errQuickExit = errors.New("quick exit")

func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPartitionKeyParam) error {
collections, err := common.ListCollectionsVersion(ctx, s.client, s.basePath, etcdversion.GetVersion())
if err != nil {
Expand Down Expand Up @@ -117,6 +119,7 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa
}

var collectionErrs int
var found bool

for _, segment := range segments {
if segment.State == models.SegmentStateDropped || segment.State == models.SegmentStateSegmentStateNone {
Expand All @@ -126,7 +129,10 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa
err := func() error {
var f *os.File
var pqWriter *storage.ParquetWriter
var quick bool
switch p.OutputFormat {
case "stdout":
quick = true
case "json":
f, err = os.Create(fmt.Sprintf("%d-%d.json", collection.ID, segment.ID))
if err != nil {
Expand All @@ -138,16 +144,15 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa
return err
}
pqWriter = storage.NewParquetWriter(collection)

}
deltalog, err := s.DownloadDeltalogs(ctx, minioClient, bucketName, rootPath, collection, segment)
if err != nil {
return err
}

s.ScanBinlogs(ctx, minioClient, bucketName, rootPath, collection, segment, func(readers map[int64]*storage.BinlogReader) {
s.ScanBinlogs(ctx, minioClient, bucketName, rootPath, collection, segment, quick, func(readers map[int64]*storage.BinlogReader) {
targetIndex := partIdx[segment.PartitionID]
iter, err := NewBinlogIterator(collection, readers)
iter, err := NewBinlogIterator(collection, readers, quick)
if err != nil {
fmt.Println("failed to create iterator", err.Error())
return
Expand Down Expand Up @@ -180,14 +185,19 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa
return nil
}
errCnt++
found = true

output := lo.MapKeys(data, func(v any, k int64) string {
return idField[k].Name
})
output[pkField.Name] = pk.GetValue()
switch p.OutputFormat {
case "stdout":
fmt.Printf("PK %v partition does not follow partition key rule\n", pk)
if p.StopIfErr {
return errQuickExit
} else {

Check warning on line 198 in states/check_partition_key.go

View workflow job for this annotation

GitHub Actions / lint

indent-error-flow: if block ends with a return statement, so drop this else and outdent its block (revive)
fmt.Printf("PK %v partition does not follow partition key rule\n", pk)
}
case "json":
bs, err := json.Marshal(output)
if err != nil {
Expand All @@ -204,22 +214,31 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa
}
return nil
})
if err != nil {
if err != nil && !errors.Is(err, errQuickExit) {
fmt.Println(err.Error())
}
})
return nil
}()
if err != nil {
if err != nil && !errors.Is(err, errQuickExit) {
return err
}
if p.StopIfErr && found {
break
}
if errCnt > 0 {
fmt.Printf("Segment %d of collection %s find %d partition-key error\n", segment.ID, collection.Schema.Name, errCnt)
collectionErrs += errCnt
}
}
if p.StopIfErr {
if found {
fmt.Printf("Collection %s found partition key error\n", collection.Schema.Name)
}
} else {
fmt.Printf("Collection %s found %d partition key error\n", collection.Schema.Name, collectionErrs)
}

fmt.Printf("Collection %s found %d partition key error\n", collection.Schema.Name, collectionErrs)
}
return nil
}
Expand Down

0 comments on commit c057684

Please sign in to comment.