Skip to content

Commit 17bf53e

Browse files
UNDERTOW-224 Provide a means to get "peer" connections to a WebSocketChannel
1 parent f13f5a8 commit 17bf53e

File tree

16 files changed

+106
-50
lines changed

16 files changed

+106
-50
lines changed

core/src/main/java/io/undertow/websockets/WebSocketProtocolHandshakeHandler.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@
3333
import org.xnio.StreamConnection;
3434

3535
import java.util.Collection;
36+
import java.util.Collections;
3637
import java.util.HashSet;
3738
import java.util.Set;
39+
import java.util.concurrent.ConcurrentHashMap;
3840

3941
/**
4042
* {@link HttpHandler} which will process the {@link HttpServerExchange} and do the actual handshake/upgrade
@@ -52,6 +54,8 @@ public class WebSocketProtocolHandshakeHandler implements HttpHandler {
5254

5355
private final WebSocketConnectionCallback callback;
5456

57+
private final Set<WebSocketChannel> peerConnections = Collections.newSetFromMap(new ConcurrentHashMap<WebSocketChannel, Boolean>());
58+
5559
/**
5660
* The handler that is invoked if there are no web socket headers
5761
*/
@@ -169,7 +173,7 @@ public void handleRequest(final HttpServerExchange exchange) throws Exception {
169173
next.handleRequest(exchange);
170174
return;
171175
}
172-
final AsyncWebSocketHttpServerExchange facade = new AsyncWebSocketHttpServerExchange(exchange);
176+
final AsyncWebSocketHttpServerExchange facade = new AsyncWebSocketHttpServerExchange(exchange, peerConnections);
173177
Handshake handshaker = null;
174178
for (Handshake method : handshakes) {
175179
if (method.matches(facade)) {
@@ -187,6 +191,7 @@ public void handleRequest(final HttpServerExchange exchange) throws Exception {
187191
@Override
188192
public void handleUpgrade(StreamConnection streamConnection, HttpServerExchange exchange) {
189193
WebSocketChannel channel = selected.createChannel(facade, streamConnection, facade.getBufferPool());
194+
peerConnections.add(channel);
190195
callback.onConnect(facade, channel);
191196
}
192197
});
@@ -196,4 +201,8 @@ public void handleUpgrade(StreamConnection streamConnection, HttpServerExchange
196201
handshaker.handshake(facade);
197202
}
198203
}
204+
205+
public Set<WebSocketChannel> getPeerConnections() {
206+
return peerConnections;
207+
}
199208
}

core/src/main/java/io/undertow/websockets/client/WebSocket13ClientHandshake.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.security.SecureRandom;
3939
import java.util.Collections;
4040
import java.util.HashMap;
41+
import java.util.HashSet;
4142
import java.util.Iterator;
4243
import java.util.List;
4344
import java.util.Locale;
@@ -63,7 +64,7 @@ public WebSocket13ClientHandshake(final URI url) {
6364

6465
@Override
6566
public WebSocketChannel createChannel(final StreamConnection channel, final String wsUri, final Pool<ByteBuffer> bufferPool) {
66-
return new WebSocket13Channel(channel, bufferPool, wsUri, negotiation != null ? negotiation.getSelectedSubProtocol() : "", true, false);
67+
return new WebSocket13Channel(channel, bufferPool, wsUri, negotiation != null ? negotiation.getSelectedSubProtocol() : "", true, false, new HashSet<WebSocketChannel>());
6768
}
6869

6970

core/src/main/java/io/undertow/websockets/core/WebSocketChannel.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.undertow.server.protocol.framed.AbstractFramedChannel;
2121
import io.undertow.server.protocol.framed.FrameHeaderData;
2222
import org.xnio.ChannelExceptionHandler;
23+
import org.xnio.ChannelListener;
2324
import org.xnio.ChannelListeners;
2425
import org.xnio.IoUtils;
2526
import org.xnio.Pool;
@@ -62,24 +63,37 @@ public abstract class WebSocketChannel extends AbstractFramedChannel<WebSocketCh
6263

6364
protected StreamSourceFrameChannel fragmentedChannel;
6465

66+
/**
67+
* Represents all web socket channels that are attached to the same endpoint.
68+
*/
69+
private final Set<WebSocketChannel> peerConnections;
70+
6571
/**
6672
* Create a new {@link WebSocketChannel}
6773
* 8
6874
*
6975
* @param connectedStreamChannel The {@link org.xnio.channels.ConnectedStreamChannel} over which the WebSocket Frames should get send and received.
7076
* Be aware that it already must be "upgraded".
7177
* @param bufferPool The {@link org.xnio.Pool} which will be used to acquire {@link java.nio.ByteBuffer}'s from.
72-
* @param version The {@link io.undertow.websockets.core.WebSocketVersion} of the {@link io.undertow.websockets.core.WebSocketChannel}
78+
* @param version The {@link WebSocketVersion} of the {@link WebSocketChannel}
7379
* @param wsUrl The url for which the channel was created.
7480
* @param client
81+
* @param peerConnections The concurrent set that is used to track open connections associtated with an endpoint
7582
*/
76-
protected WebSocketChannel(final StreamConnection connectedStreamChannel, Pool<ByteBuffer> bufferPool, WebSocketVersion version, String wsUrl, String subProtocol, final boolean client, boolean extensionsSupported) {
83+
protected WebSocketChannel(final StreamConnection connectedStreamChannel, Pool<ByteBuffer> bufferPool, WebSocketVersion version, String wsUrl, String subProtocol, final boolean client, boolean extensionsSupported, Set<WebSocketChannel> peerConnections) {
7784
super(connectedStreamChannel, bufferPool, new WebSocketFramePriority(), null);
7885
this.client = client;
7986
this.version = version;
8087
this.wsUrl = wsUrl;
8188
this.extensionsSupported = extensionsSupported;
8289
this.subProtocol = subProtocol;
90+
this.peerConnections = peerConnections;
91+
addCloseTask(new ChannelListener<WebSocketChannel>() {
92+
@Override
93+
public void handleEvent(WebSocketChannel channel) {
94+
WebSocketChannel.this.peerConnections.remove(WebSocketChannel.this);
95+
}
96+
});
8397
}
8498

8599
@Override
@@ -343,6 +357,15 @@ protected WebSocketFramePriority getFramePriority() {
343357
return (WebSocketFramePriority) super.getFramePriority();
344358
}
345359

360+
/**
361+
* Returns all 'peer' web socket connections that were created from the same endpoint.
362+
*
363+
*
364+
* @return all 'peer' web socket connections
365+
*/
366+
public Set<WebSocketChannel> getPeerConnections() {
367+
return Collections.unmodifiableSet(peerConnections);
368+
}
346369

347370
/**
348371
* Interface that represents a frame channel that is in the process of being created

core/src/main/java/io/undertow/websockets/core/protocol/version07/Hybi07Handshake.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,6 @@ protected final String solve(final String nonceBase64) throws NoSuchAlgorithmExc
100100

101101
@Override
102102
public WebSocketChannel createChannel(WebSocketHttpExchange exchange, final StreamConnection channel, final Pool<ByteBuffer> pool) {
103-
return new WebSocket07Channel(channel, pool, getWebSocketLocation(exchange), exchange.getResponseHeader(Headers.SEC_WEB_SOCKET_PROTOCOL_STRING), false, allowExtensions);
103+
return new WebSocket07Channel(channel, pool, getWebSocketLocation(exchange), exchange.getResponseHeader(Headers.SEC_WEB_SOCKET_PROTOCOL_STRING), false, allowExtensions, exchange.getPeerConnections());
104104
}
105105
}

core/src/main/java/io/undertow/websockets/core/protocol/version07/WebSocket07Channel.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.xnio.StreamConnection;
3434

3535
import java.nio.ByteBuffer;
36+
import java.util.Set;
3637

3738

3839
/**
@@ -83,8 +84,8 @@ private enum State {
8384
* @param wsUrl The url for which the {@link WebSocket07Channel} was created.
8485
*/
8586
public WebSocket07Channel(StreamConnection channel, Pool<ByteBuffer> bufferPool,
86-
String wsUrl, String subProtocol, final boolean client, boolean allowExtensions) {
87-
super(channel, bufferPool, WebSocketVersion.V08, wsUrl, subProtocol, client, allowExtensions);
87+
String wsUrl, String subProtocol, final boolean client, boolean allowExtensions, Set<WebSocketChannel> openConnections) {
88+
super(channel, bufferPool, WebSocketVersion.V08, wsUrl, subProtocol, client, allowExtensions, openConnections);
8889
}
8990

9091
@Override

core/src/main/java/io/undertow/websockets/core/protocol/version08/Hybi08Handshake.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public Hybi08Handshake(Set<String> subprotocols, boolean allowExtensions) {
4747

4848
@Override
4949
public WebSocketChannel createChannel(final WebSocketHttpExchange exchange, final StreamConnection channel, final Pool<ByteBuffer> pool) {
50-
return new WebSocket08Channel(channel, pool, getWebSocketLocation(exchange), exchange.getResponseHeader(Headers.SEC_WEB_SOCKET_PROTOCOL_STRING), false, allowExtensions);
50+
return new WebSocket08Channel(channel, pool, getWebSocketLocation(exchange), exchange.getResponseHeader(Headers.SEC_WEB_SOCKET_PROTOCOL_STRING), false, allowExtensions, exchange.getPeerConnections());
5151

5252
}
5353
}

core/src/main/java/io/undertow/websockets/core/protocol/version08/WebSocket08Channel.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717
*/
1818
package io.undertow.websockets.core.protocol.version08;
1919

20+
import io.undertow.websockets.core.WebSocketChannel;
2021
import io.undertow.websockets.core.WebSocketVersion;
2122
import io.undertow.websockets.core.protocol.version07.WebSocket07Channel;
2223
import org.xnio.Pool;
2324
import org.xnio.StreamConnection;
2425

2526
import java.nio.ByteBuffer;
27+
import java.util.Set;
2628

2729

2830
/**
@@ -31,8 +33,8 @@
3133
* @author <a href="mailto:[email protected]">Norman Maurer</a>
3234
*/
3335
public class WebSocket08Channel extends WebSocket07Channel {
34-
public WebSocket08Channel(StreamConnection channel, Pool<ByteBuffer> bufferPool, String wsUrl, String subProtocols, final boolean client, boolean allowExtensions) {
35-
super(channel, bufferPool, wsUrl, subProtocols, client, allowExtensions);
36+
public WebSocket08Channel(StreamConnection channel, Pool<ByteBuffer> bufferPool, String wsUrl, String subProtocols, final boolean client, boolean allowExtensions, Set<WebSocketChannel> openConnections) {
37+
super(channel, bufferPool, wsUrl, subProtocols, client, allowExtensions, openConnections);
3638
}
3739

3840
@Override

core/src/main/java/io/undertow/websockets/core/protocol/version13/Hybi13Handshake.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,6 @@ protected void handshakeInternal(final WebSocketHttpExchange exchange) {
7070

7171
@Override
7272
public WebSocketChannel createChannel(WebSocketHttpExchange exchange, final StreamConnection channel, final Pool<ByteBuffer> pool) {
73-
return new WebSocket13Channel(channel, pool, getWebSocketLocation(exchange), exchange.getResponseHeader(Headers.SEC_WEB_SOCKET_PROTOCOL_STRING), false, allowExtensions);
73+
return new WebSocket13Channel(channel, pool, getWebSocketLocation(exchange), exchange.getResponseHeader(Headers.SEC_WEB_SOCKET_PROTOCOL_STRING), false, allowExtensions, exchange.getPeerConnections());
7474
}
7575
}

core/src/main/java/io/undertow/websockets/core/protocol/version13/WebSocket13Channel.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717
*/
1818
package io.undertow.websockets.core.protocol.version13;
1919

20+
import io.undertow.websockets.core.WebSocketChannel;
2021
import io.undertow.websockets.core.WebSocketVersion;
2122
import io.undertow.websockets.core.protocol.version07.WebSocket07Channel;
2223
import org.xnio.Pool;
2324
import org.xnio.StreamConnection;
2425

2526
import java.nio.ByteBuffer;
27+
import java.util.Set;
2628

2729
/**
2830
*
@@ -31,8 +33,8 @@
3133
* @author <a href="mailto:[email protected]">Norman Maurer</a>
3234
*/
3335
public class WebSocket13Channel extends WebSocket07Channel {
34-
public WebSocket13Channel(StreamConnection channel, Pool<ByteBuffer> bufferPool, String wsUrl, String subProtocols, final boolean client, boolean allowExtensions) {
35-
super(channel, bufferPool, wsUrl, subProtocols, client, allowExtensions);
36+
public WebSocket13Channel(StreamConnection channel, Pool<ByteBuffer> bufferPool, String wsUrl, String subProtocols, final boolean client, boolean allowExtensions, Set<WebSocketChannel> openConnections) {
37+
super(channel, bufferPool, wsUrl, subProtocols, client, allowExtensions, openConnections);
3638
}
3739

3840
@Override

core/src/main/java/io/undertow/websockets/spi/AsyncWebSocketHttpServerExchange.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.undertow.util.AttachmentKey;
2929
import io.undertow.util.HeaderMap;
3030
import io.undertow.util.HttpString;
31+
import io.undertow.websockets.core.WebSocketChannel;
3132
import org.xnio.ChannelListener;
3233
import org.xnio.FinishedIoFuture;
3334
import org.xnio.FutureResult;
@@ -47,6 +48,7 @@
4748
import java.util.HashMap;
4849
import java.util.List;
4950
import java.util.Map;
51+
import java.util.Set;
5052

5153
/**
5254
* @author Stuart Douglas
@@ -55,9 +57,11 @@ public class AsyncWebSocketHttpServerExchange implements WebSocketHttpExchange {
5557

5658
private final HttpServerExchange exchange;
5759
private Sender sender;
60+
private final Set<WebSocketChannel> peerConnections;
5861

59-
public AsyncWebSocketHttpServerExchange(final HttpServerExchange exchange) {
62+
public AsyncWebSocketHttpServerExchange(final HttpServerExchange exchange, Set<WebSocketChannel> peerConnections) {
6063
this.exchange = exchange;
64+
this.peerConnections = peerConnections;
6165
}
6266

6367

@@ -273,4 +277,9 @@ public boolean isUserInRole(String role) {
273277
}
274278
return authenticatedAccount.getRoles().contains(role);
275279
}
280+
281+
@Override
282+
public Set<WebSocketChannel> getPeerConnections() {
283+
return peerConnections;
284+
}
276285
}

0 commit comments

Comments
 (0)