Skip to content

Commit

Permalink
Fix ApplicationState sub state transfer (#177)
Browse files Browse the repository at this point in the history
Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Aug 8, 2023
1 parent 7737b6a commit 155a239
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 187 deletions.
25 changes: 17 additions & 8 deletions framework/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ type State interface {
Ctx() (context.Context, context.CancelFunc)
Label() string
Process(cmd string) (State, error)
CanProcess(cmd string) bool
Close()
SetNext(state State)
NextState() State
Suggestions(input string) map[string]string
SetupCommands()
IsEnding() bool
Expand Down Expand Up @@ -97,7 +97,7 @@ func (s *CmdState) SetupCommands() {
}
}

// mergeFunctionCommands parses all member methods for provided state and add it into cmd.
// MergeFunctionCommands parses all member methods for provided state and add it into cmd.
func (s *CmdState) MergeFunctionCommands(cmd *cobra.Command, state State) {
items := parseFunctionCommands(state)
for _, item := range items {
Expand All @@ -115,6 +115,17 @@ func (s *CmdState) MergeFunctionCommands(cmd *cobra.Command, state State) {
}
}

func (s *CmdState) MergeCobraCommands(base *cobra.Command, cmds ...*cobra.Command) {
for _, cmd := range cmds {
target, _, err := base.Find([]string{cmd.Use})
if err != nil || (target != nil && target.Use == base.Use) {
base.AddCommand(cmd)
continue
}
s.MergeCobraCommands(target, cmd.Commands()...)
}
}

// Label returns the display label for current cli.
func (s *CmdState) Label() string {
return s.label
Expand Down Expand Up @@ -157,17 +168,15 @@ func (s *CmdState) Process(cmd string) (State, error) {
return s, nil
}

func (s *CmdState) CanProcess(cmd string) bool {
args := strings.Split(cmd, " ")
target, _, err := s.RootCmd.Find(args)
return target != nil && err == nil
}

// SetNext simple method to set next state.
func (s *CmdState) SetNext(state State) {
s.nextState = state
}

func (s *CmdState) NextState() State {
return s.nextState
}

// Close empty method to implement State.
func (s *CmdState) Close() {}

Expand Down
50 changes: 34 additions & 16 deletions states/app_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package states

import (
"context"
"fmt"
"strings"

"github.com/milvus-io/birdwatcher/configs"
Expand Down Expand Up @@ -41,35 +42,32 @@ func (app *ApplicationState) Label() string {
}

func (app *ApplicationState) Process(cmd string) (framework.State, error) {
app.config.Logger.Println("[INFO] begin to process command", cmd)
app.core.Process(cmd)
// perform sub state transfer
for key, state := range app.states {
if !state.CanProcess(cmd) {
continue
}
next, err := state.Process(cmd)
if err != nil {
return nil, err
next := state.NextState()
if next != nil {
state.SetNext(nil)
app.states[key] = next
}

app.states[key] = next
return app, nil
}
app.core.Process(cmd)

return app, nil
}

func (app *ApplicationState) CanProcess(cmd string) bool {
return true
}

func (app *ApplicationState) Close() {
for _, state := range app.states {
state.Close()
}
}

func (app *ApplicationState) SetNext(state framework.State) {
app.config.Logger.Println("SetNext called for ApplicationState, which is not expected.")
app.config.Logger.Println("[WARNING] SetNext called for ApplicationState, which is not expected.")
}

func (app *ApplicationState) NextState() framework.State {
return app
}

func (app *ApplicationState) SetTagNext(tag string, state framework.State) {
Expand All @@ -93,8 +91,8 @@ func (app *ApplicationState) Suggestions(input string) map[string]string {
// initialize or reset command after execution.
func (app *ApplicationState) SetupCommands() {
cmd := app.core.GetCmd()
app.core.UpdateState(cmd, app, app.SetupCommands)

app.core.UpdateState(cmd, app, app.SetupCommands)
for _, state := range app.states {
state.SetupCommands()
}
Expand Down Expand Up @@ -126,3 +124,23 @@ type exitParam struct {
func (app *ApplicationState) ExitCommand(ctx context.Context, _ *exitParam) {
app.SetTagNext("exit", &exitState{})
}

type debugParam struct {
framework.ParamBase `use:"debug commands" desc:"debug current command tree"`
}

func (app *ApplicationState) DebugCommand(ctx context.Context, p *debugParam) {
for _, cmd := range app.core.RootCmd.Commands() {
app.printCommands(cmd, 0)
}
}

func (app *ApplicationState) printCommands(cmd *cobra.Command, level int) {
for i := 0; i < level; i++ {
fmt.Print("\t")
}
fmt.Println(cmd.Use)
for _, subCmd := range cmd.Commands() {
app.printCommands(subCmd, level+1)
}
}
14 changes: 11 additions & 3 deletions states/balance_explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const (
Expand Down Expand Up @@ -98,12 +99,19 @@ func getAllQueryNodeDistributions(queryNodes []*models.Session) []*querypbv2.Get
distributions := make([]*querypbv2.GetDataDistributionResponse, 0)
for _, queryNode := range queryNodes {
opts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
grpc.WithTimeout(2 * time.Second),
}

conn, err := grpc.DialContext(context.Background(), queryNode.Address, opts...)
var conn *grpc.ClientConn
var err error
func() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

conn, err = grpc.DialContext(ctx, queryNode.Address, opts...)
}()

if err != nil {
fmt.Printf("failed to connect %s(%d), err: %s\n", queryNode.ServerName, queryNode.ServerID, err.Error())
continue
Expand Down
17 changes: 13 additions & 4 deletions states/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import (
rootcoordpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/rootcoordpb"
"github.com/milvus-io/birdwatcher/states/etcd/common"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type GetConfigurationParam struct {
framework.ParamBase `use:"show configurations" desc:"iterate all online components and inspect configuration"`
Format string `name:"format" default:"line" desc:"output format"`
DialTimeout int64 `name:"dialTimeout" default:"2" desc:"grpc dial timeout in seconds"`
}

func (s *InstanceState) GetConfigurationCommand(ctx context.Context, p *GetConfigurationParam) error {
Expand All @@ -31,16 +33,23 @@ func (s *InstanceState) GetConfigurationCommand(ctx context.Context, p *GetConfi

for _, session := range sessions {
opts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
grpc.WithTimeout(2 * time.Second),
}

conn, err := grpc.DialContext(ctx, session.Address, opts...)
var conn *grpc.ClientConn
var err error
func() {
dialCtx, cancel := context.WithTimeout(ctx, time.Duration(p.DialTimeout)*time.Second)
defer cancel()

conn, err = grpc.DialContext(dialCtx, session.Address, opts...)
}()
if err != nil {
fmt.Printf("failed to connect %s(%d), err: %s\n", session.ServerName, session.ServerID, err.Error())
continue
}

var client configurationSource
switch strings.ToLower(session.ServerName) {
case "rootcoord":
Expand All @@ -63,7 +72,7 @@ func (s *InstanceState) GetConfigurationCommand(ctx context.Context, p *GetConfi
continue
}

configurations, err := getConfiguration(context.Background(), client, session.ServerID)
configurations, err := getConfiguration(ctx, client, session.ServerID)
if err != nil {
continue
}
Expand Down
54 changes: 8 additions & 46 deletions states/current_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,15 @@ import (
"github.com/milvus-io/birdwatcher/framework"
"github.com/milvus-io/birdwatcher/models"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
"github.com/spf13/cobra"
)

// CurrentVersionCommand returns command for show current-version.
func CurrentVersionCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "current-version",
Run: func(_ *cobra.Command, args []string) {
fmt.Println("Current Version:", etcdversion.GetVersion())
},
}
return cmd
type ShowCurrentVersionParam struct {
framework.ParamBase `use:"show current-version" desc:"display current Milvus Meta data version"`
}

// ShowCurrentVersionCommand returns command for show current-version.
func (app *ApplicationState) ShowCurrentVersionCommand(ctx context.Context, p *ShowCurrentVersionParam) {
fmt.Println("Current Version:", etcdversion.GetVersion())
}

type setCurrentVersionParam struct {
Expand All @@ -35,7 +32,7 @@ func (p *setCurrentVersionParam) ParseArgs(args []string) error {
return nil
}

func (s *InstanceState) SetCurrentVersionCommand(ctx context.Context, param setCurrentVersionParam) {
func (app *ApplicationState) SetCurrentVersionCommand(ctx context.Context, param setCurrentVersionParam) {
switch param.newVersion {
case models.LTEVersion2_1:
fallthrough
Expand All @@ -49,38 +46,3 @@ func (s *InstanceState) SetCurrentVersionCommand(ctx context.Context, param setC
fmt.Println("Invalid version string:", param.newVersion)
}
}

// SetCurrentVersionCommand returns command for set current-version.
func SetCurrentVersionCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "set",
}

subCmd := &cobra.Command{
Use: "current-version",
Run: func(_ *cobra.Command, args []string) {
if len(args) != 1 {
fmt.Println("invalid parameter numbers")
return
}

newVersion := args[0]
switch newVersion {
case models.LTEVersion2_1:
fallthrough
case "LTEVersion2_1":
etcdversion.SetVersion(models.LTEVersion2_1)
case models.GTEVersion2_2:
fallthrough
case "GTEVersion2_2":
etcdversion.SetVersion(models.GTEVersion2_2)
default:
fmt.Println("Invalid version string:", newVersion)
}
},
}

cmd.AddCommand(subCmd)

return cmd
}
13 changes: 10 additions & 3 deletions states/distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

// GetDistributionCommand returns command to iterate all querynodes to list distribution.
Expand All @@ -30,12 +31,18 @@ func GetDistributionCommand(cli clientv3.KV, basePath string) *cobra.Command {

for _, session := range sessions {
opts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
grpc.WithTimeout(2 * time.Second),
}

conn, err := grpc.DialContext(context.Background(), session.Address, opts...)
var conn *grpc.ClientConn
var err error
func() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

conn, err = grpc.DialContext(ctx, session.Address, opts...)
}()
if err != nil {
fmt.Printf("failed to connect %s(%d), err: %s\n", session.ServerName, session.ServerID, err.Error())
continue
Expand Down
Loading

0 comments on commit 155a239

Please sign in to comment.