Skip to content

Commit

Permalink
Update initial mapping apps of service discovery MappingListener
Browse files Browse the repository at this point in the history
  • Loading branch information
chickenlj committed Nov 3, 2024
1 parent 9741de3 commit 13b1c61
Showing 1 changed file with 21 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,12 @@ public void doSubscribe(URL url, NotifyListener listener) {
mappingLock.lock();
mappingByUrl = serviceNameMapping.getMapping(url);
try {
MappingListener mappingListener = new DefaultMappingListener(url, mappingByUrl, listener);
DefaultMappingListener mappingListener = new DefaultMappingListener(url, mappingByUrl, listener);
mappingByUrl = serviceNameMapping.getAndListen(this.getUrl(), url, mappingListener);
// update the initial mapping apps we started to listen, to make sure it reflects the real value
// used do subscription before any event.
// it's protected by the mapping lock, so it won't override the event value.
mappingListener.updateInitialApps(mappingByUrl);
synchronized (mappingListeners) {
mappingListeners
.computeIfAbsent(url.getProtocolServiceKey(), (k) -> new ConcurrentHashSet<>())
Expand Down Expand Up @@ -399,8 +403,8 @@ public Map<String, ServiceInstancesChangedListener> getServiceListeners() {
private class DefaultMappingListener implements MappingListener {
private final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(DefaultMappingListener.class);
private final URL url;
private Set<String> oldApps;
private NotifyListener listener;
private final NotifyListener listener;
private volatile Set<String> oldApps;
private volatile boolean stopped;

public DefaultMappingListener(URL subscribedURL, Set<String> serviceNames, NotifyListener listener) {
Expand All @@ -424,16 +428,15 @@ public synchronized void onEvent(MappingChangedEvent event) {
Set<String> newApps = event.getApps();
Set<String> tempOldApps = oldApps;

if (CollectionUtils.isEmpty(newApps) || CollectionUtils.equals(newApps, tempOldApps)) {
return;
}

logger.info(
"Mapping of service " + event.getServiceKey() + "changed from " + tempOldApps + " to " + newApps);

Lock mappingLock = serviceNameMapping.getMappingLock(event.getServiceKey());
try {
mappingLock.lock();
if (CollectionUtils.isEmpty(newApps) || CollectionUtils.equals(newApps, tempOldApps)) {
return;
}
logger.info("Mapping of service " + event.getServiceKey() + "changed from " + tempOldApps + " to "
+ newApps);

if (CollectionUtils.isEmpty(tempOldApps) && !newApps.isEmpty()) {
serviceNameMapping.putCachedMapping(ServiceNameMapping.buildMappingKey(url), newApps);
subscribeURLs(url, listener, newApps);
Expand Down Expand Up @@ -478,6 +481,14 @@ protected NotifyListener getListener() {
return listener;
}

// writing of oldApps is protected by mapping lock to guarantee sequence consistency.
public void updateInitialApps(Set<String> oldApps) {
if (oldApps != null && !CollectionUtils.equals(oldApps, this.oldApps)) {
this.oldApps = oldApps;
logger.info("Update initial mapping apps from " + this.oldApps + " to " + oldApps);
}
}

@Override
public void stop() {
stopped = true;
Expand Down

0 comments on commit 13b1c61

Please sign in to comment.