From b5d28024369991f1e45cce7975caf993be5e2fd2 Mon Sep 17 00:00:00 2001 From: Zizhong Zhang Date: Thu, 29 Apr 2021 10:25:13 -0700 Subject: [PATCH] setShared --- .../linkedin/r2/filter/TimedRestFilter.java | 14 +++++++-- .../linkedin/r2/filter/TimedStreamFilter.java | 16 +++++++--- .../filter/transport/FilterChainClient.java | 29 ++++--------------- .../http/client/HttpClientFactory.java | 9 +++++- 4 files changed, 38 insertions(+), 30 deletions(-) diff --git a/r2-core/src/main/java/com/linkedin/r2/filter/TimedRestFilter.java b/r2-core/src/main/java/com/linkedin/r2/filter/TimedRestFilter.java index 1a39272aa9..ee7cecfe1d 100644 --- a/r2-core/src/main/java/com/linkedin/r2/filter/TimedRestFilter.java +++ b/r2-core/src/main/java/com/linkedin/r2/filter/TimedRestFilter.java @@ -43,6 +43,7 @@ public class TimedRestFilter implements RestFilter private final TimingKey _onRequestTimingKey; private final TimingKey _onResponseTimingKey; private final TimingKey _onErrorTimingKey; + private boolean _shared; /** * Registers {@link TimingKey}s for {@link com.linkedin.r2.message.timing.TimingNameConstants#TIMED_REST_FILTER}. @@ -63,6 +64,7 @@ public TimedRestFilter(RestFilter restFilter) _restFilter.getClass().getSimpleName(), TimingImportance.LOW); _onErrorTimingKey = TimingKey.registerNewKey(timingKeyPrefix + ON_ERROR_SUFFIX + timingKeyPostfix, _restFilter.getClass().getSimpleName(), TimingImportance.LOW); + _shared = false; } @Override @@ -94,7 +96,15 @@ public void onRestError(Throwable ex, _restFilter.onRestError(ex, requestContext, wireAttrs, new TimedNextFilter<>(_onErrorTimingKey, nextFilter)); } - public List getTimingKeyList() { - return Arrays.asList(_onErrorTimingKey, _onRequestTimingKey, _onResponseTimingKey); + public void setShared() { + _shared = true; + } + + public void onShutdown() { + if (!_shared) { + TimingKey.unregisterKey(_onErrorTimingKey); + TimingKey.unregisterKey(_onRequestTimingKey); + TimingKey.unregisterKey(_onResponseTimingKey); + } } } diff --git a/r2-core/src/main/java/com/linkedin/r2/filter/TimedStreamFilter.java b/r2-core/src/main/java/com/linkedin/r2/filter/TimedStreamFilter.java index b661bef842..ec6a29112b 100644 --- a/r2-core/src/main/java/com/linkedin/r2/filter/TimedStreamFilter.java +++ b/r2-core/src/main/java/com/linkedin/r2/filter/TimedStreamFilter.java @@ -23,8 +23,6 @@ import com.linkedin.r2.message.stream.StreamRequest; import com.linkedin.r2.message.stream.StreamResponse; import com.linkedin.r2.message.timing.TimingImportance; -import java.util.Arrays; -import java.util.List; import java.util.Map; import static com.linkedin.r2.filter.TimedRestFilter.ON_ERROR_SUFFIX; @@ -42,6 +40,7 @@ public class TimedStreamFilter implements StreamFilter private final TimingKey _onRequestTimingKey; private final TimingKey _onResponseTimingKey; private final TimingKey _onErrorTimingKey; + private boolean _shared; /** * Registers {@link TimingKey}s for {@link com.linkedin.r2.message.timing.TimingNameConstants#TIMED_STREAM_FILTER}. @@ -62,6 +61,7 @@ public TimedStreamFilter(StreamFilter streamFilter) filterClassName, TimingImportance.LOW); _onErrorTimingKey = TimingKey.registerNewKey(timingKeyPrefix + ON_ERROR_SUFFIX + timingKeyPostfix, filterClassName, TimingImportance.LOW); + _shared = false; } @Override @@ -94,7 +94,15 @@ public void onStreamError(Throwable ex, _streamFilter.onStreamError(ex, requestContext, wireAttrs, new TimedNextFilter<>(_onErrorTimingKey, nextFilter)); } - public List getTimingKeyList() { - return Arrays.asList(_onErrorTimingKey, _onRequestTimingKey, _onResponseTimingKey); + public void setShared() { + _shared = true; + } + + public void onShutdown() { + if (!_shared) { + TimingKey.unregisterKey(_onErrorTimingKey); + TimingKey.unregisterKey(_onRequestTimingKey); + TimingKey.unregisterKey(_onResponseTimingKey); + } } } diff --git a/r2-core/src/main/java/com/linkedin/r2/filter/transport/FilterChainClient.java b/r2-core/src/main/java/com/linkedin/r2/filter/transport/FilterChainClient.java index 685f483756..e2fe4913c2 100644 --- a/r2-core/src/main/java/com/linkedin/r2/filter/transport/FilterChainClient.java +++ b/r2-core/src/main/java/com/linkedin/r2/filter/transport/FilterChainClient.java @@ -52,7 +52,6 @@ public class FilterChainClient implements TransportClient { private final TransportClient _client; private final FilterChain _filters; - private final FilterChain _sharedFilters; /** * Construct a new instance by composing the specified {@link TransportClient} @@ -62,7 +61,7 @@ public class FilterChainClient implements TransportClient * @param filters the {@link FilterChain} to be composed. * @param sharedFilters the {@link FilterChain} can be used by other clients. */ - public FilterChainClient(TransportClient client, FilterChain filters, FilterChain sharedFilters) + public FilterChainClient(TransportClient client, FilterChain filters) { _client = client; @@ -74,7 +73,6 @@ public FilterChainClient(TransportClient client, FilterChain filters, FilterChai .addLastRest(requestFilter) .addFirst(responseFilter) .addLast(requestFilter); - _sharedFilters = sharedFilters; } @Override @@ -104,26 +102,11 @@ public void shutdown(Callback callback) { _client.shutdown(callback); - List streamFilters = _filters.getStreamFilters(); - List restFilters = _filters.getRestFilters(); - List sharedStreamFilters = _sharedFilters.getStreamFilters(); - List sharedRestFilters = _sharedFilters.getRestFilters(); - - streamFilters.stream() - .filter(filter -> !sharedStreamFilters.contains(filter)) - .filter(TimedStreamFilter.class::isInstance) - .map(TimedStreamFilter.class::cast) - .map(TimedStreamFilter::getTimingKeyList) - .flatMap(Collection::stream) - .forEach(TimingKey::unregisterKey); - - restFilters.stream() - .filter(filter -> !sharedRestFilters.contains(filter)) - .filter(TimedRestFilter.class::isInstance) - .map(TimedRestFilter.class::cast) - .map(TimedRestFilter::getTimingKeyList) - .flatMap(Collection::stream) - .forEach(TimingKey::unregisterKey); + _filters.getStreamFilters().stream().filter(TimedStreamFilter.class::isInstance) + .map(TimedStreamFilter.class::cast).forEach(TimedStreamFilter::onShutdown); + + _filters.getRestFilters().stream().filter(TimedRestFilter.class::isInstance) + .map(TimedRestFilter.class::cast).forEach(TimedRestFilter::onShutdown); } /** diff --git a/r2-netty/src/main/java/com/linkedin/r2/transport/http/client/HttpClientFactory.java b/r2-netty/src/main/java/com/linkedin/r2/transport/http/client/HttpClientFactory.java index 6ed7b0b372..6b1de01551 100644 --- a/r2-netty/src/main/java/com/linkedin/r2/transport/http/client/HttpClientFactory.java +++ b/r2-netty/src/main/java/com/linkedin/r2/transport/http/client/HttpClientFactory.java @@ -26,6 +26,8 @@ import com.linkedin.r2.filter.CompressionConfig; import com.linkedin.r2.filter.FilterChain; import com.linkedin.r2.filter.FilterChains; +import com.linkedin.r2.filter.TimedRestFilter; +import com.linkedin.r2.filter.TimedStreamFilter; import com.linkedin.r2.filter.compression.ClientCompressionFilter; import com.linkedin.r2.filter.compression.ClientCompressionHelper; import com.linkedin.r2.filter.compression.ClientStreamCompressionFilter; @@ -685,6 +687,11 @@ private HttpClientFactory(FilterChain filters, { _channelPoolManagerFactory = new ConnectionSharingChannelPoolManagerFactory(_channelPoolManagerFactory); } + + _filters.getStreamFilters().stream().filter(TimedStreamFilter.class::isInstance) + .map(TimedStreamFilter.class::cast).forEach(TimedStreamFilter::setShared); + _filters.getRestFilters().stream().filter(TimedRestFilter.class::isInstance) + .map(TimedRestFilter.class::cast).forEach(TimedRestFilter::setShared); } public static class Builder @@ -1120,7 +1127,7 @@ private TransportClient getClient(Map properties, filters = filters.addLastRest(disruptFilter); filters = filters.addLast(disruptFilter); - client = new FilterChainClient(client, filters, _filters); + client = new FilterChainClient(client, filters); client = new FactoryClient(client); synchronized (_mutex) {