Skip to content

Commit

Permalink
Improving the download segment for realtime tables by:
Browse files Browse the repository at this point in the history
1. Relying on segment status to make the call on whether to use waited download instead of tableConfig.
   - This is needed for pauseless tables as the pauseless can be disabled by changing the table config
   - Segment status is a better decision criteria.
2. Checking presence of an ONLINE server before attempting peer download to prevent waiting for exponential backoff.
  • Loading branch information
9aman committed Jan 8, 2025
1 parent 2495730 commit f4b1420
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -47,12 +49,12 @@
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.PauselessConsumptionUtils;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.data.manager.BaseTableDataManager;
import org.apache.pinot.core.data.manager.DuoSegmentDataManager;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
import org.apache.pinot.segment.local.dedup.TableDedupMetadataManager;
Expand All @@ -74,6 +76,8 @@
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.FieldSpec;
Expand Down Expand Up @@ -121,9 +125,9 @@ public class RealtimeTableDataManager extends BaseTableDataManager {

public static final long READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5);

public static final long TIMEOUT_MINUTES = 5;
public static final long TIMEOUT_MS = TIMEOUT_MINUTES * 60 * 1000;
public static final long DEFAULT_SEGMENT_DOWNLOAD_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(10); // 10 minutes
public static final long SLEEP_INTERVAL_MS = 30000; // 30 seconds sleep interval
private static final String SEGMENT_DOWNLOAD_TIMEOUT_MINUTES = "segmentDownloadTimeoutMinutes";

// TODO: Change it to BooleanSupplier
private final Supplier<Boolean> _isServerReadyToServeQueries;
Expand Down Expand Up @@ -561,13 +565,22 @@ private void doAddConsumingSegment(String segmentName)
@Override
public File downloadSegment(SegmentZKMetadata zkMetadata)
throws Exception {
if (!PauselessConsumptionUtils.isPauselessEnabled(_tableConfig)) {
Preconditions.checkState(zkMetadata.getStatus() != Status.IN_PROGRESS,
"Segment: %s is still IN_PROGRESS and cannot be downloaded", zkMetadata.getSegmentName());

// Case: The commit protocol has completed, and the segment is ready to be downloaded either
// from deep storage or from a peer (if peer-to-peer download is enabled).
if (zkMetadata.getStatus() == Status.DONE) {
return super.downloadSegment(zkMetadata);
}

// The segment status is COMMITTING, indicating that the segment commit process is incomplete.
// Attempting a waited download within the configured time limit.
long downloadTimeoutMilliseconds =
getDownloadTimeOutMilliseconds(ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType));
final long startTime = System.currentTimeMillis();

while (System.currentTimeMillis() - startTime < TIMEOUT_MS) {
List<URI> onlineServerURIs;
while (System.currentTimeMillis() - startTime < downloadTimeoutMilliseconds) {
// ZK Metadata may change during segment download process; fetch it on every retry.
zkMetadata = fetchZKMetadata(zkMetadata.getSegmentName());

Expand All @@ -580,16 +593,20 @@ public File downloadSegment(SegmentZKMetadata zkMetadata)
if (_peerDownloadScheme != null) {
_logger.info("Peer download is enabled for the segment: {}", zkMetadata.getSegmentName());
try {
return downloadSegmentFromPeers(zkMetadata);
onlineServerURIs = new ArrayList<>();
PeerServerSegmentFinder.getOnlineServersFromExternalView(_helixManager.getClusterManagmentTool(),
_helixManager.getClusterName(), _tableNameWithType, zkMetadata.getSegmentName(), _peerDownloadScheme,
onlineServerURIs);
if (!onlineServerURIs.isEmpty()) {
return downloadSegmentFromPeers(zkMetadata);
}
} catch (Exception e) {
// TODO :in this case we just retry as some of the other servers might be trying to build the
// segment
_logger.warn("Could not download segment: {} from peer", zkMetadata.getSegmentName(), e);
}
}

long timeElapsed = System.currentTimeMillis() - startTime;
long timeRemaining = TIMEOUT_MS - timeElapsed;
long timeRemaining = downloadTimeoutMilliseconds - timeElapsed;

if (timeRemaining <= 0) {
break;
Expand All @@ -603,8 +620,22 @@ public File downloadSegment(SegmentZKMetadata zkMetadata)
}

// If we exit the loop without returning, throw an exception
throw new TimeoutException("Failed to download segment after " + TIMEOUT_MINUTES + " minutes of retrying. Segment: "
+ zkMetadata.getSegmentName());
throw new TimeoutException(
"Failed to download segment after " + TimeUnit.MILLISECONDS.toMinutes(downloadTimeoutMilliseconds)
+ " minutes of retrying. Segment: " + zkMetadata.getSegmentName());
}

private long getDownloadTimeOutMilliseconds(@Nullable TableConfig tableConfig) {
return Optional.ofNullable(tableConfig).map(TableConfig::getIngestionConfig)
.map(IngestionConfig::getStreamIngestionConfig).map(StreamIngestionConfig::getStreamConfigMaps)
.filter(maps -> !maps.isEmpty()).map(maps -> maps.get(0)).map(map -> map.get(SEGMENT_DOWNLOAD_TIMEOUT_MINUTES))
.map(timeoutStr -> {
try {
return TimeUnit.MINUTES.toMillis(Long.parseLong(timeoutStr));
} catch (NumberFormatException e) {
return DEFAULT_SEGMENT_DOWNLOAD_TIMEOUT_MS;
}
}).orElse(DEFAULT_SEGMENT_DOWNLOAD_TIMEOUT_MS);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static List<URI> getPeerServerURIs(HelixManager helixManager, String tabl
return onlineServerURIs;
}

private static void getOnlineServersFromExternalView(HelixAdmin helixAdmin, String clusterName,
public static void getOnlineServersFromExternalView(HelixAdmin helixAdmin, String clusterName,
String tableNameWithType, String segmentName, String downloadScheme, List<URI> onlineServerURIs)
throws Exception {
ExternalView externalView = helixAdmin.getResourceExternalView(clusterName, tableNameWithType);
Expand Down

0 comments on commit f4b1420

Please sign in to comment.