Skip to content

Commit

Permalink
[Enhancement] Prepare/commit transaction stream load asynchronously (#…
Browse files Browse the repository at this point in the history
…328)

Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy authored Dec 29, 2023
1 parent 2320c40 commit 7fe8cd2
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand Down Expand Up @@ -141,6 +142,11 @@ public void close() {
}
}

@Override
public ExecutorService getExecutorService() {
return executorService;
}

@Override
public boolean begin(TableRegion region) {
region.setLabel(region.getLabelGenerator().next());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.starrocks.data.load.stream.properties.StreamLoadProperties;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

public interface StreamLoader {
Expand All @@ -43,4 +44,8 @@ public interface StreamLoader {
boolean prepare(StreamLoadSnapshot snapshot);
boolean commit(StreamLoadSnapshot snapshot);
boolean rollback(StreamLoadSnapshot snapshot);

default ExecutorService getExecutorService() {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,44 +235,74 @@ public boolean flush(FlushReason reason) {
return false;
}

// Commit the load asynchronously
// 1. commit() should not be called concurrently
// 2. because commit is executed asynchronously, the caller should poll the
// method to see if it executes successfully
// 3. a true returned value indicates a successful commit, and a false value
// indicates the commit should not be triggered, such as it is FLUSHING,
// or it's still doing commit asynchronously
public boolean commit() {
boolean commitTriggered = false;
if (!state.compareAndSet(State.ACTIVE, State.COMMITTING)) {
return false;
if (state.get() != State.COMMITTING) {
return false;
}
commitTriggered = true;
}

boolean commitSuccess;
if (label != null) {
StreamLoadSnapshot.Transaction transaction = new StreamLoadSnapshot.Transaction(database, table, label);
try {
if (!streamLoader.prepare(transaction)) {
String errorMsg = "Failed to prepare transaction, please check taskmanager log for details, " + transaction;
throw new StreamLoadFailException(errorMsg);
}

if (!streamLoader.commit(transaction)) {
String errorMsg = "Failed to commit transaction, please check taskmanager log for details, " + transaction;
throw new StreamLoadFailException(errorMsg);
}
} catch (Exception e) {
LOG.error("TransactionTableRegion commit failed, db: {}, table: {}, label: {}", database, table, label, e);
fail(e);
if (commitTriggered) {
// label will be set to null after commit executes successfully
if (label == null) {
state.compareAndSet(State.COMMITTING, State.ACTIVE);
return true;
} else {
// wait for the commit to finish
return false;
}
}

label = null;
long commitTime = System.currentTimeMillis();
long commitDuration = commitTime - lastCommitTimeMills;
lastCommitTimeMills = commitTime;
commitSuccess = true;
LOG.info("Success to commit transaction: {}, duration: {} ms", transaction, commitDuration);
} else {
if (label == null) {
// if the data has never been flushed (label == null), the commit should fail so that StreamLoadManagerV2#init
// will schedule to flush the data first, and then trigger commit again
commitSuccess = cacheBytes.get() == 0;
boolean commitSuccess = cacheBytes.get() == 0;
state.compareAndSet(State.COMMITTING, State.ACTIVE);
return commitSuccess;
}

try {
streamLoader.getExecutorService().submit(this::doCommit);
} catch (Exception e) {
LOG.error("Failed to submit commit task, db: {}, table: {}, label: {}", database, table, label, e);
throw e;
}

// wait for the commit to finish
return false;
}

private void doCommit() {
StreamLoadSnapshot.Transaction transaction = new StreamLoadSnapshot.Transaction(database, table, label);
try {
if (!streamLoader.prepare(transaction)) {
String errorMsg = "Failed to prepare transaction, please check taskmanager log for details, " + transaction;
throw new StreamLoadFailException(errorMsg);
}

if (!streamLoader.commit(transaction)) {
String errorMsg = "Failed to commit transaction, please check taskmanager log for details, " + transaction;
throw new StreamLoadFailException(errorMsg);
}
} catch (Throwable e) {
LOG.error("TransactionTableRegion commit failed, db: {}, table: {}, label: {}", database, table, label, e);
fail(e);
}

state.compareAndSet(State.COMMITTING, State.ACTIVE);
return commitSuccess;
long commitTime = System.currentTimeMillis();
long commitDuration = commitTime - lastCommitTimeMills;
lastCommitTimeMills = commitTime;
label = null;
LOG.info("Success to commit transaction: {}, duration: {} ms", transaction, commitDuration);
}

@Override
Expand Down

0 comments on commit 7fe8cd2

Please sign in to comment.