Skip to content

Commit

Permalink
enhance: Improve the prompt information when checking partition-key e…
Browse files Browse the repository at this point in the history
…rrors (#249)

See also #247

---------

Signed-off-by: Congqi.Xia <[email protected]>
Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Mar 14, 2024
1 parent cbb1da9 commit 727b43a
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 6 deletions.
43 changes: 37 additions & 6 deletions states/check_partition_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"

"github.com/cockroachdb/errors"
"github.com/gosuri/uilive"
"github.com/milvus-io/birdwatcher/framework"
"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/proto/v2.0/schemapb"
Expand All @@ -22,20 +23,29 @@ import (

type CheckPartitionKeyParam struct {
framework.ParamBase `use:"check-partiton-key" desc:"check partition key field file"`
Storage string `name:"storage" default:"auto" desc:"storage service configuration mode"`
StopIfErr bool `name:"stopIfErr" default:"true"`
OutputPrimaryKey bool `name:"outputPK" default:"true" desc:"print error record primary key info in stdout mode"`
MinioAddress string `name:"minioAddr" default:"" desc:"the minio address to override, leave empty to use milvus.yaml value"`
OutputFormat string `name:"outputFmt" default:"stdout"`

CollectionID int64 `name:"collection" default:"0" desc:"target collection to scan, default scan all partition key collections"`
}

var errQuickExit = errors.New("quick exit")

func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPartitionKeyParam) error {
collections, err := common.ListCollectionsVersion(ctx, s.client, s.basePath, etcdversion.GetVersion())
collections, err := common.ListCollectionsVersion(ctx, s.client, s.basePath, etcdversion.GetVersion(), func(collection *models.Collection) bool {
return p.CollectionID == 0 || collection.ID == p.CollectionID
})
if err != nil {
return err
}

minioClient, bucketName, rootPath, err := s.GetMinioClientFromCfg(ctx, p.MinioAddress)
var minioClient *minio.Client
var bucketName, rootPath string

minioClient, bucketName, rootPath, err = s.GetMinioClientFromCfg(ctx, p.MinioAddress)
if err != nil {
return err
}
Expand Down Expand Up @@ -110,6 +120,8 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa
return field.FieldID, field
})

fmt.Printf("Start to check collection %s id = %d\n", collection.Schema.Name, collection.ID)

segments, err := common.ListSegmentsVersion(ctx, s.client, s.basePath, etcdversion.GetVersion(), func(segment *models.Segment) bool {
return segment.CollectionID == collection.ID
})
Expand All @@ -121,7 +133,13 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa
var collectionErrs int
var found bool

for _, segment := range segments {
fmt.Printf("Partition number: %d, Segment number %d\n", len(partitions), len(segments))
progressDisplay := uilive.New()
progressFmt := "Scan segment ... %d%%(%d/%d) %s\n"
progressDisplay.Start()
fmt.Fprintf(progressDisplay, progressFmt, 0, 0, len(segments), "")

for idx, segment := range segments {
if segment.State == models.SegmentStateDropped || segment.State == models.SegmentStateSegmentStateNone {
continue
}
Expand All @@ -133,6 +151,9 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa
switch p.OutputFormat {
case "stdout":
selector = func(field int64) bool { return field == partKeyField.FieldID }
case "json-pk":
selector = func(field int64) bool { return field == partKeyField.FieldID }
fallthrough
case "json":
f, err = os.Create(fmt.Sprintf("%d-%d.json", collection.ID, segment.ID))
if err != nil {
Expand Down Expand Up @@ -193,11 +214,13 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa
output[pkField.Name] = pk.GetValue()
switch p.OutputFormat {
case "stdout":
if p.OutputPrimaryKey {
fmt.Printf("PK %v partition does not follow partition key rule (%s=%v)\n", pk.GetValue(), partKeyField.Name, partKeyValue)
}
if p.StopIfErr {
return errQuickExit
}
fmt.Printf("PK %v partition does not follow partition key rule\n", pk)
case "json":
case "json", "json-pk":
bs, err := json.Marshal(output)
if err != nil {
fmt.Println(err.Error())
Expand Down Expand Up @@ -225,14 +248,22 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa
if p.StopIfErr && found {
break
}
progress := (idx + 1) * 100 / len(segments)
status := fmt.Sprintf("%d [%s]", segment.ID, colorReady.Sprint("done"))
if errCnt > 0 {
fmt.Printf("Segment %d of collection %s find %d partition-key error\n", segment.ID, collection.Schema.Name, errCnt)
collectionErrs += errCnt
status = fmt.Sprintf("%d [%s](%d)", segment.ID, colorError.Sprint("error"), errCnt)
}

fmt.Fprintf(progressDisplay, progressFmt, progress, idx+1, len(segments), status)
}
progressDisplay.Stop()
fmt.Println()
if p.StopIfErr {
if found {
fmt.Printf("Collection %s found partition key error\n", collection.Schema.Name)
} else {
fmt.Printf("Collection %s all data OK!\n", collection.Schema.Name)
}
} else {
fmt.Printf("Collection %s found %d partition key error\n", collection.Schema.Name, collectionErrs)
Expand Down
1 change: 1 addition & 0 deletions states/frame_screen.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func NewFrameScreen(lines int, display *uilive.Writer) *FrameScreen {
var (
colorPending = color.New(color.FgYellow)
colorReady = color.New(color.FgGreen)
colorError = color.New(color.FgRed)

levelColor = map[eventlog.Level]*color.Color{
eventlog.Level_Debug: color.New(color.FgGreen),
Expand Down

0 comments on commit 727b43a

Please sign in to comment.