Skip to content

Commit

Permalink
[FLINK-36535][autoscaler] Optimize the scale down logic based on hist…
Browse files Browse the repository at this point in the history
…orical parallelism to reduce the rescale frequency

1. Using the maximum parallelism within the window instead of the latest parallelism when scaling down
2. Never scale down when (currentTime - triggerTime) < scale-down.interval
  • Loading branch information
1996fanrui committed Dec 3, 2024
1 parent 9bab028 commit d9e8cce
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public DelayedScaleDown getDelayedScaleDown(Context jobContext) {
try {
return deserializeDelayedScaleDown(delayedScaleDown.get());
} catch (JacksonException e) {
LOG.error(
LOG.warn(
"Could not deserialize delayed scale down, possibly the format changed. Discarding...",
e);
jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), DELAYED_SCALE_DOWN);
Expand Down Expand Up @@ -330,13 +330,11 @@ private static ConfigChanges deserializeConfigOverrides(String configOverrides)

private static String serializeDelayedScaleDown(DelayedScaleDown delayedScaleDown)
throws JacksonException {
return YAML_MAPPER.writeValueAsString(delayedScaleDown.getFirstTriggerTime());
return YAML_MAPPER.writeValueAsString(delayedScaleDown);
}

private static DelayedScaleDown deserializeDelayedScaleDown(String delayedScaleDown)
throws JacksonException {
Map<JobVertexID, Instant> firstTriggerTime =
YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {});
return new DelayedScaleDown(firstTriggerTime);
return YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,51 +19,81 @@

import org.apache.flink.runtime.jobgraph.JobVertexID;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import lombok.Data;
import lombok.Getter;

import javax.annotation.Nonnull;

import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/** All delayed scale down requests. */
public class DelayedScaleDown {

@Getter private final Map<JobVertexID, Instant> firstTriggerTime;
/** The delayed scale down info for vertex. */
@Data
public static class VertexDelayedScaleDownInfo {
private final Instant firstTriggerTime;
private int maxRecommendedParallelism;

@JsonCreator
public VertexDelayedScaleDownInfo(
@JsonProperty("firstTriggerTime") Instant firstTriggerTime,
@JsonProperty("maxRecommendedParallelism") int maxRecommendedParallelism) {
this.firstTriggerTime = firstTriggerTime;
this.maxRecommendedParallelism = maxRecommendedParallelism;
}
}

@Getter private final Map<JobVertexID, VertexDelayedScaleDownInfo> delayedVertices;

// Have any scale down request been updated? It doesn't need to be stored, it is only used to
// determine whether DelayedScaleDown needs to be stored.
@Getter private boolean isUpdated = false;
@JsonIgnore @Getter private boolean updated = false;

public DelayedScaleDown() {
this.firstTriggerTime = new HashMap<>();
}

public DelayedScaleDown(Map<JobVertexID, Instant> firstTriggerTime) {
this.firstTriggerTime = firstTriggerTime;
this.delayedVertices = new HashMap<>();
}

Optional<Instant> getFirstTriggerTimeForVertex(JobVertexID vertex) {
return Optional.ofNullable(firstTriggerTime.get(vertex));
}
/** Trigger a scale down, and return the corresponding {@link VertexDelayedScaleDownInfo}. */
@Nonnull
public VertexDelayedScaleDownInfo triggerScaleDown(
JobVertexID vertex, Instant triggerTime, int parallelism) {
var vertexDelayedScaleDownInfo = delayedVertices.get(vertex);
if (vertexDelayedScaleDownInfo == null) {
// It's the first trigger
vertexDelayedScaleDownInfo = new VertexDelayedScaleDownInfo(triggerTime, parallelism);
delayedVertices.put(vertex, vertexDelayedScaleDownInfo);
updated = true;
} else if (parallelism > vertexDelayedScaleDownInfo.getMaxRecommendedParallelism()) {
// Not the first trigger, but the maxRecommendedParallelism needs to be updated.
vertexDelayedScaleDownInfo.setMaxRecommendedParallelism(parallelism);
updated = true;
}

void updateTriggerTime(JobVertexID vertex, Instant instant) {
firstTriggerTime.put(vertex, instant);
isUpdated = true;
return vertexDelayedScaleDownInfo;
}

// Clear the delayed scale down for corresponding vertex when the recommended parallelism is
// greater than or equal to the currentParallelism.
void clearVertex(JobVertexID vertex) {
Instant removed = firstTriggerTime.remove(vertex);
VertexDelayedScaleDownInfo removed = delayedVertices.remove(vertex);
if (removed != null) {
isUpdated = true;
updated = true;
}
}

// Clear all delayed scale down when rescale happens.
void clearAll() {
if (firstTriggerTime.isEmpty()) {
if (delayedVertices.isEmpty()) {
return;
}
firstTriggerTime.clear();
isUpdated = true;
delayedVertices.clear();
updated = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,31 +88,22 @@ public JobVertexScaler(AutoScalerEventHandler<KEY, Context> autoScalerEventHandl
this.autoScalerEventHandler = autoScalerEventHandler;
}

/** The parallelism change type of {@link ParallelismChange}. */
public enum ParallelismChangeType {
NO_CHANGE,
REQUIRED_CHANGE,
OPTIONAL_CHANGE;
}

/**
* The rescaling will be triggered if any vertex's ParallelismChange is required. This means
* that if all vertices' ParallelismChange is optional, rescaling will be ignored.
*/
/** The rescaling will be triggered if any vertex's {@link ParallelismChange} is changed. */
@Getter
public static class ParallelismChange {

private static final ParallelismChange NO_CHANGE =
new ParallelismChange(ParallelismChangeType.NO_CHANGE, -1);
private static final ParallelismChange NO_CHANGE = new ParallelismChange(-1);

private final ParallelismChangeType changeType;
private final int newParallelism;

private ParallelismChange(ParallelismChangeType changeType, int newParallelism) {
this.changeType = changeType;
private ParallelismChange(int newParallelism) {
this.newParallelism = newParallelism;
}

public boolean isNoChange() {
return this == NO_CHANGE;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -122,30 +113,24 @@ public boolean equals(Object o) {
return false;
}
ParallelismChange that = (ParallelismChange) o;
return changeType == that.changeType && newParallelism == that.newParallelism;
return newParallelism == that.newParallelism;
}

@Override
public int hashCode() {
return Objects.hash(changeType, newParallelism);
return Objects.hash(newParallelism);
}

@Override
public String toString() {
return "ParallelismChange{"
+ "changeType="
+ changeType
+ ", newParallelism="
+ newParallelism
+ '}';
return isNoChange()
? "NoParallelismChange"
: "ParallelismChange{newParallelism=" + newParallelism + '}';
}

public static ParallelismChange required(int newParallelism) {
return new ParallelismChange(ParallelismChangeType.REQUIRED_CHANGE, newParallelism);
}

public static ParallelismChange optional(int newParallelism) {
return new ParallelismChange(ParallelismChangeType.OPTIONAL_CHANGE, newParallelism);
public static ParallelismChange build(int newParallelism) {
checkArgument(newParallelism > 0, "The parallelism should be greater than 0.");
return new ParallelismChange(newParallelism);
}

public static ParallelismChange noChange() {
Expand Down Expand Up @@ -263,7 +248,7 @@ private ParallelismChange detectBlockScaling(

// If we don't have past scaling actions for this vertex, don't block scale up.
if (history.isEmpty()) {
return ParallelismChange.required(newParallelism);
return ParallelismChange.build(newParallelism);
}

var lastSummary = history.get(history.lastKey());
Expand All @@ -275,7 +260,7 @@ && detectIneffectiveScaleUp(
return ParallelismChange.noChange();
}

return ParallelismChange.required(newParallelism);
return ParallelismChange.build(newParallelism);
} else {
return applyScaleDownInterval(delayedScaleDown, vertex, conf, newParallelism);
}
Expand All @@ -289,21 +274,26 @@ private ParallelismChange applyScaleDownInterval(
var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
if (scaleDownInterval.toMillis() <= 0) {
// The scale down interval is disable, so don't block scaling.
return ParallelismChange.required(newParallelism);
}

var firstTriggerTime = delayedScaleDown.getFirstTriggerTimeForVertex(vertex);
if (firstTriggerTime.isEmpty()) {
LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval);
delayedScaleDown.updateTriggerTime(vertex, clock.instant());
return ParallelismChange.optional(newParallelism);
return ParallelismChange.build(newParallelism);
}

if (clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) {
LOG.debug("Try to skip immediate scale down within scale-down interval for {}", vertex);
return ParallelismChange.optional(newParallelism);
var now = clock.instant();
var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, now, newParallelism);

// Never scale down within scale down interval
if (now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval))) {
if (now.equals(delayedScaleDownInfo.getFirstTriggerTime())) {
LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval);
} else {
LOG.debug(
"Try to skip immediate scale down within scale-down interval for {}",
vertex);
}
return ParallelismChange.noChange();
} else {
return ParallelismChange.required(newParallelism);
// Using the maximum parallelism within the scale down interval window instead of the
// latest parallelism when scaling down
return ParallelismChange.build(delayedScaleDownInfo.getMaxRecommendedParallelism());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,10 @@
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;

import static org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.NO_CHANGE;
import static org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.REQUIRED_CHANGE;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
Expand Down Expand Up @@ -181,15 +178,15 @@ private void updateRecommendedParallelism(
}

@VisibleForTesting
static boolean allRequiredVerticesWithinUtilizationTarget(
static boolean allChangedVerticesWithinUtilizationTarget(
Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
Set<JobVertexID> requiredVertices) {
// All vertices' ParallelismChange is optional, rescaling will be ignored.
if (requiredVertices.isEmpty()) {
Set<JobVertexID> changedVertices) {
// No vertices with changed parallelism.
if (changedVertices.isEmpty()) {
return true;
}

for (JobVertexID vertex : requiredVertices) {
for (JobVertexID vertex : changedVertices) {
var metrics = evaluatedMetrics.get(vertex);

double trueProcessingRate = metrics.get(TRUE_PROCESSING_RATE).getAverage();
Expand Down Expand Up @@ -234,7 +231,6 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
}

var out = new HashMap<JobVertexID, ScalingSummary>();
var requiredVertices = new HashSet<JobVertexID>();

var excludeVertexIdList =
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
Expand All @@ -260,10 +256,8 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
v, Collections.emptySortedMap()),
restartTime,
delayedScaleDown);
if (NO_CHANGE == parallelismChange.getChangeType()) {
if (parallelismChange.isNoChange()) {
return;
} else if (REQUIRED_CHANGE == parallelismChange.getChangeType()) {
requiredVertices.add(v);
}
out.put(
v,
Expand All @@ -274,10 +268,9 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
}
});

// If the Utilization of all required tasks is within range, we can skip scaling.
// It means that if only optional tasks are out of scope, we still need to ignore scale.
if (allRequiredVerticesWithinUtilizationTarget(
evaluatedMetrics.getVertexMetrics(), requiredVertices)) {
// If the Utilization of all tasks is within range, we can skip scaling.
if (allChangedVerticesWithinUtilizationTarget(
evaluatedMetrics.getVertexMetrics(), out.keySet())) {
return Map.of();
}

Expand Down
Loading

0 comments on commit d9e8cce

Please sign in to comment.