Skip to content

Commit

Permalink
add back publish original cluster for symlink cluster (#963)
Browse files Browse the repository at this point in the history
  • Loading branch information
bohhyang authored Jan 3, 2024
1 parent 76fb4b6 commit 12375bf
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 35 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.49.2] - 2024-01-02
- add back publish original cluster for symlink cluster

## [29.49.1] - 2023-12-21
- Use a separate indis warmup executor service

Expand Down Expand Up @@ -5597,7 +5600,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.49.1...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.49.2...master
[29.49.2]: https://github.com/linkedin/rest.li/compare/v29.49.1...v29.49.2
[29.49.1]: https://github.com/linkedin/rest.li/compare/v29.49.0...v29.49.1
[29.49.0]: https://github.com/linkedin/rest.li/compare/v29.48.9...v29.49.0
[29.48.9]: https://github.com/linkedin/rest.li/compare/v29.48.8...v29.48.9
Expand Down
62 changes: 43 additions & 19 deletions d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -251,21 +251,24 @@ public void onChanged(XdsClient.NodeUpdate update)
try
{
ClusterProperties clusterProperties = toClusterProperties(update.getNodeData());
// For symlink clusters, ClusterLoadBalancerSubscriber subscribed to the symlinks instead of the actual node in event bus,
// so we need to publish under the symlink names.
// For symlink clusters, ClusterLoadBalancerSubscriber subscribed to the symlinks ($FooClusterMaster) instead of
// the original cluster (FooCluster-prod-ltx1) in event bus, so we need to publish under the symlink names.
// Also, rarely but possibly, calls can be made directly to the colo-suffixed service (FooService-prod-ltx1) under
// the original cluster (FooCluster-prod-ltx1) via curli, hard-coded custom code, etc, so there could be direct
// subscribers to the original cluster, thus we need to publish under the original cluster too.
//
// For other clusters, publish under its original name. Note that these clusters could be either:
// 1) regular clusters requested normally.
// 2) clusters that were pointed by a symlink previously, but no longer the case after the symlink points to other clusters.
// For case #2: the symlinkAndActualNode map will no longer has an entry for this cluster (removed in
// D2SymlinkNodeResourceWatcher::onChanged), thus the updates will be published under the original cluster name
// (like "FooCluster-prod-ltx1"), which has no symlink subscribers anyway, so no harm to publish.
String publishName = StringUtils.defaultIfEmpty(getSymlink(clusterName), clusterName);
_clusterEventBus.publishInitialize(publishName, clusterProperties);

if (_dualReadStateManager != null)
// (like "FooCluster-prod-ltx1") in case there are direct subscribers.
String symlinkName = getSymlink(clusterName);
if (symlinkName != null)
{
_dualReadStateManager.reportData(publishName, clusterProperties, true);
publishClusterData(symlinkName, clusterProperties);
}
publishClusterData(clusterName, clusterProperties);
}
catch (PropertySerializationException e)
{
Expand All @@ -274,6 +277,15 @@ public void onChanged(XdsClient.NodeUpdate update)
}
}

private void publishClusterData(String clusterName, ClusterProperties properties)
{
_clusterEventBus.publishInitialize(clusterName, properties);
if (_dualReadStateManager != null)
{
_dualReadStateManager.reportData(clusterName, properties, true);
}
}

@Override
public void onError(Status error)
{
Expand Down Expand Up @@ -433,23 +445,24 @@ public void onChanged(XdsClient.D2URIMapUpdate update)
}
_currentData = updates;

// For symlink clusters, UriLoadBalancerSubscriber subscribed to the symlinks instead of the actual node in event bus,
// so we need to publish under the symlink names.
// For symlink clusters, UriLoadBalancerSubscriber subscribed to the symlinks ($FooClusterMaster) instead of
// the original cluster (FooCluster-prod-ltx1) in event bus, so we need to publish under the symlink names.
// Also, rarely but possibly, calls can be made directly to the colo-suffixed service (FooService-prod-ltx1) under
// the original cluster (FooCluster-prod-ltx1) via curli, hard-coded custom code, etc, so there could be direct
// subscribers to the original cluster, thus we need to publish under the original cluster too.
//
// For other clusters, publish under its original name. Note that these clusters could be either:
// 1) regular clusters requested normally.
// 2) clusters that were pointed by a symlink previously, but no longer the case after the symlink points to other clusters.
// For case #2: the actualResourceToSymlink map will no longer has an entry for this cluster (removed in
// For case #2: the symlinkAndActualNode map will no longer has an entry for this cluster (removed in
// D2SymlinkNodeResourceWatcher::onChanged), thus the updates will be published under the original cluster name
// (like "FooCluster-prod-ltx1"), which has no subscribers anyway, so no harm to publish. Yet, we still emit the tracking
// events about receiving uri updates of this cluster for measuring update propagation latencies.
String publishName = StringUtils.defaultIfEmpty(getSymlink(_clusterName), _clusterName);
UriProperties mergedUriProperties = _uriPropertiesMerger.merge(publishName, _currentData.values());
_uriEventBus.publishInitialize(publishName, mergedUriProperties);

if (_dualReadStateManager != null)
// (like "FooCluster-prod-ltx1") in case there are direct subscribers.
String symlinkName = getSymlink(_clusterName);
if (symlinkName != null)
{
_dualReadStateManager.reportData(publishName, mergedUriProperties, true);
mergeAndPublishUris(symlinkName); // under symlink name, merge data and publish it
}
mergeAndPublishUris(_clusterName); // under original cluster name, merge data and publish it
}
catch (PropertySerializationException e)
{
Expand All @@ -458,6 +471,17 @@ public void onChanged(XdsClient.D2URIMapUpdate update)
}
}

private void mergeAndPublishUris(String clusterName)
{
UriProperties mergedUriProperties = _uriPropertiesMerger.merge(clusterName, _currentData.values());
_uriEventBus.publishInitialize(clusterName, mergedUriProperties);

if (_dualReadStateManager != null)
{
_dualReadStateManager.reportData(clusterName, mergedUriProperties, true);
}
}

@Override
public void onError(Status error)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,9 @@ public void testListenToClusterSymlink()
(XdsClient.NodeResourceWatcher) fixture._clusterWatcherArgumentCaptor.getValue();
clusterNodeWatcher.onChanged(getClusterNodeUpdate(PRIMARY_CLUSTER_NAME));

// verify cluster data is published under symlink name
// verify cluster data is published under symlink name and actual cluster name
verify(fixture._clusterEventBus).publishInitialize(SYMLINK_NAME, PRIMARY_CLUSTER_PROPERTIES);
verify(fixture._clusterEventBus).publishInitialize(PRIMARY_CLUSTER_NAME, PRIMARY_CLUSTER_PROPERTIES);

// test update symlink to a new primary cluster
String primaryClusterResourceName2 = CLUSTER_NODE_PREFIX + PRIMARY_CLUSTER_NAME_2;
Expand All @@ -130,8 +131,8 @@ public void testListenToClusterSymlink()
clusterNodeWatcher.onChanged(getClusterNodeUpdate(PRIMARY_CLUSTER_NAME_2));

verify(fixture._clusterEventBus).publishInitialize(PRIMARY_CLUSTER_NAME, primaryClusterProperties2);
verify(fixture._clusterEventBus, times(1)) // verify symlink is published just once
.publishInitialize(SYMLINK_NAME, primaryClusterProperties2);
// verify symlink is published just once
verify(fixture._clusterEventBus).publishInitialize(SYMLINK_NAME, primaryClusterProperties2);
}

@Test
Expand All @@ -158,17 +159,18 @@ public void testListenToUriSymlink() throws PropertySerializationException
(XdsClient.SymlinkNodeResourceWatcher) fixture._clusterWatcherArgumentCaptor.getValue();
symlinkNodeWatcher.onChanged(URI_SYMLINK_RESOURCE_NAME, getSymlinkNodeUpdate(PRIMARY_URI_RESOURCE_NAME));

// verify actual cluster is watched
// verify actual cluster of the uris is watched
verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), eq(XdsClient.ResourceType.D2_URI_MAP), any());

// update uri data
XdsClient.D2URIMapResourceWatcher watcher =
(XdsClient.D2URIMapResourceWatcher) fixture._uriWatcherArgumentCaptor.getValue();
watcher.onChanged(DUMMY_NODE_MAP_UPDATE);

// verify uri data is merged and published under symlink name
UriProperties uriProps = new UriProperties(SYMLINK_NAME, Collections.emptyMap(), Collections.emptyMap());
verify(fixture._uriEventBus).publishInitialize(SYMLINK_NAME, uriProps);
// verify uri data is merged and published under symlink name and the actual cluster name
UriProperties uriProps = getDefaultUriProperties(PRIMARY_CLUSTER_NAME);
verify(fixture._uriEventBus).publishInitialize(SYMLINK_NAME, getDefaultUriProperties(SYMLINK_NAME));
verify(fixture._uriEventBus).publishInitialize(PRIMARY_CLUSTER_NAME, uriProps);

// test update symlink to a new primary cluster
String primaryUriResourceName2 = URI_NODE_PREFIX + PRIMARY_CLUSTER_NAME_2;
Expand Down Expand Up @@ -244,23 +246,35 @@ private static XdsClient.NodeUpdate getClusterNodeUpdate(String clusterName)
private void verifyClusterNodeUpdate(XdsToD2PropertiesAdaptorFixture fixture, String clusterName, String symlinkName,
ClusterStoreProperties expectedPublishProp)
{
String publishName = symlinkName != null ? symlinkName : clusterName;
XdsClient.NodeResourceWatcher watcher = (XdsClient.NodeResourceWatcher)
fixture._clusterWatcherArgumentCaptor.getValue();
watcher.onChanged(getClusterNodeUpdate(clusterName));
verify(fixture._clusterEventBus).publishInitialize(publishName, expectedPublishProp);
verify(fixture._clusterEventBus).publishInitialize(clusterName, expectedPublishProp);
if (symlinkName != null)
{
verify(fixture._clusterEventBus).publishInitialize(symlinkName, expectedPublishProp);
}
}

private void verifyUriUpdate(XdsToD2PropertiesAdaptorFixture fixture, String clusterName, String symlinkName)
throws PropertySerializationException {
String publishName = symlinkName != null ? symlinkName : clusterName;

throws PropertySerializationException
{
XdsClient.D2URIMapResourceWatcher watcher = (XdsClient.D2URIMapResourceWatcher)
fixture._uriWatcherArgumentCaptor.getValue();
watcher.onChanged(new XdsClient.D2URIMapUpdate("",
Collections.singletonMap("ltx1-dummyhost123", getD2URI(clusterName))));
verify(fixture._uriEventBus).publishInitialize(publishName,
new UriPropertiesJsonSerializer().fromProto(getD2URI(publishName)));
verify(fixture._uriEventBus).publishInitialize(clusterName,
new UriPropertiesJsonSerializer().fromProto(getD2URI(clusterName)));
if (symlinkName != null)
{
verify(fixture._uriEventBus).publishInitialize(symlinkName,
new UriPropertiesJsonSerializer().fromProto(getD2URI(symlinkName)));
}
}

private UriProperties getDefaultUriProperties(String clusterName)
{
return new UriProperties(clusterName, Collections.emptyMap(), Collections.emptyMap(), -1);
}

private static class XdsToD2PropertiesAdaptorFixture
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.49.1
version=29.49.2
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down

0 comments on commit 12375bf

Please sign in to comment.