Skip to content

Using JDK Http Client - 2

Working with SSE (server sent event)

JDK Http client did not come with first class support for SSE, but we can achieve that easily by implementing the listener callback by ourself with HttpResponse.BodyHandler. This implementation is POC only, it didn't cater about some edge cases and performance.

java

// define the SSE callback interface
interface SseListener {
    void onMessage(String data, String eventName, String id);

    void onOpen(int status);

    void onClose();

    void onError();
}

// implement that with the HttpResponse.BodyHandler<Void>
class SseBodyHandler implements HttpResponse.BodyHandler<Void> {

    private final SseListener listener;

    SseBodyHandler(SseListener listener) {
        this.listener = listener;
    }

    @Override
    public HttpResponse.BodySubscriber<Void> apply(HttpResponse.ResponseInfo responseInfo) {
        listener.onOpen(responseInfo.statusCode());
        return HttpResponse.BodySubscribers.fromSubscriber(new Flow.Subscriber<>() {
            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription = subscription;
                this.subscription.request(1);
            }

            @Override
            public void onNext(List<ByteBuffer> items) {
                for (ByteBuffer item : items) {
                    String decode = StandardCharsets.UTF_8.decode(item).toString();
                    if (decode.endsWith("\n\n")) {
                        decode = decode.substring(0, decode.length() - 2);
                    }

                    String[] lines = decode.split("\n");
                    String id = null;
                    String event = null;
                    StringBuilder dataBuilder = new StringBuilder();
                    for (String line : lines) {
                        if (line.startsWith("id: ")) {
                            id = line.substring(4);
                        } else if (line.startsWith("event: ")) {
                            event = line.substring(7);
                        } else if (line.startsWith("data: ")) {
                            dataBuilder.append(line.substring(6));
                        }
                    }
                    listener.onMessage(dataBuilder.toString(), event, id);
                }
                try {
                    Thread.sleep(1L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                this.subscription.request(1);
            }

            @Override
            public void onError(Throwable throwable) {
                listener.onError();
            }

            @Override
            public void onComplete() {
                listener.onClose();
            }
        });

    }
}

And then we can use it in this way

java
@Test
void testSse() throws InterruptedException {

    MuServer server = MuServerBuilder.httpsServer()
            .addHandler(Method.GET, "/something", (request, response, map) -> {
                SsePublisher publisher = SsePublisher.start(request, response);
                new Thread(() -> {
                    for (int i = 0; i < 3; i++) {
                        try {
                            publisher.send("Number" + i, "eventname", String.valueOf(i));
                            Thread.sleep(100);
                        } catch (Exception e) {
                            // The user has probably disconnected so stopping
                            break;
                        }
                    }
                    publisher.close();
                }).start();
            })
            .start();

    HttpRequest request = HttpRequest.newBuilder()
            .uri(server.uri().resolve("/something"))
            .header("user-agent", "jdk-httpclient")
            .build();

    CountDownLatch latch = new CountDownLatch(1);
    AtomicInteger status = new AtomicInteger(0);
    List<String> messages = new ArrayList<>();

    HttpResponse.BodyHandler<Void> bodyHandler = new SseBodyHandler(new SseListener() {
        @Override
        public void onMessage(String data, String eventName, String id) {
            messages.add(String.join("_", data, eventName, id));
        }

        @Override
        public void onOpen(int statusCode) {
            status.set(statusCode);
        }

        @Override
        public void onClose() {
            latch.countDown();
        }

        @Override
        public void onError() {
            latch.countDown();
        }
    });

    client.sendAsync(request, bodyHandler);

    latch.await(1, TimeUnit.MINUTES);
    assertThat(status.get()).isEqualTo(200);
    assertThat(messages).isEqualTo(List.of(
            "Number0_eventname_0",
            "Number1_eventname_1",
            "Number2_eventname_2"
    ));
}

Working with websocket

Basically what we need is to construct our WebSocket.Listener and use it as param on sending websocket requests. Again, do remember to call webSocket.request(1) in the listener, otherwise it will get hang.

java
@Test
void testWebsocket() throws InterruptedException {

    MuServer server = MuServerBuilder.httpsServer()
        .addHandler(
            WebSocketHandlerBuilder.webSocketHandler()
                .withPath("/echo-socket")
                .withWebSocketFactory((request, responseHeaders) -> new BaseWebSocket() {
                    @Override
                    public void onText(String message, boolean isLast, DoneCallback onComplete) {
                        session().sendText("Received " + message, onComplete);
                    }
                })
        )
        .start();

    CountDownLatch latch = new CountDownLatch(1);
    List<String> received = new ArrayList<>();

    // construct websocket listener
    WebSocket.Listener listener = new WebSocket.Listener() {
        @Override
        public void onOpen(WebSocket webSocket) {
            for (int i = 0; i < 5; i++) {
                webSocket.sendText("message " + i, true);
            }
            webSocket.request(1);
            webSocket.sendClose(1000, "normal close");
        }

        @Override
        public CompletionStage<?> onText(
                WebSocket webSocket, 
                CharSequence data, 
                boolean last) {
            received.add(data.toString());
            webSocket.request(1);
            return null;
        }

        @Override
        public CompletionStage<?> onClose(
                WebSocket webSocket, 
                int statusCode, 
                String reason) {
            latch.countDown();
            return null;
        }
    };

    // use the listener as callback for receiving message and sending message
    URI resolve = URI.create(server.uri().toString().replace("http", "ws"))
            .resolve("/echo-socket");
    System.out.println(resolve);
    client.newWebSocketBuilder()
            .connectTimeout(Duration.ofMillis(5000))
            .buildAsync(resolve, listener);

    latch.await(1, TimeUnit.MINUTES);
    assertThat(received).isEqualTo(List.of(
            "Received message 0",
            "Received message 1",
            "Received message 2",
            "Received message 3",
            "Received message 4"
    ));
}

Summary

This is a quick guide for using JDK http client working with SSE and websocket. Please feel free to read my previous article for sending simple GET/POST request, and also some complex case for streaming request body, stream receiving response body.

Reference

https://docs.oracle.com/en/java/javase/17/docs/api/java.net.http/java/net/http/HttpClient.html