Skip to content

Commit

Permalink
enhance: Make pprof fetching run in parallel (#264)
Browse files Browse the repository at this point in the history
Fetching pprof may cause lots of time especially when the type is set to
`profile` since the birdwatcher will fetch each component for 30 seconds

This PR make this procedure run in parallel to improve the performance
of `pprof` command

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored May 27, 2024
1 parent 0d596cb commit af72ef7
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 32 deletions.
8 changes: 3 additions & 5 deletions states/download_pk.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,11 @@ func getDownloadPKCmd(cli clientv3.KV, basePath string) *cobra.Command {
return nil
}
exists, err := minioClient.BucketExists(context.Background(), bucketName)
if !exists {
fmt.Printf("bucket %s not exists\n", bucketName)
return nil
if err != nil {
return err
}

if !exists {
fmt.Printf("Bucket not exist\n")
fmt.Printf("bucket %s not exists\n", bucketName)
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions states/etcd_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func getBackupEtcdCmd(cli clientv3.KV, basePath string) *cobra.Command {
// write backup header
// version 2 used for now
err = writeBackupHeader(w, 2)
if err != nil {
fmt.Println("write backup header failed: ", err.Error())
return
}

err = backupEtcdV2(cli, basePath, prefix, w, opt)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions states/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (s *InstanceState) SetupCommands() {
// fetch-metrics
getFetchMetricsCmd(cli, basePath),
// dry-mode
getDryModeCmd(cli, s, s.etcdState),
getDryModeCmd(s, s.etcdState),
etcd.DownloadCommand(cli, basePath),
)

Expand All @@ -118,7 +118,7 @@ func (s *InstanceState) SetupCommands() {
}

// getDryModeCmd enter dry-mode
func getDryModeCmd(cli clientv3.KV, state *InstanceState, etcdState State) *cobra.Command {
func getDryModeCmd(state *InstanceState, etcdState State) *cobra.Command {
cmd := &cobra.Command{
Use: "dry-mode",
Short: "enter dry mode to select instance",
Expand Down
92 changes: 68 additions & 24 deletions states/pprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"io"
"net/http"
"os"
"sync"
"time"

"github.com/cockroachdb/errors"
"github.com/milvus-io/birdwatcher/framework"
"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/states/etcd/common"
)

Expand Down Expand Up @@ -51,36 +53,78 @@ func (s *InstanceState) GetPprofCommand(ctx context.Context, p *PprofParam) erro
tw := tar.NewWriter(gw)
defer tw.Close()

for _, session := range sessions {
addr := session.IP()
// TODO add auto detection from configuration API
url := fmt.Sprintf("http://%s:%d/debug/pprof/%s?debug=0", addr, p.Port, p.Type)

// #nosec
resp, err := http.Get(url)
if err != nil {
return err
}
type pprofResult struct {
session *models.Session
data []byte
err error
}

bs, err := io.ReadAll(resp.Body)
if err != nil {
return err
ch := make(chan pprofResult, len(sessions))
signal := make(chan error, 1)

go func() {
for result := range ch {
if result.err != nil {
fmt.Println()
}
session := result.session
tw.WriteHeader(&tar.Header{
Typeflag: tar.TypeReg,

Name: fmt.Sprintf("%s_%d_%s", session.ServerName, session.ServerID, p.Type),
Size: int64(len(result.data)),
Mode: 0600,
})

_, err = tw.Write(result.data)
if err != nil {
signal <- err
}

fmt.Printf("%s pprof from %s-%d fetched, added into archive file\n", p.Type, session.ServerName, session.ServerID)
}
close(signal)
}()

tw.WriteHeader(&tar.Header{
Typeflag: tar.TypeReg,

Name: fmt.Sprintf("%s_%d_%s", session.ServerName, session.ServerID, p.Type),
Size: int64(len(bs)),
Mode: 0600,
})
wg := sync.WaitGroup{}
wg.Add(len(sessions))
for _, session := range sessions {
go func(session *models.Session) {
defer wg.Done()

result := pprofResult{
session: session,
}
addr := session.IP()
// TODO add auto detection from configuration API
url := fmt.Sprintf("http://%s:%d/debug/pprof/%s?debug=0", addr, p.Port, p.Type)

// #nosec
resp, err := http.Get(url)
if err != nil {
result.err = err
ch <- result
return
}

bs, err := io.ReadAll(resp.Body)
if err != nil {
result.err = err
ch <- result
return
}

result.data = bs
ch <- result
}(session)
}
wg.Wait()
close(ch)

_, err = tw.Write(bs)
for err := range signal {
if err != nil {
return err
fmt.Println("failed to write pprof:", err.Error())
}

fmt.Printf("%s pprof from %s-%d fetched, added into archive file\n", p.Type, session.ServerName, session.ServerID)
}

fmt.Printf("pprof metrics fetch done, write to archive file %s\n", filePath)
Expand Down
2 changes: 1 addition & 1 deletion states/visit.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func getMetrics(ctx context.Context, client metricsSource) (string, error) {
}

func getConfiguration(ctx context.Context, client configurationSource, id int64) ([]*commonpbv2.KeyValuePair, error) {
resp, err := client.ShowConfigurations(context.Background(), &internalpbv2.ShowConfigurationsRequest{
resp, err := client.ShowConfigurations(ctx, &internalpbv2.ShowConfigurationsRequest{
Base: &commonpbv2.MsgBase{
SourceID: -1,
TargetID: id,
Expand Down

0 comments on commit af72ef7

Please sign in to comment.