From fe4efa2b1d4fcfd68ae76fc292fa66530d96f710 Mon Sep 17 00:00:00 2001
From: Congqi Xia <congqi.xia@zilliz.com>
Date: Tue, 18 Jun 2024 16:35:48 +0800
Subject: [PATCH] enhance: Use auto oss access detection for download-segment

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
---
 states/download_segment.go | 193 ++++++++++---------------------------
 states/instance.go         |   5 -
 states/minio.go            |   3 +
 3 files changed, 55 insertions(+), 146 deletions(-)

diff --git a/states/download_segment.go b/states/download_segment.go
index 3bb89f3..ddd05c3 100644
--- a/states/download_segment.go
+++ b/states/download_segment.go
@@ -8,142 +8,47 @@ import (
 	"io"
 	"os"
 	"path"
-	"strconv"
 	"time"
 
 	"github.com/manifoldco/promptui"
-	"github.com/milvus-io/birdwatcher/proto/v2.0/datapb"
-	"github.com/milvus-io/birdwatcher/proto/v2.0/indexpb"
+	"github.com/milvus-io/birdwatcher/framework"
+	"github.com/milvus-io/birdwatcher/models"
 	"github.com/milvus-io/birdwatcher/states/etcd/common"
+	etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
 	"github.com/minio/minio-go/v7"
-	"github.com/minio/minio-go/v7/pkg/credentials"
-	"github.com/spf13/cobra"
-	clientv3 "go.etcd.io/etcd/client/v3"
 )
 
-func getDownloadSegmentCmd(cli clientv3.KV, basePath string) *cobra.Command {
-	cmd := &cobra.Command{
-		Use:   "download-segment",
-		Short: "download segment file with provided segment id",
-		Run: func(cmd *cobra.Command, args []string) {
-			if len(args) == 0 {
-				fmt.Println("no segment id provided")
-				return
-			}
-
-			segSet := make(map[int64]struct{})
-			for _, arg := range args {
-				id, err := strconv.ParseInt(arg, 10, 64)
-				if err == nil {
-					//skip bad segment id for now
-					segSet[id] = struct{}{}
-				}
-			}
-
-			segments, err := common.ListSegments(cli, basePath, func(info *datapb.SegmentInfo) bool {
-				_, ok := segSet[info.ID]
-				return ok
-			})
-			if err != nil {
-				fmt.Println("failed to list segment info", err.Error())
-				return
-			}
-
-			minioClient, bucketName, err := getMinioAccess()
-			if err != nil {
-				fmt.Println("failed to get minio access", err.Error())
-				return
-			}
-
-			folder := fmt.Sprintf("dlsegment_%s", time.Now().Format("20060102150406"))
-			for _, segment := range segments {
-				common.FillFieldsIfV2(cli, basePath, segment)
-				downloadSegment(minioClient, bucketName, segment, nil, folder)
-			}
-
-		},
-	}
-
-	return cmd
+type DownloadSegmentParam struct {
+	framework.ParamBase `use:"download-segment" desc:"download segment file with provided segment id"`
+	MinioAddress        string `name:"minioAddr" default:"" desc:"the minio address to override, leave empty to use milvus.yaml value"`
+	SegmentID           int64  `name:"segment" default:"0" desc:"segment id to downloads"`
 }
 
-func getMinioWithInfo(addr string, ak, sk string, bucketName string, secure bool) (*minio.Client, string, error) {
-	cred := credentials.NewStaticV4(ak, sk, "")
-	minioClient, err := minio.New(addr, &minio.Options{
-		Creds:  cred,
-		Secure: secure,
+func (s *InstanceState) DownloadSegmentCommand(ctx context.Context, p *DownloadSegmentParam) error {
+	segments, err := common.ListSegmentsVersion(ctx, s.client, s.basePath, etcdversion.GetVersion(), func(s *models.Segment) bool {
+		return s.ID == p.SegmentID
 	})
 	if err != nil {
-		return nil, "", err
-	}
-	exists, err := minioClient.BucketExists(context.Background(), bucketName)
-	if !exists {
-		fmt.Printf("bucket %s not exists\n", bucketName)
-		return nil, "", err
-	}
-
-	if !exists {
-		fmt.Printf("Bucket not exist\n")
-		return nil, "", errors.New("bucket not exists")
-	}
-
-	return minioClient, bucketName, nil
-}
-
-func getMinioWithIAM(addr string, bucketName string, secure bool) (*minio.Client, string, error) {
-	cred := credentials.NewIAM("")
-	minioClient, err := minio.New(addr, &minio.Options{
-		Creds:  cred,
-		Secure: secure,
-	})
-	if err != nil {
-		return nil, "", err
-	}
-	exists, err := minioClient.BucketExists(context.Background(), bucketName)
-	if !exists {
-		fmt.Printf("bucket %s not exists\n", bucketName)
-		return nil, "", err
-	}
-
-	if !exists {
-		fmt.Printf("Bucket not exist\n")
-		return nil, "", errors.New("bucket not exists")
-	}
-
-	return minioClient, bucketName, nil
-}
-
-func getMinioAccess() (*minio.Client, string, error) {
-	p := promptui.Prompt{
-		Label: "BucketName",
-	}
-	bucketName, err := p.Run()
-	if err != nil {
-		return nil, "", err
+		return err
 	}
 
-	minioClient, err := getMinioClient()
+	minioClient, bucketName, _, err := s.GetMinioClientFromCfg(ctx, p.MinioAddress)
 	if err != nil {
-		fmt.Println("cannot get minio client", err.Error())
-		return nil, "", err
-
-	}
-	exists, err := minioClient.BucketExists(context.Background(), bucketName)
-	if !exists {
-		fmt.Printf("bucket %s not exists\n", bucketName)
-		return nil, "", err
+		return err
 	}
 
-	if !exists {
-		fmt.Printf("Bucket not exist\n")
-		return nil, "", errors.New("bucket not exists")
+	folder := fmt.Sprintf("dlsegment_%s", time.Now().Format("20060102150406"))
+	for _, segment := range segments {
+		err := s.downloadSegment(ctx, minioClient, bucketName, segment, folder)
+		if err != nil {
+			return err
+		}
 	}
 
-	return minioClient, bucketName, nil
+	return nil
 }
 
-func downloadSegment(cli *minio.Client, bucketName string, segment *datapb.SegmentInfo, indexMeta *indexpb.IndexMeta, folderPath string) error {
-
+func (s *InstanceState) downloadSegment(ctx context.Context, minioClient *minio.Client, bucketName string, segment *models.Segment, folderPath string) error {
 	p := path.Join(folderPath, fmt.Sprintf("%d", segment.ID))
 	if _, err := os.Stat(p); errors.Is(err, os.ErrNotExist) {
 		err := os.MkdirAll(p, os.ModePerm)
@@ -155,7 +60,7 @@ func downloadSegment(cli *minio.Client, bucketName string, segment *datapb.Segme
 
 	fmt.Printf("Downloading Segment: %d ...\n", segment.ID)
 
-	for _, fieldBinlog := range segment.Binlogs {
+	for _, fieldBinlog := range segment.GetBinlogs() {
 		folder := fmt.Sprintf("%s/%d", p, fieldBinlog.FieldID)
 		err := os.MkdirAll(folder, 0777)
 		if err != nil {
@@ -164,13 +69,13 @@ func downloadSegment(cli *minio.Client, bucketName string, segment *datapb.Segme
 		}
 
 		for _, binlog := range fieldBinlog.Binlogs {
-			obj, err := cli.GetObject(context.Background(), bucketName, binlog.GetLogPath(), minio.GetObjectOptions{})
+			obj, err := minioClient.GetObject(ctx, bucketName, binlog.LogPath, minio.GetObjectOptions{})
 			if err != nil {
-				fmt.Printf("failed to download file bucket=\"%s\", filePath = \"%s\", err: %s\n", bucketName, binlog.GetLogPath(), err.Error())
+				fmt.Printf("failed to download file bucket=\"%s\", filePath = \"%s\", err: %s\n", bucketName, binlog.LogPath, err.Error())
 				return err
 			}
 
-			name := path.Base(binlog.GetLogPath())
+			name := path.Base(binlog.LogPath)
 
 			f, err := os.Create(path.Join(folder, name))
 			if err != nil {
@@ -182,28 +87,34 @@ func downloadSegment(cli *minio.Client, bucketName string, segment *datapb.Segme
 			io.Copy(w, r)
 		}
 	}
+	return nil
+}
 
-	if indexMeta != nil {
-		fmt.Println("downloading index files ...")
-		folder := path.Join(p, "index")
-		for _, indexFile := range indexMeta.GetIndexFilePaths() {
-			obj, err := cli.GetObject(context.Background(), bucketName, indexFile, minio.GetObjectOptions{})
-			if err != nil {
-				fmt.Println("failed to download file", bucketName, indexFile)
-				//index not affect segment download result
-				continue
-			}
+func getMinioAccess() (*minio.Client, string, error) {
+	p := promptui.Prompt{
+		Label: "BucketName",
+	}
+	bucketName, err := p.Run()
+	if err != nil {
+		return nil, "", err
+	}
+
+	minioClient, err := getMinioClient()
+	if err != nil {
+		fmt.Println("cannot get minio client", err.Error())
+		return nil, "", err
 
-			name := path.Base(indexFile)
-			f, err := os.Create(path.Join(folder, name))
-			if err != nil {
-				fmt.Println("failed to create index file")
-				continue
-			}
-			w := bufio.NewWriter(f)
-			r := bufio.NewReader(obj)
-			io.Copy(w, r)
-		}
 	}
-	return nil
+	exists, err := minioClient.BucketExists(context.Background(), bucketName)
+	if !exists {
+		fmt.Printf("bucket %s not exists\n", bucketName)
+		return nil, "", err
+	}
+
+	if !exists {
+		fmt.Printf("Bucket not exist\n")
+		return nil, "", errors.New("bucket not exists")
+	}
+
+	return minioClient, bucketName, nil
 }
diff --git a/states/instance.go b/states/instance.go
index 25a322a..52b98e9 100644
--- a/states/instance.go
+++ b/states/instance.go
@@ -56,8 +56,6 @@ func (s *InstanceState) SetupCommands() {
 	)
 
 	cmd.AddCommand(
-		// download-segment
-		getDownloadSegmentCmd(cli, basePath),
 		// show [subcommand] options...
 		showCmd,
 		// repair [subcommand] options...
@@ -94,9 +92,6 @@ func (s *InstanceState) SetupCommands() {
 		// probe
 		GetProbeCmd(cli, basePath),
 
-		// set current-version
-		SetCurrentVersionCommand(),
-
 		// remove-segment-by-id
 		//removeSegmentByID(cli, basePath),
 		// garbage-collect
diff --git a/states/minio.go b/states/minio.go
index b4a37ac..272f6e1 100644
--- a/states/minio.go
+++ b/states/minio.go
@@ -94,6 +94,9 @@ func (s *InstanceState) GetMinioClientFromCfg(ctx context.Context, minioAddr str
 			useSSL = config.GetValue()
 		}
 	}
+	if minioAddr != "" {
+		addr = minioAddr
+	}
 
 	mp := oss.MinioClientParam{
 		CloudProvider: cloudProvider,