Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dataflow Streaming] Support to receive multiple work items in a single StreamingGetWorkResponseChunk #33512

Merged
merged 7 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions {
void setUseSeparateWindmillHeartbeatStreams(Boolean value);

@Description("If true, GetWorkStreams will request multiple work items in a response chunk.")
boolean getWindmillMultipleItemsInGetWorkResponse();
boolean getWindmillRequestBatchedGetWorkResponse();

void setWindmillMultipleItemsInGetWorkResponse(boolean value);
void setWindmillRequestBatchedGetWorkResponse(boolean value);

@Description("The number of streams to use for GetData requests.")
@Default.Integer(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ private static GrpcWindmillStreamFactory.Builder createGrpcwindmillStreamFactory
!options.isEnableStreamingEngine()
|| DataflowRunner.hasExperiment(
options, "streaming_engine_disable_new_heartbeat_requests"))
.setMultipleItemsInGetWorkResponse(options.getWindmillMultipleItemsInGetWorkResponse());
.setBatchedGetWorkResponse(options.getWindmillRequestBatchedGetWorkResponse());
arunpandianp marked this conversation as resolved.
Show resolved Hide resolved
}

private static JobHeader createJobHeader(DataflowWorkerHarnessOptions options, long clientId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ final class GrpcDirectGetWorkStream
implements GetWorkStream {
private static final Logger LOG = LoggerFactory.getLogger(GrpcDirectGetWorkStream.class);

private static final StreamingGetWorkRequest HEALTH_CHECK_REQUEST =
StreamingGetWorkRequest.newBuilder()
.setRequestExtension(
Windmill.StreamingGetWorkRequestExtension.newBuilder()
.setMaxItems(0)
.setMaxBytes(0)
.build())
.build();

private final GetWorkBudgetTracker budgetTracker;
private final GetWorkRequest requestHeader;
private final WorkItemScheduler workItemScheduler;
Expand All @@ -80,7 +89,7 @@ final class GrpcDirectGetWorkStream
*/
private final ConcurrentMap<Long, GetWorkResponseChunkAssembler> workItemAssemblers;

private final boolean multipleItemsInGetWorkResponse;
private final boolean batchedGetWorkResponse;
arunpandianp marked this conversation as resolved.
Show resolved Hide resolved

private GrpcDirectGetWorkStream(
String backendWorkerToken,
Expand All @@ -93,7 +102,7 @@ private GrpcDirectGetWorkStream(
StreamObserverFactory streamObserverFactory,
Set<AbstractWindmillStream<?, ?>> streamRegistry,
int logEveryNStreamFailures,
boolean multipleItemsInGetWorkResponse,
boolean batchedGetWorkResponse,
ThrottleTimer getWorkThrottleTimer,
HeartbeatSender heartbeatSender,
GetDataClient getDataClient,
Expand Down Expand Up @@ -122,7 +131,7 @@ private GrpcDirectGetWorkStream(
.setItems(requestHeader.getMaxItems())
.setBytes(requestHeader.getMaxBytes())
.build());
this.multipleItemsInGetWorkResponse = multipleItemsInGetWorkResponse;
this.batchedGetWorkResponse = batchedGetWorkResponse;
}

static GrpcDirectGetWorkStream create(
Expand Down Expand Up @@ -182,7 +191,6 @@ private void maybeSendRequestExtension(GetWorkBudget extension) {
Windmill.StreamingGetWorkRequestExtension.newBuilder()
.setMaxItems(extension.items())
.setMaxBytes(extension.bytes()))
.setSupportsMultipleWorkItemsInChunk(multipleItemsInGetWorkResponse)
.build();
lastRequest.set(request);
budgetTracker.recordBudgetRequested(extension);
Expand All @@ -208,7 +216,7 @@ protected synchronized void onNewStream() throws WindmillStreamShutdownException
.setMaxItems(initialGetWorkBudget.items())
.setMaxBytes(initialGetWorkBudget.bytes())
.build())
.setSupportsMultipleWorkItemsInChunk(multipleItemsInGetWorkResponse)
.setSupportsMultipleWorkItemsInChunk(batchedGetWorkResponse)
.build();
lastRequest.set(request);
budgetTracker.recordBudgetRequested(initialGetWorkBudget);
Expand All @@ -231,15 +239,7 @@ public void appendSpecificHtml(PrintWriter writer) {

@Override
public void sendHealthCheck() throws WindmillStreamShutdownException {
trySend(
StreamingGetWorkRequest.newBuilder()
.setRequestExtension(
Windmill.StreamingGetWorkRequestExtension.newBuilder()
.setMaxItems(0)
.setMaxBytes(0)
.build())
.setSupportsMultipleWorkItemsInChunk(multipleItemsInGetWorkResponse)
.build());
trySend(HEALTH_CHECK_REQUEST);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,19 @@ final class GrpcGetWorkStream
implements GetWorkStream {

private static final Logger LOG = LoggerFactory.getLogger(GrpcGetWorkStream.class);
private static final StreamingGetWorkRequest HEALTH_CHECK =
StreamingGetWorkRequest.newBuilder()
.setRequestExtension(
StreamingGetWorkRequestExtension.newBuilder().setMaxItems(0).setMaxBytes(0).build())
.build();

private final GetWorkRequest request;
private final WorkItemReceiver receiver;
private final ThrottleTimer getWorkThrottleTimer;
private final Map<Long, GetWorkResponseChunkAssembler> workItemAssemblers;
private final AtomicLong inflightMessages;
private final AtomicLong inflightBytes;
private final boolean multipleItemsInGetWorkResponse;
private final boolean batchedGetWorkResponse;
arunpandianp marked this conversation as resolved.
Show resolved Hide resolved

private GrpcGetWorkStream(
String backendWorkerToken,
Expand All @@ -65,7 +70,7 @@ private GrpcGetWorkStream(
StreamObserverFactory streamObserverFactory,
Set<AbstractWindmillStream<?, ?>> streamRegistry,
int logEveryNStreamFailures,
boolean multipleItemsInGetWorkResponse,
boolean batchedGetWorkResponse,
ThrottleTimer getWorkThrottleTimer,
WorkItemReceiver receiver) {
super(
Expand All @@ -83,7 +88,7 @@ private GrpcGetWorkStream(
this.workItemAssemblers = new ConcurrentHashMap<>();
this.inflightMessages = new AtomicLong();
this.inflightBytes = new AtomicLong();
this.multipleItemsInGetWorkResponse = multipleItemsInGetWorkResponse;
this.batchedGetWorkResponse = batchedGetWorkResponse;
}

public static GrpcGetWorkStream create(
Expand Down Expand Up @@ -120,7 +125,6 @@ private void sendRequestExtension(long moreItems, long moreBytes) {
StreamingGetWorkRequestExtension.newBuilder()
.setMaxItems(moreItems)
.setMaxBytes(moreBytes))
.setSupportsMultipleWorkItemsInChunk(multipleItemsInGetWorkResponse)
.build();

executeSafely(
Expand All @@ -140,7 +144,7 @@ protected synchronized void onNewStream() throws WindmillStreamShutdownException
inflightBytes.set(request.getMaxBytes());
trySend(
StreamingGetWorkRequest.newBuilder()
.setSupportsMultipleWorkItemsInChunk(multipleItemsInGetWorkResponse)
.setSupportsMultipleWorkItemsInChunk(batchedGetWorkResponse)
.setRequest(request)
.build());
}
Expand All @@ -163,12 +167,7 @@ public void appendSpecificHtml(PrintWriter writer) {

@Override
public void sendHealthCheck() throws WindmillStreamShutdownException {
trySend(
StreamingGetWorkRequest.newBuilder()
.setRequestExtension(
StreamingGetWorkRequestExtension.newBuilder().setMaxItems(0).setMaxBytes(0).build())
.setSupportsMultipleWorkItemsInChunk(multipleItemsInGetWorkResponse)
.build());
trySend(HEALTH_CHECK);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,7 @@ static GrpcWindmillServer newTestInstance(
.setSendKeyedGetDataRequests(sendKeyedGetDataRequests)
.setHealthCheckIntervalMillis(
testOptions.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
.setMultipleItemsInGetWorkResponse(
testOptions.getWindmillMultipleItemsInGetWorkResponse())
.setBatchedGetWorkResponse(testOptions.getWindmillRequestBatchedGetWorkResponse())
.build();

return new GrpcWindmillServer(testOptions, windmillStreamFactory, dispatcherClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public class GrpcWindmillStreamFactory implements StatusDataProvider {
// If true, then active work refreshes will be sent as KeyedGetDataRequests. Otherwise, use the
// newer ComputationHeartbeatRequests.
private final boolean sendKeyedGetDataRequests;
private final boolean multipleItemsInGetWorkResponse;
private final boolean batchedGetWorkResponse;
private final Consumer<List<ComputationHeartbeatResponse>> processHeartbeatResponses;

private GrpcWindmillStreamFactory(
Expand All @@ -100,7 +100,7 @@ private GrpcWindmillStreamFactory(
int streamingRpcBatchLimit,
int windmillMessagesBetweenIsReadyChecks,
boolean sendKeyedGetDataRequests,
boolean multipleItemsInGetWorkResponse,
boolean batchedGetWorkResponse,
Consumer<List<ComputationHeartbeatResponse>> processHeartbeatResponses,
Supplier<Duration> maxBackOffSupplier) {
this.jobHeader = jobHeader;
Expand All @@ -117,7 +117,7 @@ private GrpcWindmillStreamFactory(
.backoff());
this.streamRegistry = ConcurrentHashMap.newKeySet();
this.sendKeyedGetDataRequests = sendKeyedGetDataRequests;
this.multipleItemsInGetWorkResponse = multipleItemsInGetWorkResponse;
this.batchedGetWorkResponse = batchedGetWorkResponse;
this.processHeartbeatResponses = processHeartbeatResponses;
this.streamIdGenerator = new AtomicLong();
}
Expand All @@ -129,7 +129,7 @@ static GrpcWindmillStreamFactory create(
int streamingRpcBatchLimit,
int windmillMessagesBetweenIsReadyChecks,
boolean sendKeyedGetDataRequests,
boolean multipleItemsInGetWorkResponse,
boolean batchedGetWorkResponse,
Consumer<List<ComputationHeartbeatResponse>> processHeartbeatResponses,
Supplier<Duration> maxBackOffSupplier,
int healthCheckIntervalMillis) {
Expand All @@ -140,7 +140,7 @@ static GrpcWindmillStreamFactory create(
streamingRpcBatchLimit,
windmillMessagesBetweenIsReadyChecks,
sendKeyedGetDataRequests,
multipleItemsInGetWorkResponse,
batchedGetWorkResponse,
processHeartbeatResponses,
maxBackOffSupplier);

Expand Down Expand Up @@ -179,7 +179,7 @@ public static GrpcWindmillStreamFactory.Builder of(JobHeader jobHeader) {
.setStreamingRpcBatchLimit(DEFAULT_STREAMING_RPC_BATCH_LIMIT)
.setHealthCheckIntervalMillis(NO_HEALTH_CHECKS)
.setSendKeyedGetDataRequests(true)
.setMultipleItemsInGetWorkResponse(false)
.setBatchedGetWorkResponse(false)
.setProcessHeartbeatResponses(ignored -> {});
}

Expand Down Expand Up @@ -215,7 +215,7 @@ public GetWorkStream createGetWorkStream(
newStreamObserverFactory(),
streamRegistry,
logEveryNStreamFailures,
multipleItemsInGetWorkResponse,
batchedGetWorkResponse,
getWorkThrottleTimer,
processWorkItem);
}
Expand All @@ -236,7 +236,7 @@ public GetWorkStream createDirectGetWorkStream(
newStreamObserverFactory(),
streamRegistry,
logEveryNStreamFailures,
multipleItemsInGetWorkResponse,
batchedGetWorkResponse,
getWorkThrottleTimer,
heartbeatSender,
getDataClient,
Expand Down Expand Up @@ -364,7 +364,7 @@ Builder setProcessHeartbeatResponses(

Builder setHealthCheckIntervalMillis(int healthCheckIntervalMillis);

Builder setMultipleItemsInGetWorkResponse(boolean enabled);
Builder setBatchedGetWorkResponse(boolean enabled);

GrpcWindmillStreamFactory build();
}
Expand Down
Loading
Loading