Skip to content

Commit

Permalink
Print fewer successful message at INFO level
Browse files Browse the repository at this point in the history
  • Loading branch information
liuml07 committed Jun 5, 2024
1 parent d410f19 commit 0f0c734
Showing 1 changed file with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class Elasticsearch8AsyncWriter<InputT> extends AsyncSinkWriter<InputT, O
* retried by this writer.
*/
private final Counter numRecordsSendPartialFailureCounter;
/** A counter to track the number of bulk requests that are sent to Elasticsearch. */
private final Counter numRequestSubmittedCounter;

private static final FatalExceptionClassifier ELASTICSEARCH_FATAL_EXCEPTION_CLASSIFIER =
FatalExceptionClassifier.createChain(
Expand Down Expand Up @@ -110,12 +112,19 @@ public Elasticsearch8AsyncWriter(
this.numRecordsOutErrorsCounter = metricGroup.getNumRecordsOutErrorsCounter();
this.numRecordsSendPartialFailureCounter =
metricGroup.counter("numRecordsSendPartialFailure");
this.numRequestSubmittedCounter = metricGroup.counter("numRequestSubmitted");
}

@Override
protected void submitRequestEntries(
List<Operation> requestEntries, Consumer<List<Operation>> requestResult) {
LOG.info("submitRequestEntries with {} items", requestEntries.size());
numRequestSubmittedCounter.inc();
// log progress message at INFO level every 100 requests to avoid too many log messages.
if (numRequestSubmittedCounter.getCount() % 100 == 0) {
LOG.info("Submitted {} requests", numRequestSubmittedCounter.getCount());
}

LOG.debug("submitRequestEntries with {} items", requestEntries.size());

BulkRequest.Builder br = new BulkRequest.Builder();
for (Operation operation : requestEntries) {
Expand Down Expand Up @@ -176,7 +185,7 @@ private void handlePartiallyFailedRequest(

private void handleSuccessfulRequest(
Consumer<List<Operation>> requestResult, BulkResponse response) {
LOG.info(
LOG.debug(
"The BulkRequest of {} operation(s) completed successfully. It took {}ms",
response.items().size(),
response.took());
Expand Down

0 comments on commit 0f0c734

Please sign in to comment.