Skip to content

Commit

Permalink
FE fix some core case
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Jan 14, 2025
1 parent 0938f2b commit 1e63772
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,11 @@ private void beginTxn() throws Exception {

private void commitAndPublishTxn() throws Exception {
timeTrace.commitTxnTimeMs = System.currentTimeMillis();
Pair<Database, OlapTable> pair = getDbAndTable();
Database database = getDb();
long publishTimeoutMs =
streamLoadInfo.getTimeout() * 1000L - (timeTrace.commitTxnTimeMs - timeTrace.beginTxnTimeMs);
boolean publishSuccess = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().commitAndPublishTransaction(
pair.first, txnId, tabletCommitInfo, tabletFailInfo, publishTimeoutMs, null);
database, txnId, tabletCommitInfo, tabletFailInfo, publishTimeoutMs, null);
if (!publishSuccess) {
LOG.warn("Publish timeout, txn_id: {}, label: {}, total timeout: {} ms, publish timeout: {} ms",
txnId, label, streamLoadInfo.getTimeout() * 1000, publishTimeoutMs);
Expand All @@ -184,9 +184,9 @@ private void abortTxn(Throwable reason) {
return;
}
try {
Pair<Database, OlapTable> pair = getDbAndTable();
Database database = getDb();
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().abortTransaction(
pair.first.getId(), txnId, reason == null ? "" : reason.getMessage());
database.getId(), txnId, reason == null ? "" : reason.getMessage());
} catch (Exception e) {
LOG.error("Failed to abort transaction {}", txnId, e);
}
Expand Down Expand Up @@ -262,6 +262,16 @@ private void executeLoad() throws Exception {
}
}

private Database getDb() throws Exception {
GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState();
Database db = globalStateMgr.getLocalMetastore().getDb(tableId.getDbName());
if (db == null) {
throw new LoadException(String.format("Database %s does not exist", tableId.getDbName()));
}

return db;
}

private Pair<Database, OlapTable> getDbAndTable() throws Exception {
GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState();
Database db = globalStateMgr.getLocalMetastore().getDb(tableId.getDbName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import com.starrocks.system.ComputeNode;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.transaction.TransactionStateSnapshot;
import jline.internal.Nullable;
import com.starrocks.transaction.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,6 +37,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;

/** Dispatch transaction state to backends after the load finished. */
public class TxnStateDispatcher {
Expand Down Expand Up @@ -144,15 +145,16 @@ private DispatchResult dispatchTxnState(String dbName, long txnId, long backendI
}
TNetworkAddress address = new TNetworkAddress(computeNode.getHost(), computeNode.getBrpcPort());
Database db = globalStateMgr.getLocalMetastore().getDb(dbName);
if (db == null) {
return DispatchResult.fail(DispatchStatus.ABORT, "can't find database");
}
TransactionStateSnapshot state;
try {
state = globalStateMgr.getGlobalTransactionMgr().getTxnState(db, txnId);
} catch (Throwable e) {
return DispatchResult.fail(DispatchStatus.ABORT,
"can't get txn state, exception: " + e.getMessage());
if (db == null) {
state = new TransactionStateSnapshot(TransactionStatus.UNKNOWN, "can't find database " + dbName);
} else {
try {
state = globalStateMgr.getGlobalTransactionMgr().getTxnState(db, txnId);
} catch (Throwable e) {
state = new TransactionStateSnapshot(TransactionStatus.UNKNOWN,
"can't get txn state, exception: " + e.getMessage());
}
}
try {
TransactionStatePB statePB = new TransactionStatePB();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,15 @@ public void testDbNotExist() throws Exception {
result = null;
}
};
TxnStateDispatcher.DispatchResult expected = TxnStateDispatcher.DispatchResult.fail(
TxnStateDispatcher.DispatchStatus.ABORT, "can't find database");
TransactionStateSnapshot txnState = new TransactionStateSnapshot(TransactionStatus.UNKNOWN, "can't find database db1");
TxnStateDispatcher.DispatchResult expected = TxnStateDispatcher.DispatchResult.success(txnState);
PUpdateTransactionStateResponse response = new PUpdateTransactionStateResponse();
StatusPB statusPB = new StatusPB();
statusPB.setStatusCode(0);
response.setResults(Collections.singletonList(statusPB));
CompletableFuture<PUpdateTransactionStateResponse> future = new CompletableFuture<>();
future.complete(response);
backendService.addResponseFuture(future);
testDispatchBase("db1", 1, 1, expected, 0);
}

Expand All @@ -224,8 +231,16 @@ public void testGetTxnStateFail() throws Exception {
result = new Exception("artificial failure");
}
};
TxnStateDispatcher.DispatchResult expected = TxnStateDispatcher.DispatchResult.fail(
TxnStateDispatcher.DispatchStatus.ABORT, "can't get txn state, exception: artificial failure");
TransactionStateSnapshot txnState = new TransactionStateSnapshot(TransactionStatus.UNKNOWN,
"can't get txn state, exception: artificial failure");
TxnStateDispatcher.DispatchResult expected = TxnStateDispatcher.DispatchResult.success(txnState);
PUpdateTransactionStateResponse response = new PUpdateTransactionStateResponse();
StatusPB statusPB = new StatusPB();
statusPB.setStatusCode(0);
response.setResults(Collections.singletonList(statusPB));
CompletableFuture<PUpdateTransactionStateResponse> future = new CompletableFuture<>();
future.complete(response);
backendService.addResponseFuture(future);
testDispatchBase("db1", 1, 1, expected, 0);
}

Expand Down

0 comments on commit 1e63772

Please sign in to comment.