Skip to content

Commit

Permalink
Simplify HttpTrafficHandler#write (#3376)
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg authored Jul 29, 2024
1 parent aba9075 commit 18c6bb7
Showing 1 changed file with 133 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.DecoderResultProvider;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
Expand Down Expand Up @@ -56,6 +60,7 @@
import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
import static io.netty.handler.codec.http.HttpUtil.isTransferEncodingChunked;
import static io.netty.handler.codec.http.HttpUtil.setKeepAlive;
import static io.netty.handler.codec.http.LastHttpContent.EMPTY_LAST_CONTENT;
import static reactor.netty.ReactorNetty.format;

/**
Expand Down Expand Up @@ -360,8 +365,28 @@ void doPipeline(ChannelHandlerContext ctx, Object msg) {
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
Class<?> msgClass = msg.getClass();
// modify message on way out to add headers if needed
if (msg instanceof HttpResponse) {
if (msgClass == DefaultHttpResponse.class) {
handleDefaultHttpResponse((DefaultHttpResponse) msg, promise);
return;
}
else if (msgClass == DefaultFullHttpResponse.class) {
if (handleDefaultFullHttpResponse((DefaultFullHttpResponse) msg, promise)) {
return;
}
handleLastHttpContent(msg, promise);
return;
}
else if (msg == EMPTY_LAST_CONTENT || msgClass == DefaultLastHttpContent.class) {
handleLastHttpContent(msg, promise);
return;
}
else if (msgClass == DefaultHttpContent.class) {
handleDefaultHttContent((DefaultHttpContent) msg, promise);
return;
}
else if (msg instanceof HttpResponse) {
final HttpResponse response = (HttpResponse) msg;
nonInformationalResponse = !isInformational(response);
// Assume the response writer knows if they can persist or not and sets isKeepAlive on the response
Expand All @@ -383,52 +408,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
}
if (msg instanceof LastHttpContent) {
finalizingResponse = true;

if (LAST_FLUSH_WHEN_NO_READ) {
needsFlush = !read;
}

if (!shouldKeepAlive()) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(ctx.channel(), "Detected non persistent http " +
"connection, preparing to close. Pending responses count: {}"),
pendingResponses);
}
ctx.write(msg, promise.unvoid())
.addListener(ChannelFutureListener.CLOSE);
return;
}

ctx.write(msg, promise);

if (!persistentConnection) {
return;
}

if (nonInformationalResponse) {
nonInformationalResponse = false;
pendingResponses -= 1;
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(ctx.channel(), "Decreasing pending responses count: {}"),
pendingResponses);
}
}

if (pipelined != null && !pipelined.isEmpty()) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(ctx.channel(), "Draining next pipelined " +
"HTTP request, pending responses count: {}, queued: {}"),
pendingResponses, pipelined.size());
}
ctx.executor()
.execute(this);
}
else {
IdleTimeoutHandler.addIdleTimeoutHandler(ctx.pipeline(), idleTimeout);

ctx.read();
}
handleLastHttpContent(msg, promise);
return;
}
if (persistentConnection && pendingResponses == 0) {
Expand All @@ -446,6 +426,112 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
ctx.write(msg, promise);
}

@SuppressWarnings("FutureReturnValueIgnored")
boolean handleDefaultFullHttpResponse(DefaultFullHttpResponse response, ChannelPromise promise) {
nonInformationalResponse = !isInformational(response);
// Assume the response writer knows if they can persist or not and sets isKeepAlive on the response
boolean maxKeepAliveRequestsReached = maxKeepAliveRequests != -1 && HttpServerOperations.requestsCounter(ctx.channel()) == maxKeepAliveRequests;
if (maxKeepAliveRequestsReached || !isKeepAlive(response) || !isSelfDefinedMessageLength(response)) {
// No longer keep alive as the client can't tell when the message is done unless we close connection
pendingResponses = 0;
persistentConnection = false;
}
// Server might think it can keep connection alive, but we should fix response header if we know better
if (!shouldKeepAlive()) {
setKeepAlive(response, false);
}

if (response.status().equals(HttpResponseStatus.CONTINUE)) {
//"FutureReturnValueIgnored" this is deliberate
ctx.write(response, promise);
return true;
}
return false;
}

@SuppressWarnings("FutureReturnValueIgnored")
void handleDefaultHttContent(DefaultHttpContent msg, ChannelPromise promise) {
if (persistentConnection && pendingResponses == 0) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(
format(ctx.channel(), "Dropped HTTP content, since response has been sent already: {}"),
httpMessageLogFactory.debug(HttpMessageArgProviderFactory.create(msg)));
}
msg.release();
promise.setSuccess();
return;
}
//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);
}

@SuppressWarnings("FutureReturnValueIgnored")
void handleDefaultHttpResponse(DefaultHttpResponse response, ChannelPromise promise) {
nonInformationalResponse = !isInformational(response);
// Assume the response writer knows if they can persist or not and sets isKeepAlive on the response
boolean maxKeepAliveRequestsReached = maxKeepAliveRequests != -1 && HttpServerOperations.requestsCounter(ctx.channel()) == maxKeepAliveRequests;
if (maxKeepAliveRequestsReached || !isKeepAlive(response) || !isSelfDefinedMessageLength(response)) {
// No longer keep alive as the client can't tell when the message is done unless we close connection
pendingResponses = 0;
persistentConnection = false;
}
// Server might think it can keep connection alive, but we should fix response header if we know better
if (!shouldKeepAlive()) {
setKeepAlive(response, false);
}
//"FutureReturnValueIgnored" this is deliberate
ctx.write(response, promise);
}

@SuppressWarnings("FutureReturnValueIgnored")
void handleLastHttpContent(Object msg, ChannelPromise promise) {
finalizingResponse = true;

if (LAST_FLUSH_WHEN_NO_READ) {
needsFlush = !read;
}

if (!shouldKeepAlive()) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(ctx.channel(), "Detected non persistent http " +
"connection, preparing to close. Pending responses count: {}"),
pendingResponses);
}
//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise.unvoid()).addListener(ChannelFutureListener.CLOSE);
return;
}

//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);

if (!persistentConnection) {
return;
}

if (nonInformationalResponse) {
nonInformationalResponse = false;
pendingResponses -= 1;
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(ctx.channel(), "Decreasing pending responses count: {}"),
pendingResponses);
}
}

if (pipelined != null && !pipelined.isEmpty()) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(ctx.channel(), "Draining next pipelined " +
"HTTP request, pending responses count: {}, queued: {}"),
pendingResponses, pipelined.size());
}
ctx.executor().execute(this);
}
else {
IdleTimeoutHandler.addIdleTimeoutHandler(ctx.pipeline(), idleTimeout);
ctx.read();
}
}

@Override
public void run() {
Object next;
Expand Down

0 comments on commit 18c6bb7

Please sign in to comment.