Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EXPORTER] Optimize OTLP HTTP compression #3178

Merged
merged 19 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion exporters/otlp/src/otlp_http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ OtlpHttpClient::createSession(
// Parse uri and store it to cache
if (http_uri_.empty())
{
auto parse_url = opentelemetry::ext::http::common::UrlParser(std::string(options_.url));
const auto parse_url = opentelemetry::ext::http::common::UrlParser(options_.url);
if (!parse_url.success_)
{
std::string error_message = "[OTLP HTTP Client] Export failed, invalid url: " + options_.url;
Expand Down
128 changes: 104 additions & 24 deletions ext/src/http/client/curl/http_client_curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@

#include <curl/curl.h>
#include <curl/curlver.h>
#include <algorithm>
#include <array>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <list>
#include <mutex>
#include <string>
#include <thread>
#include <type_traits>
#include <unordered_map>
#include <unordered_set>
#include <utility>
Expand Down Expand Up @@ -57,11 +60,85 @@ nostd::shared_ptr<HttpCurlGlobalInitializer> HttpCurlGlobalInitializer::GetInsta
return shared_initializer;
}

#ifdef ENABLE_OTLP_COMPRESSION_PREVIEW
// Original source:
// https://stackoverflow.com/questions/12398377/is-it-possible-to-have-zlib-read-from-and-write-to-the-same-memory-buffer/12412863#12412863
int deflateInPlace(z_stream *strm, unsigned char *buf, uint32_t len, uint32_t *max_len)
{
// must be large enough to hold zlib or gzip header (if any) and one more byte -- 11 works for the
// worst case here, but if gzip encoding is used and a deflateSetHeader() call is inserted in this
// code after the deflateReset(), then the 11 needs to be increased to accommodate the resulting
// gzip header size plus one
std::array<unsigned char, 11> temp{};

marcalff marked this conversation as resolved.
Show resolved Hide resolved
// kick start the process with a temporary output buffer -- this allows deflate to consume a large
// chunk of input data in order to make room for output data there
strm->next_in = buf;
strm->avail_in = len;
if (*max_len < len)
{
*max_len = len;
}
strm->next_out = temp.data();
strm->avail_out = (std::min)(static_cast<decltype(z_stream::avail_out)>(temp.size()), *max_len);
auto ret = deflate(strm, Z_FINISH);
if (ret == Z_STREAM_ERROR)
{
return ret;
}

// if we can, copy the temporary output data to the consumed portion of the input buffer, and then
// continue to write up to the start of the consumed input for as long as possible
auto have = strm->next_out - temp.data(); // number of bytes in temp[]
if (have <= static_cast<decltype(have)>(strm->avail_in ? len - strm->avail_in : *max_len))
{
std::memcpy(buf, temp.data(), have);
strm->next_out = buf + have;
have = 0;
while (ret == Z_OK)
{
strm->avail_out =
strm->avail_in ? strm->next_in - strm->next_out : (buf + *max_len) - strm->next_out;
ret = deflate(strm, Z_FINISH);
}
if (ret != Z_BUF_ERROR || strm->avail_in == 0)
{
*max_len = strm->next_out - buf;
return ret == Z_STREAM_END ? Z_OK : ret;
}
}

// the output caught up with the input due to insufficiently compressible data -- copy the
// remaining input data into an allocated buffer and complete the compression from there to the
// now empty input buffer (this will only occur for long incompressible streams, more than ~20 MB
// for the default deflate memLevel of 8, or when *max_len is too small and less than the length
// of the header plus one byte)
auto hold = static_cast<std::remove_const_t<decltype(z_stream::next_in)>>(
strm->zalloc(strm->opaque, strm->avail_in, 1)); // allocated buffer to hold input data
if (hold == Z_NULL)
{
return Z_MEM_ERROR;
}
std::memcpy(hold, strm->next_in, strm->avail_in);
strm->next_in = hold;
if (have)
{
std::memcpy(buf, temp.data(), have);
strm->next_out = buf + have;
}
strm->avail_out = (buf + *max_len) - strm->next_out;
ret = deflate(strm, Z_FINISH);
strm->zfree(strm->opaque, hold);
*max_len = strm->next_out - buf;
return ret == Z_OK ? Z_BUF_ERROR : (ret == Z_STREAM_END ? Z_OK : ret);
}
#endif // ENABLE_OTLP_COMPRESSION_PREVIEW

void Session::SendRequest(
std::shared_ptr<opentelemetry::ext::http::client::EventHandler> callback) noexcept
{
is_session_active_.store(true, std::memory_order_release);
std::string url = host_ + std::string(http_request_->uri_);
const auto &url = host_ + http_request_->uri_;
auto callback_ptr = callback.get();
bool reuse_connection = false;

Expand All @@ -76,44 +153,47 @@ void Session::SendRequest(
if (http_request_->compression_ == opentelemetry::ext::http::client::Compression::kGzip)
{
#ifdef ENABLE_OTLP_COMPRESSION_PREVIEW
http_request_->AddHeader("Content-Encoding", "gzip");

opentelemetry::ext::http::client::Body compressed_body(http_request_->body_.size());
z_stream zs;
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;
zs.avail_in = static_cast<uInt>(http_request_->body_.size());
zs.next_in = http_request_->body_.data();
zs.avail_out = static_cast<uInt>(compressed_body.size());
zs.next_out = compressed_body.data();
z_stream zs{};
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;

// ZLIB: Have to maually specify 16 bits for the Gzip headers
const int window_bits = 15 + 16;
static constexpr int kWindowBits = MAX_WBITS + 16;
static constexpr int kMemLevel = MAX_MEM_LEVEL;
Copy link
Contributor Author

@chusitoo chusitoo Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, MAX_MEM_LEVEL appears to be 9 and, from the official documentation

The memLevel parameter specifies how much memory should be allocated for the internal compression state. memLevel=1 uses minimum memory but is slow and reduces compression ratio; memLevel=9 uses maximum memory for optimal speed. The default value is 8.

The default level is not exposed, unfortunately (header zutil.h not available for including), but the code to drive the default is:

#if MAX_MEM_LEVEL >= 8
#  define DEF_MEM_LEVEL 8
#else
#  define DEF_MEM_LEVEL  MAX_MEM_LEVEL
#endif
/* default memLevel */

My intuition was to not hardcode the "default" value 8 and use the max level setting provided by the macro but I can revert this back to a hardcoded value of 8 as in the current compression code.


int stream =
deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, 8, Z_DEFAULT_STRATEGY);
auto stream = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, kWindowBits, kMemLevel,
Z_DEFAULT_STRATEGY);

if (stream == Z_OK)
{
deflate(&zs, Z_FINISH);
deflateEnd(&zs);
compressed_body.resize(zs.total_out);
http_request_->SetBody(compressed_body);
auto size = static_cast<uInt>(http_request_->body_.size());
auto max_size = size;
stream = deflateInPlace(&zs, http_request_->body_.data(), size, &max_size);

if (stream == Z_OK)
{
http_request_->AddHeader("Content-Encoding", "gzip");
http_request_->body_.resize(max_size);
}
}
else

if (stream != Z_OK)
{
if (callback)
{
callback->OnEvent(opentelemetry::ext::http::client::SessionState::CreateFailed, "");
callback->OnEvent(opentelemetry::ext::http::client::SessionState::CreateFailed,
zs.msg ? zs.msg : "");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a precaution; the documentation states that msg is null when there are no errors. Given that we check for an error return code before even attempting to use this variable, the ternary if is probably redundant, but it does not hurt keeping it as another safety net in case the invariant is broken by the custom deflateInPlace code or future versions of the library, for instance.

The list of possible messages to be reported:

z_const char * const z_errmsg[10] = {
    (z_const char *)"need dictionary",     /* Z_NEED_DICT       2  */
    (z_const char *)"stream end",          /* Z_STREAM_END      1  */
    (z_const char *)"",                    /* Z_OK              0  */
    (z_const char *)"file error",          /* Z_ERRNO         (-1) */
    (z_const char *)"stream error",        /* Z_STREAM_ERROR  (-2) */
    (z_const char *)"data error",          /* Z_DATA_ERROR    (-3) */
    (z_const char *)"insufficient memory", /* Z_MEM_ERROR     (-4) */
    (z_const char *)"buffer error",        /* Z_BUF_ERROR     (-5) */
    (z_const char *)"incompatible version",/* Z_VERSION_ERROR (-6) */
    (z_const char *)""
};

}
is_session_active_.store(false, std::memory_order_release);
}

deflateEnd(&zs);
#else
OTEL_INTERNAL_LOG_ERROR(
"[HTTP Client Curl] Set WITH_OTLP_HTTP_COMPRESSION=ON to use gzip compression with the "
"OTLP HTTP Exporter");
#endif
#endif // ENABLE_OTLP_COMPRESSION_PREVIEW
}

curl_operation_.reset(new HttpOperation(
Expand Down Expand Up @@ -226,7 +306,7 @@ HttpClient::~HttpClient()
std::shared_ptr<opentelemetry::ext::http::client::Session> HttpClient::CreateSession(
nostd::string_view url) noexcept
{
auto parsedUrl = common::UrlParser(std::string(url));
const auto parsedUrl = common::UrlParser(std::string(url));
if (!parsedUrl.success_)
{
return std::make_shared<Session>(*this);
Expand Down
4 changes: 1 addition & 3 deletions ext/src/http/client/curl/http_operation_curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,7 @@ HttpOperation::HttpOperation(opentelemetry::ext::http::client::Method method,
{
for (auto &kv : this->request_headers_)
{
std::string header = std::string(kv.first);
header += ": ";
header += std::string(kv.second);
const auto header = std::string(kv.first).append(": ").append(kv.second);
curl_resource_.headers_chunk =
curl_slist_append(curl_resource_.headers_chunk, header.c_str());
}
Expand Down
127 changes: 126 additions & 1 deletion ext/test/http/curl_http_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <map>
#include <memory>
#include <mutex>
#include <numeric>
#include <string>
#include <thread>
#include <utility>
Expand Down Expand Up @@ -558,7 +559,6 @@ TEST_F(BasicCurlHttpTests, ElegantQuitQuick)
ASSERT_TRUE(handler->is_called_);
ASSERT_TRUE(handler->got_response_);
}

TEST_F(BasicCurlHttpTests, BackgroundThreadWaitMore)
{
{
Expand All @@ -581,3 +581,128 @@ TEST_F(BasicCurlHttpTests, BackgroundThreadWaitMore)
ASSERT_TRUE(http_client.MaybeSpawnBackgroundThread());
}
}

#ifdef ENABLE_OTLP_COMPRESSION_PREVIEW
struct GzipEventHandler : public CustomEventHandler
{
~GzipEventHandler() override = default;

void OnResponse(http_client::Response & /* response */) noexcept override {}

void OnEvent(http_client::SessionState state, nostd::string_view reason) noexcept override
{
is_called_ = true;
state_ = state;
reason_ = std::string{reason};
}

bool is_called_ = false;
http_client::SessionState state_ = static_cast<http_client::SessionState>(-1);
std::string reason_;
};

TEST_F(BasicCurlHttpTests, GzipCompressibleData)
{
received_requests_.clear();
auto session_manager = http_client::HttpClientFactory::Create();
EXPECT_TRUE(session_manager != nullptr);

auto session = session_manager->CreateSession("http://127.0.0.1:19000");
auto request = session->CreateRequest();
request->SetUri("post/");
request->SetMethod(http_client::Method::Post);

const auto original_size = 500UL;
http_client::Body body(original_size);
std::iota(body.begin(), body.end(), 0);
request->SetBody(body);
request->AddHeader("Content-Type", "text/plain");
request->SetCompression(opentelemetry::ext::http::client::Compression::kGzip);
auto handler = std::make_shared<GzipEventHandler>();
session->SendRequest(handler);
ASSERT_TRUE(waitForRequests(30, 1));
session->FinishSession();
ASSERT_TRUE(handler->is_called_);
ASSERT_EQ(handler->state_, http_client::SessionState::Response);
ASSERT_TRUE(handler->reason_.empty());

auto http_request =
dynamic_cast<opentelemetry::ext::http::client::curl::Request *>(request.get());
ASSERT_TRUE(http_request != nullptr);
ASSERT_LT(http_request->body_.size(), original_size);

session_manager->CancelAllSessions();
session_manager->FinishAllSessions();
}

TEST_F(BasicCurlHttpTests, GzipIncompressibleData)
{
received_requests_.clear();
auto session_manager = http_client::HttpClientFactory::Create();
EXPECT_TRUE(session_manager != nullptr);

auto session = session_manager->CreateSession("http://127.0.0.1:19000");
auto request = session->CreateRequest();
request->SetUri("post/");
request->SetMethod(http_client::Method::Post);

// Random data generated using code snippet below.
// const auto original_size = 500UL;
// http_client::Body body(original_size);
// std::random_device rd;
// std::mt19937 gen(rd());
// std::uniform_int_distribution<> uid(1, 255);
// std::generate(body.begin(), body.end(), [&]() { return uid(gen); });

// The input values are fixed to make the test repeatable in the event that some distributions
// might yield results that are, in fact, compressible.
http_client::Body body = {
140, 198, 12, 56, 165, 185, 173, 20, 13, 83, 127, 223, 77, 38, 224, 43, 236, 10, 178,
75, 169, 157, 136, 199, 74, 30, 148, 195, 51, 30, 225, 21, 121, 219, 7, 155, 198, 121,
205, 102, 80, 38, 132, 202, 45, 229, 206, 90, 150, 202, 53, 221, 54, 37, 172, 90, 238,
248, 191, 240, 109, 227, 248, 41, 251, 121, 35, 226, 107, 122, 15, 242, 203, 45, 64, 195,
186, 23, 1, 158, 61, 196, 182, 26, 201, 47, 211, 241, 251, 209, 255, 170, 181, 192, 89,
133, 176, 60, 178, 97, 168, 223, 152, 9, 118, 98, 169, 240, 170, 15, 13, 161, 24, 57,
123, 117, 230, 30, 244, 117, 238, 255, 198, 232, 95, 148, 37, 61, 67, 103, 31, 240, 52,
21, 145, 175, 201, 86, 19, 61, 228, 76, 131, 185, 111, 149, 203, 143, 16, 142, 95, 173,
42, 106, 39, 203, 116, 235, 20, 162, 112, 173, 112, 70, 126, 191, 210, 219, 90, 145, 126,
118, 43, 241, 101, 66, 175, 179, 5, 233, 208, 164, 180, 83, 214, 194, 173, 29, 179, 149,
75, 202, 17, 152, 139, 130, 94, 247, 142, 249, 159, 224, 205, 131, 93, 82, 186, 226, 210,
84, 17, 212, 155, 61, 226, 103, 152, 37, 3, 193, 216, 219, 203, 101, 99, 33, 59, 38,
106, 62, 232, 127, 44, 125, 90, 169, 148, 238, 34, 106, 12, 221, 90, 173, 67, 122, 232,
161, 89, 198, 43, 241, 195, 248, 219, 35, 47, 200, 11, 227, 168, 246, 243, 103, 38, 17,
203, 237, 203, 158, 204, 89, 231, 19, 24, 25, 199, 160, 233, 43, 117, 144, 196, 117, 152,
42, 121, 189, 217, 202, 221, 250, 157, 237, 47, 29, 64, 32, 10, 32, 243, 28, 114, 158,
228, 102, 36, 191, 139, 217, 161, 162, 186, 19, 141, 212, 49, 1, 239, 153, 107, 249, 31,
235, 138, 73, 80, 58, 152, 15, 149, 50, 42, 84, 75, 95, 82, 56, 86, 143, 45, 214,
11, 184, 164, 181, 249, 74, 184, 26, 207, 165, 162, 240, 154, 90, 56, 175, 72, 4, 166,
188, 78, 232, 87, 243, 50, 59, 62, 175, 213, 210, 182, 31, 123, 91, 118, 98, 249, 23,
170, 240, 228, 236, 121, 87, 132, 129, 250, 41, 227, 204, 250, 147, 145, 109, 149, 210, 21,
174, 165, 127, 234, 64, 211, 52, 93, 126, 117, 231, 216, 210, 15, 16, 2, 167, 215, 178,
104, 245, 119, 211, 235, 120, 135, 202, 117, 150, 101, 94, 201, 136, 179, 205, 167, 212, 236,
7, 178, 132, 228, 65, 230, 90, 171, 109, 31, 83, 31, 210, 123, 136, 76, 186, 81, 205,
63, 35, 21, 121, 152, 22, 242, 199, 106, 217, 199, 211, 206, 165, 88, 77, 112, 108, 193,
122, 8, 193, 74, 91, 50, 6, 156, 185, 165, 15, 92, 116, 3, 18, 244, 165, 191, 2,
183, 9, 164, 116, 75, 127};
const auto original_size = body.size();

request->SetBody(body);
request->AddHeader("Content-Type", "text/plain");
request->SetCompression(opentelemetry::ext::http::client::Compression::kGzip);
auto handler = std::make_shared<GzipEventHandler>();
session->SendRequest(handler);
ASSERT_TRUE(waitForRequests(30, 1));
session->FinishSession();
ASSERT_TRUE(handler->is_called_);
ASSERT_EQ(handler->state_, http_client::SessionState::Response);
ASSERT_TRUE(handler->reason_.empty());

auto http_request =
dynamic_cast<opentelemetry::ext::http::client::curl::Request *>(request.get());
ASSERT_TRUE(http_request != nullptr);
ASSERT_EQ(http_request->body_.size(), original_size);

session_manager->CancelAllSessions();
session_manager->FinishAllSessions();
}
#endif // ENABLE_OTLP_COMPRESSION_PREVIEW
Loading