diff --git a/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/LoadExecutor.java b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/LoadExecutor.java index 44fcddf682a80..3c97291b9da95 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/LoadExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/LoadExecutor.java @@ -168,11 +168,11 @@ private void beginTxn() throws Exception { private void commitAndPublishTxn() throws Exception { timeTrace.commitTxnTimeMs = System.currentTimeMillis(); - Pair pair = getDbAndTable(); + Database database = getDb(); long publishTimeoutMs = streamLoadInfo.getTimeout() * 1000L - (timeTrace.commitTxnTimeMs - timeTrace.beginTxnTimeMs); boolean publishSuccess = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().commitAndPublishTransaction( - pair.first, txnId, tabletCommitInfo, tabletFailInfo, publishTimeoutMs, null); + database, txnId, tabletCommitInfo, tabletFailInfo, publishTimeoutMs, null); if (!publishSuccess) { LOG.warn("Publish timeout, txn_id: {}, label: {}, total timeout: {} ms, publish timeout: {} ms", txnId, label, streamLoadInfo.getTimeout() * 1000, publishTimeoutMs); @@ -184,9 +184,9 @@ private void abortTxn(Throwable reason) { return; } try { - Pair pair = getDbAndTable(); + Database database = getDb(); GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().abortTransaction( - pair.first.getId(), txnId, reason == null ? "" : reason.getMessage()); + database.getId(), txnId, reason == null ? "" : reason.getMessage()); } catch (Exception e) { LOG.error("Failed to abort transaction {}", txnId, e); } @@ -262,6 +262,16 @@ private void executeLoad() throws Exception { } } + private Database getDb() throws Exception { + GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState(); + Database db = globalStateMgr.getLocalMetastore().getDb(tableId.getDbName()); + if (db == null) { + throw new LoadException(String.format("Database %s does not exist", tableId.getDbName())); + } + + return db; + } + private Pair getDbAndTable() throws Exception { GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState(); Database db = globalStateMgr.getLocalMetastore().getDb(tableId.getDbName()); diff --git a/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/TxnStateDispatcher.java b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/TxnStateDispatcher.java index 7b80807f9d63c..fef24f1d89607 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/TxnStateDispatcher.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/batchwrite/TxnStateDispatcher.java @@ -26,7 +26,7 @@ import com.starrocks.system.ComputeNode; import com.starrocks.thrift.TNetworkAddress; import com.starrocks.transaction.TransactionStateSnapshot; -import jline.internal.Nullable; +import com.starrocks.transaction.TransactionStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +37,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nullable; /** Dispatch transaction state to backends after the load finished. */ public class TxnStateDispatcher { @@ -144,15 +145,16 @@ private DispatchResult dispatchTxnState(String dbName, long txnId, long backendI } TNetworkAddress address = new TNetworkAddress(computeNode.getHost(), computeNode.getBrpcPort()); Database db = globalStateMgr.getLocalMetastore().getDb(dbName); - if (db == null) { - return DispatchResult.fail(DispatchStatus.ABORT, "can't find database"); - } TransactionStateSnapshot state; - try { - state = globalStateMgr.getGlobalTransactionMgr().getTxnState(db, txnId); - } catch (Throwable e) { - return DispatchResult.fail(DispatchStatus.ABORT, - "can't get txn state, exception: " + e.getMessage()); + if (db == null) { + state = new TransactionStateSnapshot(TransactionStatus.UNKNOWN, "can't find database " + dbName); + } else { + try { + state = globalStateMgr.getGlobalTransactionMgr().getTxnState(db, txnId); + } catch (Throwable e) { + state = new TransactionStateSnapshot(TransactionStatus.UNKNOWN, + "can't get txn state, exception: " + e.getMessage()); + } } try { TransactionStatePB statePB = new TransactionStatePB(); diff --git a/fe/fe-core/src/test/java/com/starrocks/load/batchwrite/TxnStateDispatcherTest.java b/fe/fe-core/src/test/java/com/starrocks/load/batchwrite/TxnStateDispatcherTest.java index 8c33d9211c688..b74052a65afb6 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/batchwrite/TxnStateDispatcherTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/batchwrite/TxnStateDispatcherTest.java @@ -206,8 +206,15 @@ public void testDbNotExist() throws Exception { result = null; } }; - TxnStateDispatcher.DispatchResult expected = TxnStateDispatcher.DispatchResult.fail( - TxnStateDispatcher.DispatchStatus.ABORT, "can't find database"); + TransactionStateSnapshot txnState = new TransactionStateSnapshot(TransactionStatus.UNKNOWN, "can't find database db1"); + TxnStateDispatcher.DispatchResult expected = TxnStateDispatcher.DispatchResult.success(txnState); + PUpdateTransactionStateResponse response = new PUpdateTransactionStateResponse(); + StatusPB statusPB = new StatusPB(); + statusPB.setStatusCode(0); + response.setResults(Collections.singletonList(statusPB)); + CompletableFuture future = new CompletableFuture<>(); + future.complete(response); + backendService.addResponseFuture(future); testDispatchBase("db1", 1, 1, expected, 0); } @@ -224,8 +231,16 @@ public void testGetTxnStateFail() throws Exception { result = new Exception("artificial failure"); } }; - TxnStateDispatcher.DispatchResult expected = TxnStateDispatcher.DispatchResult.fail( - TxnStateDispatcher.DispatchStatus.ABORT, "can't get txn state, exception: artificial failure"); + TransactionStateSnapshot txnState = new TransactionStateSnapshot(TransactionStatus.UNKNOWN, + "can't get txn state, exception: artificial failure"); + TxnStateDispatcher.DispatchResult expected = TxnStateDispatcher.DispatchResult.success(txnState); + PUpdateTransactionStateResponse response = new PUpdateTransactionStateResponse(); + StatusPB statusPB = new StatusPB(); + statusPB.setStatusCode(0); + response.setResults(Collections.singletonList(statusPB)); + CompletableFuture future = new CompletableFuture<>(); + future.complete(response); + backendService.addResponseFuture(future); testDispatchBase("db1", 1, 1, expected, 0); }