Skip to content

Commit

Permalink
Fix exporting of zero value networks statsbeat (#1155)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzchen authored Aug 18, 2022
1 parent 891666a commit d0f9965
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 56 deletions.
4 changes: 3 additions & 1 deletion contrib/opencensus-ext-azure/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
- Add storage existence checks to storing and transmitting in exporter
([#1150](https://github.com/census-instrumentation/opencensus-python/pull/1150))
- Add 502 and 504 status codes as retriable
([#1150](https://github.com/census-instrumentation/opencensus-python/pull/1150))
([#1153](https://github.com/census-instrumentation/opencensus-python/pull/1153))
- Fix statsbeat bug - exporting zero values for network statsbeat
([#1155](https://github.com/census-instrumentation/opencensus-python/pull/1155))

## 1.1.6
Released 2022-08-03
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class TransportStatusCode:

class TransportMixin(object):

# check to see if collecting requests information related to statsbeats
# check to see whether its the case of stats collection
def _check_stats_collection(self):
return state.is_statsbeat_enabled() and \
not state.get_statsbeat_shutdown() and \
Expand Down Expand Up @@ -334,15 +334,16 @@ def _statsbeat_failure_reached_threshold():

def _update_requests_map(type, value=None):
if value is None:
value = 0 # error state
value = 0
with _requests_lock:
if type == "count":
_requests_map['count'] = _requests_map.get('count', 0) + 1 # noqa: E501
elif type == "duration":
elif type == "duration": # value will be duration
_requests_map['duration'] = _requests_map.get('duration', 0) + value # noqa: E501
elif type == "success":
_requests_map['success'] = _requests_map.get('success', 0) + 1 # noqa: E501
else:
# value will be a key (status_code/error message)
prev = 0
if _requests_map.get(type):
prev = _requests_map.get(type).get(value, 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

import logging
import os
import random
import threading
import time
Expand Down Expand Up @@ -66,9 +65,6 @@ def __init__(self, **options):
self._queue = Queue(capacity=self.options.queue_capacity)
self._worker = Worker(self._queue, self)
self._worker.start()
# start statsbeat on exporter instantiation
if not os.environ.get("APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL"):
statsbeat.collect_statsbeat_metrics(self.options)
# For redirects
self._consecutive_redirects = 0 # To prevent circular redirects

Expand Down Expand Up @@ -187,9 +183,15 @@ def filter(self, record):
return random.random() < self.probability


class AzureLogHandler(TransportMixin, ProcessorMixin, BaseLogHandler):
class AzureLogHandler(BaseLogHandler, TransportMixin, ProcessorMixin):
"""Handler for logging to Microsoft Azure Monitor."""

def __init__(self, **options):
super(AzureLogHandler, self).__init__(**options)
# start statsbeat on exporter instantiation
if self._check_stats_collection():
statsbeat.collect_statsbeat_metrics(self.options)

def log_record_to_envelope(self, record):
envelope = create_envelope(self.options.instrumentation_key, record)

Expand Down Expand Up @@ -257,6 +259,12 @@ def log_record_to_envelope(self, record):
class AzureEventHandler(TransportMixin, ProcessorMixin, BaseLogHandler):
"""Handler for sending custom events to Microsoft Azure Monitor."""

def __init__(self, **options):
super(AzureEventHandler, self).__init__(**options)
# start statsbeat on exporter instantiation
if self._check_stats_collection():
statsbeat.collect_statsbeat_metrics(self.options)

def log_record_to_envelope(self, record):
envelope = create_envelope(self.options.instrumentation_key, record)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

import atexit
import os

from opencensus.common import utils as common_utils
from opencensus.ext.azure.common import Options, utils
Expand All @@ -30,6 +29,9 @@
TransportStatusCode,
)
from opencensus.ext.azure.metrics_exporter import standard_metrics
from opencensus.ext.azure.statsbeat.statsbeat_metrics import (
_NETWORK_STATSBEAT_NAMES,
)
from opencensus.metrics import transport
from opencensus.metrics.export.metric_descriptor import MetricDescriptorType
from opencensus.stats import stats as stats_module
Expand Down Expand Up @@ -99,44 +101,47 @@ def metric_to_envelopes(self, metric):
# Each time series will be uniquely identified by its
# label values
for time_series in metric.time_series:
# Using stats, time_series should only have one
# point which contains the aggregated value
data_point = self._create_data_points(
time_series, md)[0]
# if statsbeat exporter, ignore points with 0 value
if self._is_stats and data_point.value == 0:
continue
# time_series should only have one point which
# contains the aggregated value
# time_series point list is never empty
point = time_series.points[0]
# we ignore None and 0 values for network statsbeats
if self._is_stats_exporter():
if md.name in _NETWORK_STATSBEAT_NAMES:
if not point.value.value:
continue
data_point = DataPoint(
ns=md.name,
name=md.name,
value=point.value.value
)
# The timestamp is when the metric was recorded
timestamp = time_series.points[0].timestamp
timestamp = point.timestamp
# Get the properties using label keys from metric
# and label values of the time series
properties = self._create_properties(time_series, md)
envelopes.append(self._create_envelope(data_point,
timestamp,
properties))
properties = self._create_properties(
time_series,
md.label_keys
)
envelopes.append(
self._create_envelope(
data_point,
timestamp,
properties
)
)
return envelopes

def _create_data_points(self, time_series, metric_descriptor):
"""Convert a metric's OC time series to list of Azure data points."""
data_points = []
for point in time_series.points:
# TODO: Possibly encode namespace in name
data_point = DataPoint(ns=metric_descriptor.name,
name=metric_descriptor.name,
value=point.value.value)
data_points.append(data_point)
return data_points

def _create_properties(self, time_series, metric_descriptor):
def _create_properties(self, time_series, label_keys):
properties = {}
# We construct a properties map from the label keys and values. We
# assume the ordering is already correct
for i in range(len(metric_descriptor.label_keys)):
for i in range(len(label_keys)):
if time_series.label_values[i].value is None:
value = "null"
else:
value = time_series.label_values[i].value
properties[metric_descriptor.label_keys[i].key] = value
properties[label_keys[i].key] = value
return properties

def _create_envelope(self, data_point, timestamp, properties):
Expand Down Expand Up @@ -177,8 +182,9 @@ def new_metrics_exporter(**options):
producers,
exporter,
interval=exporter.options.export_interval)
if not os.environ.get("APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL"):
# start statsbeat on exporter instantiation
if exporter._check_stats_collection():
# Import here to avoid circular dependencies
from opencensus.ext.azure.statsbeat import statsbeat
# Stats will track the user's ikey
statsbeat.collect_statsbeat_metrics(exporter.options)
return exporter
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@
_REQ_THROTTLE_NAME = "Throttle Count"
_REQ_EXCEPTION_NAME = "Exception Count"

_NETWORK_STATSBEAT_NAMES = (
_REQ_SUCCESS_NAME,
_REQ_FAILURE_NAME,
_REQ_DURATION_NAME,
_REQ_RETRY_NAME,
_REQ_THROTTLE_NAME,
_REQ_EXCEPTION_NAME,
)

_ENDPOINT_TYPES = ["breeze"]
_RP_NAMES = ["appsvc", "functions", "vm", "unknown"]

Expand Down Expand Up @@ -368,13 +377,15 @@ def _get_network_metrics(self):
properties.pop()

stats_metric = metric.get_metric(datetime.datetime.utcnow())
# Only export metric if status/exc_type was present
# metric will be None if status_code or exc_type is invalid
# for success count, this will never be None
if stats_metric is not None:
# we handle not exporting of None and 0 values in the exporter
metrics.append(stats_metric)
return metrics

def _get_feature_metric(self):
# Don't export if value is 0
# Don't export if feature list is None
if self._feature is _StatsbeatFeature.NONE:
return None
properties = self._get_common_properties()
Expand All @@ -385,7 +396,7 @@ def _get_feature_metric(self):

def _get_instrumentation_metric(self):
integrations = get_integrations()
# Don't export if value is 0
# Don't export if instrumentation list is None
if integrations is _Integrations.NONE:
return None
properties = self._get_common_properties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import atexit
import json
import logging
import os

from opencensus.common.schedule import QueueExitEvent
from opencensus.ext.azure.common import Options, utils
Expand Down Expand Up @@ -56,13 +55,14 @@
STACKTRACE = attributes_helper.COMMON_ATTRIBUTES['STACKTRACE']


class AzureExporter(BaseExporter, ProcessorMixin, TransportMixin):
class AzureExporter(BaseExporter, TransportMixin, ProcessorMixin):
"""An exporter that sends traces to Microsoft Azure Monitor.
:param options: Options for the exporter.
"""

def __init__(self, **options):
super(AzureExporter, self).__init__(**options)
self.options = Options(**options)
utils.validate_instrumentation_key(self.options.instrumentation_key)
self.storage = None
Expand All @@ -75,10 +75,9 @@ def __init__(self, **options):
source=self.__class__.__name__,
)
self._telemetry_processors = []
super(AzureExporter, self).__init__(**options)
atexit.register(self._stop, self.options.grace_period)
# start statsbeat on exporter instantiation
if not os.environ.get("APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL"):
if self._check_stats_collection():
statsbeat.collect_statsbeat_metrics(self.options)
# For redirects
self._consecutive_redirects = 0 # To prevent circular redirects
Expand Down
Loading

0 comments on commit d0f9965

Please sign in to comment.