Skip to content

Commit

Permalink
[backport] Improve sender log condition to record valuable debug info (
Browse files Browse the repository at this point in the history
  • Loading branch information
yyuuttaaoo authored Mar 27, 2024
1 parent dd92bc8 commit 1c000b9
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 64 deletions.
4 changes: 2 additions & 2 deletions core/common/LogFileCollectOffsetIndicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ void LogFileCollectOffsetIndicator::RecordFileOffset(LoggroupTimeValue* data) {
devInode,
data->mLogGroupContext.mFuseMode,
fd,
data->mLastUpdateTime);
data->mEnqueueTime);
iter = mLogFileOffsetInfoMap.insert(std::make_pair(logFileInfo, logFileOffsetInfo)).first;
}
LogFileOffsetInfo* logFileOffsetInfo = iter->second;
logFileOffsetInfo->mLastUpdateTime = data->mLastUpdateTime;
logFileOffsetInfo->mLastUpdateTime = data->mEnqueueTime;

LogFileOffsetInfoNode node(seqNum,
fileInfoPtr->offset,
Expand Down
3 changes: 0 additions & 3 deletions core/common/LogstoreSenderQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ bool LogstoreSenderInfo::RecordSendResult(SendResult rst, LogstoreSenderStatisti
if (++mLastNetworkErrorCount >= INT32_FLAG(max_client_send_error_count)) {
mLastNetworkErrorCount = INT32_FLAG(max_client_send_error_count);
mNetworkValidFlag = false;
LOG_WARNING(sLogger,
("Network fail, disable ", this->mRegion)("retry interval", mNetworkRetryInterval));
}
break;
case LogstoreSenderInfo::SendResult_QuotaFail:
Expand All @@ -155,7 +153,6 @@ bool LogstoreSenderInfo::RecordSendResult(SendResult rst, LogstoreSenderStatisti
if (++mLastQuotaExceedCount >= INT32_FLAG(max_client_quota_exceed_count)) {
mLastQuotaExceedCount = INT32_FLAG(max_client_quota_exceed_count);
mQuotaValidFlag = false;
LOG_WARNING(sLogger, ("QuotaF fail, disable ", this->mRegion)("retry interval", mQuotaRetryInterval));
}
break;
default:
Expand Down
28 changes: 20 additions & 8 deletions core/common/LogstoreSenderQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ struct LogstoreSenderStatistics {
};

struct LoggroupTimeValue {
int32_t mLastUpdateTime;
int32_t mEnqueueTime;
SEND_DATA_TYPE mDataType;
std::string mLogData;
int32_t mRawSize;
Expand All @@ -63,6 +63,7 @@ struct LoggroupTimeValue {

int32_t mSendRetryTimes;
int32_t mLastSendTime;
int32_t mLastLogWarningTime;
std::string mAliuid;
std::string mRegion;
std::string mShardHashKey;
Expand Down Expand Up @@ -99,9 +100,10 @@ struct LoggroupTimeValue {
mDataType = dataType;
mLogLines = lines;
mRawSize = rawSize;
mLastUpdateTime = lastUpdateTime;
mEnqueueTime = lastUpdateTime;
mSendRetryTimes = 0;
mLastSendTime = 0;
mLastLogWarningTime = 0;
mLogData.clear();
mShardHashKey = shardHashKey;
mStatus = LoggroupSendStatus_Idle;
Expand Down Expand Up @@ -400,11 +402,11 @@ class SingleLogstoreSenderManager : public SingleLogstoreFeedbackQueue<LoggroupT
continue;
}

if (item->mLastUpdateTime < minSendTime) {
minSendTime = item->mLastUpdateTime;
if (item->mEnqueueTime < minSendTime) {
minSendTime = item->mEnqueueTime;
}
if (item->mLastUpdateTime > maxSendTime) {
maxSendTime = item->mLastUpdateTime;
if (item->mEnqueueTime > maxSendTime) {
maxSendTime = item->mEnqueueTime;
}
++statisticsItem.mSendQueueSize;
}
Expand All @@ -417,15 +419,25 @@ class SingleLogstoreSenderManager : public SingleLogstoreFeedbackQueue<LoggroupT

int32_t OnSendDone(LoggroupTimeValue* item, LogstoreSenderInfo::SendResult sendRst, bool& needTrigger) {
needTrigger = mSenderInfo.RecordSendResult(sendRst, mSenderStatistics);
if (!mSenderInfo.mNetworkValidFlag) {
APSARA_LOG_WARNING(sLogger,
("Network fail, pause logstore", item->mLogstore)("project", item->mProjectName)(
"region", mSenderInfo.mRegion)("retry interval", mSenderInfo.mNetworkRetryInterval));
}
if (!mSenderInfo.mQuotaValidFlag) {
APSARA_LOG_WARNING(sLogger,
("Quota fail, pause logstore", item->mLogstore)("project", item->mProjectName)(
"region", mSenderInfo.mRegion)("retry interval", mSenderInfo.mQuotaRetryInterval));
}
// if send error, reset status to idle, and wait to send again
// network fail or quota fail
if (sendRst != LogstoreSenderInfo::SendResult_OK && sendRst != LogstoreSenderInfo::SendResult_Buffered
&& sendRst != LogstoreSenderInfo::SendResult_DiscardFail) {
item->mStatus = LoggroupSendStatus_Idle;
return 0;
}
if (mSenderStatistics.mMaxSendSuccessTime < item->mLastUpdateTime) {
mSenderStatistics.mMaxSendSuccessTime = item->mLastUpdateTime;
if (mSenderStatistics.mMaxSendSuccessTime < item->mEnqueueTime) {
mSenderStatistics.mMaxSendSuccessTime = item->mEnqueueTime;
}
// else remove item except buffered
return RemoveItem(item, sendRst != LogstoreSenderInfo::SendResult_Buffered);
Expand Down
3 changes: 2 additions & 1 deletion core/monitor/LogIntegrity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ void LogIntegrity::Notify(LoggroupTimeValue* data, bool flag) {
PTScopedLock lock(mLogIntegrityMapLock);
LogIntegrityInfo* info = NULL;
if (FindLogIntegrityInfo(region, projectName, logstore, filename, info)) {
info->mLastUpdateTime = data->mLastUpdateTime;
info->mLastUpdateTime = data->mEnqueueTime;

info->SetStatus(data->mLogGroupContext.mSeqNum,
data->mLogLines,
flag ? LogTimeInfo::LogIntegrityStatus_SendOK : LogTimeInfo::LogIntegrityStatus_SendFail);
Expand Down
Loading

0 comments on commit 1c000b9

Please sign in to comment.