SocketChannel socketChannel = SocketChannel.open(); 1. 配置SocketChannel socketChannel.configureBlocking(false); Socket socket = socketChannel.socket(); socket.setKeepAlive(true); if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) socket.setSendBufferSize(sendBufferSize); if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) socket.setReceiveBufferSize(receiveBufferSize); socket.setTcpNoDelay(true); 2. 连接node channel.connect(address); 3. 注册事件 SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT); KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool);//构造自定义附属信息 key.attach(channel);//附属key信息
int numReadyKeys = this.nioSelector.select(timeoutMs); if (numReadyKeys > 0){ Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys(); for (SelectionKey key:readyKeys){ KafkaChannel channel = (KafkaChannel) key.attachment(); if (channel.ready()){ while ((networkReceive = channel.read()) != null) { addToStagedReceives(channel, networkReceive); } } } }