Skip to content

Commit 8be791c

Browse files
committed
Add reactive WebSocketClient and RxNetty implementation
Issue: SPR-14527
1 parent bcf6f6e commit 8be791c

File tree

4 files changed

+192
-22
lines changed

4 files changed

+192
-22
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright 2002-2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.web.reactive.socket.client;
17+
18+
import java.net.URI;
19+
import java.security.NoSuchAlgorithmException;
20+
import java.util.function.Function;
21+
22+
import javax.net.ssl.SSLContext;
23+
import javax.net.ssl.SSLEngine;
24+
25+
import io.netty.buffer.ByteBuf;
26+
import io.netty.buffer.ByteBufAllocator;
27+
import io.reactivex.netty.protocol.http.client.HttpClient;
28+
import io.reactivex.netty.protocol.http.ws.WebSocketConnection;
29+
import io.reactivex.netty.protocol.http.ws.client.WebSocketRequest;
30+
import reactor.core.publisher.Mono;
31+
import reactor.util.function.Tuples;
32+
import rx.Observable;
33+
import rx.RxReactiveStreams;
34+
35+
import org.springframework.core.io.buffer.NettyDataBufferFactory;
36+
import org.springframework.http.HttpHeaders;
37+
import org.springframework.web.reactive.socket.WebSocketSession;
38+
import org.springframework.web.reactive.socket.adapter.HandshakeInfo;
39+
import org.springframework.web.reactive.socket.adapter.RxNettyWebSocketSession;
40+
41+
/**
42+
* A {@link WebSocketClient} based on RxNetty.
43+
*
44+
* @author Rossen Stoyanchev
45+
* @since 5.0
46+
*/
47+
public class RxNettyWebSocketClient implements WebSocketClient {
48+
49+
private final Function<URI, HttpClient<ByteBuf, ByteBuf>> httpClientFactory;
50+
51+
52+
/**
53+
* Default constructor that uses {@link HttpClient#newClient(String, int)}
54+
* to create HTTP client instances when connecting.
55+
*/
56+
public RxNettyWebSocketClient() {
57+
this(RxNettyWebSocketClient::createDefaultHttpClient);
58+
}
59+
60+
/**
61+
* Constructor with a function to create {@link HttpClient} instances.
62+
* @param httpClientFactory factory to create clients
63+
*/
64+
public RxNettyWebSocketClient(Function<URI, HttpClient<ByteBuf, ByteBuf>> httpClientFactory) {
65+
this.httpClientFactory = httpClientFactory;
66+
}
67+
68+
private static HttpClient<ByteBuf, ByteBuf> createDefaultHttpClient(URI url) {
69+
boolean secure = "wss".equals(url.getScheme());
70+
int port = url.getPort() > 0 ? url.getPort() : secure ? 443 : 80;
71+
HttpClient<ByteBuf, ByteBuf> httpClient = HttpClient.newClient(url.getHost(), port);
72+
if (secure) {
73+
try {
74+
SSLContext context = SSLContext.getDefault();
75+
SSLEngine engine = context.createSSLEngine(url.getHost(), port);
76+
engine.setUseClientMode(true);
77+
httpClient.secure(engine);
78+
}
79+
catch (NoSuchAlgorithmException ex) {
80+
throw new IllegalStateException("Failed to create HttpClient for " + url, ex);
81+
}
82+
}
83+
return httpClient;
84+
}
85+
86+
87+
@Override
88+
public Mono<WebSocketSession> connect(URI url) {
89+
return connect(url, new HttpHeaders());
90+
}
91+
92+
@Override
93+
public Mono<WebSocketSession> connect(URI url, HttpHeaders headers) {
94+
HandshakeInfo info = new HandshakeInfo(url, headers, Mono.empty());
95+
Observable<WebSocketSession> observable = connectInternal(info);
96+
return Mono.from(RxReactiveStreams.toPublisher(observable));
97+
}
98+
99+
private Observable<WebSocketSession> connectInternal(HandshakeInfo info) {
100+
return createWebSocketRequest(info.getUri())
101+
.flatMap(response -> {
102+
ByteBufAllocator allocator = response.unsafeNettyChannel().alloc();
103+
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(allocator);
104+
Observable<WebSocketConnection> conn = response.getWebSocketConnection();
105+
return Observable.zip(conn, Observable.just(bufferFactory), Tuples::of);
106+
})
107+
.map(tuple -> {
108+
WebSocketConnection conn = tuple.getT1();
109+
NettyDataBufferFactory bufferFactory = tuple.getT2();
110+
return new RxNettyWebSocketSession(conn, info, bufferFactory);
111+
});
112+
}
113+
114+
private WebSocketRequest<ByteBuf> createWebSocketRequest(URI url) {
115+
String query = url.getRawQuery();
116+
return this.httpClientFactory.apply(url)
117+
.createGet(url.getRawPath() + (query != null ? "?" + query : ""))
118+
.requestWebSocketUpgrade();
119+
}
120+
121+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2002-2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.web.reactive.socket.client;
17+
18+
import java.net.URI;
19+
20+
import reactor.core.publisher.Mono;
21+
22+
import org.springframework.http.HttpHeaders;
23+
import org.springframework.web.reactive.socket.WebSocketSession;
24+
25+
/**
26+
* Contract for starting a WebSocket interaction.
27+
*
28+
* @author Rossen Stoyanchev
29+
* @since 5.0
30+
*/
31+
public interface WebSocketClient {
32+
33+
/**
34+
* Start a WebSocket interaction to the given url.
35+
* @param url the handshake url
36+
* @return the session for the WebSocket interaction
37+
*/
38+
Mono<WebSocketSession> connect(URI url);
39+
40+
/**
41+
* Start a WebSocket interaction to the given url.
42+
* @param url the handshake url
43+
* @param headers headers for the handshake request
44+
* @return the session for the WebSocket interaction
45+
*/
46+
Mono<WebSocketSession> connect(URI url, HttpHeaders headers);
47+
48+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/**
2+
* Client support for WebSocket interactions.
3+
*/
4+
package org.springframework.web.reactive.socket.client;
Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,22 @@
1515
*/
1616
package org.springframework.web.reactive.socket.server;
1717

18-
import java.nio.charset.StandardCharsets;
18+
import java.net.URI;
1919
import java.util.HashMap;
2020
import java.util.Map;
2121

22-
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
23-
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
24-
import io.reactivex.netty.protocol.http.client.HttpClient;
25-
import io.reactivex.netty.protocol.http.ws.client.WebSocketResponse;
2622
import org.junit.Test;
23+
import reactor.core.publisher.Flux;
2724
import reactor.core.publisher.Mono;
28-
import rx.Observable;
2925

3026
import org.springframework.context.annotation.Bean;
3127
import org.springframework.context.annotation.Configuration;
28+
import org.springframework.core.io.buffer.DataBufferUtils;
3229
import org.springframework.web.reactive.HandlerMapping;
3330
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
3431
import org.springframework.web.reactive.socket.WebSocketHandler;
3532
import org.springframework.web.reactive.socket.WebSocketSession;
33+
import org.springframework.web.reactive.socket.client.RxNettyWebSocketClient;
3634

3735
import static org.junit.Assert.assertEquals;
3836

@@ -42,7 +40,7 @@
4240
* @author Rossen Stoyanchev
4341
*/
4442
@SuppressWarnings({"unused", "WeakerAccess"})
45-
public class ServerWebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
43+
public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
4644

4745

4846
@Override
@@ -54,21 +52,20 @@ protected Class<?> getWebConfigClass() {
5452
@Test
5553
public void echo() throws Exception {
5654
int count = 100;
57-
Observable<String> input = Observable.range(1, count).map(index -> "msg-" + index);
58-
Observable<String> output = HttpClient.newClient("localhost", this.port)
59-
.createGet("/echo")
60-
.requestWebSocketUpgrade()
61-
.flatMap(WebSocketResponse::getWebSocketConnection)
62-
.flatMap(conn -> conn
63-
.write(input.map(TextWebSocketFrame::new)).cast(WebSocketFrame.class)
64-
.mergeWith(conn.getInput())
65-
.take(count)
66-
.map(frame -> {
67-
String text = frame.content().toString(StandardCharsets.UTF_8);
68-
frame.release();
69-
return text;
70-
}));
71-
assertEquals(input.toList().toBlocking().first(), output.toList().toBlocking().first());
55+
Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
56+
Flux<String> output = new RxNettyWebSocketClient()
57+
.connect(new URI("ws://localhost:" + this.port + "/echo"))
58+
.flatMap(session -> session
59+
.send(input.map(session::textMessage))
60+
.thenMany(session.receive()
61+
.take(count)
62+
.map(message -> {
63+
String text = message.getPayloadAsText();
64+
DataBufferUtils.release(message.getPayload());
65+
return text;
66+
})
67+
));
68+
assertEquals(input.collectList().blockMillis(5000), output.collectList().blockMillis(5000));
7269
}
7370

7471

0 commit comments

Comments
 (0)