Skip to content

Commit

Permalink
Refactoring FSM to reduce code duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
9aman committed Jan 7, 2025
1 parent b04cad4 commit 2495730
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public SegmentCompletionProtocol.Response segmentCommitStart(SegmentCompletionPr
return committerDecidedCommit(instanceId, offset, now);

case COMMITTER_NOTIFIED:
return committerNotifiedCommit(instanceId, offset, now);
return committerNotifiedCommit(reqParams, now);

case COMMITTER_UPLOADING:
return committerUploadingCommit(instanceId, offset, now);
Expand Down Expand Up @@ -624,8 +624,10 @@ private SegmentCompletionProtocol.Response committerNotifiedConsumed(String inst
* We have notified the committer. If we get a consumed message from another server, we can ask them to
* catchup (if the offset is lower). If anything else, then we pretty much ask them to hold.
*/
protected SegmentCompletionProtocol.Response committerNotifiedCommit(String instanceId,
StreamPartitionMsgOffset offset, long now) {
protected SegmentCompletionProtocol.Response committerNotifiedCommit(
SegmentCompletionProtocol.Request.Params reqParams, long now) {
String instanceId = reqParams.getInstanceId();
StreamPartitionMsgOffset offset = _streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset());
SegmentCompletionProtocol.Response response = null;
response = checkBadCommitRequest(instanceId, offset, now);
if (response != null) {
Expand Down Expand Up @@ -757,20 +759,23 @@ protected SegmentCompletionProtocol.Response processConsumedAfterCommitStart(Str
+ "now={}", _state, instanceId, offset, now);
// Ask them to hold, just in case the committer fails for some reason..
return abortAndReturnHold(now, instanceId, offset);
}
// Common case: A different instance is reporting.
return handleNonWinnerCase(instanceId, offset);
}

protected SegmentCompletionProtocol.Response handleNonWinnerCase(String instanceId,
StreamPartitionMsgOffset offset) {
if (offset.compareTo(_winningOffset) == 0) {
// Wait until winner has posted the segment before asking this server to KEEP the segment.
return hold(instanceId, offset);
} else if (offset.compareTo(_winningOffset) < 0) {
return catchup(instanceId, offset);
} else {
// Common case: A different instance is reporting.
if (offset.compareTo(_winningOffset) == 0) {
// Wait until winner has posted the segment before asking this server to KEEP the segment.
response = hold(instanceId, offset);
} else if (offset.compareTo(_winningOffset) < 0) {
response = catchup(instanceId, offset);
} else {
// We have not yet committed, so ask the new responder to hold. They may be the new leader in case the
// committer fails.
response = hold(instanceId, offset);
}
// We have not yet committed, so ask the new responder to hold. They may be the new leader in case the
// committer fails.
return hold(instanceId, offset);
}
return response;
}

protected SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtocol.Request.Params reqParams,
Expand Down Expand Up @@ -805,7 +810,7 @@ protected SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProt
.constructDownloadUrl(_controllerVipUrl, TableNameBuilder.extractRawTableName(_realtimeTableName),
_segmentName.getSegmentName()));
}
_segmentManager.commitSegmentMetadata(_realtimeTableName, committingSegmentDescriptor);
commitSegmentMetadata(_realtimeTableName, committingSegmentDescriptor);
} catch (Exception e) {
_logger
.error("Caught exception while committing segment metadata for segment: {}", _segmentName.getSegmentName(),
Expand All @@ -818,6 +823,11 @@ protected SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProt
return SegmentCompletionProtocol.RESP_COMMIT_SUCCESS;
}

protected void commitSegmentMetadata(String realtimeTableName,
CommittingSegmentDescriptor committingSegmentDescriptor) {
_segmentManager.commitSegmentMetadata(realtimeTableName, committingSegmentDescriptor);
}

private SegmentCompletionProtocol.Response processCommitWhileUploading(String instanceId,
StreamPartitionMsgOffset offset, long now) {
_logger.info("Processing segmentCommit({}, {})", instanceId, offset);
Expand All @@ -831,7 +841,7 @@ private SegmentCompletionProtocol.Response processCommitWhileUploading(String in
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD));
}

private SegmentCompletionProtocol.Response checkBadCommitRequest(String instanceId, StreamPartitionMsgOffset offset,
protected SegmentCompletionProtocol.Response checkBadCommitRequest(String instanceId, StreamPartitionMsgOffset offset,
long now) {
SegmentCompletionProtocol.Response response = abortIfTooLateAndReturnHold(now, instanceId, offset);
if (response != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
Expand All @@ -44,73 +43,39 @@ public PauselessSegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManag
}
}

/*
* A server has sent segmentConsumed() message. The caller will save the segment if we return
* COMMIT_CONTINUE. We need to verify that it is the same server that we notified as the winner
* and the offset is the same as what is coming in with the commit. We can then move to
* COMMITTER_UPLOADING and wait for the segmentCommitEnd() call.
*
* In case of discrepancy we move the state machine to ABORTED state so that this FSM is removed
* from the map, and things start over. In this case, we respond to the server with a 'hold' so
* that they re-transmit their segmentConsumed() message and start over.
*/
@Override
public SegmentCompletionProtocol.Response segmentCommitStart(SegmentCompletionProtocol.Request.Params reqParams) {
protected SegmentCompletionProtocol.Response committerNotifiedCommit(
SegmentCompletionProtocol.Request.Params reqParams, long now) {
String instanceId = reqParams.getInstanceId();
StreamPartitionMsgOffset offset = _streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset());
long now = _segmentCompletionManager.getCurrentTimeMs();
if (_excludedServerStateMap.contains(instanceId)) {
_logger.warn("Not accepting commit from {} since it had stoppd consuming", instanceId);
return SegmentCompletionProtocol.RESP_FAILED;
SegmentCompletionProtocol.Response response = checkBadCommitRequest(instanceId, offset, now);
if (response != null) {
return response;
}
synchronized (this) {
_logger.info("Processing segmentCommitStart({}, {})", instanceId, offset);
switch (_state) {
case PARTIAL_CONSUMING:
return partialConsumingCommit(instanceId, offset, now);

case HOLDING:
return holdingCommit(instanceId, offset, now);

case COMMITTER_DECIDED:
return committerDecidedCommit(instanceId, offset, now);

case COMMITTER_NOTIFIED:
SegmentCompletionProtocol.Response response = committerNotifiedCommit(instanceId, offset, now);
try {
if (response == SegmentCompletionProtocol.RESP_COMMIT_CONTINUE) {
CommittingSegmentDescriptor committingSegmentDescriptor =
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
LOGGER.info(
"Starting to commit changes to ZK and ideal state for the segment:{} as the leader has been selected",
_segmentName);
_segmentManager.commitSegmentStartMetadata(
TableNameBuilder.REALTIME.tableNameWithType(_segmentName.getTableName()),
committingSegmentDescriptor);
}
} catch (Exception e) {
// this aims to handle the failures during commitSegmentStartMetadata
// we abort the state machine to allow commit protocol to start from the beginning
// the server would then retry the commit protocol from the start
return abortAndReturnFailed();
}
return response;
case COMMITTER_UPLOADING:
return committerUploadingCommit(instanceId, offset, now);

case COMMITTING:
return committingCommit(instanceId, offset, now);

case COMMITTED:
return committedCommit(instanceId, offset);

case ABORTED:
return hold(instanceId, offset);

default:
return fail(instanceId, offset);
}
try {
CommittingSegmentDescriptor committingSegmentDescriptor =
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
LOGGER.info(
"Starting to commit changes to ZK and ideal state for the segment:{} during pauseles ingestion as the "
+ "leader has been selected", _segmentName);
_segmentManager.commitSegmentStartMetadata(
TableNameBuilder.REALTIME.tableNameWithType(_segmentName.getTableName()), committingSegmentDescriptor);
} catch (Exception e) {
// this aims to handle the failures during commitSegmentStartMetadata
// we abort the state machine to allow commit protocol to start from the beginning
// the server would then retry the commit protocol from the start
return abortAndReturnFailed();
}
_logger.info("{}:Uploading for instance={} offset={}", _state, instanceId, offset);
_state = BlockingSegmentCompletionFSMState.COMMITTER_UPLOADING;
long commitTimeMs = now - _startTimeMs;
if (commitTimeMs > _initialCommitTimeMs) {
// We assume that the commit time holds for all partitions. It is possible, though, that one partition
// commits at a lower time than another partition, and the two partitions are going simultaneously,
// and we may not get the maximum value all the time.
_segmentCompletionManager.setCommitTime(_segmentName.getTableName(), commitTimeMs);
}
return SegmentCompletionProtocol.RESP_COMMIT_CONTINUE;
}

@Override
Expand All @@ -136,91 +101,26 @@ public SegmentCompletionProtocol.Response extendBuildTime(final String instanceI
}
}

protected SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtocol.Request.Params reqParams,
@Override
protected void commitSegmentMetadata(String realtimeTableName,
CommittingSegmentDescriptor committingSegmentDescriptor) {
String instanceId = reqParams.getInstanceId();
StreamPartitionMsgOffset offset =
_streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset());
if (!_state.equals(BlockingSegmentCompletionFSMState.COMMITTER_UPLOADING)) {
// State changed while we were out of sync. return a failed commit.
_logger.warn("State change during upload: state={} segment={} winner={} winningOffset={}", _state,
_segmentName.getSegmentName(), _winner, _winningOffset);
return SegmentCompletionProtocol.RESP_FAILED;
}
_logger.info("Committing segment {} at offset {} winner {}", _segmentName.getSegmentName(), offset, instanceId);
_state = BlockingSegmentCompletionFSMState.COMMITTING;
// In case of splitCommit, the segment is uploaded to a unique file name indicated by segmentLocation,
// so we need to move the segment file to its permanent location first before committing the metadata.
// The committingSegmentDescriptor is then updated with the permanent segment location to be saved in metadata
// store.
try {
_segmentManager.commitSegmentFile(_realtimeTableName, committingSegmentDescriptor);
} catch (Exception e) {
_logger.error("Caught exception while committing segment file for segment: {}", _segmentName.getSegmentName(),
e);
return SegmentCompletionProtocol.RESP_FAILED;
}
try {
// Convert to a controller uri if the segment location uses local file scheme.
if (CommonConstants.Segment.LOCAL_SEGMENT_SCHEME
.equalsIgnoreCase(URIUtils.getUri(committingSegmentDescriptor.getSegmentLocation()).getScheme())) {
committingSegmentDescriptor.setSegmentLocation(URIUtils
.constructDownloadUrl(_controllerVipUrl, TableNameBuilder.extractRawTableName(_realtimeTableName),
_segmentName.getSegmentName()));
}
_segmentManager.commitSegmentEndMetadata(_realtimeTableName, committingSegmentDescriptor);
} catch (Exception e) {
_logger
.error("Caught exception while committing segment metadata for segment: {}", _segmentName.getSegmentName(),
e);
return SegmentCompletionProtocol.RESP_FAILED;
}

_state = BlockingSegmentCompletionFSMState.COMMITTED;
_logger.info("Committed segment {} at offset {} winner {}", _segmentName.getSegmentName(), offset, instanceId);
return SegmentCompletionProtocol.RESP_COMMIT_SUCCESS;
_segmentManager.commitSegmentEndMetadata(realtimeTableName, committingSegmentDescriptor);
}


@Override
// A common method when the state is > COMMITTER_NOTIFIED.
protected SegmentCompletionProtocol.Response processConsumedAfterCommitStart(String instanceId,
StreamPartitionMsgOffset offset, long now) {
SegmentCompletionProtocol.Response response;
// We have already picked a winner, and may or many not have heard from them.
// Common case here is that another server is coming back to us with its offset. We either respond back with
// HOLD or CATCHUP.
// It may be that we never heard from the committer, or the committer is taking too long to commit the segment.
// In that case, we abort the FSM and start afresh (i.e, return HOLD).
// If the winner is coming back again, then we have some more conditions to look at.
response = abortIfTooLateAndReturnHold(now, instanceId, offset);
if (response != null) {
return response;
}
if (instanceId.equals(_winner)) {
// The winner is coming back to report its offset. Take a decision based on the offset reported, and whether we
// already notified them
// Winner is supposedly already in the commit call. Something wrong.
LOGGER.warn(
"{}:Aborting FSM because winner is reporting a segment while it is also committing instance={} offset={} "
+ "now={}", _state, instanceId, offset, now);
// Ask them to hold, just in case the committer fails for some reason..
return abortAndReturnHold(now, instanceId, offset);
protected SegmentCompletionProtocol.Response handleNonWinnerCase(String instanceId, StreamPartitionMsgOffset offset) {
// Common case: A different instance is reporting.
if (offset.compareTo(_winningOffset) == 0) {
// The winner has already updated the segment's ZK metadata for the committing segment.
// Additionally, a new consuming segment has been created for pauseless ingestion.
// Return "keep" to allow the server to build the segment and begin ingestion for the new consuming segment.
return keep(instanceId, offset);
} else if (offset.compareTo(_winningOffset) < 0) {
return catchup(instanceId, offset);
} else {
// Common case: A different instance is reporting.
if (offset.compareTo(_winningOffset) == 0) {
// The winner has already updated the segment's ZK metadata for the committing segment.
// Additionally, a new consuming segment has been created for pauseless ingestion.
// Return "keep" to allow the server to build the segment and begin ingestion for the new consuming segment.
response = keep(instanceId, offset);
} else if (offset.compareTo(_winningOffset) < 0) {
response = catchup(instanceId, offset);
} else {
// We have not yet committed, so ask the new responder to hold. They may be the new leader in case the
// committer fails.
response = hold(instanceId, offset);
}
// We have not yet committed, so ask the new responder to hold. They may be the new leader in case the
// committer fails.
return hold(instanceId, offset);
}
return response;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public void setUp()
TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0));
// Replace stream config from indexing config to ingestion config
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(tableConfig.getIndexingConfig().getStreamConfigs())));
ingestionConfig.setStreamIngestionConfig(
new StreamIngestionConfig(List.of(tableConfig.getIndexingConfig().getStreamConfigs())));
ingestionConfig.getStreamIngestionConfig().setPauselessConsumptionEnabled(true);
tableConfig.getIndexingConfig().setStreamConfigs(null);
tableConfig.setIngestionConfig(ingestionConfig);
Expand Down

0 comments on commit 2495730

Please sign in to comment.