diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 9dcf29172173..9126bea9e3cb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -22,11 +22,13 @@ import com.google.common.base.Preconditions; import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -47,12 +49,12 @@ import org.apache.pinot.common.metrics.ServerGauge; import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; import org.apache.pinot.common.utils.LLCSegmentName; -import org.apache.pinot.common.utils.PauselessConsumptionUtils; import org.apache.pinot.common.utils.SegmentUtils; import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.data.manager.BaseTableDataManager; import org.apache.pinot.core.data.manager.DuoSegmentDataManager; import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; +import org.apache.pinot.core.util.PeerServerSegmentFinder; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager; import org.apache.pinot.segment.local.dedup.TableDedupMetadataManager; @@ -74,6 +76,8 @@ import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DateTimeFormatSpec; import org.apache.pinot.spi.data.FieldSpec; @@ -121,9 +125,9 @@ public class RealtimeTableDataManager extends BaseTableDataManager { public static final long READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5); - public static final long TIMEOUT_MINUTES = 5; - public static final long TIMEOUT_MS = TIMEOUT_MINUTES * 60 * 1000; + public static final long DEFAULT_SEGMENT_DOWNLOAD_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(10); // 10 minutes public static final long SLEEP_INTERVAL_MS = 30000; // 30 seconds sleep interval + private static final String SEGMENT_DOWNLOAD_TIMEOUT_MINUTES = "segmentDownloadTimeoutMinutes"; // TODO: Change it to BooleanSupplier private final Supplier _isServerReadyToServeQueries; @@ -561,13 +565,22 @@ private void doAddConsumingSegment(String segmentName) @Override public File downloadSegment(SegmentZKMetadata zkMetadata) throws Exception { - if (!PauselessConsumptionUtils.isPauselessEnabled(_tableConfig)) { + Preconditions.checkState(zkMetadata.getStatus() != Status.IN_PROGRESS, + "Segment: %s is still IN_PROGRESS and cannot be downloaded", zkMetadata.getSegmentName()); + + // Case: The commit protocol has completed, and the segment is ready to be downloaded either + // from deep storage or from a peer (if peer-to-peer download is enabled). + if (zkMetadata.getStatus() == Status.DONE) { return super.downloadSegment(zkMetadata); } + // The segment status is COMMITTING, indicating that the segment commit process is incomplete. + // Attempting a waited download within the configured time limit. + long downloadTimeoutMilliseconds = + getDownloadTimeOutMilliseconds(ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType)); final long startTime = System.currentTimeMillis(); - - while (System.currentTimeMillis() - startTime < TIMEOUT_MS) { + List onlineServerURIs; + while (System.currentTimeMillis() - startTime < downloadTimeoutMilliseconds) { // ZK Metadata may change during segment download process; fetch it on every retry. zkMetadata = fetchZKMetadata(zkMetadata.getSegmentName()); @@ -580,16 +593,20 @@ public File downloadSegment(SegmentZKMetadata zkMetadata) if (_peerDownloadScheme != null) { _logger.info("Peer download is enabled for the segment: {}", zkMetadata.getSegmentName()); try { - return downloadSegmentFromPeers(zkMetadata); + onlineServerURIs = new ArrayList<>(); + PeerServerSegmentFinder.getOnlineServersFromExternalView(_helixManager.getClusterManagmentTool(), + _helixManager.getClusterName(), _tableNameWithType, zkMetadata.getSegmentName(), _peerDownloadScheme, + onlineServerURIs); + if (!onlineServerURIs.isEmpty()) { + return downloadSegmentFromPeers(zkMetadata); + } } catch (Exception e) { - // TODO :in this case we just retry as some of the other servers might be trying to build the - // segment _logger.warn("Could not download segment: {} from peer", zkMetadata.getSegmentName(), e); } } long timeElapsed = System.currentTimeMillis() - startTime; - long timeRemaining = TIMEOUT_MS - timeElapsed; + long timeRemaining = downloadTimeoutMilliseconds - timeElapsed; if (timeRemaining <= 0) { break; @@ -603,8 +620,22 @@ public File downloadSegment(SegmentZKMetadata zkMetadata) } // If we exit the loop without returning, throw an exception - throw new TimeoutException("Failed to download segment after " + TIMEOUT_MINUTES + " minutes of retrying. Segment: " - + zkMetadata.getSegmentName()); + throw new TimeoutException( + "Failed to download segment after " + TimeUnit.MILLISECONDS.toMinutes(downloadTimeoutMilliseconds) + + " minutes of retrying. Segment: " + zkMetadata.getSegmentName()); + } + + private long getDownloadTimeOutMilliseconds(@Nullable TableConfig tableConfig) { + return Optional.ofNullable(tableConfig).map(TableConfig::getIngestionConfig) + .map(IngestionConfig::getStreamIngestionConfig).map(StreamIngestionConfig::getStreamConfigMaps) + .filter(maps -> !maps.isEmpty()).map(maps -> maps.get(0)).map(map -> map.get(SEGMENT_DOWNLOAD_TIMEOUT_MINUTES)) + .map(timeoutStr -> { + try { + return TimeUnit.MINUTES.toMillis(Long.parseLong(timeoutStr)); + } catch (NumberFormatException e) { + return DEFAULT_SEGMENT_DOWNLOAD_TIMEOUT_MS; + } + }).orElse(DEFAULT_SEGMENT_DOWNLOAD_TIMEOUT_MS); } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java b/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java index 7f26d759352d..07181ea373e6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java @@ -76,7 +76,7 @@ public static List getPeerServerURIs(HelixManager helixManager, String tabl return onlineServerURIs; } - private static void getOnlineServersFromExternalView(HelixAdmin helixAdmin, String clusterName, + public static void getOnlineServersFromExternalView(HelixAdmin helixAdmin, String clusterName, String tableNameWithType, String segmentName, String downloadScheme, List onlineServerURIs) throws Exception { ExternalView externalView = helixAdmin.getResourceExternalView(clusterName, tableNameWithType);