diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java index c94ae28b..8b202a7a 100644 --- a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java +++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java @@ -67,6 +67,8 @@ public class Elasticsearch8AsyncWriter extends AsyncSinkWriter requestEntries, Consumer> requestResult) { - LOG.info("submitRequestEntries with {} items", requestEntries.size()); + numRequestSubmittedCounter.inc(); + // log progress message at INFO level every 100 requests to avoid too many log messages. + if (numRequestSubmittedCounter.getCount() % 100 == 0) { + LOG.info("Submitted {} requests", numRequestSubmittedCounter.getCount()); + } + + LOG.debug("submitRequestEntries with {} items", requestEntries.size()); BulkRequest.Builder br = new BulkRequest.Builder(); for (Operation operation : requestEntries) { @@ -176,7 +185,7 @@ private void handlePartiallyFailedRequest( private void handleSuccessfulRequest( Consumer> requestResult, BulkResponse response) { - LOG.info( + LOG.debug( "The BulkRequest of {} operation(s) completed successfully. It took {}ms", response.items().size(), response.took());