Skip to content

Commit

Permalink
Fix log group memory leak when using Golang plugins and reduce alarm …
Browse files Browse the repository at this point in the history
…frequency in LOGSTORE send mode (#1313)

1. Fixed the issue where the logGroup pointer was not released when using Golang plugins.
2. Increase alarm threshold and reduce 'package list interval' configuration to fix the alarm of LOG_GROUP_WAIT_TOO_LONG_ALARM in LOGSTORE mode.
  • Loading branch information
linrunqi08 authored Jan 8, 2024
1 parent c3886fa commit 2c2ae3d
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 14 deletions.
2 changes: 1 addition & 1 deletion core/aggregator/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ bool MergeItem::IsReady() {
bool PackageListMergeBuffer::IsReady(int32_t curTime) {
// should use 2 * INT32_FLAG(batch_send_interval)), package list interval should > merge item interval
return (mTotalRawBytes >= INT32_FLAG(batch_send_metric_size))
|| (mItemCount > 0 && (curTime - mFirstItemTime) >= 2 * INT32_FLAG(batch_send_interval));
|| (mItemCount > 0 && (curTime - mFirstItemTime) >= INT32_FLAG(batch_send_interval));
}


Expand Down
22 changes: 11 additions & 11 deletions core/processor/daemon/LogProcess.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ void* LogProcess::ProcessLoop(int32_t threadNo) {
continue;
}

std::vector<sls_logs::LogGroup*> logGroupList;
std::vector<std::unique_ptr<sls_logs::LogGroup>> logGroupList;
ProcessProfile profile;
profile.readBytes = readBytes;
int32_t parseStartTime = (int32_t)time(NULL);
Expand All @@ -321,9 +321,8 @@ void* LogProcess::ProcessLoop(int32_t threadNo) {
int32_t parseEndTime = (int32_t)time(NULL);

int logSize = 0;
for (auto pLogGroup : logGroupList) {
sls_logs::LogGroup logGroup = *pLogGroup;
logSize +=logGroup.logs_size();
for (auto& pLogGroup : logGroupList) {
logSize += pLogGroup->logs_size();
}
// add lines count
s_processLines += profile.splitLines;
Expand Down Expand Up @@ -375,7 +374,7 @@ void* LogProcess::ProcessLoop(int32_t threadNo) {
}
sls_logs::SlsCompressType compressType = sdk::Client::GetCompressType(compressStr);

for (auto pLogGroup : logGroupList) {
for (auto& pLogGroup : logGroupList) {
LogGroupContext context(flusherSLS->mRegion,
projectName,
flusherSLS->mLogstore,
Expand All @@ -389,7 +388,7 @@ void* LogProcess::ProcessLoop(int32_t threadNo) {
logBuffer->exactlyOnceCheckpoint);
if (!Sender::Instance()->Send(projectName,
logFileReader->GetSourceId(),
*pLogGroup,
*(pLogGroup.get()),
logFileReader->GetLogGroupKey(),
flusherSLS,
flusherSLS->mBatch.mMergeType,
Expand All @@ -403,12 +402,11 @@ void* LogProcess::ProcessLoop(int32_t threadNo) {
category,
pipeline->GetContext().GetRegion());
LOG_ERROR(sLogger,
("push file data into batch map fail, discard logs", (*pLogGroup).logs_size())(
("push file data into batch map fail, discard logs", pLogGroup->logs_size())(
"project", projectName)("logstore", category)("filename", convertedPath));


}
delete pLogGroup;
}

LogFileProfiler::GetInstance()->AddProfilingData(pipeline->Name(),
Expand All @@ -434,6 +432,7 @@ void* LogProcess::ProcessLoop(int32_t threadNo) {
"parse_failures", profile.parseFailures)("parse_time_failures", profile.parseTimeFailures)(
"regex_match_failures", profile.regexMatchFailures)("history_failures", profile.historyFailures));
}
logGroupList.clear();
}
}
LOG_WARNING(sLogger, ("LogProcessThread", "Exit")("threadNo", threadNo));
Expand All @@ -442,7 +441,7 @@ void* LogProcess::ProcessLoop(int32_t threadNo) {

int LogProcess::ProcessBuffer(std::shared_ptr<LogBuffer>& logBuffer,
LogFileReaderPtr& logFileReader,
std::vector<sls_logs::LogGroup*>& resultGroupList,
std::vector<std::unique_ptr<sls_logs::LogGroup>>& resultGroupList,
ProcessProfile& profile) {
auto pipeline = PipelineManager::GetInstance()->FindPipelineByName(
logFileReader->GetConfigName()); // pipeline should be set in the loggroup by input
Expand Down Expand Up @@ -483,11 +482,12 @@ int LogProcess::ProcessBuffer(std::shared_ptr<LogBuffer>& logBuffer,

for (auto& eventGroup : eventGroupList) {
// fill protobuf
sls_logs::LogGroup* resultGroup = new sls_logs::LogGroup();
resultGroupList.emplace_back(resultGroup);
resultGroupList.emplace_back(new sls_logs::LogGroup());
auto& resultGroup = resultGroupList.back();
FillLogGroupLogs(eventGroup, *resultGroup, pipeline->GetContext().GetGlobalConfig().mEnableTimestampNanosecond);
FillLogGroupTags(eventGroup, logFileReader, *resultGroup);
if (pipeline->IsFlushingThroughGoPipeline()) {
// LogGroup will be deleted outside
LogtailPlugin::GetInstance()->ProcessLogGroup(
logFileReader->GetConfigName(), *resultGroup, logFileReader->GetSourceId());
return 1;
Expand Down
2 changes: 1 addition & 1 deletion core/processor/daemon/LogProcess.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class LogProcess : public LogRunnable {
*/
int ProcessBuffer(std::shared_ptr<LogBuffer>& logBuffer,
LogFileReaderPtr& logFileReader,
std::vector<sls_logs::LogGroup*>& logGroupList,
std::vector<std::unique_ptr<sls_logs::LogGroup>>& resultGroupList,
ProcessProfile& profile);
/**
* @retval 0 if continue processing by C++, 1 if processed by Go
Expand Down
2 changes: 1 addition & 1 deletion core/sender/Sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ DEFINE_FLAG_INT32(test_unavailable_endpoint_interval, "test unavailable endpoint
DEFINE_FLAG_INT32(sending_cost_time_alarm_interval, "sending log group cost too much time, second", 3);
DEFINE_FLAG_INT32(log_group_wait_in_queue_alarm_interval,
"log group wait in queue alarm interval, may blocked by concurrency or quota, second",
3);
6);
DEFINE_FLAG_STRING(data_endpoint_policy,
"policy for switching between data server endpoints, possible options include "
"'designated_first'(default) and 'designated_locked'",
Expand Down

0 comments on commit 2c2ae3d

Please sign in to comment.