diff --git a/src/main/java/com/gravity9/mongocse/MongoChangeStreamWorker.java b/src/main/java/com/gravity9/mongocse/MongoChangeStreamWorker.java index 373b499..4a26cf1 100644 --- a/src/main/java/com/gravity9/mongocse/MongoChangeStreamWorker.java +++ b/src/main/java/com/gravity9/mongocse/MongoChangeStreamWorker.java @@ -2,12 +2,17 @@ import com.gravity9.mongocse.listener.ChangeStreamListener; import com.gravity9.mongocse.logging.LoggingUtil; +import com.mongodb.MongoCommandException; import com.mongodb.client.ChangeStreamIterable; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Aggregates; import com.mongodb.client.model.changestream.ChangeStreamDocument; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; import org.bson.BsonDocument; import org.bson.BsonString; import org.bson.BsonValue; @@ -17,11 +22,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; - import static com.gravity9.mongocse.MongoExpressions.abs; import static com.gravity9.mongocse.MongoExpressions.cond; import static com.gravity9.mongocse.MongoExpressions.documentKey; @@ -100,12 +100,7 @@ public void run() { .fullDocumentBeforeChange(mongoConfig.getFullDocumentBeforeChange()) .maxAwaitTime(mongoConfig.getMaxAwaitTimeInMs(), MILLISECONDS); - if (resumeToken != null) { - log.info("Resuming change stream for partition {} on collection {} with token: {}", partition, mongoConfig.getCollectionName(), resumeToken); - watch.resumeAfter(buildResumeToken(resumeToken)); - } else { - log.info("No resume token found for partition {} on collection {}, starting fresh", partition, mongoConfig.getCollectionName()); - } + resumeIfTokenValid(watch); boolean firstCursorOpen = true; isReadingFromChangeStream = true; @@ -189,6 +184,20 @@ private static boolean canLogSensitiveData() { return log.isDebugEnabled(); } + private void resumeIfTokenValid(ChangeStreamIterable watch) { + if (resumeToken == null) { + log.info("No resume token found for partition {} on collection {}, starting fresh", partition, mongoConfig.getCollectionName()); + return; + } + + try { + log.info("Resuming change stream for partition {} on collection {} with token: {}", partition, mongoConfig.getCollectionName(), resumeToken); + watch.resumeAfter(buildResumeToken(resumeToken)); + } catch (MongoCommandException e) { + log.warn("Error while resuming with a saved token! Likely, token has expired from the opLog. Resuming fresh...", e); + } + } + private Optional getChangedDocumentId(ChangeStreamDocument document) { Document fullDocument = document.getFullDocument(); if (fullDocument != null && fullDocument.containsKey("_id")) {