Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Use auto oss access detection for download-segment #274

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 52 additions & 141 deletions states/download_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,142 +8,47 @@ import (
"io"
"os"
"path"
"strconv"
"time"

"github.com/manifoldco/promptui"
"github.com/milvus-io/birdwatcher/proto/v2.0/datapb"
"github.com/milvus-io/birdwatcher/proto/v2.0/indexpb"
"github.com/milvus-io/birdwatcher/framework"
"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/states/etcd/common"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
)

func getDownloadSegmentCmd(cli clientv3.KV, basePath string) *cobra.Command {
cmd := &cobra.Command{
Use: "download-segment",
Short: "download segment file with provided segment id",
Run: func(cmd *cobra.Command, args []string) {
if len(args) == 0 {
fmt.Println("no segment id provided")
return
}

segSet := make(map[int64]struct{})
for _, arg := range args {
id, err := strconv.ParseInt(arg, 10, 64)
if err == nil {
//skip bad segment id for now
segSet[id] = struct{}{}
}
}

segments, err := common.ListSegments(cli, basePath, func(info *datapb.SegmentInfo) bool {
_, ok := segSet[info.ID]
return ok
})
if err != nil {
fmt.Println("failed to list segment info", err.Error())
return
}

minioClient, bucketName, err := getMinioAccess()
if err != nil {
fmt.Println("failed to get minio access", err.Error())
return
}

folder := fmt.Sprintf("dlsegment_%s", time.Now().Format("20060102150406"))
for _, segment := range segments {
common.FillFieldsIfV2(cli, basePath, segment)
downloadSegment(minioClient, bucketName, segment, nil, folder)
}

},
}

return cmd
type DownloadSegmentParam struct {
framework.ParamBase `use:"download-segment" desc:"download segment file with provided segment id"`
MinioAddress string `name:"minioAddr" default:"" desc:"the minio address to override, leave empty to use milvus.yaml value"`
SegmentID int64 `name:"segment" default:"0" desc:"segment id to downloads"`
}

func getMinioWithInfo(addr string, ak, sk string, bucketName string, secure bool) (*minio.Client, string, error) {
cred := credentials.NewStaticV4(ak, sk, "")
minioClient, err := minio.New(addr, &minio.Options{
Creds: cred,
Secure: secure,
func (s *InstanceState) DownloadSegmentCommand(ctx context.Context, p *DownloadSegmentParam) error {
segments, err := common.ListSegmentsVersion(ctx, s.client, s.basePath, etcdversion.GetVersion(), func(s *models.Segment) bool {
return s.ID == p.SegmentID
})
if err != nil {
return nil, "", err
}
exists, err := minioClient.BucketExists(context.Background(), bucketName)
if !exists {
fmt.Printf("bucket %s not exists\n", bucketName)
return nil, "", err
}

if !exists {
fmt.Printf("Bucket not exist\n")
return nil, "", errors.New("bucket not exists")
}

return minioClient, bucketName, nil
}

func getMinioWithIAM(addr string, bucketName string, secure bool) (*minio.Client, string, error) {
cred := credentials.NewIAM("")
minioClient, err := minio.New(addr, &minio.Options{
Creds: cred,
Secure: secure,
})
if err != nil {
return nil, "", err
}
exists, err := minioClient.BucketExists(context.Background(), bucketName)
if !exists {
fmt.Printf("bucket %s not exists\n", bucketName)
return nil, "", err
}

if !exists {
fmt.Printf("Bucket not exist\n")
return nil, "", errors.New("bucket not exists")
}

return minioClient, bucketName, nil
}

func getMinioAccess() (*minio.Client, string, error) {
p := promptui.Prompt{
Label: "BucketName",
}
bucketName, err := p.Run()
if err != nil {
return nil, "", err
return err
}

minioClient, err := getMinioClient()
minioClient, bucketName, _, err := s.GetMinioClientFromCfg(ctx, p.MinioAddress)
if err != nil {
fmt.Println("cannot get minio client", err.Error())
return nil, "", err

}
exists, err := minioClient.BucketExists(context.Background(), bucketName)
if !exists {
fmt.Printf("bucket %s not exists\n", bucketName)
return nil, "", err
return err
}

if !exists {
fmt.Printf("Bucket not exist\n")
return nil, "", errors.New("bucket not exists")
folder := fmt.Sprintf("dlsegment_%s", time.Now().Format("20060102150406"))
for _, segment := range segments {
err := s.downloadSegment(ctx, minioClient, bucketName, segment, folder)
if err != nil {
return err
}
}

return minioClient, bucketName, nil
return nil
}

func downloadSegment(cli *minio.Client, bucketName string, segment *datapb.SegmentInfo, indexMeta *indexpb.IndexMeta, folderPath string) error {

func (s *InstanceState) downloadSegment(ctx context.Context, minioClient *minio.Client, bucketName string, segment *models.Segment, folderPath string) error {
p := path.Join(folderPath, fmt.Sprintf("%d", segment.ID))
if _, err := os.Stat(p); errors.Is(err, os.ErrNotExist) {
err := os.MkdirAll(p, os.ModePerm)
Expand All @@ -155,7 +60,7 @@ func downloadSegment(cli *minio.Client, bucketName string, segment *datapb.Segme

fmt.Printf("Downloading Segment: %d ...\n", segment.ID)

for _, fieldBinlog := range segment.Binlogs {
for _, fieldBinlog := range segment.GetBinlogs() {
folder := fmt.Sprintf("%s/%d", p, fieldBinlog.FieldID)
err := os.MkdirAll(folder, 0777)
if err != nil {
Expand All @@ -164,13 +69,13 @@ func downloadSegment(cli *minio.Client, bucketName string, segment *datapb.Segme
}

for _, binlog := range fieldBinlog.Binlogs {
obj, err := cli.GetObject(context.Background(), bucketName, binlog.GetLogPath(), minio.GetObjectOptions{})
obj, err := minioClient.GetObject(ctx, bucketName, binlog.LogPath, minio.GetObjectOptions{})
if err != nil {
fmt.Printf("failed to download file bucket=\"%s\", filePath = \"%s\", err: %s\n", bucketName, binlog.GetLogPath(), err.Error())
fmt.Printf("failed to download file bucket=\"%s\", filePath = \"%s\", err: %s\n", bucketName, binlog.LogPath, err.Error())
return err
}

name := path.Base(binlog.GetLogPath())
name := path.Base(binlog.LogPath)

f, err := os.Create(path.Join(folder, name))
if err != nil {
Expand All @@ -182,28 +87,34 @@ func downloadSegment(cli *minio.Client, bucketName string, segment *datapb.Segme
io.Copy(w, r)
}
}
return nil
}

if indexMeta != nil {
fmt.Println("downloading index files ...")
folder := path.Join(p, "index")
for _, indexFile := range indexMeta.GetIndexFilePaths() {
obj, err := cli.GetObject(context.Background(), bucketName, indexFile, minio.GetObjectOptions{})
if err != nil {
fmt.Println("failed to download file", bucketName, indexFile)
//index not affect segment download result
continue
}
func getMinioAccess() (*minio.Client, string, error) {
p := promptui.Prompt{
Label: "BucketName",
}
bucketName, err := p.Run()
if err != nil {
return nil, "", err
}

minioClient, err := getMinioClient()
if err != nil {
fmt.Println("cannot get minio client", err.Error())
return nil, "", err

name := path.Base(indexFile)
f, err := os.Create(path.Join(folder, name))
if err != nil {
fmt.Println("failed to create index file")
continue
}
w := bufio.NewWriter(f)
r := bufio.NewReader(obj)
io.Copy(w, r)
}
}
return nil
exists, err := minioClient.BucketExists(context.Background(), bucketName)
if !exists {
fmt.Printf("bucket %s not exists\n", bucketName)
return nil, "", err
}

if !exists {
fmt.Printf("Bucket not exist\n")
return nil, "", errors.New("bucket not exists")
}

return minioClient, bucketName, nil
}
5 changes: 0 additions & 5 deletions states/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ func (s *InstanceState) SetupCommands() {
)

cmd.AddCommand(
// download-segment
getDownloadSegmentCmd(cli, basePath),
// show [subcommand] options...
showCmd,
// repair [subcommand] options...
Expand Down Expand Up @@ -94,9 +92,6 @@ func (s *InstanceState) SetupCommands() {
// probe
GetProbeCmd(cli, basePath),

// set current-version
SetCurrentVersionCommand(),

// remove-segment-by-id
//removeSegmentByID(cli, basePath),
// garbage-collect
Expand Down
3 changes: 3 additions & 0 deletions states/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ func (s *InstanceState) GetMinioClientFromCfg(ctx context.Context, minioAddr str
useSSL = config.GetValue()
}
}
if minioAddr != "" {
addr = minioAddr
}

mp := oss.MinioClientParam{
CloudProvider: cloudProvider,
Expand Down
Loading