Using JDK Http Client - 2
Working with SSE (server sent event)
JDK Http client did not come with first class support for SSE, but we can achieve that easily by implementing the listener callback by ourself with HttpResponse.BodyHandler
. This implementation is POC only, it didn't cater about some edge cases and performance.
// define the SSE callback interface
interface SseListener {
void onMessage(String data, String eventName, String id);
void onOpen(int status);
void onClose();
void onError();
}
// implement that with the HttpResponse.BodyHandler<Void>
class SseBodyHandler implements HttpResponse.BodyHandler<Void> {
private final SseListener listener;
SseBodyHandler(SseListener listener) {
this.listener = listener;
}
@Override
public HttpResponse.BodySubscriber<Void> apply(HttpResponse.ResponseInfo responseInfo) {
listener.onOpen(responseInfo.statusCode());
return HttpResponse.BodySubscribers.fromSubscriber(new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(List<ByteBuffer> items) {
for (ByteBuffer item : items) {
String decode = StandardCharsets.UTF_8.decode(item).toString();
if (decode.endsWith("\n\n")) {
decode = decode.substring(0, decode.length() - 2);
}
String[] lines = decode.split("\n");
String id = null;
String event = null;
StringBuilder dataBuilder = new StringBuilder();
for (String line : lines) {
if (line.startsWith("id: ")) {
id = line.substring(4);
} else if (line.startsWith("event: ")) {
event = line.substring(7);
} else if (line.startsWith("data: ")) {
dataBuilder.append(line.substring(6));
}
}
listener.onMessage(dataBuilder.toString(), event, id);
}
try {
Thread.sleep(1L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
listener.onError();
}
@Override
public void onComplete() {
listener.onClose();
}
});
}
}
And then we can use it in this way
@Test
void testSse() throws InterruptedException {
MuServer server = MuServerBuilder.httpsServer()
.addHandler(Method.GET, "/something", (request, response, map) -> {
SsePublisher publisher = SsePublisher.start(request, response);
new Thread(() -> {
for (int i = 0; i < 3; i++) {
try {
publisher.send("Number" + i, "eventname", String.valueOf(i));
Thread.sleep(100);
} catch (Exception e) {
// The user has probably disconnected so stopping
break;
}
}
publisher.close();
}).start();
})
.start();
HttpRequest request = HttpRequest.newBuilder()
.uri(server.uri().resolve("/something"))
.header("user-agent", "jdk-httpclient")
.build();
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger status = new AtomicInteger(0);
List<String> messages = new ArrayList<>();
HttpResponse.BodyHandler<Void> bodyHandler = new SseBodyHandler(new SseListener() {
@Override
public void onMessage(String data, String eventName, String id) {
messages.add(String.join("_", data, eventName, id));
}
@Override
public void onOpen(int statusCode) {
status.set(statusCode);
}
@Override
public void onClose() {
latch.countDown();
}
@Override
public void onError() {
latch.countDown();
}
});
client.sendAsync(request, bodyHandler);
latch.await(1, TimeUnit.MINUTES);
assertThat(status.get()).isEqualTo(200);
assertThat(messages).isEqualTo(List.of(
"Number0_eventname_0",
"Number1_eventname_1",
"Number2_eventname_2"
));
}
Working with websocket
Basically what we need is to construct our WebSocket.Listener
and use it as param on sending websocket requests. Again, do remember to call webSocket.request(1)
in the listener, otherwise it will get hang.
@Test
void testWebsocket() throws InterruptedException {
MuServer server = MuServerBuilder.httpsServer()
.addHandler(
WebSocketHandlerBuilder.webSocketHandler()
.withPath("/echo-socket")
.withWebSocketFactory((request, responseHeaders) -> new BaseWebSocket() {
@Override
public void onText(String message, boolean isLast, DoneCallback onComplete) {
session().sendText("Received " + message, onComplete);
}
})
)
.start();
CountDownLatch latch = new CountDownLatch(1);
List<String> received = new ArrayList<>();
// construct websocket listener
WebSocket.Listener listener = new WebSocket.Listener() {
@Override
public void onOpen(WebSocket webSocket) {
for (int i = 0; i < 5; i++) {
webSocket.sendText("message " + i, true);
}
webSocket.request(1);
webSocket.sendClose(1000, "normal close");
}
@Override
public CompletionStage<?> onText(
WebSocket webSocket,
CharSequence data,
boolean last) {
received.add(data.toString());
webSocket.request(1);
return null;
}
@Override
public CompletionStage<?> onClose(
WebSocket webSocket,
int statusCode,
String reason) {
latch.countDown();
return null;
}
};
// use the listener as callback for receiving message and sending message
URI resolve = URI.create(server.uri().toString().replace("http", "ws"))
.resolve("/echo-socket");
System.out.println(resolve);
client.newWebSocketBuilder()
.connectTimeout(Duration.ofMillis(5000))
.buildAsync(resolve, listener);
latch.await(1, TimeUnit.MINUTES);
assertThat(received).isEqualTo(List.of(
"Received message 0",
"Received message 1",
"Received message 2",
"Received message 3",
"Received message 4"
));
}
Summary
This is a quick guide for using JDK http client working with SSE and websocket. Please feel free to read my previous article for sending simple GET/POST request, and also some complex case for streaming request body, stream receiving response body.
Reference
https://docs.oracle.com/en/java/javase/17/docs/api/java.net.http/java/net/http/HttpClient.html