Skip to content

Commit

Permalink
Handle expired resumeTokens #4
Browse files Browse the repository at this point in the history
    * Added support for handling expired tokens gently (start fresh)
  • Loading branch information
BartekGravity committed May 9, 2024
1 parent 69ebf28 commit dfa57a6
Showing 1 changed file with 20 additions and 11 deletions.
31 changes: 20 additions & 11 deletions src/main/java/com/gravity9/mongocse/MongoChangeStreamWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@

import com.gravity9.mongocse.listener.ChangeStreamListener;
import com.gravity9.mongocse.logging.LoggingUtil;
import com.mongodb.MongoCommandException;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import org.bson.BsonDocument;
import org.bson.BsonString;
import org.bson.BsonValue;
Expand All @@ -17,11 +22,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;

import static com.gravity9.mongocse.MongoExpressions.abs;
import static com.gravity9.mongocse.MongoExpressions.cond;
import static com.gravity9.mongocse.MongoExpressions.documentKey;
Expand Down Expand Up @@ -100,12 +100,7 @@ public void run() {
.fullDocumentBeforeChange(mongoConfig.getFullDocumentBeforeChange())
.maxAwaitTime(mongoConfig.getMaxAwaitTimeInMs(), MILLISECONDS);

if (resumeToken != null) {
log.info("Resuming change stream for partition {} on collection {} with token: {}", partition, mongoConfig.getCollectionName(), resumeToken);
watch.resumeAfter(buildResumeToken(resumeToken));
} else {
log.info("No resume token found for partition {} on collection {}, starting fresh", partition, mongoConfig.getCollectionName());
}
resumeIfTokenValid(watch);

boolean firstCursorOpen = true;
isReadingFromChangeStream = true;
Expand Down Expand Up @@ -189,6 +184,20 @@ private static boolean canLogSensitiveData() {
return log.isDebugEnabled();
}

private void resumeIfTokenValid(ChangeStreamIterable<Document> watch) {
if (resumeToken == null) {
log.info("No resume token found for partition {} on collection {}, starting fresh", partition, mongoConfig.getCollectionName());
return;
}

try {
log.info("Resuming change stream for partition {} on collection {} with token: {}", partition, mongoConfig.getCollectionName(), resumeToken);
watch.resumeAfter(buildResumeToken(resumeToken));
} catch (MongoCommandException e) {
log.warn("Error while resuming with a saved token! Likely, token has expired from the opLog. Resuming fresh...", e);
}
}

private Optional<ObjectId> getChangedDocumentId(ChangeStreamDocument<Document> document) {
Document fullDocument = document.getFullDocument();
if (fullDocument != null && fullDocument.containsKey("_id")) {
Expand Down

0 comments on commit dfa57a6

Please sign in to comment.