Skip to content

Commit

Permalink
Implement CMake WITH_THREAD_INSTRUMENTATION_PREVIEW flag.
Browse files Browse the repository at this point in the history
  • Loading branch information
marcalff committed Jan 7, 2025
1 parent 71d6535 commit 2cd0fb4
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 1 deletion.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ New features:
shows how to use the feature,
and add application logic in the thread execution code path.

* Note that this feature is experimental,
protected by a WITH_THREAD_INSTRUMENTATION_PREVIEW
flag in CMake. Various runtime options structures,
as well as the thread instrumentation interface,
may change without notice before this feature is declared stable.

## [1.18 2024-11-25]

* [EXPORTER] Fix crash in ElasticsearchLogRecordExporter
Expand Down
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ option(WITH_ASYNC_EXPORT_PREVIEW "Whether to enable async export" OFF)
option(WITH_METRICS_EXEMPLAR_PREVIEW
"Whether to enable exemplar within metrics" OFF)

# Experimental, so behind feature flag by default
option(WITH_THREAD_INSTRUMENTATION_PREVIEW
"Whether to enable thread instrumentation" OFF)

option(OPENTELEMETRY_SKIP_DYNAMIC_LOADING_TESTS
"Whether to build test libraries that are always linked as shared libs"
OFF)
Expand Down
5 changes: 5 additions & 0 deletions api/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ if(WITH_METRICS_EXEMPLAR_PREVIEW)
INTERFACE ENABLE_METRICS_EXEMPLAR_PREVIEW)
endif()

if(WITH_THREAD_INSTRUMENTATION_PREVIEW)
target_compile_definitions(opentelemetry_api
INTERFACE ENABLE_THREAD_INSTRUMENTATION_PREVIEW)
endif()

if(WITH_OTLP_HTTP_COMPRESSION)
target_compile_definitions(opentelemetry_api
INTERFACE ENABLE_OTLP_COMPRESSION_PREVIEW)
Expand Down
4 changes: 4 additions & 0 deletions ci/do_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ elif [[ "$1" == "cmake.maintainer.sync.test" ]]; then
-DOTELCPP_MAINTAINER_MODE=ON \
-DWITH_NO_DEPRECATED_CODE=ON \
-DWITH_OTLP_HTTP_COMPRESSION=ON \
-DWITH_THREAD_INSTRUMENTATION_PREVIEW=ON \
${IWYU} \
"${SRC_DIR}"
eval "$MAKE_COMMAND"
Expand All @@ -153,6 +154,7 @@ elif [[ "$1" == "cmake.maintainer.async.test" ]]; then
-DOTELCPP_MAINTAINER_MODE=ON \
-DWITH_NO_DEPRECATED_CODE=ON \
-DWITH_OTLP_HTTP_COMPRESSION=ON \
-DWITH_THREAD_INSTRUMENTATION_PREVIEW=ON \
${IWYU} \
"${SRC_DIR}"
eval "$MAKE_COMMAND"
Expand All @@ -176,6 +178,7 @@ elif [[ "$1" == "cmake.maintainer.cpp11.async.test" ]]; then
-DOTELCPP_MAINTAINER_MODE=ON \
-DWITH_NO_DEPRECATED_CODE=ON \
-DWITH_OTLP_HTTP_COMPRESSION=ON \
-DWITH_THREAD_INSTRUMENTATION_PREVIEW=ON \
"${SRC_DIR}"
make -k -j $(nproc)
make test
Expand All @@ -199,6 +202,7 @@ elif [[ "$1" == "cmake.maintainer.abiv2.test" ]]; then
-DWITH_ABI_VERSION_1=OFF \
-DWITH_ABI_VERSION_2=ON \
-DWITH_OTLP_HTTP_COMPRESSION=ON \
-DWITH_THREAD_INSTRUMENTATION_PREVIEW=ON \
${IWYU} \
"${SRC_DIR}"
eval "$MAKE_COMMAND"
Expand Down
18 changes: 17 additions & 1 deletion examples/otlp/http_instrumented_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ namespace

std::mutex serialize;

#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW

/**
The purpose of MyThreadInstrumentation is to demonstrate
how notifications are delivered to the application.
Expand Down Expand Up @@ -143,6 +145,8 @@ class MyThreadInstrumentation : public opentelemetry::sdk::common::ThreadInstrum
std::string priority_;
};

#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */

opentelemetry::exporter::otlp::OtlpHttpExporterOptions tracer_opts;
opentelemetry::exporter::otlp::OtlpHttpMetricExporterOptions meter_opts;
opentelemetry::exporter::otlp::OtlpHttpLogRecordExporterOptions logger_opts;
Expand All @@ -155,19 +159,23 @@ void InitTracer()
{
// Create OTLP exporter instance
opentelemetry::exporter::otlp::OtlpHttpExporterRuntimeOptions exp_rt_opts;
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
auto exp_instr = std::shared_ptr<opentelemetry::sdk::common::ThreadInstrumentation>(
new MyThreadInstrumentation("OtlpHttpExporter", "trace-net", "high"));
exp_rt_opts.thread_instrumentation = exp_instr;
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
auto exporter =
opentelemetry::exporter::otlp::OtlpHttpExporterFactory::Create(tracer_opts, exp_rt_opts);

// Create Processor instance
opentelemetry::sdk::trace::BatchSpanProcessorOptions pro_opts;
opentelemetry::sdk::trace::BatchSpanProcessorRuntimeOptions pro_rt_opts;
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
auto pro_instr = std::shared_ptr<opentelemetry::sdk::common::ThreadInstrumentation>(
new MyThreadInstrumentation("BatchSpanProcessor", "", "high"));
pro_rt_opts.thread_instrumentation = pro_instr;
auto processor = opentelemetry::sdk::trace::BatchSpanProcessorFactory::Create(
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
auto processor = opentelemetry::sdk::trace::BatchSpanProcessorFactory::Create(
std::move(exporter), pro_opts, pro_rt_opts);

// Create Provider instance
Expand Down Expand Up @@ -196,9 +204,11 @@ void InitMetrics()
{
// Create OTLP exporter instance
opentelemetry::exporter::otlp::OtlpHttpMetricExporterRuntimeOptions exp_rt_opts;
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
auto exp_instr = std::shared_ptr<opentelemetry::sdk::common::ThreadInstrumentation>(
new MyThreadInstrumentation("OtlpHttpMetricExporter", "metric-net", "medium"));
exp_rt_opts.thread_instrumentation = exp_instr;
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
auto exporter =
opentelemetry::exporter::otlp::OtlpHttpMetricExporterFactory::Create(meter_opts, exp_rt_opts);

Expand All @@ -211,12 +221,14 @@ void InitMetrics()
reader_options.export_timeout_millis = std::chrono::milliseconds(500);

opentelemetry::sdk::metrics::PeriodicExportingMetricReaderRuntimeOptions reader_rt_opts;
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
auto reader_periodic_instr = std::shared_ptr<opentelemetry::sdk::common::ThreadInstrumentation>(
new MyThreadInstrumentation("PeriodicExportingMetricReader(periodic)", "", "medium"));
auto reader_collect_instr = std::shared_ptr<opentelemetry::sdk::common::ThreadInstrumentation>(
new MyThreadInstrumentation("PeriodicExportingMetricReader(collect)", "", "medium"));
reader_rt_opts.periodic_thread_instrumentation = reader_periodic_instr;
reader_rt_opts.collect_thread_instrumentation = reader_collect_instr;
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
auto reader = opentelemetry::sdk::metrics::PeriodicExportingMetricReaderFactory::Create(
std::move(exporter), reader_options, reader_rt_opts);

Expand Down Expand Up @@ -247,18 +259,22 @@ void InitLogger()
{
// Create OTLP exporter instance
opentelemetry::exporter::otlp::OtlpHttpLogRecordExporterRuntimeOptions exp_rt_opts;
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
auto exp_instr = std::shared_ptr<opentelemetry::sdk::common::ThreadInstrumentation>(
new MyThreadInstrumentation("OtlpHttpLogRecordExporter", "log-net", "low"));
exp_rt_opts.thread_instrumentation = exp_instr;
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
auto exporter = opentelemetry::exporter::otlp::OtlpHttpLogRecordExporterFactory::Create(
logger_opts, exp_rt_opts);

// Create Processor instance
opentelemetry::sdk::logs::BatchLogRecordProcessorOptions pro_opts;
opentelemetry::sdk::logs::BatchLogRecordProcessorRuntimeOptions pro_rt_opts;
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
auto pro_instr = std::shared_ptr<opentelemetry::sdk::common::ThreadInstrumentation>(
new MyThreadInstrumentation("BatchLogRecordProcessor", "", "low"));
pro_rt_opts.thread_instrumentation = pro_instr;
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
auto processor = opentelemetry::sdk::logs::BatchLogRecordProcessorFactory::Create(
std::move(exporter), pro_opts, pro_rt_opts);

Expand Down
8 changes: 8 additions & 0 deletions exporters/otlp/src/otlp_file_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1453,10 +1453,12 @@ class OPENTELEMETRY_LOCAL_SYMBOL OtlpFileSystemBackend : public OtlpFileAppender
std::chrono::system_clock::now();
std::size_t last_record_count = 0;

#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
if (thread_instrumentation != nullptr)
{
thread_instrumentation->OnStart();
}
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */

while (true)
{
Expand All @@ -1472,20 +1474,24 @@ class OPENTELEMETRY_LOCAL_SYMBOL OtlpFileSystemBackend : public OtlpFileAppender
break;
}

#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
if (thread_instrumentation != nullptr)
{
thread_instrumentation->BeforeWait();
}
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */

{
std::unique_lock<std::mutex> lk(concurrency_file->background_thread_waker_lock);
concurrency_file->background_thread_waker_cv.wait_for(lk, flush_interval);
}

#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
if (thread_instrumentation != nullptr)
{
thread_instrumentation->AfterWait();
}
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */

{
std::size_t current_record_count =
Expand Down Expand Up @@ -1516,10 +1522,12 @@ class OPENTELEMETRY_LOCAL_SYMBOL OtlpFileSystemBackend : public OtlpFileAppender
background_flush_thread.swap(concurrency_file->background_flush_thread);
}

#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
if (thread_instrumentation != nullptr)
{
thread_instrumentation->OnEnd();
}
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */

if (background_flush_thread && background_flush_thread->joinable())
{
Expand Down
8 changes: 8 additions & 0 deletions ext/src/http/client/curl/http_client_curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -441,10 +441,12 @@ bool HttpClient::MaybeSpawnBackgroundThread()

background_thread_.reset(new std::thread(
[](HttpClient *self) {
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
if (self->background_thread_instrumentation_ != nullptr)
{
self->background_thread_instrumentation_->OnStart();
}
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */

int still_running = 1;
std::chrono::system_clock::time_point last_free_job_timepoint =
Expand All @@ -464,10 +466,12 @@ bool HttpClient::MaybeSpawnBackgroundThread()
}
else if (still_running || need_wait_more)
{
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
if (self->background_thread_instrumentation_ != nullptr)
{
self->background_thread_instrumentation_->BeforeWait();
}
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */

// curl_multi_poll is added from libcurl 7.66.0, before 7.68.0, we can only wait util
// timeout to do the rest jobs
Expand All @@ -482,10 +486,12 @@ bool HttpClient::MaybeSpawnBackgroundThread()
nullptr);
#endif

#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
if (self->background_thread_instrumentation_ != nullptr)
{
self->background_thread_instrumentation_->AfterWait();
}
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
}

do
Expand Down Expand Up @@ -584,10 +590,12 @@ bool HttpClient::MaybeSpawnBackgroundThread()
// If there is no pending jobs, we can stop the background thread.
if (still_running == 0)
{
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
if (self->background_thread_instrumentation_ != nullptr)
{
self->background_thread_instrumentation_->OnEnd();
}
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */

if (self->background_thread_)
{
Expand Down
5 changes: 5 additions & 0 deletions sdk/include/opentelemetry/sdk/common/thread_instrumentation.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,17 @@ class ThreadInstrumentation
ThreadInstrumentation() = default;
virtual ~ThreadInstrumentation() = default;

/*
* This feature is experimental, protected by a _PREVIEW flag.
*/
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
virtual void OnStart() {}
virtual void OnEnd() {}
virtual void BeforeWait() {}
virtual void AfterWait() {}
virtual void BeforeLoad() {}
virtual void AfterLoad() {}
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
};

} // namespace common
Expand Down
12 changes: 12 additions & 0 deletions sdk/src/logs/batch_log_record_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,19 +178,23 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex

void BatchLogRecordProcessor::DoBackgroundWork()
{
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
if (worker_thread_instrumentation_ != nullptr)
{
worker_thread_instrumentation_->OnStart();
}
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */

auto timeout = scheduled_delay_millis_;

while (true)
{
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
if (worker_thread_instrumentation_ != nullptr)
{
worker_thread_instrumentation_->BeforeWait();
}
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */

// Wait for `timeout` milliseconds
std::unique_lock<std::mutex> lk(synchronization_data_->cv_m);
Expand All @@ -205,10 +209,12 @@ void BatchLogRecordProcessor::DoBackgroundWork()
synchronization_data_->is_force_wakeup_background_worker.store(false,
std::memory_order_release);

#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
if (worker_thread_instrumentation_ != nullptr)
{
worker_thread_instrumentation_->AfterWait();
}
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */

if (synchronization_data_->is_shutdown.load() == true)
{
Expand All @@ -225,18 +231,22 @@ void BatchLogRecordProcessor::DoBackgroundWork()
timeout = scheduled_delay_millis_ - duration;
}

#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
if (worker_thread_instrumentation_ != nullptr)
{
worker_thread_instrumentation_->OnEnd();
}
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
}

void BatchLogRecordProcessor::Export()
{
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
if (worker_thread_instrumentation_ != nullptr)
{
worker_thread_instrumentation_->BeforeLoad();
}
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */

do
{
Expand Down Expand Up @@ -277,10 +287,12 @@ void BatchLogRecordProcessor::Export()
NotifyCompletion(notify_force_flush, exporter_, synchronization_data_);
} while (true);

#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
if (worker_thread_instrumentation_ != nullptr)
{
worker_thread_instrumentation_->AfterLoad();
}
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
}

void BatchLogRecordProcessor::NotifyCompletion(
Expand Down
Loading

0 comments on commit 2cd0fb4

Please sign in to comment.