diff --git a/core/aggregator/Aggregator.cpp b/core/aggregator/Aggregator.cpp index 234643517f..a42dd6bfb3 100644 --- a/core/aggregator/Aggregator.cpp +++ b/core/aggregator/Aggregator.cpp @@ -48,7 +48,7 @@ bool MergeItem::IsReady() { bool PackageListMergeBuffer::IsReady(int32_t curTime) { // should use 2 * INT32_FLAG(batch_send_interval)), package list interval should > merge item interval return (mTotalRawBytes >= INT32_FLAG(batch_send_metric_size)) - || (mItemCount > 0 && (curTime - mFirstItemTime) >= 2 * INT32_FLAG(batch_send_interval)); + || (mItemCount > 0 && (curTime - mFirstItemTime) >= INT32_FLAG(batch_send_interval)); } diff --git a/core/processor/daemon/LogProcess.cpp b/core/processor/daemon/LogProcess.cpp index ab7140707c..c63e18ab10 100644 --- a/core/processor/daemon/LogProcess.cpp +++ b/core/processor/daemon/LogProcess.cpp @@ -306,7 +306,7 @@ void* LogProcess::ProcessLoop(int32_t threadNo) { continue; } - std::vector logGroupList; + std::vector> logGroupList; ProcessProfile profile; profile.readBytes = readBytes; int32_t parseStartTime = (int32_t)time(NULL); @@ -321,9 +321,8 @@ void* LogProcess::ProcessLoop(int32_t threadNo) { int32_t parseEndTime = (int32_t)time(NULL); int logSize = 0; - for (auto pLogGroup : logGroupList) { - sls_logs::LogGroup logGroup = *pLogGroup; - logSize +=logGroup.logs_size(); + for (auto& pLogGroup : logGroupList) { + logSize += pLogGroup->logs_size(); } // add lines count s_processLines += profile.splitLines; @@ -375,7 +374,7 @@ void* LogProcess::ProcessLoop(int32_t threadNo) { } sls_logs::SlsCompressType compressType = sdk::Client::GetCompressType(compressStr); - for (auto pLogGroup : logGroupList) { + for (auto& pLogGroup : logGroupList) { LogGroupContext context(flusherSLS->mRegion, projectName, flusherSLS->mLogstore, @@ -389,7 +388,7 @@ void* LogProcess::ProcessLoop(int32_t threadNo) { logBuffer->exactlyOnceCheckpoint); if (!Sender::Instance()->Send(projectName, logFileReader->GetSourceId(), - *pLogGroup, + *(pLogGroup.get()), logFileReader->GetLogGroupKey(), flusherSLS, flusherSLS->mBatch.mMergeType, @@ -403,12 +402,11 @@ void* LogProcess::ProcessLoop(int32_t threadNo) { category, pipeline->GetContext().GetRegion()); LOG_ERROR(sLogger, - ("push file data into batch map fail, discard logs", (*pLogGroup).logs_size())( + ("push file data into batch map fail, discard logs", pLogGroup->logs_size())( "project", projectName)("logstore", category)("filename", convertedPath)); } - delete pLogGroup; } LogFileProfiler::GetInstance()->AddProfilingData(pipeline->Name(), @@ -434,6 +432,7 @@ void* LogProcess::ProcessLoop(int32_t threadNo) { "parse_failures", profile.parseFailures)("parse_time_failures", profile.parseTimeFailures)( "regex_match_failures", profile.regexMatchFailures)("history_failures", profile.historyFailures)); } + logGroupList.clear(); } } LOG_WARNING(sLogger, ("LogProcessThread", "Exit")("threadNo", threadNo)); @@ -442,7 +441,7 @@ void* LogProcess::ProcessLoop(int32_t threadNo) { int LogProcess::ProcessBuffer(std::shared_ptr& logBuffer, LogFileReaderPtr& logFileReader, - std::vector& resultGroupList, + std::vector>& resultGroupList, ProcessProfile& profile) { auto pipeline = PipelineManager::GetInstance()->FindPipelineByName( logFileReader->GetConfigName()); // pipeline should be set in the loggroup by input @@ -483,11 +482,12 @@ int LogProcess::ProcessBuffer(std::shared_ptr& logBuffer, for (auto& eventGroup : eventGroupList) { // fill protobuf - sls_logs::LogGroup* resultGroup = new sls_logs::LogGroup(); - resultGroupList.emplace_back(resultGroup); + resultGroupList.emplace_back(new sls_logs::LogGroup()); + auto& resultGroup = resultGroupList.back(); FillLogGroupLogs(eventGroup, *resultGroup, pipeline->GetContext().GetGlobalConfig().mEnableTimestampNanosecond); FillLogGroupTags(eventGroup, logFileReader, *resultGroup); if (pipeline->IsFlushingThroughGoPipeline()) { + // LogGroup will be deleted outside LogtailPlugin::GetInstance()->ProcessLogGroup( logFileReader->GetConfigName(), *resultGroup, logFileReader->GetSourceId()); return 1; diff --git a/core/processor/daemon/LogProcess.h b/core/processor/daemon/LogProcess.h index bf9ea0d2a0..a7879f484b 100644 --- a/core/processor/daemon/LogProcess.h +++ b/core/processor/daemon/LogProcess.h @@ -94,7 +94,7 @@ class LogProcess : public LogRunnable { */ int ProcessBuffer(std::shared_ptr& logBuffer, LogFileReaderPtr& logFileReader, - std::vector& logGroupList, + std::vector>& resultGroupList, ProcessProfile& profile); /** * @retval 0 if continue processing by C++, 1 if processed by Go diff --git a/core/sender/Sender.cpp b/core/sender/Sender.cpp index b766edd278..134eab18fa 100644 --- a/core/sender/Sender.cpp +++ b/core/sender/Sender.cpp @@ -97,7 +97,7 @@ DEFINE_FLAG_INT32(test_unavailable_endpoint_interval, "test unavailable endpoint DEFINE_FLAG_INT32(sending_cost_time_alarm_interval, "sending log group cost too much time, second", 3); DEFINE_FLAG_INT32(log_group_wait_in_queue_alarm_interval, "log group wait in queue alarm interval, may blocked by concurrency or quota, second", - 3); + 6); DEFINE_FLAG_STRING(data_endpoint_policy, "policy for switching between data server endpoints, possible options include " "'designated_first'(default) and 'designated_locked'",