From 6639be4778c78daa54b723709e015f87313d340e Mon Sep 17 00:00:00 2001 From: Christoph Koras Date: Sat, 3 Jun 2023 14:11:25 +0200 Subject: [PATCH] Fix grpc status headers --- .../headers/GRPCResponseHeadersFilter.java | 72 ++++++++++++------- 1 file changed, 46 insertions(+), 26 deletions(-) diff --git a/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/headers/GRPCResponseHeadersFilter.java b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/headers/GRPCResponseHeadersFilter.java index 25a45d29ae..0899907076 100644 --- a/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/headers/GRPCResponseHeadersFilter.java +++ b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/headers/GRPCResponseHeadersFilter.java @@ -16,6 +16,7 @@ package org.springframework.cloud.gateway.filter.headers; +import reactor.netty.http.client.HttpClientResponse; import reactor.netty.http.server.HttpServerResponse; import org.springframework.core.Ordered; @@ -26,6 +27,8 @@ import org.springframework.util.StringUtils; import org.springframework.web.server.ServerWebExchange; +import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.CLIENT_RESPONSE_ATTR; + /** * @author Alberto C. RĂ­os */ @@ -37,45 +40,62 @@ public class GRPCResponseHeadersFilter implements HttpHeadersFilter, Ordered { @Override public HttpHeaders filter(HttpHeaders headers, ServerWebExchange exchange) { - ServerHttpResponse response = exchange.getResponse(); - HttpHeaders responseHeaders = response.getHeaders(); if (isGRPC(exchange)) { - String trailerHeaderValue = GRPC_STATUS_HEADER + "," + GRPC_MESSAGE_HEADER; - String originalTrailerHeaderValue = responseHeaders.getFirst(HttpHeaders.TRAILER); - if (originalTrailerHeaderValue != null) { - trailerHeaderValue += "," + originalTrailerHeaderValue; - } - responseHeaders.set(HttpHeaders.TRAILER, trailerHeaderValue); + ServerHttpResponse response = exchange.getResponse(); + HttpHeaders responseHeaders = response.getHeaders(); - while (response instanceof ServerHttpResponseDecorator) { - response = ((ServerHttpResponseDecorator) response).getDelegate(); + if (headers.containsKey(GRPC_STATUS_HEADER)) { + if (!"0".equals(headers.getFirst(GRPC_STATUS_HEADER))) { + response.setComplete(); // avoid empty DATA frame + } } - if (response instanceof AbstractServerHttpResponse) { - String grpcStatus = getGrpcStatus(headers); - String grpcMessage = getGrpcMessage(headers); - ((HttpServerResponse) ((AbstractServerHttpResponse) response).getNativeResponse()).trailerHeaders(h -> { - h.set(GRPC_STATUS_HEADER, grpcStatus); - h.set(GRPC_MESSAGE_HEADER, grpcMessage); + + HttpClientResponse nettyInResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR); + if (nettyInResponse != null) { + nettyInResponse.trailerHeaders().subscribe(entries -> { + if (entries.contains(GRPC_STATUS_HEADER)) { + addTrailingHeader(entries, response, responseHeaders); + } }); } - } + return headers; } - private boolean isGRPC(ServerWebExchange exchange) { - String contentTypeValue = exchange.getRequest().getHeaders().getFirst(HttpHeaders.CONTENT_TYPE); - return StringUtils.startsWithIgnoreCase(contentTypeValue, "application/grpc"); + private void addTrailingHeader(io.netty.handler.codec.http.HttpHeaders sourceHeaders, ServerHttpResponse response, + HttpHeaders responseHeaders) { + String trailerHeaderValue = GRPC_STATUS_HEADER + "," + GRPC_MESSAGE_HEADER; + String originalTrailerHeaderValue = responseHeaders.getFirst(HttpHeaders.TRAILER); + if (originalTrailerHeaderValue != null) { + trailerHeaderValue += "," + originalTrailerHeaderValue; + } + responseHeaders.set(HttpHeaders.TRAILER, trailerHeaderValue); + + HttpServerResponse nettyOutResponse = getNettyResponse(response); + if (nettyOutResponse != null) { + String grpcStatus = sourceHeaders.get(GRPC_STATUS_HEADER, "0"); + String grpcMessage = sourceHeaders.get(GRPC_MESSAGE_HEADER, ""); + nettyOutResponse.trailerHeaders(h -> { + h.set(GRPC_STATUS_HEADER, grpcStatus); + h.set(GRPC_MESSAGE_HEADER, grpcMessage); + }); + } } - private String getGrpcStatus(HttpHeaders headers) { - final String grpcStatusValue = headers.getFirst(GRPC_STATUS_HEADER); - return StringUtils.hasText(grpcStatusValue) ? grpcStatusValue : "0"; + private HttpServerResponse getNettyResponse(ServerHttpResponse response) { + while (response instanceof ServerHttpResponseDecorator) { + response = ((ServerHttpResponseDecorator) response).getDelegate(); + } + if (response instanceof AbstractServerHttpResponse) { + return ((AbstractServerHttpResponse) response).getNativeResponse(); + } + return null; } - private String getGrpcMessage(HttpHeaders headers) { - final String grpcStatusValue = headers.getFirst(GRPC_MESSAGE_HEADER); - return StringUtils.hasText(grpcStatusValue) ? grpcStatusValue : ""; + private boolean isGRPC(ServerWebExchange exchange) { + String contentTypeValue = exchange.getRequest().getHeaders().getFirst(HttpHeaders.CONTENT_TYPE); + return StringUtils.startsWithIgnoreCase(contentTypeValue, "application/grpc"); } @Override