From 2e171e34c8203057e3aa8ad667c39ce7d98a0949 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Tue, 15 Oct 2024 18:06:31 +0800 Subject: [PATCH] enhance: enable check_qn_collection_leak Signed-off-by: Wei Liu --- models/metrics_info.go | 196 ++++++++++++++++++ models/quota_metrics.go | 73 +++++++ models/topology.go | 119 +++++++++++ states/etcd/commands.go | 2 + .../etcd/repair/check_qn_collection_leak.go | 146 +++++++++++++ 5 files changed, 536 insertions(+) create mode 100644 models/metrics_info.go create mode 100644 models/quota_metrics.go create mode 100644 models/topology.go create mode 100644 states/etcd/repair/check_qn_collection_leak.go diff --git a/models/metrics_info.go b/models/metrics_info.go new file mode 100644 index 0000000..4392d64 --- /dev/null +++ b/models/metrics_info.go @@ -0,0 +1,196 @@ +package models + +import ( + "encoding/json" +) + +// ComponentInfos defines the interface of all component infos +type ComponentInfos interface{} + +// MarshalComponentInfos returns the json string of ComponentInfos +func MarshalComponentInfos(infos ComponentInfos) (string, error) { + binary, err := json.Marshal(infos) + return string(binary), err +} + +// UnmarshalComponentInfos constructs a ComponentInfos object using a json string +func UnmarshalComponentInfos(s string, infos ComponentInfos) error { + return json.Unmarshal([]byte(s), infos) +} + +// HardwareMetrics records the hardware information of nodes. +type HardwareMetrics struct { + IP string `json:"ip"` + CPUCoreCount int `json:"cpu_core_count"` + CPUCoreUsage float64 `json:"cpu_core_usage"` + Memory uint64 `json:"memory"` + MemoryUsage uint64 `json:"memory_usage"` + + // how to metric disk & disk usage in distributed storage + Disk uint64 `json:"disk"` + DiskUsage uint64 `json:"disk_usage"` +} + +const ( + // GitCommitEnvKey defines the key to retrieve the commit corresponding to the current milvus version + // from the metrics information + GitCommitEnvKey = "MILVUS_GIT_COMMIT" + + // DeployModeEnvKey defines the key to retrieve the current milvus deployment mode + // from the metrics information + DeployModeEnvKey = "DEPLOY_MODE" + + // ClusterDeployMode represents distributed deployment mode + ClusterDeployMode = "DISTRIBUTED" + + // StandaloneDeployMode represents the stand-alone deployment mode + StandaloneDeployMode = "STANDALONE" + + // GitBuildTagsEnvKey build tag + GitBuildTagsEnvKey = "MILVUS_GIT_BUILD_TAGS" + + // MilvusBuildTimeEnvKey build time + MilvusBuildTimeEnvKey = "MILVUS_BUILD_TIME" + + // MilvusUsedGoVersion used go version + MilvusUsedGoVersion = "MILVUS_USED_GO_VERSION" +) + +// DeployMetrics records the deploy information of nodes. +type DeployMetrics struct { + SystemVersion string `json:"system_version"` + DeployMode string `json:"deploy_mode"` + BuildVersion string `json:"build_version"` + BuildTime string `json:"build_time"` + UsedGoVersion string `json:"used_go_version"` +} + +// BaseComponentInfos contains basic information that all components should have. +type BaseComponentInfos struct { + HasError bool `json:"has_error"` + ErrorReason string `json:"error_reason"` + Name string `json:"name"` + HardwareInfos HardwareMetrics `json:"hardware_infos"` + SystemInfo DeployMetrics `json:"system_info"` + CreatedTime string `json:"created_time"` + UpdatedTime string `json:"updated_time"` + Type string `json:"type"` + ID int64 `json:"id"` +} + +// QueryNodeConfiguration records the configuration of QueryNode. +type QueryNodeConfiguration struct { + SimdType string `json:"simd_type"` +} + +type QueryNodeCollectionMetrics struct { + CollectionRows map[int64]int64 +} + +// QueryNodeInfos implements ComponentInfos +type QueryNodeInfos struct { + BaseComponentInfos + SystemConfigurations QueryNodeConfiguration `json:"system_configurations"` + QuotaMetrics *QueryNodeQuotaMetrics `json:"quota_metrics"` + CollectionMetrics *QueryNodeCollectionMetrics `json:"collection_metrics"` +} + +// QueryCoordConfiguration records the configuration of QueryCoord. +type QueryCoordConfiguration struct { + SearchChannelPrefix string `json:"search_channel_prefix"` + SearchResultChannelPrefix string `json:"search_result_channel_prefix"` +} + +// QueryCoordInfos implements ComponentInfos +type QueryCoordInfos struct { + BaseComponentInfos + SystemConfigurations QueryCoordConfiguration `json:"system_configurations"` +} + +// ProxyConfiguration records the configuration of Proxy. +type ProxyConfiguration struct { + DefaultPartitionName string `json:"default_partition_name"` + DefaultIndexName string `json:"default_index_name"` +} + +// ProxyInfos implements ComponentInfos +type ProxyInfos struct { + BaseComponentInfos + SystemConfigurations ProxyConfiguration `json:"system_configurations"` + QuotaMetrics *ProxyQuotaMetrics `json:"quota_metrics"` +} + +// IndexNodeConfiguration records the configuration of IndexNode. +type IndexNodeConfiguration struct { + MinioBucketName string `json:"minio_bucket_name"` + + SimdType string `json:"simd_type"` +} + +// IndexNodeInfos implements ComponentInfos +type IndexNodeInfos struct { + BaseComponentInfos + SystemConfigurations IndexNodeConfiguration `json:"system_configurations"` +} + +// IndexCoordConfiguration records the configuration of IndexCoord. +type IndexCoordConfiguration struct { + MinioBucketName string `json:"minio_bucket_name"` +} + +// IndexCoordInfos implements ComponentInfos +type IndexCoordInfos struct { + BaseComponentInfos + SystemConfigurations IndexCoordConfiguration `json:"system_configurations"` +} + +// DataNodeConfiguration records the configuration of DataNode. +type DataNodeConfiguration struct { + FlushInsertBufferSize int64 `json:"flush_insert_buffer_size"` +} + +// DataNodeInfos implements ComponentInfos +type DataNodeInfos struct { + BaseComponentInfos + SystemConfigurations DataNodeConfiguration `json:"system_configurations"` + QuotaMetrics *DataNodeQuotaMetrics `json:"quota_metrics"` +} + +// DataCoordConfiguration records the configuration of DataCoord. +type DataCoordConfiguration struct { + SegmentMaxSize float64 `json:"segment_max_size"` +} + +type DataCoordIndexInfo struct { + NumEntitiesIndexed int64 + IndexName string + FieldID int64 +} + +type DataCoordCollectionInfo struct { + NumEntitiesTotal int64 + IndexInfo []*DataCoordIndexInfo +} + +type DataCoordCollectionMetrics struct { + Collections map[int64]*DataCoordCollectionInfo +} + +// DataCoordInfos implements ComponentInfos +type DataCoordInfos struct { + BaseComponentInfos + SystemConfigurations DataCoordConfiguration `json:"system_configurations"` + QuotaMetrics *DataCoordQuotaMetrics `json:"quota_metrics"` + CollectionMetrics *DataCoordCollectionMetrics `json:"collection_metrics"` +} + +// RootCoordConfiguration records the configuration of RootCoord. +type RootCoordConfiguration struct { + MinSegmentSizeToEnableIndex int64 `json:"min_segment_size_to_enable_index"` +} + +// RootCoordInfos implements ComponentInfos +type RootCoordInfos struct { + BaseComponentInfos + SystemConfigurations RootCoordConfiguration `json:"system_configurations"` +} diff --git a/models/quota_metrics.go b/models/quota_metrics.go new file mode 100644 index 0000000..5d0ee6e --- /dev/null +++ b/models/quota_metrics.go @@ -0,0 +1,73 @@ +package models + +// RateMetricLabel defines the metric label collected from nodes. +type RateMetricLabel = string + +const ( + ReadResultThroughput RateMetricLabel = "ReadResultThroughput" + InsertConsumeThroughput RateMetricLabel = "InsertConsumeThroughput" + DeleteConsumeThroughput RateMetricLabel = "DeleteConsumeThroughput" +) + +const ( + UnsolvedQueueType string = "Unsolved" + ReadyQueueType string = "Ready" + ReceiveQueueType string = "Receive" + ExecuteQueueType string = "Execute" +) + +// RateMetric contains a RateMetricLabel and a float rate. +type RateMetric struct { + Label RateMetricLabel + Rate float64 +} + +// FlowGraphMetric contains a minimal timestamp of flow graph and the number of flow graphs. +type FlowGraphMetric struct { + MinFlowGraphChannel string + MinFlowGraphTt uint64 + NumFlowGraph int +} + +// NodeEffect contains the a node and its effected collection info. +type NodeEffect struct { + NodeID int64 + CollectionIDs []int64 +} + +// QueryNodeQuotaMetrics are metrics of QueryNode. +type QueryNodeQuotaMetrics struct { + Hms HardwareMetrics + Rms []RateMetric + Fgm FlowGraphMetric + GrowingSegmentsSize int64 + Effect NodeEffect + DeleteBufferInfo DeleteBufferInfo +} + +type DeleteBufferInfo struct { + CollectionDeleteBufferNum map[int64]int64 + CollectionDeleteBufferSize map[int64]int64 +} + +type DataCoordQuotaMetrics struct { + TotalBinlogSize int64 + CollectionBinlogSize map[int64]int64 + PartitionsBinlogSize map[int64]map[int64]int64 + // l0 segments + CollectionL0RowCount map[int64]int64 +} + +// DataNodeQuotaMetrics are metrics of DataNode. +type DataNodeQuotaMetrics struct { + Hms HardwareMetrics + Rms []RateMetric + Fgm FlowGraphMetric + Effect NodeEffect +} + +// ProxyQuotaMetrics are metrics of Proxy. +type ProxyQuotaMetrics struct { + Hms HardwareMetrics + Rms []RateMetric +} diff --git a/models/topology.go b/models/topology.go new file mode 100644 index 0000000..cfdb399 --- /dev/null +++ b/models/topology.go @@ -0,0 +1,119 @@ +package models + +import ( + "encoding/json" + "strconv" +) + +// in topology graph, the name of all nodes are consisted of role name and its' id +// for example, Proxy1, DataCoord3 + +// ConstructComponentName returns a name according to the role name and its' id +func ConstructComponentName(role string, id int64) string { + return role + strconv.Itoa(int(id)) +} + +// Topology defines the interface of topology graph between different components +type Topology interface{} + +// MarshalTopology returns the json string of Topology +func MarshalTopology(topology Topology) (string, error) { + binary, err := json.Marshal(topology) + return string(binary), err +} + +// UnmarshalTopology constructs a Topology object using the json string +func UnmarshalTopology(s string, topology Topology) error { + return json.Unmarshal([]byte(s), topology) +} + +// QueryClusterTopology shows the topology between QueryCoord and QueryNodes +type QueryClusterTopology struct { + Self QueryCoordInfos `json:"self"` + ConnectedNodes []QueryNodeInfos `json:"connected_nodes"` +} + +// ConnectionType is the type of connection between nodes +type ConnectionType = string + +// ConnectionType definitions +const ( + CoordConnectToNode ConnectionType = "manage" + Forward ConnectionType = "forward" +) + +// ConnectionTargetType is the type of connection target +type ConnectionTargetType = string + +// ConnectionInfo contains info of connection target +type ConnectionInfo struct { + TargetName string `json:"target_name"` + TargetType ConnectionTargetType `json:"target_type"` +} + +// ConnTopology contains connection topology +// TODO(dragondriver) +// necessary to show all connection edge in topology graph? +// for example, in system, Proxy connects to RootCoord and RootCoord also connects to Proxy, +// if we do so, the connection relationship may be confusing. +// ConnTopology shows how different components connect to each other. +type ConnTopology struct { + Name string `json:"name"` + ConnectedComponents []ConnectionInfo `json:"connected_components"` +} + +// QueryCoordTopology shows the whole metrics of query cluster +type QueryCoordTopology struct { + Cluster QueryClusterTopology `json:"cluster"` + Connections ConnTopology `json:"connections"` +} + +// IndexClusterTopology shows the topology between IndexCoord and IndexNodes +type IndexClusterTopology struct { + Self IndexCoordInfos `json:"self"` + ConnectedNodes []IndexNodeInfos `json:"connected_nodes"` +} + +// IndexCoordTopology shows the whole metrics of index cluster +type IndexCoordTopology struct { + Cluster IndexClusterTopology `json:"cluster"` + Connections ConnTopology `json:"connections"` +} + +// DataClusterTopology shows the topology between DataCoord and DataNodes +type DataClusterTopology struct { + Self DataCoordInfos `json:"self"` + ConnectedDataNodes []DataNodeInfos `json:"connected_data_nodes"` + ConnectedIndexNodes []IndexNodeInfos `json:"connected_index_nodes"` +} + +// DataCoordTopology shows the whole metrics of index cluster +type DataCoordTopology struct { + Cluster DataClusterTopology `json:"cluster"` + Connections ConnTopology `json:"connections"` +} + +// RootCoordTopology shows the whole metrics of root coordinator +type RootCoordTopology struct { + Self RootCoordInfos `json:"self"` + Connections ConnTopology `json:"connections"` +} + +// ConnectionEdge contains connection's id, type and target type +type ConnectionEdge struct { + ConnectedIdentifier int `json:"connected_identifier"` + Type ConnectionType `json:"type"` + TargetType ConnectionTargetType `json:"target_type"` // RootCoord, DataCoord ... +} + +// SystemTopologyNode is a node in system topology graph. +type SystemTopologyNode struct { + Identifier int `json:"identifier"` // unique in the SystemTopology graph + Connected []ConnectionEdge `json:"connected"` + Infos ComponentInfos `json:"infos"` +} + +// SystemTopology shows the system topology +type SystemTopology struct { + NodesInfo []SystemTopologyNode `json:"nodes_info"` +} diff --git a/states/etcd/commands.go b/states/etcd/commands.go index 4f56dad..65e3937 100644 --- a/states/etcd/commands.go +++ b/states/etcd/commands.go @@ -57,6 +57,8 @@ func RepairCommand(cli clientv3.KV, basePath string) *cobra.Command { repair.AddIndexParamsCommand(cli, basePath), // repair manual compaction repair.ManualCompactionCommand(cli, basePath), + // check querynode collection leak + repair.CheckQNCollectionLeak(cli, basePath), ) return repairCmd diff --git a/states/etcd/repair/check_qn_collection_leak.go b/states/etcd/repair/check_qn_collection_leak.go new file mode 100644 index 0000000..90ada45 --- /dev/null +++ b/states/etcd/repair/check_qn_collection_leak.go @@ -0,0 +1,146 @@ +package repair + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + "github.com/milvus-io/birdwatcher/models" + commonpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/commonpb" + "github.com/milvus-io/birdwatcher/proto/v2.2/milvuspb" + querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb" + rootcoordpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/rootcoordpb" + "github.com/milvus-io/birdwatcher/states/etcd/common" + "github.com/samber/lo" + "github.com/spf13/cobra" + clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/grpc" +) + +func CheckQNCollectionLeak(cli clientv3.KV, basePath string) *cobra.Command { + cmd := &cobra.Command{ + Use: "check_qn_collection_leak", + Short: "check whether querynode has collection leak", + RunE: func(cmd *cobra.Command, args []string) error { + fmt.Printf("check querynode collection leak\n") + sessions, err := common.ListSessions(cli, basePath) + if err != nil { + fmt.Printf("failed to list sessions") + return err + } + + rcClient, err := getRootCoordClient(sessions) + if err != nil { + fmt.Printf("failed to get querycoord client") + return err + } + + resp1, err := rcClient.ShowCollections(context.Background(), &milvuspb.ShowCollectionsRequest{ + Base: &commonpbv2.MsgBase{ + SourceID: -1, + MsgType: commonpbv2.MsgType_ShowCollections, + }, + }) + if err != nil { + fmt.Printf("failed to get rootcoord collections, err=%s", err.Error()) + return err + } + collectionsOnRC := resp1.CollectionIds + fmt.Printf("rootcoord collections: %v\n", collectionsOnRC) + + qcClient, err := getQueryCoordClient(sessions) + if err != nil { + fmt.Printf("failed to get querycoord client") + return err + } + req, _ := ConstructRequestByMetricType("system_info") + resp, err := qcClient.GetMetrics(context.Background(), req) + if err != nil { + fmt.Printf("failed to get querycoord metrics, err=%s", err.Error()) + return err + } + + queryCoordTopology := &models.QueryCoordTopology{} + if err := json.Unmarshal([]byte(resp.GetResponse()), queryCoordTopology); err != nil { + fmt.Printf("failed to unmarshal querycoord metrics, len=%d", len(resp.GetResponse())) + return err + } + + for _, qnMetrics := range queryCoordTopology.Cluster.ConnectedNodes { + collectionsOnQN := qnMetrics.QuotaMetrics.Effect.CollectionIDs + _, leakCollections := lo.Difference(collectionsOnRC, collectionsOnQN) + if len(leakCollections) > 0 { + fmt.Printf("querynode %d has leak collections: %v\n", qnMetrics.ID, leakCollections) + } + } + + return nil + }, + } + cmd.Flags().Int64("collection", 0, "collection id to filter with") + return cmd +} + +func getQueryCoordClient(sessions []*models.Session) (querypbv2.QueryCoordClient, error) { + for _, session := range sessions { + if strings.ToLower(session.ServerName) != "querycoord" { + continue + } + + opts := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithBlock(), + grpc.WithTimeout(2 * time.Second), + } + + conn, err := grpc.DialContext(context.Background(), session.Address, opts...) + if err != nil { + fmt.Printf("failed to connect %s(%d), err: %s\n", session.ServerName, session.ServerID, err.Error()) + continue + } + + client := querypbv2.NewQueryCoordClient(conn) + return client, nil + } + return nil, errors.New("querycoord session not found") +} + +func getRootCoordClient(sessions []*models.Session) (rootcoordpbv2.RootCoordClient, error) { + for _, session := range sessions { + if strings.ToLower(session.ServerName) != "rootcoord" { + continue + } + + opts := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithBlock(), + grpc.WithTimeout(2 * time.Second), + } + + conn, err := grpc.DialContext(context.Background(), session.Address, opts...) + if err != nil { + fmt.Printf("failed to connect %s(%d), err: %s\n", session.ServerName, session.ServerID, err.Error()) + continue + } + + client := rootcoordpbv2.NewRootCoordClient(conn) + return client, nil + } + return nil, errors.New("querycoord session not found") +} + +func ConstructRequestByMetricType(metricType string) (*milvuspb.GetMetricsRequest, error) { + m := make(map[string]interface{}) + m["metric_type"] = metricType + binary, err := json.Marshal(m) + if err != nil { + return nil, fmt.Errorf("failed to construct request by metric type %s: %s", metricType, err.Error()) + } + // TODO:: switch metricType to different msgType and return err when metricType is not supported + return &milvuspb.GetMetricsRequest{ + Request: string(binary), + }, nil +}