diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java index 536da52c74a..f53867a03a0 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java @@ -266,6 +266,10 @@ public IndexStore buildStore() throws IOException, CommitFailedException { } public IndexStore buildStore(String initialCheckpoint, String finalCheckpoint) throws IOException, CommitFailedException { + return buildStore(initialCheckpoint, finalCheckpoint, Long.MAX_VALUE); + } + + public IndexStore buildStore(String initialCheckpoint, String finalCheckpoint, long maxDurationSeconds) throws IOException, CommitFailedException { IncrementalStoreBuilder builder; IndexStore incrementalStore; Set indexDefinitions = indexerSupport.getIndexDefinitions(); @@ -308,6 +312,7 @@ public IndexStore buildStore(String initialCheckpoint, String finalCheckpoint) t try { builder = new IncrementalStoreBuilder(indexHelper.getWorkDir(), indexHelper, initialCheckpoint, finalCheckpoint) .withPreferredPathElements(preferredPathElements) + .withMaxDurationSeconds(maxDurationSeconds) .withPathPredicate(predicate) .withBlobStore(indexHelper.getGCBlobStore()); incrementalStore = builder.build(); diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreEditor.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreEditor.java index cb87699c7a0..73625a2a2da 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreEditor.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreEditor.java @@ -29,6 +29,7 @@ import java.io.BufferedWriter; import java.io.IOException; +import java.util.concurrent.TimeUnit; import java.util.function.Predicate; public class IncrementalFlatFileStoreEditor implements Editor { @@ -38,18 +39,29 @@ public class IncrementalFlatFileStoreEditor implements Editor { private final IncrementalFlatFileStoreNodeStateEntryWriter entryWriter; private final Predicate predicate; private final IncrementalFlatFileStoreStrategy incrementalFlatFileStoreStrategy; + // if not 0, timeout if System.nanoTime() exceeds this value + private final long timeoutAtNanos; private static final int LINE_SEP_LENGTH = System.getProperty("line.separator").length(); public IncrementalFlatFileStoreEditor(BufferedWriter bufferedWriter, IncrementalFlatFileStoreNodeStateEntryWriter entryWriter, Predicate predicate, - IncrementalFlatFileStoreStrategy incrementalFlatFileStoreStrategy) { + IncrementalFlatFileStoreStrategy incrementalFlatFileStoreStrategy, long maxDurationSeconds) { this.bufferedWriter = bufferedWriter; this.entryWriter = entryWriter; this.predicate = predicate; this.incrementalFlatFileStoreStrategy = incrementalFlatFileStoreStrategy; + long timeout; + if (maxDurationSeconds == Long.MAX_VALUE) { + timeout = 0; + } else { + timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(maxDurationSeconds, TimeUnit.SECONDS); + log.info("Max duration: " + maxDurationSeconds + " timeout: " + timeout + " now: " + System.nanoTime()); + } + this.timeoutAtNanos = timeout; } @Override public void enter(NodeState before, NodeState after) { + checkTimeout(); } @Override @@ -112,4 +124,13 @@ private void writeToFile(NodeState e, IncrementalStoreOperand action) { throw new RuntimeException("Error while creating incremental store", ex); } } + + private void checkTimeout() { + if (timeoutAtNanos != 0) { + long now = System.nanoTime(); + if (now > timeoutAtNanos) { + throw new RuntimeException("Timeout"); + } + } + } } diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreStrategy.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreStrategy.java index 1718e71f86b..3b33840c1ec 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreStrategy.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreStrategy.java @@ -63,10 +63,11 @@ public class IncrementalFlatFileStoreStrategy implements IncrementalIndexStoreSo private long textSize = 0; private long entryCount = 0; private final Set preferredPathElements; + private final long maxDurationSeconds; public IncrementalFlatFileStoreStrategy(NodeStore nodeStore, @NotNull String beforeCheckpoint, @NotNull String afterCheckpoint, File storeDir, Set preferredPathElements, @NotNull Compression algorithm, - Predicate pathPredicate, IncrementalFlatFileStoreNodeStateEntryWriter entryWriter) { + Predicate pathPredicate, IncrementalFlatFileStoreNodeStateEntryWriter entryWriter, long maxDurationSeconds) { this.nodeStore = nodeStore; this.beforeCheckpoint = beforeCheckpoint; this.afterCheckpoint = afterCheckpoint; @@ -76,6 +77,7 @@ public IncrementalFlatFileStoreStrategy(NodeStore nodeStore, @NotNull String bef this.entryWriter = entryWriter; this.preferredPathElements = preferredPathElements; this.comparator = new PathElementComparator(preferredPathElements); + this.maxDurationSeconds = maxDurationSeconds; } @Override @@ -85,7 +87,7 @@ public File createSortedStoreFile() throws IOException { try (BufferedWriter w = FlatFileStoreUtils.createWriter(file, algorithm)) { NodeState before = Objects.requireNonNull(nodeStore.retrieve(beforeCheckpoint)); NodeState after = Objects.requireNonNull(nodeStore.retrieve(afterCheckpoint)); - Exception e = EditorDiff.process(VisibleEditor.wrap(new IncrementalFlatFileStoreEditor(w, entryWriter, pathPredicate, this)), before, after); + Exception e = EditorDiff.process(VisibleEditor.wrap(new IncrementalFlatFileStoreEditor(w, entryWriter, pathPredicate, this, maxDurationSeconds)), before, after); if (e != null) { log.error("Exception while building incremental store for checkpoint before {}, after {}", beforeCheckpoint, afterCheckpoint, e); throw new RuntimeException(e); diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalStoreBuilder.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalStoreBuilder.java index 03565746e60..aa00438459e 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalStoreBuilder.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalStoreBuilder.java @@ -50,6 +50,7 @@ public class IncrementalStoreBuilder { private final IndexHelper indexHelper; private final String initialCheckpoint; private final String finalCheckpoint; + private long maxDurationSeconds = Long.MAX_VALUE; private Predicate pathPredicate = path -> true; private Set preferredPathElements = Collections.emptySet(); private BlobStore blobStore; @@ -107,6 +108,10 @@ public IncrementalStoreBuilder withBlobStore(BlobStore blobStore) { return this; } + public IncrementalStoreBuilder withMaxDurationSeconds(long maxDurationSeconds) { + this.maxDurationSeconds = maxDurationSeconds; + return this; + } public IndexStore build() throws IOException, CompositeException { logFlags(); @@ -115,11 +120,12 @@ public IndexStore build() throws IOException, CompositeException { if (sortStrategyType == IncrementalSortStrategyType.INCREMENTAL_FFS_STORE || sortStrategyType == IncrementalSortStrategyType.INCREMENTAL_TREE_STORE) { IncrementalFlatFileStoreNodeStateEntryWriter entryWriter = new IncrementalFlatFileStoreNodeStateEntryWriter(blobStore); + IncrementalIndexStoreSortStrategy strategy = new IncrementalFlatFileStoreStrategy( indexHelper.getNodeStore(), initialCheckpoint, finalCheckpoint, - dir, preferredPathElements, algorithm, pathPredicate, entryWriter); + dir, preferredPathElements, algorithm, pathPredicate, entryWriter, maxDurationSeconds); File metadataFile = strategy.createMetadataFile(); File incrementalStoreFile = strategy.createSortedStoreFile(); long entryCount = strategy.getEntryCount(); @@ -147,4 +153,5 @@ private void logFlags() { log.info("Compression enabled while sorting : {} ({})", IndexStoreUtils.compressionEnabled(), OAK_INDEXER_USE_ZIP); log.info("LZ4 enabled for compression algorithm : {} ({})", IndexStoreUtils.useLZ4(), OAK_INDEXER_USE_LZ4); } + } diff --git a/oak-run/src/test/java/org/apache/jackrabbit/oak/index/IncrementalStoreTest.java b/oak-run/src/test/java/org/apache/jackrabbit/oak/index/IncrementalStoreTest.java index a10e6ad33d9..c1d3da92eae 100644 --- a/oak-run/src/test/java/org/apache/jackrabbit/oak/index/IncrementalStoreTest.java +++ b/oak-run/src/test/java/org/apache/jackrabbit/oak/index/IncrementalStoreTest.java @@ -401,7 +401,7 @@ private IncrementalFlatFileStoreStrategy createIncrementalStrategy(Backend backe readOnlyNodeStore.retrieve(finalCheckpoint); return new IncrementalFlatFileStoreStrategy( readOnlyNodeStore, initialCheckpoint, finalCheckpoint, sortFolder.getRoot(), preferredPathElements, - algorithm, pathPredicate, new IncrementalFlatFileStoreNodeStateEntryWriter(fileBlobStore)); + algorithm, pathPredicate, new IncrementalFlatFileStoreNodeStateEntryWriter(fileBlobStore), Long.MAX_VALUE); } private void createBaseContent(NodeStore rwNodeStore) throws CommitFailedException {