From d662844a680d95f4d47bde933c72e995a56304a4 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 27 Jul 2023 17:32:58 +0800 Subject: [PATCH] Add listen events command (#171) Signed-off-by: Congqi Xia --- eventlog/event_log.pb.go | 290 +++++++++++++++++++++++++++++++++++++++ eventlog/event_log.proto | 26 ++++ eventlog/listener.go | 67 +++++++++ go.mod | 1 + models/session.go | 13 +- states/configuration.go | 85 +----------- states/frame_screen.go | 90 ++++++++++++ states/management.go | 231 +++++++++++++++++++++++++++++++ 8 files changed, 718 insertions(+), 85 deletions(-) create mode 100644 eventlog/event_log.pb.go create mode 100644 eventlog/event_log.proto create mode 100644 eventlog/listener.go create mode 100644 states/frame_screen.go create mode 100644 states/management.go diff --git a/eventlog/event_log.pb.go b/eventlog/event_log.pb.go new file mode 100644 index 0000000..f00cccc --- /dev/null +++ b/eventlog/event_log.pb.go @@ -0,0 +1,290 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: event_log.proto + +package eventlog + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type Level int32 + +const ( + Level_Undefined Level = 0 + Level_Debug Level = 1 + Level_Info Level = 2 + Level_Warn Level = 3 + Level_Error Level = 4 +) + +var Level_name = map[int32]string{ + 0: "Undefined", + 1: "Debug", + 2: "Info", + 3: "Warn", + 4: "Error", +} + +var Level_value = map[string]int32{ + "Undefined": 0, + "Debug": 1, + "Info": 2, + "Warn": 3, + "Error": 4, +} + +func (x Level) String() string { + return proto.EnumName(Level_name, int32(x)) +} + +func (Level) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_443313318a2fd90c, []int{0} +} + +type ListenRequest struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ListenRequest) Reset() { *m = ListenRequest{} } +func (m *ListenRequest) String() string { return proto.CompactTextString(m) } +func (*ListenRequest) ProtoMessage() {} +func (*ListenRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_443313318a2fd90c, []int{0} +} + +func (m *ListenRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ListenRequest.Unmarshal(m, b) +} +func (m *ListenRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ListenRequest.Marshal(b, m, deterministic) +} +func (m *ListenRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ListenRequest.Merge(m, src) +} +func (m *ListenRequest) XXX_Size() int { + return xxx_messageInfo_ListenRequest.Size(m) +} +func (m *ListenRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ListenRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ListenRequest proto.InternalMessageInfo + +type Event struct { + Level Level `protobuf:"varint,1,opt,name=level,proto3,enum=milvus.proto.eventlog.Level" json:"level,omitempty"` + Type int32 `protobuf:"varint,2,opt,name=type,proto3" json:"type,omitempty"` + Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` + Ts int64 `protobuf:"varint,4,opt,name=ts,proto3" json:"ts,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Event) Reset() { *m = Event{} } +func (m *Event) String() string { return proto.CompactTextString(m) } +func (*Event) ProtoMessage() {} +func (*Event) Descriptor() ([]byte, []int) { + return fileDescriptor_443313318a2fd90c, []int{1} +} + +func (m *Event) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Event.Unmarshal(m, b) +} +func (m *Event) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Event.Marshal(b, m, deterministic) +} +func (m *Event) XXX_Merge(src proto.Message) { + xxx_messageInfo_Event.Merge(m, src) +} +func (m *Event) XXX_Size() int { + return xxx_messageInfo_Event.Size(m) +} +func (m *Event) XXX_DiscardUnknown() { + xxx_messageInfo_Event.DiscardUnknown(m) +} + +var xxx_messageInfo_Event proto.InternalMessageInfo + +func (m *Event) GetLevel() Level { + if m != nil { + return m.Level + } + return Level_Undefined +} + +func (m *Event) GetType() int32 { + if m != nil { + return m.Type + } + return 0 +} + +func (m *Event) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +func (m *Event) GetTs() int64 { + if m != nil { + return m.Ts + } + return 0 +} + +func init() { + proto.RegisterEnum("milvus.proto.eventlog.Level", Level_name, Level_value) + proto.RegisterType((*ListenRequest)(nil), "milvus.proto.eventlog.ListenRequest") + proto.RegisterType((*Event)(nil), "milvus.proto.eventlog.Event") +} + +func init() { proto.RegisterFile("event_log.proto", fileDescriptor_443313318a2fd90c) } + +var fileDescriptor_443313318a2fd90c = []byte{ + // 280 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x8f, 0x4f, 0x4b, 0xc3, 0x40, + 0x14, 0xc4, 0xbb, 0xf9, 0x53, 0xec, 0xc3, 0x36, 0x61, 0x41, 0x08, 0xe2, 0x21, 0x04, 0x0f, 0x51, + 0x31, 0x91, 0xf8, 0x05, 0x44, 0xec, 0x41, 0xc8, 0x41, 0x22, 0x22, 0x78, 0x91, 0xfc, 0x79, 0x4d, + 0x17, 0xd2, 0xdd, 0xba, 0xbb, 0x89, 0xf8, 0xed, 0x25, 0x1b, 0x04, 0x0f, 0xed, 0x6d, 0x76, 0x67, + 0xde, 0x8f, 0x19, 0xf0, 0x70, 0x40, 0xae, 0x3f, 0x3b, 0xd1, 0x26, 0x7b, 0x29, 0xb4, 0xa0, 0x67, + 0x3b, 0xd6, 0x0d, 0xbd, 0x9a, 0x5e, 0x89, 0x71, 0x3b, 0xd1, 0x46, 0x1e, 0x2c, 0x73, 0xa6, 0x34, + 0xf2, 0x02, 0xbf, 0x7a, 0x54, 0x3a, 0x52, 0xe0, 0xae, 0x47, 0x93, 0x66, 0xe0, 0x76, 0x38, 0x60, + 0x17, 0x90, 0x90, 0xc4, 0xab, 0xec, 0x22, 0x39, 0x08, 0x48, 0xf2, 0x31, 0x53, 0x4c, 0x51, 0x4a, + 0xc1, 0xd1, 0x3f, 0x7b, 0x0c, 0xac, 0x90, 0xc4, 0x6e, 0x61, 0xf4, 0xf8, 0xd7, 0x94, 0xba, 0x0c, + 0xec, 0x90, 0xc4, 0xa7, 0x85, 0xd1, 0x74, 0x05, 0x96, 0x56, 0x81, 0x13, 0x92, 0xd8, 0x2e, 0x2c, + 0xad, 0xae, 0x1f, 0xc0, 0x35, 0x1c, 0xba, 0x84, 0xc5, 0x1b, 0x6f, 0x70, 0xc3, 0x38, 0x36, 0xfe, + 0x8c, 0x2e, 0xc0, 0x7d, 0xc2, 0xaa, 0x6f, 0x7d, 0x42, 0x4f, 0xc0, 0x79, 0xe6, 0x1b, 0xe1, 0x5b, + 0xa3, 0x7a, 0x2f, 0x25, 0xf7, 0xed, 0xd1, 0x5e, 0x4b, 0x29, 0xa4, 0xef, 0x64, 0x35, 0x78, 0xa6, + 0x76, 0x2e, 0xda, 0x57, 0x94, 0x03, 0xab, 0x91, 0xbe, 0xc0, 0x7c, 0x9a, 0x46, 0x2f, 0x8f, 0x75, + 0xff, 0xbf, 0xfc, 0xfc, 0xd8, 0x42, 0xc3, 0x8d, 0x66, 0x77, 0xe4, 0xf1, 0xe6, 0xe3, 0xaa, 0x65, + 0x7a, 0xdb, 0x57, 0x49, 0x2d, 0x76, 0xe9, 0x94, 0xbe, 0x65, 0x22, 0xad, 0x98, 0x6c, 0xbe, 0x4b, + 0x5d, 0x6f, 0x51, 0xa6, 0x7f, 0x67, 0xd5, 0xdc, 0x60, 0xee, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, + 0x4e, 0x13, 0x4d, 0xa6, 0x8a, 0x01, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// EventLogServiceClient is the client API for EventLogService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type EventLogServiceClient interface { + Listen(ctx context.Context, in *ListenRequest, opts ...grpc.CallOption) (EventLogService_ListenClient, error) +} + +type eventLogServiceClient struct { + cc *grpc.ClientConn +} + +func NewEventLogServiceClient(cc *grpc.ClientConn) EventLogServiceClient { + return &eventLogServiceClient{cc} +} + +func (c *eventLogServiceClient) Listen(ctx context.Context, in *ListenRequest, opts ...grpc.CallOption) (EventLogService_ListenClient, error) { + stream, err := c.cc.NewStream(ctx, &_EventLogService_serviceDesc.Streams[0], "/milvus.proto.eventlog.EventLogService/Listen", opts...) + if err != nil { + return nil, err + } + x := &eventLogServiceListenClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type EventLogService_ListenClient interface { + Recv() (*Event, error) + grpc.ClientStream +} + +type eventLogServiceListenClient struct { + grpc.ClientStream +} + +func (x *eventLogServiceListenClient) Recv() (*Event, error) { + m := new(Event) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// EventLogServiceServer is the server API for EventLogService service. +type EventLogServiceServer interface { + Listen(*ListenRequest, EventLogService_ListenServer) error +} + +// UnimplementedEventLogServiceServer can be embedded to have forward compatible implementations. +type UnimplementedEventLogServiceServer struct { +} + +func (*UnimplementedEventLogServiceServer) Listen(req *ListenRequest, srv EventLogService_ListenServer) error { + return status.Errorf(codes.Unimplemented, "method Listen not implemented") +} + +func RegisterEventLogServiceServer(s *grpc.Server, srv EventLogServiceServer) { + s.RegisterService(&_EventLogService_serviceDesc, srv) +} + +func _EventLogService_Listen_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(ListenRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(EventLogServiceServer).Listen(m, &eventLogServiceListenServer{stream}) +} + +type EventLogService_ListenServer interface { + Send(*Event) error + grpc.ServerStream +} + +type eventLogServiceListenServer struct { + grpc.ServerStream +} + +func (x *eventLogServiceListenServer) Send(m *Event) error { + return x.ServerStream.SendMsg(m) +} + +var _EventLogService_serviceDesc = grpc.ServiceDesc{ + ServiceName: "milvus.proto.eventlog.EventLogService", + HandlerType: (*EventLogServiceServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Listen", + Handler: _EventLogService_Listen_Handler, + ServerStreams: true, + }, + }, + Metadata: "event_log.proto", +} diff --git a/eventlog/event_log.proto b/eventlog/event_log.proto new file mode 100644 index 0000000..68bac4d --- /dev/null +++ b/eventlog/event_log.proto @@ -0,0 +1,26 @@ +syntax = "proto3"; +package milvus.proto.eventlog; + +option go_package = "github.com/milvus-io/birdwatcher/eventlog"; + +service EventLogService { + rpc Listen(ListenRequest) returns(stream Event) {} +} + +message ListenRequest { +} + +message Event { + Level level = 1; + int32 type = 2; + bytes data = 3; + int64 ts = 4; +} + +enum Level { + Undefined = 0; + Debug = 1; + Info = 2; + Warn = 3; + Error = 4; +} diff --git a/eventlog/listener.go b/eventlog/listener.go new file mode 100644 index 0000000..61730d8 --- /dev/null +++ b/eventlog/listener.go @@ -0,0 +1,67 @@ +package eventlog + +import ( + "context" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type Listener struct { + conn *grpc.ClientConn + client EventLogServiceClient + stream EventLogService_ListenClient + closed chan struct{} +} + +func NewListener(ctx context.Context, addr string) (*Listener, error) { + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + grpc.WithTimeout(time.Second), + } + + conn, err := grpc.DialContext(ctx, addr, opts...) + if err != nil { + return nil, err + } + + client := NewEventLogServiceClient(conn) + return &Listener{ + conn: conn, + client: client, + }, nil +} + +func (l *Listener) Start(ctx context.Context) (<-chan *Event, error) { + ch := make(chan *Event, 100) + s, err := l.client.Listen(ctx, &ListenRequest{}) + if err != nil { + return nil, err + } + + go func() { + defer close(ch) + for { + evt, err := s.Recv() + if err != nil { + return + } + select { + case ch <- evt: + case <-l.closed: + return + } + } + }() + + return ch, nil +} + +func (l *Listener) Stop() { + close(l.closed) + if l.stream != nil { + l.stream.CloseSend() + } +} diff --git a/go.mod b/go.mod index b408d89..d6b8299 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/c-bata/go-prompt v0.2.6 github.com/cockroachdb/errors v1.9.1 github.com/confluentinc/confluent-kafka-go v1.9.1 + github.com/fatih/color v1.7.0 github.com/gin-gonic/gin v1.9.1 github.com/golang/protobuf v1.5.2 github.com/gosuri/uilive v0.0.4 diff --git a/models/session.go b/models/session.go index 2497bbf..123442b 100644 --- a/models/session.go +++ b/models/session.go @@ -1,6 +1,9 @@ package models -import "fmt" +import ( + "fmt" + "net" +) // Session is the json model for milvus session struct in etcd. type Session struct { @@ -14,3 +17,11 @@ type Session struct { func (s Session) String() string { return fmt.Sprintf("Session:%s, ServerID: %d, Version: %s, Address: %s", s.ServerName, s.ServerID, s.Version, s.Address) } + +func (s Session) IP() string { + addr, err := net.ResolveTCPAddr("tcp", s.Address) + if err != nil { + return "" + } + return addr.IP.To4().String() +} diff --git a/states/configuration.go b/states/configuration.go index 8fcd43f..f4d0f0b 100644 --- a/states/configuration.go +++ b/states/configuration.go @@ -13,8 +13,6 @@ import ( querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb" rootcoordpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/rootcoordpb" "github.com/milvus-io/birdwatcher/states/etcd/common" - "github.com/spf13/cobra" - clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" ) @@ -61,6 +59,7 @@ func (s *InstanceState) GetConfigurationCommand(ctx context.Context, p *GetConfi client = indexpbv2.NewIndexNodeClient(conn) } if client == nil { + fmt.Println("client nil", session.String()) continue } @@ -88,85 +87,3 @@ func (s *InstanceState) GetConfigurationCommand(ctx context.Context, p *GetConfi } return nil } - -// GetConfigurationCommand returns command to iterate all online components and fetch configurations. -func GetConfigurationCommand(cli clientv3.KV, basePath string) *cobra.Command { - cmd := &cobra.Command{ - Use: "configurations", - Short: "iterate all online components and inspect configuration", - Run: func(cmd *cobra.Command, args []string) { - format, err := cmd.Flags().GetString("format") - if err != nil { - fmt.Println(err.Error()) - return - } - sessions, err := common.ListSessions(cli, basePath) - if err != nil { - fmt.Println(err.Error()) - return - } - - results := make(map[string]map[string]string) - - for _, session := range sessions { - opts := []grpc.DialOption{ - grpc.WithInsecure(), - grpc.WithBlock(), - grpc.WithTimeout(2 * time.Second), - } - - conn, err := grpc.DialContext(context.Background(), session.Address, opts...) - if err != nil { - fmt.Printf("failed to connect %s(%d), err: %s\n", session.ServerName, session.ServerID, err.Error()) - continue - } - var client configurationSource - switch strings.ToLower(session.ServerName) { - case "rootcoord": - client = rootcoordpbv2.NewRootCoordClient(conn) - case "datacoord": - client = datapbv2.NewDataCoordClient(conn) - case "indexcoord": - client = indexpbv2.NewIndexCoordClient(conn) - case "querycoord": - client = querypbv2.NewQueryCoordClient(conn) - case "datanode": - client = datapbv2.NewDataNodeClient(conn) - case "querynode": - client = querypbv2.NewQueryNodeClient(conn) - case "indexnode": - client = indexpbv2.NewIndexNodeClient(conn) - } - if client == nil { - continue - } - - configurations, err := getConfiguration(context.Background(), client, session.ServerID) - if err != nil { - continue - } - - results[fmt.Sprintf("%s-%d", session.ServerName, session.ServerID)] = common.KVListMap(configurations) - } - - switch strings.ToLower(format) { - case "json": - bs, _ := json.MarshalIndent(results, "", "\t") - fmt.Println(string(bs)) - case "line": - fallthrough - default: - for comp, configs := range results { - fmt.Println("Component", comp) - for key, value := range configs { - fmt.Printf("%s: %s\n", key, value) - } - } - } - - }, - } - - cmd.Flags().String("format", "line", "output format") - return cmd -} diff --git a/states/frame_screen.go b/states/frame_screen.go new file mode 100644 index 0000000..a1d99d0 --- /dev/null +++ b/states/frame_screen.go @@ -0,0 +1,90 @@ +package states + +import ( + "fmt" + "io" + "sync" + "time" + + "github.com/fatih/color" + "github.com/gosuri/uilive" + "github.com/milvus-io/birdwatcher/eventlog" + "go.uber.org/atomic" +) + +type FrameScreen struct { + display *uilive.Writer + lastPrint atomic.Int64 + lines []io.Writer + mut sync.Mutex +} + +func NewFrameScreen(lines int, display *uilive.Writer) *FrameScreen { + if lines <= 0 { + lines = 1 + } + ws := make([]io.Writer, lines) + ws[0] = display + for i := 1; i < lines; i++ { + ws[i] = display.Newline() + } + + return &FrameScreen{ + display: display, + lines: ws, + } +} + +var ( + colorPending = color.New(color.FgYellow) + colorReady = color.New(color.FgGreen) + + levelColor = map[eventlog.Level]*color.Color{ + eventlog.Level_Debug: color.New(color.FgGreen), + eventlog.Level_Info: color.New(color.FgBlue), + eventlog.Level_Warn: color.New(color.FgYellow), + eventlog.Level_Error: color.New(color.FgRed), + } +) + +func (s *FrameScreen) printEvent(evt *eventlog.Event) { + s.mut.Lock() + defer s.mut.Unlock() + + lvl := evt.GetLevel() + fmt.Printf("[%s][%s]%s\n", time.Unix(0, evt.GetTs()).Format("01/02 15:04:05"), levelColor[lvl].Sprint(lvl.String()), string(evt.Data)) +} + +func (s *FrameScreen) printEvents(display *uilive.Writer, m *sync.Map, events []*eventlog.Event) { + s.mut.Lock() + defer s.mut.Unlock() + t := time.Now() + last := time.Unix(0, s.lastPrint.Load()) + if t.Sub(last) < time.Millisecond*50 { + return + } + + s.lastPrint.Store(t.UnixNano()) + _, rcOk := m.Load("rootcoord") + rctext := colorPending.Sprint("Connecting") + if rcOk { + rctext = colorReady.Sprint(" Ready ") + } + _, qcOk := m.Load("querycoord") + qctext := colorPending.Sprintf("Connecting") + if qcOk { + qctext = colorReady.Sprintf(" Ready ") + } + fmt.Fprintf(display, fmt.Sprintf("RootCoord[%s] QueryCoord[%s]\n", rctext, qctext)) + + start := 0 + if len(events) > 10 { + start = len(events) - 10 + } + + for i := start; i < len(events); i++ { + evt := events[i] + lvl := evt.GetLevel() + fmt.Fprintf(s.lines[i+1], fmt.Sprintf("[%s][%s]%s\n", time.Unix(0, evt.GetTs()).Format("01/02 15:04:05"), levelColor[lvl].Sprint(lvl.String()), string(evt.Data))) + } +} diff --git a/states/management.go b/states/management.go new file mode 100644 index 0000000..700fa40 --- /dev/null +++ b/states/management.go @@ -0,0 +1,231 @@ +package states + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "sync" + "time" + + "github.com/cockroachdb/errors" + "github.com/milvus-io/birdwatcher/eventlog" + "github.com/milvus-io/birdwatcher/framework" + "github.com/milvus-io/birdwatcher/models" + commonpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/commonpb" + datapbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/datapb" + indexpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/indexpb" + querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb" + rootcoordpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/rootcoordpb" + "github.com/milvus-io/birdwatcher/states/etcd/common" + "github.com/samber/lo" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type ListMetricsPortParam struct { + framework.ParamBase `use:"list metrics-port" desc:"list metrics port for online components"` +} + +// ListMetricsPortCommand returns command logic listing metrics port for all online components. +func (s *InstanceState) ListMetricsPortCommand(ctx context.Context, p *ListMetricsPortParam) error { + sessions, err := common.ListSessions(s.client, s.basePath) + if err != nil { + return errors.Wrap(err, "failed to list sessions") + } + + for _, session := range sessions { + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + } + + conn, err := grpc.DialContext(context.Background(), session.Address, opts...) + if err != nil { + fmt.Printf("failed to connect to Server(%d) addr: %s, err: %s\n", session.ServerID, session.Address, err.Error()) + continue + } + + source := getConfigurationSource(session, conn) + if source == nil { + // fmt.Println("source nil", session.String()) + continue + } + items, _ := getConfiguration(ctx, source, session.ServerID) + for _, item := range items { + if item.GetKey() == "commonmetricsport" { + fmt.Println(session.ServerName, session.IP(), item.GetValue()) + } + } + + } + + return nil +} + +type ListenEventParam struct { + framework.ParamBase `use:"listen-events"` + Localhost bool `name:"localhost" default:"false" desc:"localhost components"` +} + +// ListenEventsCommand returns command logic listen events from grpc event logger. +func (s *InstanceState) ListenEventsCommand(ctx context.Context, p *ListenEventParam) error { + listeners, err := s.prepareListenerClients(ctx) + if err != nil { + return err + } + + var wg sync.WaitGroup + var mut sync.Mutex + wg.Add(len(listeners)) + + for _, listener := range listeners { + go func(listener *eventlog.Listener) { + defer wg.Done() + ch, err := listener.Start(ctx) + if err != nil { + fmt.Println(err.Error()) + return + } + for evt := range ch { + // screen.printEvent(event) + mut.Lock() + lvl := evt.GetLevel() + fmt.Printf("[%s][%s]%s\n", time.Unix(0, evt.GetTs()).Format("01/02 15:04:05"), levelColor[lvl].Sprint(lvl.String()), string(evt.Data)) + mut.Unlock() + } + }(listener) + } + + // block until cancel + <-ctx.Done() + wg.Wait() + return nil +} + +type portResp struct { + Status int `json:"status"` + Port int `json:"port"` +} + +func getEventLogPort(ctx context.Context, ip string, metricPort string) int { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://%s:%s/eventlog", ip, metricPort), nil) + if err != nil { + return -1 + } + + resp, err := http.DefaultClient.Do(req) + + if err != nil { + return -1 + } + bs, err := ioutil.ReadAll(resp.Body) + if err != nil { + return -1 + } + r := portResp{} + json.Unmarshal(bs, &r) + if r.Status != http.StatusOK { + return -1 + } + return r.Port +} + +func (s *InstanceState) prepareListenerClients(ctx context.Context) ([]*eventlog.Listener, error) { + sessions, err := common.ListSessions(s.client, s.basePath) + if err != nil { + return nil, errors.Wrap(err, "failed to list sessions") + } + + var m sync.Map + var wg sync.WaitGroup + wg.Add(len(sessions)) + + for _, session := range sessions { + go func(session *models.Session) { + defer wg.Done() + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + } + + conn, err := grpc.DialContext(ctx, session.Address, opts...) + if err != nil { + fmt.Printf("failed to connect to Server(%d) addr: %s, err: %s\n", session.ServerID, session.Address, err.Error()) + return + } + + // create configuration source + source := getConfigurationSource(session, conn) + if source == nil { + return + } + + // fetch configuration items from source + items, err := getConfiguration(ctx, source, session.ServerID) + if err != nil { + return + } + + items = lo.Filter(items, func(kv *commonpbv2.KeyValuePair, _ int) bool { + return kv.GetKey() == "commonmetricsport" + }) + + if len(items) != 1 { + return + } + + item := items[0] + ip := session.IP() + port := getEventLogPort(ctx, ip, item.GetValue()) + if port == -1 { + return + } + addr := fmt.Sprintf("%s:%d", ip, port) + + listener, err := eventlog.NewListener(ctx, addr) + if err != nil { + return + } + m.Store(addr, listener) + + }(session) + } + + wg.Wait() + + var listeners []*eventlog.Listener + m.Range(func(key, value any) bool { + listener := value.(*eventlog.Listener) + listeners = append(listeners, listener) + return true + }) + + return listeners, nil +} + +func getConfigurationSource(session *models.Session, conn *grpc.ClientConn) configurationSource { + var client configurationSource + switch session.ServerName { + case "datacoord": + client = datapbv2.NewDataCoordClient(conn) + case "datanode": + client = datapbv2.NewDataNodeClient(conn) + case "indexcoord": + client = indexpbv2.NewIndexCoordClient(conn) + case "indexnode": + client = indexpbv2.NewIndexNodeClient(conn) + case "querycoord": + client = querypbv2.NewQueryCoordClient(conn) + case "querynode": + client = querypbv2.NewQueryNodeClient(conn) + case "rootcoord": + client = rootcoordpbv2.NewRootCoordClient(conn) + // case "proxy": + //client:= milvuspb.NewMilvusServiceClient(conn) + //state.SetNext(getProxy) + case "milvus": + } + return client +}