diff --git a/framework/state.go b/framework/state.go index e5d1708..8db7a78 100644 --- a/framework/state.go +++ b/framework/state.go @@ -18,9 +18,9 @@ type State interface { Ctx() (context.Context, context.CancelFunc) Label() string Process(cmd string) (State, error) - CanProcess(cmd string) bool Close() SetNext(state State) + NextState() State Suggestions(input string) map[string]string SetupCommands() IsEnding() bool @@ -97,7 +97,7 @@ func (s *CmdState) SetupCommands() { } } -// mergeFunctionCommands parses all member methods for provided state and add it into cmd. +// MergeFunctionCommands parses all member methods for provided state and add it into cmd. func (s *CmdState) MergeFunctionCommands(cmd *cobra.Command, state State) { items := parseFunctionCommands(state) for _, item := range items { @@ -115,6 +115,17 @@ func (s *CmdState) MergeFunctionCommands(cmd *cobra.Command, state State) { } } +func (s *CmdState) MergeCobraCommands(base *cobra.Command, cmds ...*cobra.Command) { + for _, cmd := range cmds { + target, _, err := base.Find([]string{cmd.Use}) + if err != nil || (target != nil && target.Use == base.Use) { + base.AddCommand(cmd) + continue + } + s.MergeCobraCommands(target, cmd.Commands()...) + } +} + // Label returns the display label for current cli. func (s *CmdState) Label() string { return s.label @@ -157,17 +168,15 @@ func (s *CmdState) Process(cmd string) (State, error) { return s, nil } -func (s *CmdState) CanProcess(cmd string) bool { - args := strings.Split(cmd, " ") - target, _, err := s.RootCmd.Find(args) - return target != nil && err == nil -} - // SetNext simple method to set next state. func (s *CmdState) SetNext(state State) { s.nextState = state } +func (s *CmdState) NextState() State { + return s.nextState +} + // Close empty method to implement State. func (s *CmdState) Close() {} diff --git a/states/app_state.go b/states/app_state.go index b413cf8..9ede242 100644 --- a/states/app_state.go +++ b/states/app_state.go @@ -2,6 +2,7 @@ package states import ( "context" + "fmt" "strings" "github.com/milvus-io/birdwatcher/configs" @@ -41,27 +42,20 @@ func (app *ApplicationState) Label() string { } func (app *ApplicationState) Process(cmd string) (framework.State, error) { + app.config.Logger.Println("[INFO] begin to process command", cmd) + app.core.Process(cmd) + // perform sub state transfer for key, state := range app.states { - if !state.CanProcess(cmd) { - continue - } - next, err := state.Process(cmd) - if err != nil { - return nil, err + next := state.NextState() + if next != nil { + state.SetNext(nil) + app.states[key] = next } - - app.states[key] = next - return app, nil } - app.core.Process(cmd) return app, nil } -func (app *ApplicationState) CanProcess(cmd string) bool { - return true -} - func (app *ApplicationState) Close() { for _, state := range app.states { state.Close() @@ -69,7 +63,11 @@ func (app *ApplicationState) Close() { } func (app *ApplicationState) SetNext(state framework.State) { - app.config.Logger.Println("SetNext called for ApplicationState, which is not expected.") + app.config.Logger.Println("[WARNING] SetNext called for ApplicationState, which is not expected.") +} + +func (app *ApplicationState) NextState() framework.State { + return app } func (app *ApplicationState) SetTagNext(tag string, state framework.State) { @@ -93,8 +91,8 @@ func (app *ApplicationState) Suggestions(input string) map[string]string { // initialize or reset command after execution. func (app *ApplicationState) SetupCommands() { cmd := app.core.GetCmd() - app.core.UpdateState(cmd, app, app.SetupCommands) + app.core.UpdateState(cmd, app, app.SetupCommands) for _, state := range app.states { state.SetupCommands() } @@ -126,3 +124,23 @@ type exitParam struct { func (app *ApplicationState) ExitCommand(ctx context.Context, _ *exitParam) { app.SetTagNext("exit", &exitState{}) } + +type debugParam struct { + framework.ParamBase `use:"debug commands" desc:"debug current command tree"` +} + +func (app *ApplicationState) DebugCommand(ctx context.Context, p *debugParam) { + for _, cmd := range app.core.RootCmd.Commands() { + app.printCommands(cmd, 0) + } +} + +func (app *ApplicationState) printCommands(cmd *cobra.Command, level int) { + for i := 0; i < level; i++ { + fmt.Print("\t") + } + fmt.Println(cmd.Use) + for _, subCmd := range cmd.Commands() { + app.printCommands(subCmd, level+1) + } +} diff --git a/states/balance_explain.go b/states/balance_explain.go index 2a101d5..01a5320 100644 --- a/states/balance_explain.go +++ b/states/balance_explain.go @@ -15,6 +15,7 @@ import ( "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) const ( @@ -98,12 +99,19 @@ func getAllQueryNodeDistributions(queryNodes []*models.Session) []*querypbv2.Get distributions := make([]*querypbv2.GetDataDistributionResponse, 0) for _, queryNode := range queryNodes { opts := []grpc.DialOption{ - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), - grpc.WithTimeout(2 * time.Second), } - conn, err := grpc.DialContext(context.Background(), queryNode.Address, opts...) + var conn *grpc.ClientConn + var err error + func() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + conn, err = grpc.DialContext(ctx, queryNode.Address, opts...) + }() + if err != nil { fmt.Printf("failed to connect %s(%d), err: %s\n", queryNode.ServerName, queryNode.ServerID, err.Error()) continue diff --git a/states/configuration.go b/states/configuration.go index f4d0f0b..8653bf3 100644 --- a/states/configuration.go +++ b/states/configuration.go @@ -14,11 +14,13 @@ import ( rootcoordpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/rootcoordpb" "github.com/milvus-io/birdwatcher/states/etcd/common" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) type GetConfigurationParam struct { framework.ParamBase `use:"show configurations" desc:"iterate all online components and inspect configuration"` Format string `name:"format" default:"line" desc:"output format"` + DialTimeout int64 `name:"dialTimeout" default:"2" desc:"grpc dial timeout in seconds"` } func (s *InstanceState) GetConfigurationCommand(ctx context.Context, p *GetConfigurationParam) error { @@ -31,16 +33,23 @@ func (s *InstanceState) GetConfigurationCommand(ctx context.Context, p *GetConfi for _, session := range sessions { opts := []grpc.DialOption{ - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), - grpc.WithTimeout(2 * time.Second), } - conn, err := grpc.DialContext(ctx, session.Address, opts...) + var conn *grpc.ClientConn + var err error + func() { + dialCtx, cancel := context.WithTimeout(ctx, time.Duration(p.DialTimeout)*time.Second) + defer cancel() + + conn, err = grpc.DialContext(dialCtx, session.Address, opts...) + }() if err != nil { fmt.Printf("failed to connect %s(%d), err: %s\n", session.ServerName, session.ServerID, err.Error()) continue } + var client configurationSource switch strings.ToLower(session.ServerName) { case "rootcoord": @@ -63,7 +72,7 @@ func (s *InstanceState) GetConfigurationCommand(ctx context.Context, p *GetConfi continue } - configurations, err := getConfiguration(context.Background(), client, session.ServerID) + configurations, err := getConfiguration(ctx, client, session.ServerID) if err != nil { continue } diff --git a/states/current_version.go b/states/current_version.go index f655112..f42121e 100644 --- a/states/current_version.go +++ b/states/current_version.go @@ -8,18 +8,15 @@ import ( "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/models" etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" - "github.com/spf13/cobra" ) -// CurrentVersionCommand returns command for show current-version. -func CurrentVersionCommand() *cobra.Command { - cmd := &cobra.Command{ - Use: "current-version", - Run: func(_ *cobra.Command, args []string) { - fmt.Println("Current Version:", etcdversion.GetVersion()) - }, - } - return cmd +type ShowCurrentVersionParam struct { + framework.ParamBase `use:"show current-version" desc:"display current Milvus Meta data version"` +} + +// ShowCurrentVersionCommand returns command for show current-version. +func (app *ApplicationState) ShowCurrentVersionCommand(ctx context.Context, p *ShowCurrentVersionParam) { + fmt.Println("Current Version:", etcdversion.GetVersion()) } type setCurrentVersionParam struct { @@ -35,7 +32,7 @@ func (p *setCurrentVersionParam) ParseArgs(args []string) error { return nil } -func (s *InstanceState) SetCurrentVersionCommand(ctx context.Context, param setCurrentVersionParam) { +func (app *ApplicationState) SetCurrentVersionCommand(ctx context.Context, param setCurrentVersionParam) { switch param.newVersion { case models.LTEVersion2_1: fallthrough @@ -49,38 +46,3 @@ func (s *InstanceState) SetCurrentVersionCommand(ctx context.Context, param setC fmt.Println("Invalid version string:", param.newVersion) } } - -// SetCurrentVersionCommand returns command for set current-version. -func SetCurrentVersionCommand() *cobra.Command { - cmd := &cobra.Command{ - Use: "set", - } - - subCmd := &cobra.Command{ - Use: "current-version", - Run: func(_ *cobra.Command, args []string) { - if len(args) != 1 { - fmt.Println("invalid parameter numbers") - return - } - - newVersion := args[0] - switch newVersion { - case models.LTEVersion2_1: - fallthrough - case "LTEVersion2_1": - etcdversion.SetVersion(models.LTEVersion2_1) - case models.GTEVersion2_2: - fallthrough - case "GTEVersion2_2": - etcdversion.SetVersion(models.GTEVersion2_2) - default: - fmt.Println("Invalid version string:", newVersion) - } - }, - } - - cmd.AddCommand(subCmd) - - return cmd -} diff --git a/states/distribution.go b/states/distribution.go index bde9c97..16b2bf4 100644 --- a/states/distribution.go +++ b/states/distribution.go @@ -11,6 +11,7 @@ import ( "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) // GetDistributionCommand returns command to iterate all querynodes to list distribution. @@ -30,12 +31,18 @@ func GetDistributionCommand(cli clientv3.KV, basePath string) *cobra.Command { for _, session := range sessions { opts := []grpc.DialOption{ - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), - grpc.WithTimeout(2 * time.Second), } - conn, err := grpc.DialContext(context.Background(), session.Address, opts...) + var conn *grpc.ClientConn + var err error + func() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + conn, err = grpc.DialContext(ctx, session.Address, opts...) + }() if err != nil { fmt.Printf("failed to connect %s(%d), err: %s\n", session.ServerName, session.ServerID, err.Error()) continue diff --git a/states/etcd_backup.go b/states/etcd_backup.go index 2dfff17..b5d4e49 100644 --- a/states/etcd_backup.go +++ b/states/etcd_backup.go @@ -7,7 +7,6 @@ import ( "context" "encoding/binary" "encoding/json" - "errors" "fmt" "io" "os" @@ -15,8 +14,10 @@ import ( "strings" "time" + "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" "github.com/gosuri/uilive" + "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/proto/v2.0/commonpb" "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" @@ -29,10 +30,9 @@ import ( querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb" rootcoordpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/rootcoordpb" "github.com/milvus-io/birdwatcher/states/etcd/common" - "github.com/spf13/cobra" - "github.com/spf13/pflag" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) type milvusComponent string @@ -69,62 +69,62 @@ func (c *milvusComponent) Type() string { return "MilvusComponent" } -// getBackupEtcdCmd returns command for backup etcd -// usage: backup [component] [options...] -func getBackupEtcdCmd(cli clientv3.KV, basePath string) *cobra.Command { - - component := compAll - cmd := &cobra.Command{ - Use: "backup", - Short: "backup etcd key-values", - Run: func(cmd *cobra.Command, args []string) { - opt := &backupOption{} - if err := opt.getArgs(cmd.Flags()); err != nil { - fmt.Println(err.Error()) - return - } +type BackupParam struct { + framework.ParamBase `use:"backup" desc:"backup etcd key-values"` + // Component string `name:""` + component milvusComponent + IgnoreRevision bool `name:"ignoreRevision" default:"false" desc:"backup ignore revision change, ONLY shall works with no nodes online"` + BatchSize int64 `name:"batchSize" default:"100" desc:"batch fetch size for etcd backup operation"` +} - prefix := "" - switch component { - case compAll: - prefix = "" - case compQueryCoord: - prefix = `queryCoord-` - default: - fmt.Printf("component %s not supported for separate backup, use ALL instead\n", component) - return - } +func (p *BackupParam) ParseArgs(args []string) error { + if len(args) == 0 { + p.component.Set("ALL") + return nil + } - f, err := getBackupFile(component.String()) - if err != nil { - fmt.Println("failed to open backup file:", err.Error()) - return - } - defer f.Close() + return p.component.Set(args[0]) +} - gw := gzip.NewWriter(f) - defer gw.Close() - w := bufio.NewWriter(gw) +// getBackupEtcdCmd returns command for backup etcd +// usage: backup [component] [options...] +func (s *InstanceState) BackupCommand(ctx context.Context, p *BackupParam) error { + prefix := "" + switch p.component { + case compAll: + prefix = "" + case compQueryCoord: + prefix = `queryCoord-` + default: + return fmt.Errorf("component %s not supported for separate backup, use ALL instead", p.component.String()) + } - // write backup header - // version 2 used for now - err = writeBackupHeader(w, 2) + f, err := getBackupFile(p.component.String()) + if err != nil { + return errors.Wrap(err, "failed to open backup file") + } + defer f.Close() - err = backupEtcdV2(cli, basePath, prefix, w, opt) - if err != nil { - fmt.Printf("backup etcd failed, error: %v\n", err) - } - backupMetrics(cli, basePath, w) - backupConfiguration(cli, basePath, w) - backupAppMetrics(cli, basePath, w) - fmt.Printf("backup for prefix done, stored in file: %s\n", f.Name()) - }, + gw := gzip.NewWriter(f) + defer gw.Close() + w := bufio.NewWriter(gw) + + // write backup header + // version 2 used for now + err = writeBackupHeader(w, 2) + if err != nil { + return errors.Wrap(err, "failed to write backup file header") } - cmd.Flags().Var(&component, "ALL", "component to backup") - cmd.Flags().Bool("ignoreRevision", false, "backup ignore revision change, ONLY shall works with no nodes online") - cmd.Flags().Int("batchSize", 100, "batch fetch size for etcd backup operation") - return cmd + err = backupEtcdV2(s.client, s.basePath, prefix, w, p) + if err != nil { + fmt.Printf("backup etcd failed, error: %v\n", err) + } + backupMetrics(s.client, s.basePath, w) + backupConfiguration(s.client, s.basePath, w) + backupAppMetrics(s.client, s.basePath, w) + fmt.Printf("backup for prefix done, stored in file: %s\n", f.Name()) + return nil } func getBackupFile(component string) (*os.File, error) { @@ -151,22 +151,7 @@ func writeBackupHeader(w io.Writer, version int32) error { return nil } -type backupOption struct { - ignoreRevision bool - batchSize int -} - -func (opt *backupOption) getArgs(fs *pflag.FlagSet) error { - var err error - opt.ignoreRevision, err = fs.GetBool("ignoreRevision") - if err != nil { - return err - } - opt.batchSize, err = fs.GetInt("batchSize") - return err -} - -func backupEtcdV2(cli clientv3.KV, base, prefix string, w *bufio.Writer, opt *backupOption) error { +func backupEtcdV2(cli clientv3.KV, base, prefix string, w *bufio.Writer, opt *BackupParam) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() resp, err := cli.Get(ctx, path.Join(base, prefix), clientv3.WithCountOnly(), clientv3.WithPrefix()) @@ -174,7 +159,7 @@ func backupEtcdV2(cli clientv3.KV, base, prefix string, w *bufio.Writer, opt *ba return err } - if opt.ignoreRevision { + if opt.IgnoreRevision { fmt.Println("WARNING!!! doing backup ignore revision! please make sure no instance of milvus is online!") } @@ -214,8 +199,8 @@ func backupEtcdV2(cli clientv3.KV, base, prefix string, w *bufio.Writer, opt *ba progressDisplay.Start() fmt.Fprintf(progressDisplay, progressFmt, 0, 0, cnt) - options := []clientv3.OpOption{clientv3.WithFromKey(), clientv3.WithLimit(int64(opt.batchSize))} - if !opt.ignoreRevision { + options := []clientv3.OpOption{clientv3.WithFromKey(), clientv3.WithLimit(opt.BatchSize)} + if !opt.IgnoreRevision { options = append(options, clientv3.WithRev(rev)) } @@ -323,12 +308,18 @@ func backupAppMetrics(cli clientv3.KV, basePath string, w *bufio.Writer) error { for _, session := range sessions { opts := []grpc.DialOption{ - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), - grpc.WithTimeout(2 * time.Second), } - conn, err := grpc.DialContext(context.Background(), session.Address, opts...) + var conn *grpc.ClientConn + var err error + func() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + conn, err = grpc.DialContext(ctx, session.Address, opts...) + }() if err != nil { fmt.Printf("failed to connect %s(%d), err: %s\n", session.ServerName, session.ServerID, err.Error()) continue @@ -393,7 +384,7 @@ func backupConfiguration(cli clientv3.KV, basePath string, w *bufio.Writer) erro for _, session := range sessions { opts := []grpc.DialOption{ - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), grpc.WithTimeout(2 * time.Second), } diff --git a/states/instance.go b/states/instance.go index 55be278..8b49b09 100644 --- a/states/instance.go +++ b/states/instance.go @@ -1,6 +1,7 @@ package states import ( + "context" "fmt" "os" "path" @@ -11,7 +12,6 @@ import ( "github.com/milvus-io/birdwatcher/states/etcd/audit" "github.com/milvus-io/birdwatcher/states/etcd/remove" "github.com/milvus-io/birdwatcher/states/etcd/show" - "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -47,13 +47,11 @@ func (s *InstanceState) SetupCommands() { showCmd := etcd.ShowCommand(cli, basePath) showCmd.AddCommand( - // show current-version - CurrentVersionCommand(), // show segment-loaded-grpc GetDistributionCommand(cli, basePath), ) - cmd.AddCommand( + s.MergeCobraCommands(cmd, // download-segment getDownloadSegmentCmd(cli, basePath), // show [subcommand] options... @@ -67,8 +65,6 @@ func (s *InstanceState) SetupCommands() { // restore [subcommand] options... // etcd.RestoreCommand(cli, basePath), - // backup [component] - getBackupEtcdCmd(cli, basePath), // kill --component [component] --id [id] getEtcdKillCmd(cli, basePath), // force-release @@ -94,9 +90,6 @@ func (s *InstanceState) SetupCommands() { // probe GetProbeCmd(cli, basePath), - // set current-version - SetCurrentVersionCommand(), - // remove-segment-by-id //removeSegmentByID(cli, basePath), // garbage-collect @@ -108,8 +101,6 @@ func (s *InstanceState) SetupCommands() { getWebCmd(s, cli, basePath), // fetch-metrics getFetchMetricsCmd(cli, basePath), - // dry-mode - getDryModeCmd(cli, s, s.etcdState), ) //cmd.AddCommand(etcd.RawCommands(cli)...) @@ -117,16 +108,13 @@ func (s *InstanceState) SetupCommands() { s.UpdateState(cmd, s, s.SetupCommands) } -// getDryModeCmd enter dry-mode -func getDryModeCmd(cli clientv3.KV, state *InstanceState, etcdState framework.State) *cobra.Command { - cmd := &cobra.Command{ - Use: "dry-mode", - Short: "enter dry mode to select instance", - Run: func(*cobra.Command, []string) { - state.SetNext(etcdState) - }, - } - return cmd +type DryModeParam struct { + framework.ParamBase `use:"dry-mode" desc:"enter dry mode to select instance"` +} + +// DryModeCommand implement `dry-mode` command to enter etcd "dry mode". +func (s *InstanceState) DryModeCommand(ctx context.Context, p *DryModeParam) { + s.SetNext(s.etcdState) } func getInstanceState(parent *framework.CmdState, cli clientv3.KV, instanceName, metaPath string, etcdState framework.State, config *configs.Config) framework.State { diff --git a/states/management.go b/states/management.go index 700fa40..7a9a255 100644 --- a/states/management.go +++ b/states/management.go @@ -4,7 +4,7 @@ import ( "context" "encoding/json" "fmt" - "io/ioutil" + "io" "net/http" "sync" "time" @@ -26,6 +26,7 @@ import ( type ListMetricsPortParam struct { framework.ParamBase `use:"list metrics-port" desc:"list metrics port for online components"` + DialTimeout int64 `name:"dialTimeout" default:"2" desc:"grpc dial timeout in seconds"` } // ListMetricsPortCommand returns command logic listing metrics port for all online components. @@ -41,7 +42,13 @@ func (s *InstanceState) ListMetricsPortCommand(ctx context.Context, p *ListMetri grpc.WithBlock(), } - conn, err := grpc.DialContext(context.Background(), session.Address, opts...) + var conn *grpc.ClientConn + var err error + func() { + dialCtx, cancel := context.WithTimeout(ctx, time.Second*2) + defer cancel() + conn, err = grpc.DialContext(dialCtx, session.Address, opts...) + }() if err != nil { fmt.Printf("failed to connect to Server(%d) addr: %s, err: %s\n", session.ServerID, session.Address, err.Error()) continue @@ -49,7 +56,6 @@ func (s *InstanceState) ListMetricsPortCommand(ctx context.Context, p *ListMetri source := getConfigurationSource(session, conn) if source == nil { - // fmt.Println("source nil", session.String()) continue } items, _ := getConfiguration(ctx, source, session.ServerID) @@ -58,7 +64,6 @@ func (s *InstanceState) ListMetricsPortCommand(ctx context.Context, p *ListMetri fmt.Println(session.ServerName, session.IP(), item.GetValue()) } } - } return nil @@ -120,7 +125,7 @@ func getEventLogPort(ctx context.Context, ip string, metricPort string) int { if err != nil { return -1 } - bs, err := ioutil.ReadAll(resp.Body) + bs, err := io.ReadAll(resp.Body) if err != nil { return -1 } diff --git a/states/probe.go b/states/probe.go index ed8c0a0..04e3720 100644 --- a/states/probe.go +++ b/states/probe.go @@ -28,6 +28,7 @@ import ( "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) func GetProbeCmd(cli clientv3.KV, basePath string) *cobra.Command { @@ -330,12 +331,18 @@ func getQueryCoordClient(sessions []*models.Session) (querypbv2.QueryCoordClient } opts := []grpc.DialOption{ - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), - grpc.WithTimeout(2 * time.Second), } - conn, err := grpc.DialContext(context.Background(), session.Address, opts...) + var conn *grpc.ClientConn + var err error + func() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + conn, err = grpc.DialContext(ctx, session.Address, opts...) + }() if err != nil { fmt.Printf("failed to connect %s(%d), err: %s\n", session.ServerName, session.ServerID, err.Error()) continue @@ -357,12 +364,18 @@ func getQueryNodeClients(sessions []*models.Session) (map[int64]querypbv2.QueryN continue } opts := []grpc.DialOption{ - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), - grpc.WithTimeout(2 * time.Second), } - conn, err := grpc.DialContext(context.Background(), session.Address, opts...) + var conn *grpc.ClientConn + var err error + func() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + conn, err = grpc.DialContext(ctx, session.Address, opts...) + }() if err != nil { fmt.Printf("failed to connect %s(%d), err: %s\n", session.ServerName, session.ServerID, err.Error()) continue