Skip to content

Commit

Permalink
enhance: Support removing import job (#329)
Browse files Browse the repository at this point in the history
Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Dec 11, 2024
1 parent 8b31891 commit 0c6b1a9
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 11 deletions.
2 changes: 2 additions & 0 deletions states/etcd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ func RemoveCommand(cli clientv3.KV, instanceName, basePath string) *cobra.Comman
remove.EtcdConfigCommand(cli, instanceName),
// remove collection has been dropped
remove.CollectionCleanCommand(cli, basePath),
// remove import job
remove.ImportJob(cli, basePath),
)

return removeCmd
Expand Down
8 changes: 4 additions & 4 deletions states/etcd/common/bulkinsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ const (
)

// ListImportJobs list import jobs.
func ListImportJobs(ctx context.Context, cli clientv3.KV, basePath string, filters ...func(*datapb.ImportJob) bool) ([]*datapb.ImportJob, error) {
func ListImportJobs(ctx context.Context, cli clientv3.KV, basePath string, filters ...func(*datapb.ImportJob) bool) ([]*datapb.ImportJob, []string, error) {
prefix := path.Join(basePath, ImportJobPrefix) + "/"
jobs, _, err := ListProtoObjects[datapb.ImportJob](ctx, cli, prefix)
jobs, keys, err := ListProtoObjects[datapb.ImportJob](ctx, cli, prefix)
if err != nil {
return nil, err
return nil, nil, err
}

return lo.FilterMap(jobs, func(job datapb.ImportJob, idx int) (*datapb.ImportJob, bool) {
Expand All @@ -31,7 +31,7 @@ func ListImportJobs(ctx context.Context, cli clientv3.KV, basePath string, filte
}
}
return &job, true
}), nil
}), keys, nil
}

// ListPreImportTasks list pre-import tasks.
Expand Down
84 changes: 84 additions & 0 deletions states/etcd/remove/bulkinsert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package remove

import (
"context"
"fmt"
"strconv"
"time"

"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/milvus-io/birdwatcher/proto/v2.2/datapb"
"github.com/milvus-io/birdwatcher/states/etcd/common"
"github.com/milvus-io/birdwatcher/states/etcd/show"
)

// ImportJob returns remove import job command.
func ImportJob(cli clientv3.KV, basePath string) *cobra.Command {
cmd := &cobra.Command{
Use: "import-job",
Short: "Remove import job from datacoord meta with specified job id",
Run: func(cmd *cobra.Command, args []string) {
jobIDStr, err := cmd.Flags().GetString("job")
if err != nil {
fmt.Println(err.Error())
return
}
jobID, err := strconv.ParseInt(jobIDStr, 10, 64)
if err != nil {
fmt.Printf("invalid jobID %s, err=%v\n", jobIDStr, err)
return
}

run, err := cmd.Flags().GetBool("run")
if err != nil {
fmt.Println(err.Error())
return
}

jobs, paths, err := common.ListImportJobs(context.Background(), cli, basePath, func(job *datapb.ImportJob) bool {
return job.GetJobID() == jobID
})
if err != nil {
fmt.Println("failed to list bulkinsert jobs, err=", err.Error())
return
}
if len(jobs) != len(paths) {
fmt.Printf("unaligned jobs and paths, len(jobs)=%d, len(paths)=%d", len(jobs), len(paths))
return
}
if len(jobs) == 0 {
fmt.Printf("cannot find target import job %s\n", jobIDStr)
return
}
if len(jobs) > 1 {
fmt.Printf("unexpected import job, expect 1, but got %d\n", len(jobs))
return
}

targetJob := jobs[0]
targetPath := paths[0]
fmt.Printf("selected target import job, jobID: %d, key=%s\n", targetJob.GetJobID(), targetPath)
show.PrintDetailedImportJob(context.Background(), cli, basePath, targetJob, false)

if !run {
return
}

fmt.Printf("Start to delete import job...\n")
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
_, err = cli.Delete(ctx, targetPath)
cancel()
if err != nil {
fmt.Printf("failed to delete import job %d, error: %s\n", targetJob.GetJobID(), err.Error())
return
}
fmt.Printf("remove import job %d done\n", targetJob.GetJobID())
},
}

cmd.Flags().Bool("run", false, "flags indicating whether to remove import job from meta")
cmd.Flags().String("job", "", "import job id to remove")
return cmd
}
15 changes: 8 additions & 7 deletions states/etcd/show/bulkinsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"

"github.com/samber/lo"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/milvus-io/birdwatcher/framework"
"github.com/milvus-io/birdwatcher/proto/v2.2/datapb"
Expand All @@ -27,7 +28,7 @@ type ImportJobParam struct {

// BulkInsertCommand returns show bulkinsert command.
func (c *ComponentShow) BulkInsertCommand(ctx context.Context, p *ImportJobParam) error {
jobs, err := common.ListImportJobs(ctx, c.client, c.basePath, func(job *datapb.ImportJob) bool {
jobs, _, err := common.ListImportJobs(ctx, c.client, c.basePath, func(job *datapb.ImportJob) bool {
return (p.JobID == 0 || job.GetJobID() == p.JobID) &&
(p.CollectionID == 0 || job.GetCollectionID() == p.CollectionID) &&
(p.State == "" || strings.EqualFold(job.GetState().String(), p.State))
Expand All @@ -50,9 +51,9 @@ func (c *ComponentShow) BulkInsertCommand(ctx context.Context, p *ImportJobParam
fmt.Println("Please specify the job ID (-job={JobID}) to show detailed info.")
return nil
}
c.PrintDetailedImportJob(ctx, job, p.ShowAllFiles)
PrintDetailedImportJob(ctx, c.client, c.basePath, job, p.ShowAllFiles)
} else {
c.PrintSimpleImportJob(job)
PrintSimpleImportJob(job)
}
}
fmt.Printf("\n")
Expand All @@ -69,7 +70,7 @@ func (c *ComponentShow) BulkInsertCommand(ctx context.Context, p *ImportJobParam
return nil
}

func (c *ComponentShow) PrintSimpleImportJob(job *datapb.ImportJob) {
func PrintSimpleImportJob(job *datapb.ImportJob) {
str := fmt.Sprintf("JobID: %d DBID: %d CollectionID: %d State: %s StartTime: %s",
job.GetJobID(), job.GetDbID(), job.GetCollectionID(), job.State.String(), job.GetStartTime())
if job.GetState() == internalpb.ImportJobState_Failed {
Expand All @@ -81,16 +82,16 @@ func (c *ComponentShow) PrintSimpleImportJob(job *datapb.ImportJob) {
fmt.Println(str)
}

func (c *ComponentShow) PrintDetailedImportJob(ctx context.Context, job *datapb.ImportJob, showAllFiles bool) {
func PrintDetailedImportJob(ctx context.Context, client clientv3.KV, basePath string, job *datapb.ImportJob, showAllFiles bool) {
// Get job's tasks.
preimportTasks, err := common.ListPreImportTasks(ctx, c.client, c.basePath, func(task *datapb.PreImportTask) bool {
preimportTasks, err := common.ListPreImportTasks(ctx, client, basePath, func(task *datapb.PreImportTask) bool {
return task.GetJobID() == job.GetJobID()
})
if err != nil {
fmt.Println("failed to list preimport tasks, err=", err.Error())
return
}
importTasks, err := common.ListImportTasks(ctx, c.client, c.basePath, func(task *datapb.ImportTaskV2) bool {
importTasks, err := common.ListImportTasks(ctx, client, basePath, func(task *datapb.ImportTaskV2) bool {
return task.GetJobID() == job.GetJobID()
})
if err != nil {
Expand Down

0 comments on commit 0c6b1a9

Please sign in to comment.