diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java index 04a1dd1a440..fc48095c854 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java @@ -264,7 +264,7 @@ public SegmentCompletionProtocol.Response segmentCommitStart(SegmentCompletionPr return committerDecidedCommit(instanceId, offset, now); case COMMITTER_NOTIFIED: - return committerNotifiedCommit(instanceId, offset, now); + return committerNotifiedCommit(reqParams, now); case COMMITTER_UPLOADING: return committerUploadingCommit(instanceId, offset, now); @@ -624,8 +624,10 @@ private SegmentCompletionProtocol.Response committerNotifiedConsumed(String inst * We have notified the committer. If we get a consumed message from another server, we can ask them to * catchup (if the offset is lower). If anything else, then we pretty much ask them to hold. */ - protected SegmentCompletionProtocol.Response committerNotifiedCommit(String instanceId, - StreamPartitionMsgOffset offset, long now) { + protected SegmentCompletionProtocol.Response committerNotifiedCommit( + SegmentCompletionProtocol.Request.Params reqParams, long now) { + String instanceId = reqParams.getInstanceId(); + StreamPartitionMsgOffset offset = _streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset()); SegmentCompletionProtocol.Response response = null; response = checkBadCommitRequest(instanceId, offset, now); if (response != null) { @@ -757,20 +759,23 @@ protected SegmentCompletionProtocol.Response processConsumedAfterCommitStart(Str + "now={}", _state, instanceId, offset, now); // Ask them to hold, just in case the committer fails for some reason.. return abortAndReturnHold(now, instanceId, offset); + } + // Common case: A different instance is reporting. + return handleNonWinnerCase(instanceId, offset); + } + + protected SegmentCompletionProtocol.Response handleNonWinnerCase(String instanceId, + StreamPartitionMsgOffset offset) { + if (offset.compareTo(_winningOffset) == 0) { + // Wait until winner has posted the segment before asking this server to KEEP the segment. + return hold(instanceId, offset); + } else if (offset.compareTo(_winningOffset) < 0) { + return catchup(instanceId, offset); } else { - // Common case: A different instance is reporting. - if (offset.compareTo(_winningOffset) == 0) { - // Wait until winner has posted the segment before asking this server to KEEP the segment. - response = hold(instanceId, offset); - } else if (offset.compareTo(_winningOffset) < 0) { - response = catchup(instanceId, offset); - } else { - // We have not yet committed, so ask the new responder to hold. They may be the new leader in case the - // committer fails. - response = hold(instanceId, offset); - } + // We have not yet committed, so ask the new responder to hold. They may be the new leader in case the + // committer fails. + return hold(instanceId, offset); } - return response; } protected SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtocol.Request.Params reqParams, @@ -805,7 +810,7 @@ protected SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProt .constructDownloadUrl(_controllerVipUrl, TableNameBuilder.extractRawTableName(_realtimeTableName), _segmentName.getSegmentName())); } - _segmentManager.commitSegmentMetadata(_realtimeTableName, committingSegmentDescriptor); + commitSegmentMetadata(_realtimeTableName, committingSegmentDescriptor); } catch (Exception e) { _logger .error("Caught exception while committing segment metadata for segment: {}", _segmentName.getSegmentName(), @@ -818,6 +823,11 @@ protected SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProt return SegmentCompletionProtocol.RESP_COMMIT_SUCCESS; } + protected void commitSegmentMetadata(String realtimeTableName, + CommittingSegmentDescriptor committingSegmentDescriptor) { + _segmentManager.commitSegmentMetadata(realtimeTableName, committingSegmentDescriptor); + } + private SegmentCompletionProtocol.Response processCommitWhileUploading(String instanceId, StreamPartitionMsgOffset offset, long now) { _logger.info("Processing segmentCommit({}, {})", instanceId, offset); @@ -831,7 +841,7 @@ private SegmentCompletionProtocol.Response processCommitWhileUploading(String in .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD)); } - private SegmentCompletionProtocol.Response checkBadCommitRequest(String instanceId, StreamPartitionMsgOffset offset, + protected SegmentCompletionProtocol.Response checkBadCommitRequest(String instanceId, StreamPartitionMsgOffset offset, long now) { SegmentCompletionProtocol.Response response = abortIfTooLateAndReturnHold(now, instanceId, offset); if (response != null) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java index 137a3fb5fe7..f1ca0ece26e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java @@ -21,7 +21,6 @@ import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; import org.apache.pinot.common.utils.LLCSegmentName; -import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; @@ -44,73 +43,39 @@ public PauselessSegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManag } } - /* - * A server has sent segmentConsumed() message. The caller will save the segment if we return - * COMMIT_CONTINUE. We need to verify that it is the same server that we notified as the winner - * and the offset is the same as what is coming in with the commit. We can then move to - * COMMITTER_UPLOADING and wait for the segmentCommitEnd() call. - * - * In case of discrepancy we move the state machine to ABORTED state so that this FSM is removed - * from the map, and things start over. In this case, we respond to the server with a 'hold' so - * that they re-transmit their segmentConsumed() message and start over. - */ @Override - public SegmentCompletionProtocol.Response segmentCommitStart(SegmentCompletionProtocol.Request.Params reqParams) { + protected SegmentCompletionProtocol.Response committerNotifiedCommit( + SegmentCompletionProtocol.Request.Params reqParams, long now) { String instanceId = reqParams.getInstanceId(); StreamPartitionMsgOffset offset = _streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset()); - long now = _segmentCompletionManager.getCurrentTimeMs(); - if (_excludedServerStateMap.contains(instanceId)) { - _logger.warn("Not accepting commit from {} since it had stoppd consuming", instanceId); - return SegmentCompletionProtocol.RESP_FAILED; + SegmentCompletionProtocol.Response response = checkBadCommitRequest(instanceId, offset, now); + if (response != null) { + return response; } - synchronized (this) { - _logger.info("Processing segmentCommitStart({}, {})", instanceId, offset); - switch (_state) { - case PARTIAL_CONSUMING: - return partialConsumingCommit(instanceId, offset, now); - - case HOLDING: - return holdingCommit(instanceId, offset, now); - - case COMMITTER_DECIDED: - return committerDecidedCommit(instanceId, offset, now); - - case COMMITTER_NOTIFIED: - SegmentCompletionProtocol.Response response = committerNotifiedCommit(instanceId, offset, now); - try { - if (response == SegmentCompletionProtocol.RESP_COMMIT_CONTINUE) { - CommittingSegmentDescriptor committingSegmentDescriptor = - CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams); - LOGGER.info( - "Starting to commit changes to ZK and ideal state for the segment:{} as the leader has been selected", - _segmentName); - _segmentManager.commitSegmentStartMetadata( - TableNameBuilder.REALTIME.tableNameWithType(_segmentName.getTableName()), - committingSegmentDescriptor); - } - } catch (Exception e) { - // this aims to handle the failures during commitSegmentStartMetadata - // we abort the state machine to allow commit protocol to start from the beginning - // the server would then retry the commit protocol from the start - return abortAndReturnFailed(); - } - return response; - case COMMITTER_UPLOADING: - return committerUploadingCommit(instanceId, offset, now); - - case COMMITTING: - return committingCommit(instanceId, offset, now); - - case COMMITTED: - return committedCommit(instanceId, offset); - - case ABORTED: - return hold(instanceId, offset); - - default: - return fail(instanceId, offset); - } + try { + CommittingSegmentDescriptor committingSegmentDescriptor = + CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams); + LOGGER.info( + "Starting to commit changes to ZK and ideal state for the segment:{} during pauseles ingestion as the " + + "leader has been selected", _segmentName); + _segmentManager.commitSegmentStartMetadata( + TableNameBuilder.REALTIME.tableNameWithType(_segmentName.getTableName()), committingSegmentDescriptor); + } catch (Exception e) { + // this aims to handle the failures during commitSegmentStartMetadata + // we abort the state machine to allow commit protocol to start from the beginning + // the server would then retry the commit protocol from the start + return abortAndReturnFailed(); } + _logger.info("{}:Uploading for instance={} offset={}", _state, instanceId, offset); + _state = BlockingSegmentCompletionFSMState.COMMITTER_UPLOADING; + long commitTimeMs = now - _startTimeMs; + if (commitTimeMs > _initialCommitTimeMs) { + // We assume that the commit time holds for all partitions. It is possible, though, that one partition + // commits at a lower time than another partition, and the two partitions are going simultaneously, + // and we may not get the maximum value all the time. + _segmentCompletionManager.setCommitTime(_segmentName.getTableName(), commitTimeMs); + } + return SegmentCompletionProtocol.RESP_COMMIT_CONTINUE; } @Override @@ -136,91 +101,26 @@ public SegmentCompletionProtocol.Response extendBuildTime(final String instanceI } } - protected SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtocol.Request.Params reqParams, + @Override + protected void commitSegmentMetadata(String realtimeTableName, CommittingSegmentDescriptor committingSegmentDescriptor) { - String instanceId = reqParams.getInstanceId(); - StreamPartitionMsgOffset offset = - _streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset()); - if (!_state.equals(BlockingSegmentCompletionFSMState.COMMITTER_UPLOADING)) { - // State changed while we were out of sync. return a failed commit. - _logger.warn("State change during upload: state={} segment={} winner={} winningOffset={}", _state, - _segmentName.getSegmentName(), _winner, _winningOffset); - return SegmentCompletionProtocol.RESP_FAILED; - } - _logger.info("Committing segment {} at offset {} winner {}", _segmentName.getSegmentName(), offset, instanceId); - _state = BlockingSegmentCompletionFSMState.COMMITTING; - // In case of splitCommit, the segment is uploaded to a unique file name indicated by segmentLocation, - // so we need to move the segment file to its permanent location first before committing the metadata. - // The committingSegmentDescriptor is then updated with the permanent segment location to be saved in metadata - // store. - try { - _segmentManager.commitSegmentFile(_realtimeTableName, committingSegmentDescriptor); - } catch (Exception e) { - _logger.error("Caught exception while committing segment file for segment: {}", _segmentName.getSegmentName(), - e); - return SegmentCompletionProtocol.RESP_FAILED; - } - try { - // Convert to a controller uri if the segment location uses local file scheme. - if (CommonConstants.Segment.LOCAL_SEGMENT_SCHEME - .equalsIgnoreCase(URIUtils.getUri(committingSegmentDescriptor.getSegmentLocation()).getScheme())) { - committingSegmentDescriptor.setSegmentLocation(URIUtils - .constructDownloadUrl(_controllerVipUrl, TableNameBuilder.extractRawTableName(_realtimeTableName), - _segmentName.getSegmentName())); - } - _segmentManager.commitSegmentEndMetadata(_realtimeTableName, committingSegmentDescriptor); - } catch (Exception e) { - _logger - .error("Caught exception while committing segment metadata for segment: {}", _segmentName.getSegmentName(), - e); - return SegmentCompletionProtocol.RESP_FAILED; - } - - _state = BlockingSegmentCompletionFSMState.COMMITTED; - _logger.info("Committed segment {} at offset {} winner {}", _segmentName.getSegmentName(), offset, instanceId); - return SegmentCompletionProtocol.RESP_COMMIT_SUCCESS; + _segmentManager.commitSegmentEndMetadata(realtimeTableName, committingSegmentDescriptor); } - @Override - // A common method when the state is > COMMITTER_NOTIFIED. - protected SegmentCompletionProtocol.Response processConsumedAfterCommitStart(String instanceId, - StreamPartitionMsgOffset offset, long now) { - SegmentCompletionProtocol.Response response; - // We have already picked a winner, and may or many not have heard from them. - // Common case here is that another server is coming back to us with its offset. We either respond back with - // HOLD or CATCHUP. - // It may be that we never heard from the committer, or the committer is taking too long to commit the segment. - // In that case, we abort the FSM and start afresh (i.e, return HOLD). - // If the winner is coming back again, then we have some more conditions to look at. - response = abortIfTooLateAndReturnHold(now, instanceId, offset); - if (response != null) { - return response; - } - if (instanceId.equals(_winner)) { - // The winner is coming back to report its offset. Take a decision based on the offset reported, and whether we - // already notified them - // Winner is supposedly already in the commit call. Something wrong. - LOGGER.warn( - "{}:Aborting FSM because winner is reporting a segment while it is also committing instance={} offset={} " - + "now={}", _state, instanceId, offset, now); - // Ask them to hold, just in case the committer fails for some reason.. - return abortAndReturnHold(now, instanceId, offset); + protected SegmentCompletionProtocol.Response handleNonWinnerCase(String instanceId, StreamPartitionMsgOffset offset) { + // Common case: A different instance is reporting. + if (offset.compareTo(_winningOffset) == 0) { + // The winner has already updated the segment's ZK metadata for the committing segment. + // Additionally, a new consuming segment has been created for pauseless ingestion. + // Return "keep" to allow the server to build the segment and begin ingestion for the new consuming segment. + return keep(instanceId, offset); + } else if (offset.compareTo(_winningOffset) < 0) { + return catchup(instanceId, offset); } else { - // Common case: A different instance is reporting. - if (offset.compareTo(_winningOffset) == 0) { - // The winner has already updated the segment's ZK metadata for the committing segment. - // Additionally, a new consuming segment has been created for pauseless ingestion. - // Return "keep" to allow the server to build the segment and begin ingestion for the new consuming segment. - response = keep(instanceId, offset); - } else if (offset.compareTo(_winningOffset) < 0) { - response = catchup(instanceId, offset); - } else { - // We have not yet committed, so ask the new responder to hold. They may be the new leader in case the - // committer fails. - response = hold(instanceId, offset); - } + // We have not yet committed, so ask the new responder to hold. They may be the new leader in case the + // committer fails. + return hold(instanceId, offset); } - return response; } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionIntegrationTest.java index b3f011385f2..4e9fcac0abd 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionIntegrationTest.java @@ -99,7 +99,8 @@ public void setUp() TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0)); // Replace stream config from indexing config to ingestion config IngestionConfig ingestionConfig = new IngestionConfig(); - ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(tableConfig.getIndexingConfig().getStreamConfigs()))); + ingestionConfig.setStreamIngestionConfig( + new StreamIngestionConfig(List.of(tableConfig.getIndexingConfig().getStreamConfigs()))); ingestionConfig.getStreamIngestionConfig().setPauselessConsumptionEnabled(true); tableConfig.getIndexingConfig().setStreamConfigs(null); tableConfig.setIngestionConfig(ingestionConfig);