Skip to content

Commit

Permalink
[POC] Better control of threads executed by opentelemetry-cpp
Browse files Browse the repository at this point in the history
  • Loading branch information
marcalff committed Nov 26, 2024
1 parent fe68d51 commit 3975855
Show file tree
Hide file tree
Showing 25 changed files with 254 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "opentelemetry/nostd/string_view.h"
#include "opentelemetry/nostd/variant.h"
#include "opentelemetry/sdk/common/exporter_utils.h"
#include "opentelemetry/sdk/common/thread_instrumentation.h"
#include "opentelemetry/version.h"

// forward declare google::protobuf::Message
Expand Down Expand Up @@ -83,28 +84,32 @@ struct OtlpHttpClientOptions
// User agent
std::string user_agent;

inline OtlpHttpClientOptions(nostd::string_view input_url,
bool input_ssl_insecure_skip_verify,
nostd::string_view input_ssl_ca_cert_path,
nostd::string_view input_ssl_ca_cert_string,
nostd::string_view input_ssl_client_key_path,
nostd::string_view input_ssl_client_key_string,
nostd::string_view input_ssl_client_cert_path,
nostd::string_view input_ssl_client_cert_string,
nostd::string_view input_ssl_min_tls,
nostd::string_view input_ssl_max_tls,
nostd::string_view input_ssl_cipher,
nostd::string_view input_ssl_cipher_suite,
HttpRequestContentType input_content_type,
JsonBytesMappingKind input_json_bytes_mapping,
nostd::string_view input_compression,
bool input_use_json_name,
bool input_console_debug,
std::chrono::system_clock::duration input_timeout,
const OtlpHeaders &input_http_headers,
std::size_t input_concurrent_sessions = 64,
std::size_t input_max_requests_per_connection = 8,
nostd::string_view input_user_agent = GetOtlpDefaultUserAgent())
std::shared_ptr<sdk::common::ThreadInstrumentation> thread_instrumentation;

inline OtlpHttpClientOptions(
nostd::string_view input_url,
bool input_ssl_insecure_skip_verify,
nostd::string_view input_ssl_ca_cert_path,
nostd::string_view input_ssl_ca_cert_string,
nostd::string_view input_ssl_client_key_path,
nostd::string_view input_ssl_client_key_string,
nostd::string_view input_ssl_client_cert_path,
nostd::string_view input_ssl_client_cert_string,
nostd::string_view input_ssl_min_tls,
nostd::string_view input_ssl_max_tls,
nostd::string_view input_ssl_cipher,
nostd::string_view input_ssl_cipher_suite,
HttpRequestContentType input_content_type,
JsonBytesMappingKind input_json_bytes_mapping,
nostd::string_view input_compression,
bool input_use_json_name,
bool input_console_debug,
std::chrono::system_clock::duration input_timeout,
const OtlpHeaders &input_http_headers,
const std::shared_ptr<sdk::common::ThreadInstrumentation> &input_thread_instrumentation,
std::size_t input_concurrent_sessions = 64,
std::size_t input_max_requests_per_connection = 8,
nostd::string_view input_user_agent = GetOtlpDefaultUserAgent())
: url(input_url),
ssl_options(input_url,
input_ssl_insecure_skip_verify,
Expand All @@ -127,7 +132,8 @@ struct OtlpHttpClientOptions
http_headers(input_http_headers),
max_concurrent_requests(input_concurrent_sessions),
max_requests_per_connection(input_max_requests_per_connection),
user_agent(input_user_agent)
user_agent(input_user_agent),
thread_instrumentation(input_thread_instrumentation)
{}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
#pragma once

#include <chrono>
#include <memory>
#include <string>

#include "opentelemetry/exporters/otlp/otlp_environment.h"
#include "opentelemetry/exporters/otlp/otlp_http.h"
#include "opentelemetry/sdk/common/thread_instrumentation.h"
#include "opentelemetry/version.h"

OPENTELEMETRY_BEGIN_NAMESPACE
Expand Down Expand Up @@ -101,6 +103,8 @@ struct OPENTELEMETRY_EXPORT OtlpHttpExporterOptions

/** Compression type. */
std::string compression;

std::shared_ptr<sdk::common::ThreadInstrumentation> thread_instrumentation;
};

} // namespace otlp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
#pragma once

#include <chrono>
#include <memory>
#include <string>

#include "opentelemetry/exporters/otlp/otlp_environment.h"
#include "opentelemetry/exporters/otlp/otlp_http.h"
#include "opentelemetry/sdk/common/thread_instrumentation.h"
#include "opentelemetry/version.h"

OPENTELEMETRY_BEGIN_NAMESPACE
Expand Down Expand Up @@ -101,6 +103,8 @@ struct OPENTELEMETRY_EXPORT OtlpHttpLogRecordExporterOptions

/** Compression type. */
std::string compression;

std::shared_ptr<sdk::common::ThreadInstrumentation> thread_instrumentation;
};

} // namespace otlp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
#pragma once

#include <chrono>
#include <memory>
#include <string>

#include "opentelemetry/exporters/otlp/otlp_environment.h"
#include "opentelemetry/exporters/otlp/otlp_http.h"
#include "opentelemetry/exporters/otlp/otlp_preferred_temporality.h"
#include "opentelemetry/sdk/common/thread_instrumentation.h"
#include "opentelemetry/version.h"

OPENTELEMETRY_BEGIN_NAMESPACE
Expand Down Expand Up @@ -104,6 +106,8 @@ struct OPENTELEMETRY_EXPORT OtlpHttpMetricExporterOptions

/** Compression type. */
std::string compression;

std::shared_ptr<sdk::common::ThreadInstrumentation> thread_instrumentation;
};

} // namespace otlp
Expand Down
2 changes: 1 addition & 1 deletion exporters/otlp/src/otlp_http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ void ConvertListFieldToJson(nlohmann::json &value,
OtlpHttpClient::OtlpHttpClient(OtlpHttpClientOptions &&options)
: is_shutdown_(false),
options_(options),
http_client_(http_client::HttpClientFactory::Create()),
http_client_(http_client::HttpClientFactory::Create(options.thread_instrumentation)),
start_session_counter_(0),
finished_session_counter_(0)
{
Expand Down
3 changes: 2 additions & 1 deletion exporters/otlp/src/otlp_http_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ OtlpHttpExporter::OtlpHttpExporter(const OtlpHttpExporterOptions &options)
options.use_json_name,
options.console_debug,
options.timeout,
options.http_headers
options.http_headers,
options.thread_instrumentation
#ifdef ENABLE_ASYNC_EXPORT
,
options.max_concurrent_requests,
Expand Down
3 changes: 2 additions & 1 deletion exporters/otlp/src/otlp_http_log_record_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ OtlpHttpLogRecordExporter::OtlpHttpLogRecordExporter(
options.use_json_name,
options.console_debug,
options.timeout,
options.http_headers
options.http_headers,
options.thread_instrumentation
#ifdef ENABLE_ASYNC_EXPORT
,
options.max_concurrent_requests,
Expand Down
3 changes: 2 additions & 1 deletion exporters/otlp/src/otlp_http_metric_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ OtlpHttpMetricExporter::OtlpHttpMetricExporter(const OtlpHttpMetricExporterOptio
options.use_json_name,
options.console_debug,
options.timeout,
options.http_headers
options.http_headers,
options.thread_instrumentation
#ifdef ENABLE_ASYNC_EXPORT
,
options.max_concurrent_requests,
Expand Down
3 changes: 2 additions & 1 deletion exporters/otlp/test/otlp_http_exporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ static nostd::span<T, N> MakeSpan(T (&array)[N])
OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_type,
bool async_mode)
{
std::shared_ptr<opentelemetry::sdk::common::ThreadInstrumentation> not_instrumented;
OtlpHttpExporterOptions options;
options.content_type = content_type;
options.console_debug = true;
Expand All @@ -70,7 +71,7 @@ OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_t
"", /* ssl_cipher */
"", /* ssl_cipher_suite */
options.content_type, options.json_bytes_mapping, options.compression, options.use_json_name,
options.console_debug, options.timeout, options.http_headers);
options.console_debug, options.timeout, options.http_headers, not_instrumented);
if (!async_mode)
{
otlp_http_client_options.max_concurrent_requests = 0;
Expand Down
3 changes: 2 additions & 1 deletion exporters/otlp/test/otlp_http_log_record_exporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ static nostd::span<T, N> MakeSpan(T (&array)[N])
OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_type,
bool async_mode)
{
std::shared_ptr<opentelemetry::sdk::common::ThreadInstrumentation> not_instrumented;
OtlpHttpLogRecordExporterOptions options;
options.content_type = content_type;
options.console_debug = true;
Expand All @@ -69,7 +70,7 @@ OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_t
"", /* ssl_cipher */
"", /* ssl_cipher_suite */
options.content_type, options.json_bytes_mapping, options.compression, options.use_json_name,
options.console_debug, options.timeout, options.http_headers);
options.console_debug, options.timeout, options.http_headers, not_instrumented);
if (!async_mode)
{
otlp_http_client_options.max_concurrent_requests = 0;
Expand Down
3 changes: 2 additions & 1 deletion exporters/otlp/test/otlp_http_metric_exporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ static IntegerType JsonToInteger(nlohmann::json value)
OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_type,
bool async_mode)
{
std::shared_ptr<opentelemetry::sdk::common::ThreadInstrumentation> not_instrumented;
OtlpHttpMetricExporterOptions options;
options.content_type = content_type;
options.console_debug = true;
Expand All @@ -76,7 +77,7 @@ OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_t
"", /* ssl_cipher */
"", /* ssl_cipher_suite */
options.content_type, options.json_bytes_mapping, options.compression, options.use_json_name,
options.console_debug, options.timeout, options.http_headers);
options.console_debug, options.timeout, options.http_headers, not_instrumented);
if (!async_mode)
{
otlp_http_client_options.max_concurrent_requests = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "opentelemetry/nostd/function_ref.h"
#include "opentelemetry/nostd/shared_ptr.h"
#include "opentelemetry/nostd/string_view.h"
#include "opentelemetry/sdk/common/thread_instrumentation.h"
#include "opentelemetry/version.h"

OPENTELEMETRY_BEGIN_NAMESPACE
Expand Down Expand Up @@ -304,6 +305,7 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient
public:
// The call (curl_global_init) is not thread safe. Ensure this is called only once.
HttpClient();
HttpClient(const std::shared_ptr<sdk::common::ThreadInstrumentation> &thread_instrumentation);
~HttpClient() override;

std::shared_ptr<opentelemetry::ext::http::client::Session> CreateSession(
Expand Down Expand Up @@ -366,6 +368,7 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient

std::mutex background_thread_m_;
std::unique_ptr<std::thread> background_thread_;
std::shared_ptr<sdk::common::ThreadInstrumentation> background_thread_instrumentation_;
std::chrono::milliseconds scheduled_delay_milliseconds_;

nostd::shared_ptr<HttpCurlGlobalInitializer> curl_global_initializer_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// SPDX-License-Identifier: Apache-2.0

#pragma once

#include "opentelemetry/ext/http/client/http_client.h"
#include "opentelemetry/sdk/common/thread_instrumentation.h"
#include "opentelemetry/version.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace ext
Expand All @@ -17,6 +20,8 @@ class HttpClientFactory
static std::shared_ptr<HttpClientSync> CreateSync();

static std::shared_ptr<HttpClient> Create();
static std::shared_ptr<HttpClient> Create(
const std::shared_ptr<sdk::common::ThreadInstrumentation> &thread_instrumentation);
};
} // namespace client
} // namespace http
Expand Down
23 changes: 22 additions & 1 deletion ext/src/http/client/curl/http_client_curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,18 @@ HttpClient::HttpClient()
next_session_id_{0},
max_sessions_per_connection_{8},
scheduled_delay_milliseconds_{std::chrono::milliseconds(256)},
curl_global_initializer_(HttpCurlGlobalInitializer::GetInstance())
curl_global_initializer_(HttpCurlGlobalInitializer::GetInstance()),
background_thread_instrumentation_(nullptr)
{}

HttpClient::HttpClient(
const std::shared_ptr<sdk::common::ThreadInstrumentation> &thread_instrumentation)
: multi_handle_(curl_multi_init()),
next_session_id_{0},
max_sessions_per_connection_{8},
scheduled_delay_milliseconds_{std::chrono::milliseconds(256)},
curl_global_initializer_(HttpCurlGlobalInitializer::GetInstance()),
background_thread_instrumentation_(thread_instrumentation)
{}

HttpClient::~HttpClient()
Expand Down Expand Up @@ -345,6 +356,11 @@ void HttpClient::MaybeSpawnBackgroundThread()

background_thread_.reset(new std::thread(
[](HttpClient *self) {
if (self->background_thread_instrumentation_ != nullptr)
{
self->background_thread_instrumentation_->OnStart();
}

int still_running = 1;
while (true)
{
Expand Down Expand Up @@ -452,6 +468,11 @@ void HttpClient::MaybeSpawnBackgroundThread()
}
}
}

if (self->background_thread_instrumentation_ != nullptr)
{
self->background_thread_instrumentation_->OnEnd();
}
},
this));
}
Expand Down
6 changes: 6 additions & 0 deletions ext/src/http/client/curl/http_client_factory_curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ std::shared_ptr<http_client::HttpClient> http_client::HttpClientFactory::Create(
return std::make_shared<http_client::curl::HttpClient>();
}

std::shared_ptr<http_client::HttpClient> http_client::HttpClientFactory::Create(
const std::shared_ptr<sdk::common::ThreadInstrumentation> &thread_instrumentation)
{
return std::make_shared<http_client::curl::HttpClient>(thread_instrumentation);
}

std::shared_ptr<http_client::HttpClientSync> http_client::HttpClientFactory::CreateSync()
{
return std::make_shared<http_client::curl::HttpClientSync>();
Expand Down
30 changes: 30 additions & 0 deletions sdk/include/opentelemetry/sdk/common/thread_instrumentation.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#pragma once

#include "opentelemetry/version.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace common
{

class ThreadInstrumentation
{
public:
ThreadInstrumentation() = default;
virtual ~ThreadInstrumentation() = default;

virtual void OnStart() = 0;
virtual void OnEnd() = 0;
virtual void BeforeWait() = 0;
virtual void AfterWait() = 0;
virtual void BeforeLoad() = 0;
virtual void AfterLoad() = 0;
};

} // namespace common
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ class BatchLogRecordProcessor : public LogRecordProcessor

/* The background worker thread */
std::thread worker_thread_;
std::shared_ptr<sdk::common::ThreadInstrumentation> worker_thread_instrumentation_;
};

} // namespace logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <chrono>
#include <cstddef>

#include "opentelemetry/sdk/common/thread_instrumentation.h"
#include "opentelemetry/version.h"

OPENTELEMETRY_BEGIN_NAMESPACE
Expand Down Expand Up @@ -34,6 +35,8 @@ struct BatchLogRecordProcessorOptions
* equal to max_queue_size.
*/
size_t max_export_batch_size = 512;

std::shared_ptr<sdk::common::ThreadInstrumentation> thread_instrumentation;
};

} // namespace logs
Expand Down
Loading

0 comments on commit 3975855

Please sign in to comment.