Good day! I am new to programming, mastering java and netty 4. I am writing a small client-server application using netty. The point is simple - every 5 seconds a sample is formed from the database of the turnstile along the aisles of people. If there is a person in this sample that we need, a corresponding message is sent to the client. If the sample did not contain the right person, send "nothing", which also serves as a ping to verify that the connection is active. That is, the server sends messages to the client every 5 seconds, to which the client sends the current user's login to the server. And so in a circle. However, it happens that the client catches ArrayIndexOutOfBoundsException: 1, loses connection with the server. In this case, the logs show that the message sent by the server was divided into two messages, and each of them worked channelReadComplete

Server:

public class StartServer { private static final Logger LOGGER = LogManager.getLogger(); private int port = Integer.parseInt(ServerFunctions.loadProperties("Port")); public StartServer() { } public void startThatServer() throws Exception { LOGGER.info("Запуск сервера..."); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(8,8,8, TimeUnit.SECONDS)); ch.pipeline().addLast(new ServerHandler()); } }) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true) ; ChannelFuture f = b.bind().sync(); LOGGER.info(Server.class.getName() + " запущен и слушает порт: " + f.channel().localAddress()); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } //end of startserver() } 

ServerHandler:

 public class ServerHandler extends ChannelInboundHandlerAdapter { Watch nothingWatch = new Watch("nothing"); Message nothingMessage = new Message(nothingWatch); private static final Logger LOGGER = LogManager.getLogger(); private boolean noError = true; private List<String> clientTabels = new ArrayList<>(); // получаем список табельных, за которыми наблюдает клиент private ArrayList<Message> watchToSend = new ArrayList<>(); // данные, которые будут отправлены клиенту, когда он назовет logonName @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { InetSocketAddress socketAddress = (InetSocketAddress)ctx.channel().remoteAddress(); InetAddress inetAddress = socketAddress.getAddress(); String clientHostName = socketAddress.getHostName(); String clientIpAdress = inetAddress.getHostAddress(); ByteBuf in = (ByteBuf) msg; String logonName = in.toString(CharsetUtil.UTF_8); try { //получили logonName, делаем немного магии и отправляем нужные данные назад clientTabels.clear(); watchToSend.clear(); // заносим в clientTabels все табельные номера из WatchAсtual-таблицы clientTabels.addAll(ServerFunctions.convertLogonNametoTabelNumbers(dbPostgre, logonName)); // ищем каждое значение из clientTabels в выборке viborkaForPeriod, если находим - помещаем его в watchToSend for (String tabel: clientTabels) { for (Watch watch: GenerateViborka.viborkaForPeriod) { if(tabel.equals(watch.getTabel())) { Message result = new Message(watch); watchToSend.add(result); } } } // если отправить нечего, заносим в watchToSend служебное сообщение nothingMessage if(watchToSend.size() == 0) { watchToSend.add(nothingMessage); } for(Message m: watchToSend) { // отправляем ответ клиенту в виде строки ctx.write(Unpooled.copiedBuffer(m.toString(), CharsetUtil.UTF_8)); } ctx.flush(); try { Thread.sleep(Integer.parseInt(ServerFunctions.loadProperties("CheckPeriodDelay"))*1000); } catch (InterruptedException e) { LOGGER.error("Ошибка во время Thread.sleep в потоке channelRead", e); } } catch(Exception e){ noError = false; LOGGER.error("ChannelHandlerContext exception", e); } } 

Customer:

 public class Client extends Application { public static void main(String[] args) { launch(); } @ Override public void start(final Stage stage) { createBootstrap(new Bootstrap(), loop); } public Bootstrap createBootstrap(Bootstrap bootstrap, EventLoopGroup eventLoop) { if (bootstrap != null) { final ClientHandler clientHandler = new ClientHandler(this); bootstrap.group(eventLoop); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.MAX_MESSAGES_PER_READ, 1); bootstrap.handler(new ChannelInitializer < SocketChannel > () {@ Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast("idleStateHandler", new IdleStateHandler(10, 10, 10, TimeUnit.SECONDS)); socketChannel.pipeline().addLast(clientHandler); } }); bootstrap.remoteAddress(ClientAppFunctions.loadProperties("Server"), Integer.parseInt(ClientAppFunctions.loadProperties("Port"))); bootstrap.connect().addListener(new ClientConnectionListener(this)); } return bootstrap; } 

ClientHandler:

 public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private static final Logger LOGGER = LogManager.getLogger(); private static boolean isConnected; private Client client; public ClientHandler(Client client) { this.client = client; } @Override public void channelActive(ChannelHandlerContext ctx) { LOGGER.info("Подключились к " + ctx.channel().remoteAddress()); isConnected = true; ClientAppFunctions.setApplicationTrayIcon(isConnected); // отправляем логин пользователя ctx.writeAndFlush(Unpooled.copiedBuffer(Client.LOGONNAME, CharsetUtil.UTF_8)); } @Override public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { // получаем ответ сервера и выводим String inputMessage = in.toString(CharsetUtil.UTF_8); System.out.print(new SimpleDateFormat("HH:mm:ss").format(Calendar.getInstance().getTime()) + " From server: " + inputMessage + "\n"); ArrayList<String> dataToNotification = new ArrayList<>(); // список для хранения строки из data без времени события if(!inputMessage.equals("nothing")) { List<String> data = Arrays.asList(inputMessage.split("\\^")); // список для хранения разделенных строк из принятого сообщения for(String s: data) { Client.globalEventsList.add(ClientAppFunctions.splitStringAndGetFirstPart(s) + ClientAppFunctions.splitStringAndGetSecondPart(s)); dataToNotification.add(ClientAppFunctions.splitStringAndGetSecondPart(s)); } } for(String d: dataToNotification) { if(Math.round(Double.parseDouble(ClientAppFunctions.loadProperties("NotificationDuration"))) != 0) { new JFXPanel(); ClientAppFunctions.showMessage("Новое событие", d); } } } @Override public void channelReadComplete(ChannelHandlerContext ctx) { System.out.println("channelReadComplete " + new SimpleDateFormat("HH:mm:ss").format(Calendar.getInstance().getTime())); // отправляем логин пользователя ctx.writeAndFlush(Unpooled.copiedBuffer(Client.LOGONNAME, CharsetUtil.UTF_8)); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if(!(evt instanceof IdleStateEvent)) { return; } IdleStateEvent e = (IdleStateEvent) evt; if(e.state() == IdleState.ALL_IDLE) { // если с соединением все хорошо, но нет траффика за заданный период ctx.close(); } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { isConnected = false; ClientAppFunctions.setApplicationTrayIcon(isConnected); // выводим сообщение о потере соединения LOGGER.error("Соединение с сервером " + ctx.channel().remoteAddress() + " отсутствует"); // при потере соединения - реконнект final EventLoop eventLoop = ctx.channel().eventLoop(); eventLoop.schedule(new Runnable() { @Override public void run() { LOGGER.info("Попытка подключиться..."); client.createBootstrap(new Bootstrap(), eventLoop); } }, Long.parseLong(ClientAppFunctions.loadProperties("ReconnectTime")), TimeUnit.SECONDS); super.channelInactive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable aCause) throws Exception { if (aCause instanceof IOException) { LOGGER.error("IO exception ", aCause); } else { LOGGER.error("other exception " + aCause); } ctx.close(); } } 

Example of a "broken message" (break at 15:24:05):

 channelReadComplete 15:23:50 15:23:55 From server: nothing channelReadComplete 15:23:55 15:24:00 From server: nothing channelReadComplete 15:24:00 15:24:05 From server: 15:23:58 - $Иванов Иван Иванович вышел channelReadComplete 15:24:05 15:24:05 From server: из здания^ 15:24:05.339 [nioEventLoopGroup-2-1] ERROR watcher2.client.ClientHandler - java.lang.ArrayIndexOutOfBoundsException: 1 channelReadComplete 15:24:05 15:24:05.352 [nioEventLoopGroup-2-1] ERROR watcher2.client.ClientHandler - Соединение с сервером watchsrv/10.7.1.43:6067 отсутствует 15:24:15.354 [nioEventLoopGroup-2-1] INFO watcher2.client.ClientHandler - Попытка подключиться... 15:24:15.359 [nioEventLoopGroup-2-1] INFO watcher2.client.ClientHandler - Подключились к watchsrv/10.7.1.43:6067 15:24:15 From server: 15:24:10 - $Иванов Иван Иванович вышел из здания^ channelReadComplete 15:24:15 15:24:20 From server: 15:24:14 - $Иванов Иван Иванович вошел в здание^ channelReadComplete 15:24:20 15:24:25 From server: 15:24:23 - $Иванов Иван Иванович вошел в здание^ channelReadComplete 15:24:25 15:24:30 From server: 15:24:26 - $Иванов Иван Иванович вышел из здания^ channelReadComplete 15:24:30 
  • Use the "Fragment of the code" button only for the code that can actually be executed in the browser. For pieces of code not in JS / HTML / CSS, you should use blocks of code that are formatted using a space of 4 spaces (Ctrl + K). - Mikhail Vaysman
  • Thanks, corrected - Joe Doe
  • How many times have they told the world: TCP does not preserve the boundaries of messages! - Pavel Mayorov
  • Here, for information: stackoverflow.com/a/453385/178779 - Pavel Mayorov

1 answer 1

Thank you very much for the tip. As a result, instead of a bytebuff, I began to send and receive a string using:

 pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new ClientHandler()); 

without forgetting to add "\ r \ n" with each write to the channel. Used as an example: https://stackoverflow.com/questions/13923032/netty-client-to-server-message