From 68f3f6a7423c9fcea415deb16552c2e899a4c247 Mon Sep 17 00:00:00 2001 From: Arpita Mathur Date: Tue, 9 Mar 2021 17:17:26 +0530 Subject: [PATCH 1/4] Add FaultDetectionStats to collect latency and failure metrics --- .../performanceanalyzer/ESResources.java | 11 + .../PerformanceAnalyzerPlugin.java | 43 +--- .../FaultDetectionMetricsCollector.java | 185 --------------- .../FaultDetectionStatsCollector.java | 220 ++++++++++++++++++ .../tracker/MetricsTracker.java | 47 ++++ .../performanceanalyzer/util/Utils.java | 15 +- ...=> FaultDetectionStatsCollectorTests.java} | 44 ++-- 7 files changed, 302 insertions(+), 263 deletions(-) delete mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionMetricsCollector.java create mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollector.java create mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tracker/MetricsTracker.java rename src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/{FaultDetectionMetricsCollectorTests.java => FaultDetectionStatsCollectorTests.java} (59%) diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/ESResources.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/ESResources.java index 8f98c596..a72f495b 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/ESResources.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/ESResources.java @@ -16,6 +16,7 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.discovery.Discovery; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.common.settings.Settings; @@ -34,6 +35,7 @@ public final class ESResources { private java.nio.file.Path configPath; private String pluginFileLocation; private Client client; + private Discovery discovery; private ESResources() { threadPool = null; @@ -44,6 +46,7 @@ private ESResources() { environment = null; configPath = null; pluginFileLocation = null; + discovery = null; } public void setPluginFileLocation(String pluginFileLocation) { @@ -117,4 +120,12 @@ public void setClient(final Client client) { public Client getClient() { return client; } + + public void setDiscovery(Discovery discovery) { + this.discovery = discovery; + } + + public Discovery getDiscovery() { + return discovery; + } } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerPlugin.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerPlugin.java index 06bb6c77..8611da03 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerPlugin.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerPlugin.java @@ -18,26 +18,7 @@ import static java.util.Collections.singletonList; import com.amazon.opendistro.elasticsearch.performanceanalyzer.action.PerformanceAnalyzerActionFilter; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.CacheConfigMetricsCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.CircuitBreakerCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.DisksCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.FaultDetectionMetricsCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.GCInfoCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.HeapMetricsCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ShardIndexingPressureMetricsCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MasterServiceEventMetrics; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MasterServiceMetrics; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MasterThrottlingMetricsCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MetricsPurgeActivity; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.NetworkInterfaceCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.NodeDetailsCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.NodeStatsAllShardsMetricsCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.NodeStatsFixedShardsMetricsCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.OSMetricsCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ScheduledMetricCollectorsExecutor; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ShardStateCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatsCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ThreadPoolMetricsCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.*; import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.PerformanceAnalyzerController; import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.PluginSettings; import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper; @@ -197,8 +178,6 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new NetworkInterfaceCollector()); scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new GCInfoCollector()); scheduledMetricCollectorsExecutor.addScheduledMetricCollector(StatsCollector.instance()); - scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new FaultDetectionMetricsCollector( - performanceAnalyzerController, configOverridesWrapper)); scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new ShardStateCollector( performanceAnalyzerController,configOverridesWrapper)); scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new MasterThrottlingMetricsCollector( @@ -210,6 +189,13 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa } catch (ClassNotFoundException e) { LOG.info("Shard IndexingPressure not present in this ES version. Skipping ShardIndexingPressureMetricsCollector"); } + try { + Class.forName(FaultDetectionStatsCollector.FAULT_DETECTION_STATS_CLASS_NAME); + scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new FaultDetectionStatsCollector( + performanceAnalyzerController,configOverridesWrapper)); + } catch (ClassNotFoundException e) { + LOG.info("Fault Detection Stats not present in this ES version. Skipping FaultDetectionStatsCollector"); + } scheduledMetricCollectorsExecutor.start(); EventLog eventLog = new EventLog(); @@ -243,18 +229,7 @@ public void onIndexModule(IndexModule indexModule) { //follower check, leader check public void onDiscovery(Discovery discovery) { - try { - Class listenerInjector = Class.forName(LISTENER_INJECTOR_CLASS_PATH); - Object listenerInjectorInstance = listenerInjector.getDeclaredConstructor().newInstance(); - Method addListenerMethod = listenerInjectorInstance.getClass().getMethod(ADD_FAULT_DETECTION_METHOD, - Discovery.class); - addListenerMethod.invoke(listenerInjectorInstance, discovery); - } catch (InstantiationException | InvocationTargetException | NoSuchMethodException | - IllegalAccessException e) { - LOG.debug("Exception while calling addFaultDetectionListener in Discovery"); - } catch (ClassNotFoundException e) { - LOG.debug("No Class for ListenerInjector detected"); - } + ESResources.INSTANCE.setDiscovery(discovery); } //- shardbulk diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionMetricsCollector.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionMetricsCollector.java deleted file mode 100644 index 8488c1da..00000000 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionMetricsCollector.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors; - -import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.PerformanceAnalyzerController; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsProcessor; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.WriterMetrics; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.jooq.tools.StringUtils; - -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; - -import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics.addMetricEntry; - -public class FaultDetectionMetricsCollector extends PerformanceAnalyzerMetricsCollector implements MetricsProcessor { - public static final int SAMPLING_TIME_INTERVAL = MetricsConfiguration.CONFIG_MAP. - get(FaultDetectionMetricsCollector.class).samplingInterval; - private static final int KEYS_PATH_LENGTH = 3; - private static final Logger LOG = LogManager.getLogger(FaultDetectionMetricsCollector.class); - private static final String FAULT_DETECTION_HANDLER_NAME = - "com.amazon.opendistro.elasticsearch.performanceanalyzer.handler.ClusterFaultDetectionStatsHandler"; - private static final String FAULT_DETECTION_HANDLER_METRIC_QUEUE = "metricQueue"; - private final ConfigOverridesWrapper configOverridesWrapper; - private final PerformanceAnalyzerController controller; - private StringBuilder value; - private static final ObjectMapper mapper = new ObjectMapper(); - - public FaultDetectionMetricsCollector(PerformanceAnalyzerController controller, - ConfigOverridesWrapper configOverridesWrapper) { - super(SAMPLING_TIME_INTERVAL, "FaultDetectionMetricsCollector"); - value = new StringBuilder(); - this.configOverridesWrapper = configOverridesWrapper; - this.controller = controller; - } - - @Override - @SuppressWarnings("unchecked") - public void collectMetrics(long startTime) { - if(!controller.isCollectorEnabled(configOverridesWrapper, getCollectorName())) { - return; - } - long mCurrT = System.currentTimeMillis(); - Class faultDetectionHandler = null; - try { - faultDetectionHandler = Class.forName(FAULT_DETECTION_HANDLER_NAME); - } catch (ClassNotFoundException e) { - LOG.debug("No Handler Detected for Fault Detection. Skipping FaultDetectionMetricsCollector"); - return; - } - try { - BlockingQueue metricQueue = (BlockingQueue) - getFaultDetectionHandlerMetricsQueue(faultDetectionHandler).get(null); - List metrics = new ArrayList<>(); - metricQueue.drainTo(metrics); - - List faultDetectionContextsList = new ArrayList<>(); - for(String metric : metrics) { - faultDetectionContextsList.add(mapper.readValue(metric, ClusterFaultDetectionContext.class)); - } - - for(ClusterFaultDetectionContext clusterFaultDetectionContext : faultDetectionContextsList) { - value.setLength(0); - value.append(PerformanceAnalyzerMetrics.getCurrentTimeMetric()); - addMetricEntry(value, AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID - .toString(), clusterFaultDetectionContext.getSourceNodeId()); - addMetricEntry(value, AllMetrics.FaultDetectionDimension.TARGET_NODE_ID - .toString(), clusterFaultDetectionContext.getTargetNodeId()); - - if(StringUtils.isEmpty(clusterFaultDetectionContext.getStartTime())) { - addMetricEntry(value, AllMetrics.CommonMetric.FINISH_TIME.toString(), - clusterFaultDetectionContext.getFinishTime()); - addMetricEntry(value, PerformanceAnalyzerMetrics.FAULT, - clusterFaultDetectionContext.getFault()); - saveMetricValues(value.toString(), startTime, clusterFaultDetectionContext.getType(), - clusterFaultDetectionContext.getRequestId(), PerformanceAnalyzerMetrics.FINISH_FILE_NAME); - } else { - addMetricEntry(value, AllMetrics.CommonMetric.START_TIME.toString(), - clusterFaultDetectionContext.getStartTime()); - saveMetricValues(value.toString(), startTime, clusterFaultDetectionContext.getType(), - clusterFaultDetectionContext.getRequestId(), PerformanceAnalyzerMetrics.START_FILE_NAME); - } - } - PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat( - WriterMetrics.FAULT_DETECTION_COLLECTOR_EXECUTION_TIME, "", - System.currentTimeMillis() - mCurrT); - } catch (Exception ex) { - PerformanceAnalyzerApp.ERRORS_AND_EXCEPTIONS_AGGREGATOR.updateStat( - ExceptionsAndErrors.FAULT_DETECTION_COLLECTOR_ERROR, "", - System.currentTimeMillis() - mCurrT); - LOG.debug("Exception in Collecting FaultDetection Metrics: {} for startTime {}", - () -> ex.toString(), () -> startTime); - } - } - - Field getFaultDetectionHandlerMetricsQueue(Class faultDetectionHandler) throws Exception { - Field metricsQueue = faultDetectionHandler.getDeclaredField(FAULT_DETECTION_HANDLER_METRIC_QUEUE); - metricsQueue.setAccessible(true); - return metricsQueue; - } - - /** Sample Event - * ^fault_detection/follower_check/7627/finish - * current_time:1601486201861 - * SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a - * TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a - * FinishTime:1566413987986 - * fault:0$ - * - * @param startTime time at which collector is called - * @param keysPath List of string that would make up the metrics path - * @return metric path - */ - @Override - public String getMetricsPath(long startTime, String... keysPath) { - if (keysPath.length != KEYS_PATH_LENGTH) { - throw new RuntimeException("keys length should be " + KEYS_PATH_LENGTH); - } - - return PerformanceAnalyzerMetrics.generatePath(startTime, PerformanceAnalyzerMetrics.sFaultDetection, - keysPath[0], keysPath[1], keysPath[2]); - } - - public static class ClusterFaultDetectionContext { - String type; - String sourceNodeId; - String targetNodeId; - String requestId; - String fault; - String startTime; - String finishTime; - - public String getType() { - return this.type; - } - - public String getSourceNodeId() { - return this.sourceNodeId; - } - - public String getTargetNodeId() { - return this.targetNodeId; - } - - public String getFault() { - return this.fault; - } - - public String getStartTime() { - return this.startTime; - } - - public String getFinishTime() { - return this.finishTime; - } - - public String getRequestId() { - return this.requestId; - } - - } -} diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollector.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollector.java new file mode 100644 index 00000000..6d6a99e9 --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollector.java @@ -0,0 +1,220 @@ +/* + * Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.ESResources; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.PerformanceAnalyzerController; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.util.Utils; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsProcessor; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.WriterMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.tracker.MetricsTracker; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.coordination.Coordinator; +import org.elasticsearch.cluster.coordination.FollowersChecker; +import org.elasticsearch.cluster.coordination.LeaderChecker; +import org.elasticsearch.discovery.Discovery; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +public class FaultDetectionStatsCollector extends PerformanceAnalyzerMetricsCollector implements + MetricsProcessor { + public static final int SAMPLING_TIME_INTERVAL = + MetricsConfiguration.CONFIG_MAP.get(FaultDetectionStatsCollector.class).samplingInterval; + private static final int KEYS_PATH_LENGTH = 0; + private static final Logger LOG = LogManager.getLogger(FaultDetectionStatsCollector.class); + public static final String FAULT_DETECTION_STATS_CLASS_NAME = "org.elasticsearch.cluster.coordination.FaultDetectionStats"; + private static final String GET_STATS_METHOD_NAME = "getStats"; + private static final String FOLLOWERS_CHECKER_FIELD = "followersChecker"; + private static final String LEADER_CHECKER_FIELD = "leaderChecker"; + private static final ObjectMapper mapper = new ObjectMapper(); + private final StringBuilder value; + private final PerformanceAnalyzerController controller; + private final ConfigOverridesWrapper configOverridesWrapper; + + private MetricsTracker leaderCheckTracker; + private MetricsTracker followerCheckTracker; + + public FaultDetectionStatsCollector(PerformanceAnalyzerController controller, + ConfigOverridesWrapper configOverridesWrapper) { + super(SAMPLING_TIME_INTERVAL, FaultDetectionStatsCollector.class.getSimpleName()); + value = new StringBuilder(); + this.controller = controller; + this.configOverridesWrapper = configOverridesWrapper; + this.leaderCheckTracker = new MetricsTracker(); + this.followerCheckTracker = new MetricsTracker(); + } + + @Override + public void collectMetrics(long startTime) { + if(!controller.isCollectorEnabled(configOverridesWrapper, getCollectorName())) { + return; + } + try { + long mCurrT = System.currentTimeMillis(); + mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + FaultDetectionStats followerCheckStats = mapper.readValue( + mapper.writeValueAsString(getFollowerCheckStats()), FaultDetectionStats.class); + FaultDetectionStats leaderCheckStats = mapper.readValue( + mapper.writeValueAsString(getLeaderCheckStats()), FaultDetectionStats.class); + System.out.println("Arpita in collector"); + FaultDetectionMetrics faultDetectionMetrics = new FaultDetectionMetrics(); + if (followerCheckStats != null) { + faultDetectionMetrics.setFollowerCheckMetrics(computeLatency(followerCheckStats, followerCheckTracker), + computeFailure(followerCheckStats, followerCheckTracker)); + this.followerCheckTracker = new MetricsTracker(followerCheckStats.timeTakenInMillis, + followerCheckStats.failedCount, followerCheckStats.totalCount); + } + if (leaderCheckStats != null) { + faultDetectionMetrics.setLeaderCheckMetrics(computeLatency(leaderCheckStats, leaderCheckTracker), + computeFailure(leaderCheckStats, leaderCheckTracker)); + this.leaderCheckTracker = new MetricsTracker(leaderCheckStats.timeTakenInMillis, + leaderCheckStats.failedCount, leaderCheckStats.totalCount); + } + StringBuilder value = new StringBuilder(); + value.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds()) + .append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor); + value.append(faultDetectionMetrics.serialize()); + + saveMetricValues(value.toString(), startTime); + + PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat( + WriterMetrics.FAULT_DETECTION_COLLECTOR_EXECUTION_TIME, "", + System.currentTimeMillis() - mCurrT); + } + catch (NoSuchFieldException | InvocationTargetException | IllegalAccessException | NoSuchMethodException + | JsonProcessingException ex) { + PerformanceAnalyzerApp.ERRORS_AND_EXCEPTIONS_AGGREGATOR.updateStat( + ExceptionsAndErrors.FAULT_DETECTION_COLLECTOR_ERROR, "", 1); + LOG.debug("Exception in Collecting Fault Detection Stats: {} for startTime {}", + () -> ex.toString(), () -> startTime); + } + } + + private Object getLeaderCheckStats() throws InvocationTargetException, IllegalAccessException, + NoSuchMethodException, NoSuchFieldException { + Method method = LeaderChecker.class.getMethod(GET_STATS_METHOD_NAME); + Discovery discovery = ESResources.INSTANCE.getDiscovery(); + if(discovery instanceof Coordinator) { + Coordinator coordinator = (Coordinator) discovery; + Field leaderCheckerField = Coordinator.class.getDeclaredField(LEADER_CHECKER_FIELD); + leaderCheckerField.setAccessible(true); + LeaderChecker leaderChecker = (LeaderChecker) leaderCheckerField.get(coordinator); + return method.invoke(leaderChecker); + } + return null; + } + + private Object getFollowerCheckStats() throws InvocationTargetException, IllegalAccessException, + NoSuchMethodException, NoSuchFieldException { + Method method = FollowersChecker.class.getMethod(GET_STATS_METHOD_NAME); + Discovery discovery = ESResources.INSTANCE.getDiscovery(); + if(discovery instanceof Coordinator) { + Coordinator coordinator = (Coordinator) discovery; + Field followerCheckerField = Coordinator.class.getDeclaredField(FOLLOWERS_CHECKER_FIELD); + followerCheckerField.setAccessible(true); + FollowersChecker followersChecker = (FollowersChecker) followerCheckerField.get(coordinator); + return method.invoke(followersChecker); + } + return null; + } + + private double computeLatency(final FaultDetectionStats currentMetrics, MetricsTracker tracker) { + final double rate = computeRate(currentMetrics.totalCount, tracker); + if(rate == 0) { + return 0D; + } + return (currentMetrics.timeTakenInMillis - tracker.getPrevTimeTakenInMillis()) / rate; + } + + private double computeRate(final double currentTotalCount, MetricsTracker tracker) { + return currentTotalCount - tracker.getPrevTotalCount(); + } + + private double computeFailure(final FaultDetectionStats currentMetrics, MetricsTracker tracker) { + return currentMetrics.failedCount - tracker.getPrevFailedCount(); + } + + @Override + public String getMetricsPath(long startTime, String... keysPath) { + if (keysPath.length != KEYS_PATH_LENGTH) { + throw new RuntimeException("keys length should be " + KEYS_PATH_LENGTH); + } + return PerformanceAnalyzerMetrics.generatePath(startTime, PerformanceAnalyzerMetrics.sFaultDetection); + } + + public static class FaultDetectionStats { + private long totalCount; + private long timeTakenInMillis; + private long failedCount; + } + + public static class FaultDetectionMetrics extends MetricStatus { + private double followerCheckTimeTakenInMillis; + private double leaderCheckTimeTakenInMillis; + private double followerCheckFailedCount; + private double leaderCheckFailedCount; + + public FaultDetectionMetrics() { + + } + + @JsonProperty(AllMetrics.FaultDetectionMetric.Constants.FOLLOWER_CHECK_LATENCY) + public double getFollowerCheckTimeTakenInMillis() { + return followerCheckTimeTakenInMillis; + } + + @JsonProperty(AllMetrics.FaultDetectionMetric.Constants.FOLLOWER_CHECK_FAILURE) + public double getFollowerCheckFailedCount() { + return followerCheckFailedCount; + } + + @JsonProperty(AllMetrics.FaultDetectionMetric.Constants.LEADER_CHECK_LATENCY) + public double getLeaderCheckTimeTakenInMillis() { + return leaderCheckTimeTakenInMillis; + } + + @JsonProperty(AllMetrics.FaultDetectionMetric.Constants.LEADER_CHECK_FAILURE) + public double getLeaderCheckFailedCount() { + return leaderCheckFailedCount; + } + + public void setFollowerCheckMetrics(double latency, double failed) { + this.followerCheckTimeTakenInMillis = latency; + this.followerCheckFailedCount = failed; + } + + public void setLeaderCheckMetrics(double latency, double failed) { + this.leaderCheckTimeTakenInMillis = latency; + this.leaderCheckFailedCount = failed; + } + } +} diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tracker/MetricsTracker.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tracker/MetricsTracker.java new file mode 100644 index 00000000..0ed34ad7 --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tracker/MetricsTracker.java @@ -0,0 +1,47 @@ +/* + * Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.tracker; + +/** + * Class to track previous latency and failure metrics to calculate point in time metrics. These values are updated + * everytime collector calls respective stats API. + */ +public class MetricsTracker { + private double prevTimeTakenInMillis; + private double prevFailedCount; + private double prevTotalCount; + + public MetricsTracker(double timeInMillis, double failedCount, double totalCount) { + this.prevTimeTakenInMillis = timeInMillis; + this.prevFailedCount = failedCount; + this.prevTotalCount = totalCount; + } + + public MetricsTracker() { + } + + public double getPrevTimeTakenInMillis() { + return prevTimeTakenInMillis; + } + + public double getPrevFailedCount() { + return prevFailedCount; + } + + public double getPrevTotalCount() { + return prevTotalCount; + } +} diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/util/Utils.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/util/Utils.java index 68049ea0..737ddfaf 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/util/Utils.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/util/Utils.java @@ -16,19 +16,8 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.util; import com.amazon.opendistro.elasticsearch.performanceanalyzer.ESResources; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.CacheConfigMetricsCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.FaultDetectionMetricsCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ShardIndexingPressureMetricsCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MasterThrottlingMetricsCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ShardStateCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.*; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.CircuitBreakerCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MasterServiceEventMetrics; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MasterServiceMetrics; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.NodeDetailsCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.NodeStatsAllShardsMetricsCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.NodeStatsFixedShardsMetricsCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ThreadPoolMetricsCollector; import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.IndexShardStats; @@ -55,10 +44,10 @@ public static void configureMetrics() { MetricsConfiguration.CONFIG_MAP.put(NodeStatsFixedShardsMetricsCollector.class, cdefault); MetricsConfiguration.CONFIG_MAP.put(MasterServiceEventMetrics.class, new MetricsConfiguration.MetricConfig(1000, 0, 0)); MetricsConfiguration.CONFIG_MAP.put(MasterServiceMetrics.class, cdefault); - MetricsConfiguration.CONFIG_MAP.put(FaultDetectionMetricsCollector.class, cdefault); MetricsConfiguration.CONFIG_MAP.put(ShardStateCollector.class, cdefault); MetricsConfiguration.CONFIG_MAP.put(MasterThrottlingMetricsCollector.class, cdefault); MetricsConfiguration.CONFIG_MAP.put(ShardIndexingPressureMetricsCollector.class, cdefault); + MetricsConfiguration.CONFIG_MAP.put(FaultDetectionStatsCollector.class, cdefault); } // These methods are utility functions for the Node Stat Metrics Collectors. These methods are used by both the all diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionMetricsCollectorTests.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollectorTests.java similarity index 59% rename from src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionMetricsCollectorTests.java rename to src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollectorTests.java index df807f25..d386d974 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionMetricsCollectorTests.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollectorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -21,7 +21,6 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader_writer_shared.Event; - import org.junit.Test; import org.mockito.Mockito; @@ -31,49 +30,32 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public class FaultDetectionMetricsCollectorTests extends CustomMetricsLocationTestBase { +public class FaultDetectionStatsCollectorTests extends CustomMetricsLocationTestBase { @Test public void testFaultDetectionMetrics() { - MetricsConfiguration.CONFIG_MAP.put(FaultDetectionMetricsCollector.class, MetricsConfiguration.cdefault); + MetricsConfiguration.CONFIG_MAP.put(FaultDetectionStatsCollector.class, MetricsConfiguration.cdefault); System.setProperty("performanceanalyzer.metrics.log.enabled", "False"); + long startTimeInMills = 1153721339; PerformanceAnalyzerController controller = Mockito.mock(PerformanceAnalyzerController.class); ConfigOverridesWrapper configOverrides = Mockito.mock(ConfigOverridesWrapper.class); - FaultDetectionMetricsCollector faultDetectionMetricsCollector = new FaultDetectionMetricsCollector( - controller, configOverrides); - Mockito.when(controller.isCollectorEnabled(configOverrides, "FaultDetectionMetricsCollector")) + Mockito.when(controller.isCollectorEnabled(configOverrides, "FaultDetectionStatsCollector")) .thenReturn(true); - faultDetectionMetricsCollector.saveMetricValues("fault_detection", startTimeInMills, - "follower_check", "65432", "start"); + FaultDetectionStatsCollector faultDetectionStatsCollector = new FaultDetectionStatsCollector( + controller, configOverrides); + faultDetectionStatsCollector.saveMetricValues("testMetric", startTimeInMills); + List metrics = new ArrayList<>(); PerformanceAnalyzerMetrics.metricQueue.drainTo(metrics); - assertEquals(1, metrics.size()); - assertEquals("fault_detection", metrics.get(0).value); + assertEquals("testMetric", metrics.get(0).value); try { - faultDetectionMetricsCollector.saveMetricValues("fault_detection_metrics", startTimeInMills); + faultDetectionStatsCollector.saveMetricValues("fault_detection_metrics", startTimeInMills, "123"); assertTrue("Negative scenario test: Should have been a RuntimeException", true); } catch (RuntimeException ex) { - //- expecting exception...0 values passed; 3 expected - } - - try { - faultDetectionMetricsCollector.saveMetricValues("fault_detection_metrics", startTimeInMills, - "leader_check"); - assertTrue("Negative scenario test: Should have been a RuntimeException", true); - } catch (RuntimeException ex) { - //- expecting exception...1 values passed; 3 expected - } - - try { - faultDetectionMetricsCollector.saveMetricValues("fault_detection_metrics", startTimeInMills, - "leader_check", "823765423"); - assertTrue("Negative scenario test: Should have been a RuntimeException", true); - } catch (RuntimeException ex) { - //- expecting exception...2 values passed; 0 expected + //- expecting exception...1 values passed; 0 expected } } -} - +} \ No newline at end of file From badbb824e16974cff9047640ed1b4910dd79adc4 Mon Sep 17 00:00:00 2001 From: Arpita Mathur Date: Tue, 9 Mar 2021 17:37:27 +0530 Subject: [PATCH 2/4] Add FaultDetectionStats to collect latency and failure metrics --- .../PerformanceAnalyzerPlugin.java | 21 ++++++++++++++++++- .../FaultDetectionStatsCollector.java | 1 - .../performanceanalyzer/util/Utils.java | 13 +++++++++++- .../FaultDetectionStatsCollectorTests.java | 2 +- 4 files changed, 33 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerPlugin.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerPlugin.java index 8611da03..afdb7c4f 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerPlugin.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerPlugin.java @@ -18,7 +18,26 @@ import static java.util.Collections.singletonList; import com.amazon.opendistro.elasticsearch.performanceanalyzer.action.PerformanceAnalyzerActionFilter; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.*; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.CacheConfigMetricsCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.CircuitBreakerCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.DisksCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.FaultDetectionStatsCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.GCInfoCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.HeapMetricsCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ShardIndexingPressureMetricsCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MasterServiceEventMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MasterServiceMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MasterThrottlingMetricsCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MetricsPurgeActivity; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.NetworkInterfaceCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.NodeDetailsCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.NodeStatsAllShardsMetricsCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.NodeStatsFixedShardsMetricsCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.OSMetricsCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ScheduledMetricCollectorsExecutor; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ShardStateCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatsCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ThreadPoolMetricsCollector; import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.PerformanceAnalyzerController; import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.PluginSettings; import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper; diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollector.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollector.java index 6d6a99e9..2e5b80e8 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollector.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollector.java @@ -85,7 +85,6 @@ public void collectMetrics(long startTime) { mapper.writeValueAsString(getFollowerCheckStats()), FaultDetectionStats.class); FaultDetectionStats leaderCheckStats = mapper.readValue( mapper.writeValueAsString(getLeaderCheckStats()), FaultDetectionStats.class); - System.out.println("Arpita in collector"); FaultDetectionMetrics faultDetectionMetrics = new FaultDetectionMetrics(); if (followerCheckStats != null) { faultDetectionMetrics.setFollowerCheckMetrics(computeLatency(followerCheckStats, followerCheckTracker), diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/util/Utils.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/util/Utils.java index 737ddfaf..a0abc0c4 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/util/Utils.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/util/Utils.java @@ -16,8 +16,19 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.util; import com.amazon.opendistro.elasticsearch.performanceanalyzer.ESResources; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.*; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.CacheConfigMetricsCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.FaultDetectionStatsCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ShardIndexingPressureMetricsCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MasterThrottlingMetricsCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ShardStateCollector; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.CircuitBreakerCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MasterServiceEventMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MasterServiceMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.NodeDetailsCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.NodeStatsAllShardsMetricsCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.NodeStatsFixedShardsMetricsCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ThreadPoolMetricsCollector; import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.IndexShardStats; diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollectorTests.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollectorTests.java index d386d974..8064ccd7 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollectorTests.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollectorTests.java @@ -58,4 +58,4 @@ public void testFaultDetectionMetrics() { //- expecting exception...1 values passed; 0 expected } } -} \ No newline at end of file +} From 3436998c26b1d5378404d9d1835d1c8db21fbd8f Mon Sep 17 00:00:00 2001 From: Arpita Mathur Date: Wed, 17 Mar 2021 18:09:28 +0530 Subject: [PATCH 3/4] Add FaultDetectionStats to collect latency and failure metrics --- .../FaultDetectionStatsCollector.java | 94 +++++++---- .../tracker/MetricsTracker.java | 47 ------ .../FaultDetectionStatsCollectorTests.java | 148 ++++++++++++++++-- 3 files changed, 203 insertions(+), 86 deletions(-) delete mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tracker/MetricsTracker.java diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollector.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollector.java index 2e5b80e8..254f0c6e 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollector.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollector.java @@ -21,18 +21,17 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.util.Utils; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsProcessor; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.WriterMetrics; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.tracker.MetricsTracker; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.coordination.Coordinator; @@ -54,22 +53,25 @@ public class FaultDetectionStatsCollector extends PerformanceAnalyzerMetricsColl private static final String GET_STATS_METHOD_NAME = "getStats"; private static final String FOLLOWERS_CHECKER_FIELD = "followersChecker"; private static final String LEADER_CHECKER_FIELD = "leaderChecker"; - private static final ObjectMapper mapper = new ObjectMapper(); + private static final ObjectMapper mapper; + private static volatile FaultDetectionStats prevFollowerCheckStats = new FaultDetectionStats(); + private static volatile FaultDetectionStats prevLeaderCheckStats = new FaultDetectionStats(); private final StringBuilder value; private final PerformanceAnalyzerController controller; private final ConfigOverridesWrapper configOverridesWrapper; - private MetricsTracker leaderCheckTracker; - private MetricsTracker followerCheckTracker; + static { + mapper = new ObjectMapper(); + mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } public FaultDetectionStatsCollector(PerformanceAnalyzerController controller, - ConfigOverridesWrapper configOverridesWrapper) { + ConfigOverridesWrapper configOverridesWrapper) { super(SAMPLING_TIME_INTERVAL, FaultDetectionStatsCollector.class.getSimpleName()); value = new StringBuilder(); this.controller = controller; this.configOverridesWrapper = configOverridesWrapper; - this.leaderCheckTracker = new MetricsTracker(); - this.followerCheckTracker = new MetricsTracker(); } @Override @@ -79,37 +81,36 @@ public void collectMetrics(long startTime) { } try { long mCurrT = System.currentTimeMillis(); - mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + FaultDetectionStats followerCheckStats = mapper.readValue( mapper.writeValueAsString(getFollowerCheckStats()), FaultDetectionStats.class); FaultDetectionStats leaderCheckStats = mapper.readValue( mapper.writeValueAsString(getLeaderCheckStats()), FaultDetectionStats.class); + FaultDetectionMetrics faultDetectionMetrics = new FaultDetectionMetrics(); if (followerCheckStats != null) { - faultDetectionMetrics.setFollowerCheckMetrics(computeLatency(followerCheckStats, followerCheckTracker), - computeFailure(followerCheckStats, followerCheckTracker)); - this.followerCheckTracker = new MetricsTracker(followerCheckStats.timeTakenInMillis, - followerCheckStats.failedCount, followerCheckStats.totalCount); + faultDetectionMetrics.setFollowerCheckMetrics( + computeLatency(followerCheckStats, FaultDetectionStatsCollector.prevFollowerCheckStats), + computeFailure(followerCheckStats, FaultDetectionStatsCollector.prevFollowerCheckStats)); + FaultDetectionStatsCollector.prevFollowerCheckStats = followerCheckStats; } if (leaderCheckStats != null) { - faultDetectionMetrics.setLeaderCheckMetrics(computeLatency(leaderCheckStats, leaderCheckTracker), - computeFailure(leaderCheckStats, leaderCheckTracker)); - this.leaderCheckTracker = new MetricsTracker(leaderCheckStats.timeTakenInMillis, - leaderCheckStats.failedCount, leaderCheckStats.totalCount); + faultDetectionMetrics.setLeaderCheckMetrics( + computeLatency(leaderCheckStats, FaultDetectionStatsCollector.prevLeaderCheckStats), + computeFailure(leaderCheckStats, FaultDetectionStatsCollector.prevLeaderCheckStats)); + FaultDetectionStatsCollector.prevLeaderCheckStats = leaderCheckStats; } + StringBuilder value = new StringBuilder(); value.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds()) .append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor); value.append(faultDetectionMetrics.serialize()); - saveMetricValues(value.toString(), startTime); PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat( WriterMetrics.FAULT_DETECTION_COLLECTOR_EXECUTION_TIME, "", System.currentTimeMillis() - mCurrT); - } - catch (NoSuchFieldException | InvocationTargetException | IllegalAccessException | NoSuchMethodException + } catch (NoSuchFieldException | InvocationTargetException | IllegalAccessException | NoSuchMethodException | JsonProcessingException ex) { PerformanceAnalyzerApp.ERRORS_AND_EXCEPTIONS_AGGREGATOR.updateStat( ExceptionsAndErrors.FAULT_DETECTION_COLLECTOR_ERROR, "", 1); @@ -118,7 +119,8 @@ public void collectMetrics(long startTime) { } } - private Object getLeaderCheckStats() throws InvocationTargetException, IllegalAccessException, + @VisibleForTesting + public Object getLeaderCheckStats() throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, NoSuchFieldException { Method method = LeaderChecker.class.getMethod(GET_STATS_METHOD_NAME); Discovery discovery = ESResources.INSTANCE.getDiscovery(); @@ -132,7 +134,8 @@ private Object getLeaderCheckStats() throws InvocationTargetException, IllegalAc return null; } - private Object getFollowerCheckStats() throws InvocationTargetException, IllegalAccessException, + @VisibleForTesting + public Object getFollowerCheckStats() throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, NoSuchFieldException { Method method = FollowersChecker.class.getMethod(GET_STATS_METHOD_NAME); Discovery discovery = ESResources.INSTANCE.getDiscovery(); @@ -146,20 +149,38 @@ private Object getFollowerCheckStats() throws InvocationTargetException, Illegal return null; } - private double computeLatency(final FaultDetectionStats currentMetrics, MetricsTracker tracker) { - final double rate = computeRate(currentMetrics.totalCount, tracker); + /** + * FaultDetectionStats is ES is a tracker for total time taken for fault detection and the + * number of times it has failed. To calculate point in time metric, + * we will have to store its previous state and calculate the diff to get the point in time latency. + * This might return as 0 if there is no fault detection operation since last retrieval. + * + * @param currentMetrics Current fault detection stats in ES + * @return point in time latency. + */ + private double computeLatency(final FaultDetectionStats currentMetrics, FaultDetectionStats prevStats) { + final double rate = computeRate(currentMetrics.totalCount, prevStats); if(rate == 0) { return 0D; } - return (currentMetrics.timeTakenInMillis - tracker.getPrevTimeTakenInMillis()) / rate; + return (currentMetrics.timeTakenInMillis - prevStats.timeTakenInMillis) / rate; } - private double computeRate(final double currentTotalCount, MetricsTracker tracker) { - return currentTotalCount - tracker.getPrevTotalCount(); + private double computeRate(final double currentTotalCount, FaultDetectionStats prevStats) { + return currentTotalCount - prevStats.totalCount; } - private double computeFailure(final FaultDetectionStats currentMetrics, MetricsTracker tracker) { - return currentMetrics.failedCount - tracker.getPrevFailedCount(); + /** + * FaultDetectionStats is ES is a tracker for total time taken for fault detection and the + * number of times it has failed. To calculate point in time metric, + * we will have to store its previous state and calculate the diff to get the point in time latency. + * This might return as 0 if there is no fault detection operation since last retrieval. + * + * @param currentMetrics Current fault detection stats in ES + * @return point in time latency. + */ + private double computeFailure(final FaultDetectionStats currentMetrics, FaultDetectionStats prevStats) { + return currentMetrics.failedCount - prevStats.failedCount; } @Override @@ -170,10 +191,23 @@ public String getMetricsPath(long startTime, String... keysPath) { return PerformanceAnalyzerMetrics.generatePath(startTime, PerformanceAnalyzerMetrics.sFaultDetection); } + public void resetFaultDetectionStats() { + FaultDetectionStatsCollector.prevFollowerCheckStats = new FaultDetectionStats(); + FaultDetectionStatsCollector.prevLeaderCheckStats = new FaultDetectionStats(); + } + public static class FaultDetectionStats { private long totalCount; private long timeTakenInMillis; private long failedCount; + + public FaultDetectionStats(long totalCount, long timeTakenInMillis, long failedCount) { + this.totalCount = totalCount; + this.timeTakenInMillis = timeTakenInMillis; + this.failedCount = failedCount; + } + + public FaultDetectionStats() {} } public static class FaultDetectionMetrics extends MetricStatus { diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tracker/MetricsTracker.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tracker/MetricsTracker.java deleted file mode 100644 index 0ed34ad7..00000000 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tracker/MetricsTracker.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package com.amazon.opendistro.elasticsearch.performanceanalyzer.tracker; - -/** - * Class to track previous latency and failure metrics to calculate point in time metrics. These values are updated - * everytime collector calls respective stats API. - */ -public class MetricsTracker { - private double prevTimeTakenInMillis; - private double prevFailedCount; - private double prevTotalCount; - - public MetricsTracker(double timeInMillis, double failedCount, double totalCount) { - this.prevTimeTakenInMillis = timeInMillis; - this.prevFailedCount = failedCount; - this.prevTotalCount = totalCount; - } - - public MetricsTracker() { - } - - public double getPrevTimeTakenInMillis() { - return prevTimeTakenInMillis; - } - - public double getPrevFailedCount() { - return prevFailedCount; - } - - public double getPrevTotalCount() { - return prevTotalCount; - } -} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollectorTests.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollectorTests.java index 8064ccd7..6fbacc6c 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollectorTests.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollectorTests.java @@ -18,44 +18,174 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.CustomMetricsLocationTestBase; import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.PerformanceAnalyzerController; import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader_writer_shared.Event; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.Test; import org.mockito.Mockito; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class FaultDetectionStatsCollectorTests extends CustomMetricsLocationTestBase { + private static final ObjectMapper mapper = new ObjectMapper(); @Test - public void testFaultDetectionMetrics() { + public void testFaultDetectionStats_saveMetricValues() { MetricsConfiguration.CONFIG_MAP.put(FaultDetectionStatsCollector.class, MetricsConfiguration.cdefault); System.setProperty("performanceanalyzer.metrics.log.enabled", "False"); - long startTimeInMills = 1153721339; - PerformanceAnalyzerController controller = Mockito.mock(PerformanceAnalyzerController.class); + PerformanceAnalyzerController controller = Mockito.mock( + PerformanceAnalyzerController.class); ConfigOverridesWrapper configOverrides = Mockito.mock(ConfigOverridesWrapper.class); - Mockito.when(controller.isCollectorEnabled(configOverrides, "FaultDetectionStatsCollector")) - .thenReturn(true); FaultDetectionStatsCollector faultDetectionStatsCollector = new FaultDetectionStatsCollector( controller, configOverrides); - faultDetectionStatsCollector.saveMetricValues("testMetric", startTimeInMills); + Mockito.when(controller.isCollectorEnabled(configOverrides, "FaultDetectionStatsCollector")) + .thenReturn(true); + faultDetectionStatsCollector.saveMetricValues("fault_detection", startTimeInMills); List metrics = new ArrayList<>(); PerformanceAnalyzerMetrics.metricQueue.drainTo(metrics); + assertEquals(1, metrics.size()); - assertEquals("testMetric", metrics.get(0).value); + assertEquals("fault_detection", metrics.get(0).value); try { - faultDetectionStatsCollector.saveMetricValues("fault_detection_metrics", startTimeInMills, "123"); + faultDetectionStatsCollector.saveMetricValues("fault_detection", startTimeInMills, "dummy"); assertTrue("Negative scenario test: Should have been a RuntimeException", true); } catch (RuntimeException ex) { //- expecting exception...1 values passed; 0 expected } } -} + + @SuppressWarnings("unchecked") + @Test + public void testFaultDetectionStats_collectMetrics() throws NoSuchMethodException, IllegalAccessException, + InvocationTargetException, JsonProcessingException, NoSuchFieldException { + System.out.println("test 1"); + MetricsConfiguration.CONFIG_MAP.put(FaultDetectionStatsCollector.class, MetricsConfiguration.cdefault); + System.setProperty("performanceanalyzer.metrics.log.enabled", "False"); + long startTimeInMills = 1153721339; + PerformanceAnalyzerController controller = Mockito.mock(PerformanceAnalyzerController.class); + ConfigOverridesWrapper configOverrides = Mockito.mock(ConfigOverridesWrapper.class); + FaultDetectionStatsCollector faultDetectionStatsCollector = new + FaultDetectionStatsCollector(controller, configOverrides); + FaultDetectionStatsCollector spyCollector = Mockito.spy(faultDetectionStatsCollector); + + Mockito.doReturn(new FaultDetectionStatsCollector.FaultDetectionStats(23L, 15L, 2L)) + .when(spyCollector).getFollowerCheckStats(); + Mockito.doReturn(new FaultDetectionStatsCollector.FaultDetectionStats(27L, 19L, 5L)) + .when(spyCollector).getLeaderCheckStats(); + + Mockito.when(controller.isCollectorEnabled(configOverrides, + FaultDetectionStatsCollector.class.getSimpleName())).thenReturn(true); + + spyCollector.collectMetrics(startTimeInMills); + + List metrics = new ArrayList<>(); + PerformanceAnalyzerMetrics.metricQueue.drainTo(metrics); + + assertEquals(1, metrics.size()); + String[] lines = metrics.get(0).value.split(System.lineSeparator()); + Map map = mapper.readValue(lines[1], Map.class); + assertEquals(0.6521739130434783, map.get(AllMetrics.FaultDetectionMetric + .FOLLOWER_CHECK_LATENCY.toString())); + assertEquals(2.0, map.get(AllMetrics.FaultDetectionMetric + .FOLLOWER_CHECK_FAILURE.toString())); + assertEquals(0.7037037037037037, map.get(AllMetrics.FaultDetectionMetric + .LEADER_CHECK_LATENCY.toString())); + assertEquals(5.0, map.get(AllMetrics.FaultDetectionMetric + .LEADER_CHECK_FAILURE.toString())); + } + + @SuppressWarnings("unchecked") + @Test + public void testFaultDetectionStats_collectMetricsWithPreviousClusterApplierMetrics() throws NoSuchMethodException, IllegalAccessException, + InvocationTargetException, JsonProcessingException, NoSuchFieldException { + System.out.println("test 2"); + MetricsConfiguration.CONFIG_MAP.put(FaultDetectionStatsCollector.class, MetricsConfiguration.cdefault); + System.setProperty("performanceanalyzer.metrics.log.enabled", "False"); + long startTimeInMills = 1153721339; + PerformanceAnalyzerController controller = Mockito.mock(PerformanceAnalyzerController.class); + ConfigOverridesWrapper configOverrides = Mockito.mock(ConfigOverridesWrapper.class); + FaultDetectionStatsCollector faultDetectionStatsCollector = new + FaultDetectionStatsCollector(controller, configOverrides); + FaultDetectionStatsCollector spyCollector = Mockito.spy(faultDetectionStatsCollector); + + Mockito.doReturn(new FaultDetectionStatsCollector.FaultDetectionStats(23L, 15L, 2L)) + .when(spyCollector).getFollowerCheckStats(); + Mockito.doReturn(new FaultDetectionStatsCollector.FaultDetectionStats(27L, 19L, 5L)) + .when(spyCollector).getLeaderCheckStats(); + + Mockito.when(controller.isCollectorEnabled(configOverrides, + FaultDetectionStatsCollector.class.getSimpleName())).thenReturn(true); + + spyCollector.resetFaultDetectionStats(); + spyCollector.collectMetrics(startTimeInMills); + + List metrics = new ArrayList<>(); + PerformanceAnalyzerMetrics.metricQueue.drainTo(metrics); + + assertEquals(1, metrics.size()); + String[] lines = metrics.get(0).value.split(System.lineSeparator()); + Map map = mapper.readValue(lines[1], Map.class); + assertEquals(0.6521739130434783, map.get(AllMetrics.FaultDetectionMetric + .FOLLOWER_CHECK_LATENCY.toString())); + assertEquals(2.0, map.get(AllMetrics.FaultDetectionMetric + .FOLLOWER_CHECK_FAILURE.toString())); + assertEquals(0.7037037037037037, map.get(AllMetrics.FaultDetectionMetric + .LEADER_CHECK_LATENCY.toString())); + assertEquals(5.0, map.get(AllMetrics.FaultDetectionMetric + .LEADER_CHECK_FAILURE.toString())); + + Mockito.doReturn(new FaultDetectionStatsCollector.FaultDetectionStats(24L, 17L, 2L)) + .when(spyCollector).getFollowerCheckStats(); + Mockito.doReturn(new FaultDetectionStatsCollector.FaultDetectionStats(30L, 22L, 6L)) + .when(spyCollector).getLeaderCheckStats(); + + spyCollector.collectMetrics(startTimeInMills); + + metrics.clear(); + PerformanceAnalyzerMetrics.metricQueue.drainTo(metrics); + + assertEquals(1, metrics.size()); + lines = metrics.get(0).value.split(System.lineSeparator()); + map = mapper.readValue(lines[1], Map.class); + assertEquals(2.0, map.get(AllMetrics.FaultDetectionMetric.FOLLOWER_CHECK_LATENCY.toString())); + assertEquals(0.0, map.get(AllMetrics.FaultDetectionMetric.FOLLOWER_CHECK_FAILURE.toString())); + assertEquals(1.0, map.get(AllMetrics.FaultDetectionMetric.LEADER_CHECK_LATENCY.toString())); + assertEquals(1.0, map.get(AllMetrics.FaultDetectionMetric.LEADER_CHECK_FAILURE.toString())); + + + } + + @SuppressWarnings("unchecked") + @Test + public void testFaultDetectionStats_collectMetrics_ClassNotFoundException() { + MetricsConfiguration.CONFIG_MAP.put(FaultDetectionStatsCollector.class, MetricsConfiguration.cdefault); + System.setProperty("performanceanalyzer.metrics.log.enabled", "False"); + long startTimeInMills = 1153721339; + PerformanceAnalyzerController controller = Mockito.mock(PerformanceAnalyzerController.class); + ConfigOverridesWrapper configOverrides = Mockito.mock(ConfigOverridesWrapper.class); + FaultDetectionStatsCollector faultDetectionStatsCollector = new + FaultDetectionStatsCollector(controller, configOverrides); + FaultDetectionStatsCollector spyCollector = Mockito.spy(faultDetectionStatsCollector); + Mockito.when(controller.isCollectorEnabled(configOverrides, + FaultDetectionStatsCollector.class.getSimpleName())).thenReturn(true); + + spyCollector.collectMetrics(startTimeInMills); + + List metrics = new ArrayList<>(); + PerformanceAnalyzerMetrics.metricQueue.drainTo(metrics); + // No method found to get cluster state applier thread stats. Skipping ClusterApplierServiceStatsCollector. + assertEquals(0, metrics.size()); + } +} \ No newline at end of file From 8454c30c2372462fd453898f38bf17f1b1f123cc Mon Sep 17 00:00:00 2001 From: Arpita Mathur Date: Wed, 17 Mar 2021 18:11:58 +0530 Subject: [PATCH 4/4] Add FaultDetectionStats to collect latency and failure metrics --- .../collectors/FaultDetectionStatsCollectorTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollectorTests.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollectorTests.java index 6fbacc6c..606927aa 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollectorTests.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionStatsCollectorTests.java @@ -188,4 +188,4 @@ public void testFaultDetectionStats_collectMetrics_ClassNotFoundException() { // No method found to get cluster state applier thread stats. Skipping ClusterApplierServiceStatsCollector. assertEquals(0, metrics.size()); } -} \ No newline at end of file +}