From af72ef7f55e07981634eb51cc74f42a2eb470870 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 27 May 2024 17:11:37 +0800 Subject: [PATCH] enhance: Make pprof fetching run in parallel (#264) Fetching pprof may cause lots of time especially when the type is set to `profile` since the birdwatcher will fetch each component for 30 seconds This PR make this procedure run in parallel to improve the performance of `pprof` command --------- Signed-off-by: Congqi Xia --- states/download_pk.go | 8 ++-- states/etcd_backup.go | 4 ++ states/instance.go | 4 +- states/pprof.go | 92 ++++++++++++++++++++++++++++++++----------- states/visit.go | 2 +- 5 files changed, 78 insertions(+), 32 deletions(-) diff --git a/states/download_pk.go b/states/download_pk.go index 6b62e4c..aae09c1 100644 --- a/states/download_pk.go +++ b/states/download_pk.go @@ -73,13 +73,11 @@ func getDownloadPKCmd(cli clientv3.KV, basePath string) *cobra.Command { return nil } exists, err := minioClient.BucketExists(context.Background(), bucketName) - if !exists { - fmt.Printf("bucket %s not exists\n", bucketName) - return nil + if err != nil { + return err } - if !exists { - fmt.Printf("Bucket not exist\n") + fmt.Printf("bucket %s not exists\n", bucketName) return nil } diff --git a/states/etcd_backup.go b/states/etcd_backup.go index 2dfff17..18fddb6 100644 --- a/states/etcd_backup.go +++ b/states/etcd_backup.go @@ -109,6 +109,10 @@ func getBackupEtcdCmd(cli clientv3.KV, basePath string) *cobra.Command { // write backup header // version 2 used for now err = writeBackupHeader(w, 2) + if err != nil { + fmt.Println("write backup header failed: ", err.Error()) + return + } err = backupEtcdV2(cli, basePath, prefix, w, opt) if err != nil { diff --git a/states/instance.go b/states/instance.go index 21986df..38e7cd0 100644 --- a/states/instance.go +++ b/states/instance.go @@ -107,7 +107,7 @@ func (s *InstanceState) SetupCommands() { // fetch-metrics getFetchMetricsCmd(cli, basePath), // dry-mode - getDryModeCmd(cli, s, s.etcdState), + getDryModeCmd(s, s.etcdState), etcd.DownloadCommand(cli, basePath), ) @@ -118,7 +118,7 @@ func (s *InstanceState) SetupCommands() { } // getDryModeCmd enter dry-mode -func getDryModeCmd(cli clientv3.KV, state *InstanceState, etcdState State) *cobra.Command { +func getDryModeCmd(state *InstanceState, etcdState State) *cobra.Command { cmd := &cobra.Command{ Use: "dry-mode", Short: "enter dry mode to select instance", diff --git a/states/pprof.go b/states/pprof.go index c32c23e..64efe9a 100644 --- a/states/pprof.go +++ b/states/pprof.go @@ -8,10 +8,12 @@ import ( "io" "net/http" "os" + "sync" "time" "github.com/cockroachdb/errors" "github.com/milvus-io/birdwatcher/framework" + "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/states/etcd/common" ) @@ -51,36 +53,78 @@ func (s *InstanceState) GetPprofCommand(ctx context.Context, p *PprofParam) erro tw := tar.NewWriter(gw) defer tw.Close() - for _, session := range sessions { - addr := session.IP() - // TODO add auto detection from configuration API - url := fmt.Sprintf("http://%s:%d/debug/pprof/%s?debug=0", addr, p.Port, p.Type) - - // #nosec - resp, err := http.Get(url) - if err != nil { - return err - } + type pprofResult struct { + session *models.Session + data []byte + err error + } - bs, err := io.ReadAll(resp.Body) - if err != nil { - return err + ch := make(chan pprofResult, len(sessions)) + signal := make(chan error, 1) + + go func() { + for result := range ch { + if result.err != nil { + fmt.Println() + } + session := result.session + tw.WriteHeader(&tar.Header{ + Typeflag: tar.TypeReg, + + Name: fmt.Sprintf("%s_%d_%s", session.ServerName, session.ServerID, p.Type), + Size: int64(len(result.data)), + Mode: 0600, + }) + + _, err = tw.Write(result.data) + if err != nil { + signal <- err + } + + fmt.Printf("%s pprof from %s-%d fetched, added into archive file\n", p.Type, session.ServerName, session.ServerID) } + close(signal) + }() - tw.WriteHeader(&tar.Header{ - Typeflag: tar.TypeReg, - - Name: fmt.Sprintf("%s_%d_%s", session.ServerName, session.ServerID, p.Type), - Size: int64(len(bs)), - Mode: 0600, - }) + wg := sync.WaitGroup{} + wg.Add(len(sessions)) + for _, session := range sessions { + go func(session *models.Session) { + defer wg.Done() + + result := pprofResult{ + session: session, + } + addr := session.IP() + // TODO add auto detection from configuration API + url := fmt.Sprintf("http://%s:%d/debug/pprof/%s?debug=0", addr, p.Port, p.Type) + + // #nosec + resp, err := http.Get(url) + if err != nil { + result.err = err + ch <- result + return + } + + bs, err := io.ReadAll(resp.Body) + if err != nil { + result.err = err + ch <- result + return + } + + result.data = bs + ch <- result + }(session) + } + wg.Wait() + close(ch) - _, err = tw.Write(bs) + for err := range signal { if err != nil { - return err + fmt.Println("failed to write pprof:", err.Error()) } - - fmt.Printf("%s pprof from %s-%d fetched, added into archive file\n", p.Type, session.ServerName, session.ServerID) } fmt.Printf("pprof metrics fetch done, write to archive file %s\n", filePath) diff --git a/states/visit.go b/states/visit.go index 2b03b62..e918b1e 100644 --- a/states/visit.go +++ b/states/visit.go @@ -181,7 +181,7 @@ func getMetrics(ctx context.Context, client metricsSource) (string, error) { } func getConfiguration(ctx context.Context, client configurationSource, id int64) ([]*commonpbv2.KeyValuePair, error) { - resp, err := client.ShowConfigurations(context.Background(), &internalpbv2.ShowConfigurationsRequest{ + resp, err := client.ShowConfigurations(ctx, &internalpbv2.ShowConfigurationsRequest{ Base: &commonpbv2.MsgBase{ SourceID: -1, TargetID: id,