In general, when receiving one of the webscoket'ов messages, the webscoket'ов to forward this message to the other connected websocket'ам . I am using jetty-9.2.20.v20161216.
This is how I initialize the server.
JettyWSServer websocketServer = new JettyWSServer("localhost", 8000, new MySocketHandler(), new QueuedThreadPool(128)); public <T extends WebSocketHandler> JettyWSServer(String hostName, int port, T webscoketHandler, QueuedThreadPool threadPool) { this.hostName = hostName; this.port = port; this.handler = webscoketHandler; this.threadPool = threadPool; this.socket = null; //create server this.server = new Server(this.threadPool); //set connector ServerConnector connector = new ServerConnector(server); connector.setHost(this.hostName); connector.setPort(this.port); this.server.addConnector(connector); //set handler this.server.setHandler(this.handler); //set listener setLifecycleListener(); } MySocketHandler.java
public class MySocketHandler extends WebSocketHandler { private final String TAG = MySocketHandler.class.getSimpleName(); private MySocketCreator creator; @Override public void configure(WebSocketServletFactory webSocketServletFactory) { this.creator = new MySocketCreator(); webSocketServletFactory.setCreator(this.creator); } public Set<ServerSocket> getSockets(){ return this.creator.getSockets(); } } MySocketCreator.java
public class MySocketCreator implements WebSocketCreator { private static final String TAG = MySocketCreator.class.getSimpleName(); private static Log log = new Log(TAG, true); private Set<ServerSocket> sockets = new HashSet<>(); private Set<Session> guests = new HashSet<>(); private ConcurrentHashMap<ServiceUser, ArrayList<WSDeviceSessionWrapper>> users = new ConcurrentHashMap<>(); @Override public Object createWebSocket(ServletUpgradeRequest servletUpgradeRequest, ServletUpgradeResponse servletUpgradeResponse) { ServerSocket socket = new ServerSocket(statusCallback); sockets.add(socket); return socket; } private OnSessionStatusListener statusCallback = new OnSessionStatusListener() { @Override public void onGuestIn(Session session) { synchronized (this) { guests.add(session); Integer totalAgeReduce = users.values() .stream() .map(wsDeviceSessionWrappers -> { return 1; }) .reduce( 0, (a, b) -> a + b); log.debug("onGuestIn() " + "Guests: " + guests.size() + " Registered: " + totalAgeReduce); } } @Override public void onUserIn(Session session, ServiceUser user, Device device) { synchronized (this) { if (guests.contains(session)) guests.remove(session); if (!users.containsKey(user)) { users.put(user, new ArrayList<WSDeviceSessionWrapper>()); } users.get(user).add(new WSDeviceSessionWrapper(session, device)); log.debug("onUserIn() " + "Guests: " + guests.size() + " Registered: " + users.size()); } } @Override public void sendResponse(ArrayList<ServiceUser> users, WSResponse response) { log.debug("Send message to [" + (users != null ? users.size() : null) + "] current users " + MySocketCreator.this.users.size()); MySocketCreator.this.users.keySet().forEach(user -> { users.forEach(u -> { if (user.equals(u)) { ArrayList<WSDeviceSessionWrapper> wsDeviceSessionWrappers = MySocketCreator.this.users.get(user); new ArrayList<>(wsDeviceSessionWrappers).forEach(wrapper -> { wrapper.getSession().getRemote().sendStringByFuture(response.toJSON()); } }); } }); }); } @Override public void sendResponse(ServiceUser user, WSResponse response, Device excludeDevice) { MySocketCreator.this.users.get(user).forEach(wrapper -> { wrapper.getSession().getRemote().sendStringByFuture(response.toJSON()); }); } @Override public void onExit(Session session, ServiceUser user, Device device) { synchronized (this) { //remove from guest sessions if (session != null && guests.contains(session)) guests.remove(session); if (user != null && device != null && users.containsKey(user)) { ArrayList<WSDeviceSessionWrapper> wrappers = users.get(user); Iterator<WSDeviceSessionWrapper> iterator = wrappers.iterator(); while (iterator.hasNext()) { WSDeviceSessionWrapper wrapper = iterator.next(); if (wrapper.getSession() == session || wrapper.getSession().equals(session) && wrapper.getDevice() == device || wrapper.getDevice().equals(device)) { //remove session for current device iterator.remove(); //if user does not have session on server //remove him from current server users if (wrappers.size() == 0) { users.remove(user); } } } } Integer totalRegisteredDevices = users.values() .stream() .map(wsDeviceSessionWrappers -> { return 1; }) .reduce( 0, (a, b) -> a + b); log.debug("onExit() " + "Guests: " + guests.size() + " Registered: " + totalRegisteredDevices); } } }; public Set<ServerSocket> getSockets() { return sockets; } } The logic of work is as follows:
In the MySocketCreator class, when I create a new socket, I pass callback there. Further, in the socket onOpen event onOpen I call the callback and pass the session to it, and this session is saved in the MySocketCreator class, after which I associate this session with the user and device.
The problem is that when I try to send messages to all users from some websocket'a call via callback method
@Override public void sendResponse(ArrayList<ServiceUser> users, WSResponse response) { log.debug("Send message to [" + (users != null ? users.size() : null) + "] current users " + MySocketCreator.this.users.size()); MySocketCreator.this.users.keySet().forEach(user -> { users.forEach(u -> { if (user.equals(u)) { ArrayList<WSDeviceSessionWrapper> wsDeviceSessionWrappers = MySocketCreator.this.users.get(user); new ArrayList<>(wsDeviceSessionWrappers).forEach(wrapper -> { wrapper.getSession().getRemote().sendStringByFuture(response.toJSON()); } }); } }); }); } string wrapper.getSession().getRemote().sendStringByFuture(response.toJSON());
It locks the stream and the server hangs. Attempting to replace it with
wrapper.getSession().getRemote().sendString(response.toJSON());
throws exceptions
java.lang.IllegalStateException: Blocking message pending 10000 for BLOCKING org.eclipse.jetty.websocket.api.WebSocketException: RemoteEndpoint unavailable, outgoing connection not open It turns out that both options do not work and hang and incorrectly the server. And this is all with just 300 connections.
Question : how can I accomplish the task by sending messages to all users, what could be my mistake?