-
Notifications
You must be signed in to change notification settings - Fork 863
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add weight clustering to yarp #2356
Comments
How is it going? |
@adityamandaleeka Hi thanks for adding this feature to the plan. I would like to submit a PR to implement this feature. |
@ayrloong you can start by helping define the design. This will be a complex feature and it's better to get the design right before starting a PR. E.g. what do you expect the overall behavior to be, and what APIs and config do you think are necessary to implement that? The linked items above should help. |
Thanks for the heads up, I'm sure some designs are needed to describe this feature. |
@Tratcher Hi this is my initial design for this feature. YARP weighted clustering designAdding a new configurationWeightedClusterConfig.cspublic sealed record WeightedClusterConfig
{
public string? ClusterId { get; init; }
public int? Weight { get; set; } ;
} Update the configuration to add WeightedClusters in RouteConfigRouteConfig.cs/// </summary>
public sealed record RouteConfig
{
/// <summary>
/// Gets or sets the weight clusters that requests matching this route
/// If set ClusterId there is no need to set this
/// </summary>
public IReadOnlyList<WeightedClusterConfig>? WeightedClusters { get; init; }
} Update ConfigurationConfigProviderConfigurationConfigProvider.cs private static RouteConfig CreateRoute(IConfigurationSection section)
{
return new RouteConfig
{
///.....
WeightedClusters = CreateWeightedClusters(section.GetSection(nameof(RouteConfig.WeightedClusters)))
};
}
private static IReadOnlyList<WeightedClusterConfig>? CreateWeightedClusters (IConfigurationSection section)
{
if (!section.Exists())
{
return null;
}
return section.GetChildren().Select(CreateWeightedCluster).ToArray();
}
private static WeightedClusterConfig CreateWeightedCluster(IConfigurationSection section)
{
return new WeightedClusterConfig()
{
ClusterId = section[nameof(WeightedClusterConfig.ClusterId)]!,
Weight = section.ReadInt32(nameof(WeightedClusterConfig.Weight))
};
} Adding RoutingHelperinternal static class RoutingHelper
{
public static T SelectByWeight<T>(this IEnumerable<T> endpoints, Func<T, double> weightProvider, Randomizer randomizer)
{
var accumulatedProbability = 0d;
var weightSum = endpoints.Sum(weightProvider);
var randomPercentageValue = randomizer.NextDouble(weightSum);
foreach (var endpoint in endpoints)
{
var weight = weightProvider(endpoint);
if (randomPercentageValue <= weight + accumulatedProbability)
{
return endpoint;
}
accumulatedProbability += weight;
}
throw new InvalidOperationException(
$"The item cannot be selected because the weights are not correctly calculated.");
}
}
internal class Randomizer
{
#if NET6_0_OR_GREATER
public virtual double NextDouble(double maxValue) => Random.Shared.NextDouble() * maxValue;
public virtual int NextInt(int maxValue) => Random.Shared.Next(maxValue);
#else
private static readonly System.Threading.ThreadLocal<Random> _randomInstance = new(() => new Random());
i
public virtual double NextDouble(double maxValue) => _randomInstance.Value!.NextDouble() * maxValue;
public virtual int NextInt(int maxValue) => _randomInstance.Value!.Next(maxValue);
#endif
} Adding middleware to handle request routeingCanaryMiddleware.cspublic class CanaryMiddleware(RequestDelegate next, IProxyStateLookup lookup, IRequestClusterPolicy clusterPolicy)
{
public Task InvokeAsync(HttpContext context)
{
var proxyFeature = context.GetReverseProxyFeature();
var weightedClusters = proxyFeature.Route.Config.WeightedClusters;
if (weightedClusters is null) return next(context);
var weightedCluster = clusterPolicy.PickCluster(context, weightedClusters);
if (lookup.TryGetCluster(weightedCluster?.ClusterId, out var cluster))
{
context.ReassignProxyRequest(cluster);
}
return next(context);
}
public interface IRequestClusterPolicy
{
WeightedClusterConfig? PickCluster(HttpContext context, IEnumerable<WeightedClusterConfig> clusters);
}
internal class WeightedClusterPolicy(Randomizer randomizer) : IRequestClusterPolicy
{
public WeightedClusterConfig? PickCluster(HttpContext context, IEnumerable<WeightedClusterConfig> clusters)
{
return clusters.SelectByWeight(g => g.Weight ?? 0, randomizer);
}
} |
Interesting approach. I'm a bit surprised the weights are at the route/cluster level, I expected them to be at the destination level and feed into the load balancing algorithms. That said, maybe we should consider both as separate features. Can you break down the tradeoffs of both? How about session affinity, making sure the same client returns to the same cluster? Or is that a detail of IRequestClusterPolicy? No need to show straight forward implementation details like ConfigurationConfigProvider. Schemas, APIs, and behavior are the interesting points at this stage. It might make sense to combine the CanaryMiddleware with ProxyPipelineInitializerMiddleware reverse-proxy/src/ReverseProxy/Model/ProxyPipelineInitializerMiddleware.cs Lines 49 to 59 in 4b29682
|
I think destination weight policy may be added in the future, so I separated routing/cluster. WeightedLoadBalancingPolicy.cspublic class WeightedLoadBalancingPolicy : ILoadBalancingPolicy
{
public string Name { get; }
public DestinationState? PickDestination(HttpContext context, ClusterState cluster, IReadOnlyList<DestinationState> availableDestinations)
{
// return selected destination
throw new NotImplementedException();
}
} Regarding cluster session affinity, I initially ignored this feature but after your reminder, I re-examined this feature. Combining CanaryMiddleware with ProxyPipelineInitializerMiddleware is a very good choice, which can reduce unnecessary code This is the combined code ClusterState cluster = null;
var weightedClusters = route.Config.WeightedClusters;
if (weightedClusters is not null && weightedClusters.Count != 0)
{
var weightedCluster = clusterPolicy.PickCluster(context, weightedClusters);
cluster = new ClusterState(weightedCluster.ClusterId);
}
else
{
cluster = route.Cluster;
} |
Hi @Tratcher , we implemented wrr (Weight Round Robin). appsettings.json, add Weight in destination. {
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"AllowedHosts": "*",
"ReverseProxy": {
"Routes": {
"route1" : {
"ClusterId": "cluster1",
"Match": {
"Path": "{**catch-all}"
}
}
},
"Clusters": {
"cluster1": {
"Destinations": {
"destination1": {
"Address": "https://example.com/",
"Weight": 100
}
}
}
}
}
} WeightRoundRobinLoadBalancingPolicy public class WeightRoundRobinLoadBalancingPolicy: ILoadBalancingPolicy
{
private ILogger<WeightRoundRobinLoadBalancingPolicy> _logger;
public string Name => "WeightRoundRobin";
public WeightRoundRobinLoadBalancingPolicy(ILogger<WeightRoundRobinLoadBalancingPolicy> logger)
{
_logger = logger;
}
public DestinationState? PickDestination(HttpContext context, ClusterState cluster, IReadOnlyList<DestinationState> availableDestinations)
{
if (Weighting.WeightedClusterWeights.TryGetValue(cluster.ClusterId, out var weightedWeights))
{
if (weightedWeights is null)
{
_logger.LogInformation($"PickDestination Error: Can not get [{cluster.ClusterId}] cluster weightedWeights");
return null;
}
if (weightedWeights.DestinationIds is null)
{
_logger.LogInformation($"PickDestination Error: Can not get [{cluster.ClusterId}] destination, DestinationIds is null");
return null;
}
var destinationId = weightedWeights.DestinationIds[WeightingHelper.GetIndexByRandomWeight(weightedWeights.DestinationWeightedWeights, weightedWeights.DestinationWeights, weightedWeights.TotalWeights ?? 1D)];
return availableDestinations.FirstOrDefault(destination => destination.DestinationId == destinationId);
}
_logger.LogInformation($"PickDestination Error: Can not get [{cluster.ClusterId}] cluster");
return null;
}
} WeightConfigFilter public class WeightConfigFilter : IProxyConfigFilter
{
private ILogger<WeightConfigFilter> _logger;
public WeightConfigFilter(ILogger<WeightConfigFilter> logger)
{
_logger = logger;
}
public ValueTask<ClusterConfig> ConfigureClusterAsync(ClusterConfig cluster, CancellationToken cancel)
{
_logger.LogInformation($"[{DateTime.Now}]:{nameof(WeightConfigFilter)}.{nameof(ConfigureClusterAsync)} Started");
try
{
var weights = cluster.Destinations?.ToDictionary(destination => destination.Key, destination =>
{
if (destination.Value.Metadata?.TryGetValue("Weight", out var weight) ?? false)
return double.Parse(weight) / 100D;
else
return 1D;
});
List<string> destinationIds = new();
List<double> destinationWeights = new();
WeightedWeight? weightedWeight = null;
if (weights is not null)
{
foreach (var weight in weights)
{
destinationIds.Add(weight.Key);
destinationWeights.Add(weight.Value);
}
var weightedWeights = WeightingHelper.GetWeightedWeights(destinationWeights.ToArray());
weightedWeight = new()
{
DestinationIds = destinationIds.ToArray(),
DestinationWeights = destinationWeights.ToArray(),
DestinationWeightedWeights = weightedWeights.Weights,
TotalWeights = weightedWeights.TotalWeight
};
}
if (Weighting.ClusterWeights.ContainsKey(cluster.ClusterId))
{
Weighting.ClusterWeights[cluster.ClusterId] = weights;
Weighting.WeightedClusterWeights[cluster.ClusterId] = weightedWeight;
}
else
{
Weighting.ClusterWeights.Add(cluster.ClusterId, weights);
Weighting.WeightedClusterWeights.Add(cluster.ClusterId, weightedWeight);
}
_logger.LogInformation($"[{DateTime.Now}]:{nameof(WeightConfigFilter)}.{nameof(ConfigureClusterAsync)} Set, clusterId: {cluster.ClusterId}, {JsonSerializer.Serialize(Weighting.WeightedClusterWeights[cluster.ClusterId])}");
}
catch (Exception ex)
{
_logger.LogInformation($"[{DateTime.Now}]:{nameof(WeightConfigFilter)}.{nameof(ConfigureClusterAsync)} Error:{ex}");
}
_logger.LogInformation($"[{DateTime.Now}]:{nameof(WeightConfigFilter)}.{nameof(ConfigureClusterAsync)} Finished");
return new ValueTask<ClusterConfig>(cluster);
}
public ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, ClusterConfig? cluster, CancellationToken cancel)
{
return new ValueTask<RouteConfig>(route);
}
} WeightingHelper public class WeightingHelper
{
public static (double[]? Weights, double? TotalWeight) GetWeightedWeights(double[] weights)
{
if (weights.Length == 0) return (null, null);
else if (weights.Length == 1) return ([.. weights], weights[0]);
var totalWeight = 0D;
Span<double> newWeights = stackalloc double[weights.Length];
for (int i = 0; i < weights.Length; i++)
{
totalWeight += weights[i];
newWeights[i] = totalWeight;
}
return ([.. newWeights], totalWeight);
}
public static int GetIndexByRandomWeight(Span<double> weightedWeights, Span<double> weights, double totalWeight)
{
// Ignore weight when only one server
if (weightedWeights.Length == 1) return 0;
var randomWeight = Random.Shared.NextDouble() * totalWeight;
var index = weightedWeights.BinarySearch(randomWeight);
if (index < 0)
index = -index - 1;
else if (index > weightedWeights.Length)
// The number of servers decreases
index = GetIndexByRandomWeight(weightedWeights, weights, totalWeight);
if (weights[index] != 0D)
return index;
else
// The weight of the server is 0
return GetIndexByRandomWeight(weightedWeights, weights, totalWeight);
}
} WeightedWeight public class WeightedWeight
{
public string[]? DestinationIds { get; set; }
public double[]? DestinationWeights { get; set; }
public double[]? DestinationWeightedWeights { get; set; }
public double? TotalWeights { get; set; }
} BTW, we provide some new APIs for dynamically updating configurations. Finally, if this is the wrr you want, I can pr. |
Many reverse proxy tools have built-in weight clustering capabilities. Although YARP provides extensions, I think this function is more commonly used, so I want to add this function to Yarp.
The text was updated successfully, but these errors were encountered: