Skip to content

Eliminate unnecessary flushes #5943

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2022, 2025 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -129,6 +129,8 @@ private void clientCloseTest(int readTimeout) throws IOException {
inputStream.close();
// trigger sending another 'A' to the stream; it should fail
// (indicating that the streaming has been terminated on the server)
// But only the second flush causes the Exception
assertEquals("OK", sendTarget.request().get().readEntity(String.class));
assertEquals("NOK", sendTarget.request().get().readEntity(String.class));
assertEquals(0, counter.get());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2024, 2025 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -17,6 +17,7 @@
package org.glassfish.jersey.io.spi;

import java.io.Closeable;
import java.io.FilterOutputStream;
import java.io.Flushable;
import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -27,10 +28,9 @@
* That way, {@link #flush()} method is not called twice.
*
* <p>
* Usable by {@link javax.ws.rs.client.ClientRequestContext#setEntityStream(OutputStream)}.
* Usable by {@link javax.ws.rs.container.ContainerResponseContext#setEntityStream(OutputStream)}.
* Usable by {@link jakarta.ws.rs.client.ClientRequestContext#setEntityStream(OutputStream)}.
* Usable by {@link jakarta.ws.rs.container.ContainerResponseContext#setEntityStream(OutputStream)}.
* </p>
*
* <p>
* This marker interface can be useful for the customer OutputStream to know the {@code flush} did not come from
* Jersey before close. By default, when the entity stream is to be closed by Jersey, {@code flush} is called first.
Expand All @@ -52,4 +52,14 @@ public interface FlushedCloseable extends Flushable, Closeable {
* @throws IOException if an I/O error occurs
*/
public void close() throws IOException;


/**
* Determine if the stream {@link OutputStream#flush() flushes} on {@link OutputStream#close()}.
* @param stream the provided {@link OutputStream}
* @return {@code true} if the stream ensures to call {@link OutputStream#flush()} on {@link OutputStream#close()}.
*/
public static boolean flushOnClose(OutputStream stream) {
return FilterOutputStream.class.isInstance(stream) || FlushedCloseable.class.isInstance(stream);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010, 2024 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2010, 2025 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -26,6 +26,7 @@
import org.glassfish.jersey.innate.VirtualThreadSupport;
import org.glassfish.jersey.internal.LocalizationMessages;
import org.glassfish.jersey.internal.guava.Preconditions;
import org.glassfish.jersey.io.spi.FlushedCloseable;

/**
* A committing output stream with optional serialized entity buffering functionality
Expand Down Expand Up @@ -155,6 +156,12 @@ void enableBuffering() {
enableBuffering(DEFAULT_BUFFER_SIZE);
}

/* package */ void flushOnClose() throws IOException {
if (!FlushedCloseable.flushOnClose(adaptedOutput)) {
flush();
}
}

/**
* Determine whether the stream was already committed or not.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2025 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -562,7 +562,9 @@ public void close() {
if (hasEntity()) {
try {
final OutputStream es = getEntityStream();
if (!FlushedCloseable.class.isInstance(es)) {
if (CommittingOutputStream.class.isInstance(es)) {
((CommittingOutputStream) es).flushOnClose();
} else if (!FlushedCloseable.flushOnClose(es)) {
es.flush();
}
es.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public final class ReaderWriter {
public static final int BUFFER_SIZE = getBufferSize();

/**
* Whether {@linkplain BUFFER_SIZE} is to be ignored in favor of JRE's own decision.
* Whether {@linkplain #BUFFER_SIZE} is to be ignored in favor of JRE's own decision.
*/
public static final boolean AUTOSIZE_BUFFER = getAutosizeBuffer();

Expand Down Expand Up @@ -263,9 +263,7 @@ private static byte[] readAllBytes(InputStream inputStream) throws IOException {
* @throws IOException in case of a write failure.
*/
public static void writeToAsString(String s, OutputStream out, MediaType type) throws IOException {
Writer osw = new OutputStreamWriter(out, getCharset(type));
osw.write(s);
osw.flush();
out.write(s.getBytes(getCharset(type)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright (c) 2024, 2025 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/

package org.glassfish.jersey.server;


import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.StreamingOutput;
import org.glassfish.jersey.internal.MapPropertiesDelegate;
import org.glassfish.jersey.io.spi.FlushedCloseable;
import org.glassfish.jersey.message.MessageBodyWorkers;
import org.glassfish.jersey.server.RequestContextBuilder.TestContainerRequest;
import org.glassfish.jersey.server.spi.ContainerResponseWriter;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ContainerResponseWriterNoFlushTest {
private static final String RESPONSE = "RESPONSE";
private static AtomicInteger flushCounter = new AtomicInteger(0);
private static class TestResponseOutputStream extends ByteArrayOutputStream implements FlushedCloseable {
private boolean closed = false;
@Override
public void close() throws IOException {
if (!closed) {
closed = true;
flush();
super.close();
}
}

@Override
public void flush() throws IOException {
flushCounter.incrementAndGet();
}
}

private static class TestContainerWriter implements ContainerResponseWriter {
TestResponseOutputStream outputStream;
private final boolean buffering;

private TestContainerWriter(boolean buffering) {
this.buffering = buffering;
}

@Override
public OutputStream writeResponseStatusAndHeaders(long contentLength, ContainerResponse responseContext)
throws ContainerException {
outputStream = new TestResponseOutputStream();
return outputStream;
}

@Override
public boolean suspend(long timeOut, TimeUnit timeUnit, TimeoutHandler timeoutHandler) {
return false;
}

@Override
public void setSuspendTimeout(long timeOut, TimeUnit timeUnit) throws IllegalStateException {
}

@Override
public void commit() {
}

@Override
public void failure(Throwable error) {
throw new RuntimeException(error);
}

@Override
public boolean enableResponseBuffering() {
return buffering;
}
}

@Path("/test")
public static class StreamResource {

@GET
@Path(value = "/stream")
@Produces(MediaType.TEXT_PLAIN)
public Response stream() {

StreamingOutput stream = output -> {
output.write(RESPONSE.getBytes(StandardCharsets.UTF_8));
};
return Response.ok(stream).build();
}
}

@Test
public void testWriterBuffering() {
TestContainerWriter writer = new TestContainerWriter(true);
testWriter(writer);
}

@Test
public void testWriterNoBuffering() {
TestContainerWriter writer = new TestContainerWriter(false);
testWriter(writer);
}

private void testWriter(TestContainerWriter writer) {
flushCounter.set(0);
RequestContextBuilder rcb = RequestContextBuilder.from("/test/stream", "GET");

TestContainerRequest request = rcb.new TestContainerRequest(
null, URI.create("/test/stream"), "GET", null, new MapPropertiesDelegate()) {
@Override
public void setWorkers(MessageBodyWorkers workers) {
if (workers != null) {
setWriter(writer);
}
super.setWorkers(workers);
}
};

ApplicationHandler applicationHandler = new ApplicationHandler(new ResourceConfig(StreamResource.class));
Future<ContainerResponse> future = applicationHandler.apply(request);
MatcherAssert.assertThat(writer.outputStream.toString(), Matchers.is(RESPONSE));
MatcherAssert.assertThat(flushCounter.get(), Matchers.is(1));
}
}
Loading