Java教程

05、Netty学习笔记—(案例:聊天业务)

本文主要是介绍05、Netty学习笔记—(案例:聊天业务),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

文章目录

  • 坑点说明
    • 1、自己实现SimpleChannelInboundHandler的子类不添加@Sharable注解导致第二个客户端连接不上
  • 具体业务草稿
  • 具体业务实现
    • ①登陆业务
    • ②客户端根据命令实现业务发送
    • ③单聊业务(send [username] [content])
    • ④群聊建群拉人处理(gcreate [group name] [m1,m2,m3...])
    • ⑤群聊消息发送(gsend [group name] [content])
    • ⑥获取群成员信息(gmembers [group name])
    • ⑦加入群聊(gjoin [group name])
    • ⑧退出群聊(gquit [group name])
    • ⑨退出登陆
    • ⑩空闲检测(发送心跳)
  • 扩展
    • ①指定用户下线(quitmember [username])

netty笔记汇总:Netty学习指南(资料、文章汇总)

根据黑马程序员netty视频教程学习所做笔记,部分内容图例来源黑马笔记

笔记demo案例仓库地址: Github-【netty-learn】

坑点说明

1、自己实现SimpleChannelInboundHandler的子类不添加@Sharable注解导致第二个客户端连接不上

针对于自己写继承SimpleChannelInboundHandler的类若是不加上@ChannelHandler.Sharable注解,当第二个客户端连接时就会立刻执行INACTIVEUNREGISTERED事件,直接就会连接失败!

image-20220113202036830

image-20220113202100705

出现的问题展示:第二个客户端直接连不上

image-20220113202146247

解决方案:在自定义handler上加上@Sharable注解即可!



具体业务草稿

客户端、服务端定义好指定的传输协议,之后根据指定的传输协议来进行传输数据。

首先是登陆业务:客户端在启动时创建一个线程来构造出指定的消息对象并进行发送数据。(该过程会按照指定的方式来进行对数据写协议,最终发送出去);服务端按照指定的协议规则进行处理取到发过来的协议数据并将其转为指定的对象。

①登陆-线程通信:客户端接收到服务端响应过来的数据后,如何进行两个线程见的交互处理。(一个是客户端active事件执行时创建的线程来发送给服务端数据的,另一个是客户端在执行channelread读取到服务端的数据(该线程是nio中),如何让这两个线程进行通信?)

  • 使用一个countdownlatch工具类进行线程通信确定是否登陆接收到信息,对于登陆是否成功的状态则使用一个并发变量AtomicBoolean来表示。

②业务消息发送(客户端):登陆成功之后,使用一个线程来不断取得控制台的命令信息,根据不同的命令来进行发送指定的消息对象。

③单聊消息业务:在服务器端维护一个session集合,若是登陆成功就加入到该集合中(username,channel)。对于单聊业务消息主要有三部分内容:发送方,目的方,内容。服务端收到单聊消息时,从session集合中根据目的方取到指定channel,再使用该channel向目的方方发送数据。

④群聊建群拉人处理:create 群名 人1,人2,人3,使用一个map集合来临时存储用户名及对应channel的关系,在创建的过程中会向另外几个人的channel写提示数据,表示已经拉入群。

⑤群聊消息发送:gsend 群名 信息,根据群名称,先从sessionGroup中取到所有的用户名,接着根据每个用户取到sessionFactory中的所有channel,来依次发送群聊消息。

⑥获取群成员信息:gmembers 群民 信息,发送之后根据群名取到所有在群中的channel,接着依次向各个channel来进行写数据。

⑦加入群聊:gjoin [group name],直接添加到指定群集合中。

⑧退出群聊:gquit [group name],对相关联的集合进行解绑。

⑨退出登陆:客户端进行断开连接,服务端的话要进行一系列解绑动作!正常退出、异常退出:实现一个handleradapter即可,重写其中的channelInactive、exceptionActive方法

⑩空闲检测(发送心跳):设置IdleStateHandler来指定读、写、读+写监控时间描述,若是指定秒数后依旧没有事件发生,那么就会触发IdleState的事件,可以使用ChannelDuplexHandler重写其中的userEventTriggered来进行捕获。一般服务端对读进行设置5s,客户端进行写监控3秒(服务端一半时间)来进行发送心跳,表示当前还在连接中!

扩展

①指定用户下线:quitmember [username]

  • 客户端:解析命令来发送给服务端进行用户下线操作,指定某个用户名。
    • 使用一个并发布尔变量俩进行表示下线,重写channelInactive、exceptionCaught事件来进行EXIT变量的设置,并且对于客户端三个读取控制台的阻塞事件下对EXIT变量进行判断提示已断开连接!
  • 服务端:类似相同的对指定对象感兴趣handler,来进行业务操作,取到指定username的channel进行close()操作!


具体业务实现

①登陆业务

客户端:

  • ①发送:自定义线程在channelActive事件中运行一个线程来主要与我们控制台进行交互,登陆业务同样也是如此,首先需要输入用户名密码,接着将其包装成预先设置好的LoginRequestMessage对象由channel发送出去。
  • ②接收:eventloop中的线程接收到经过自定义协议解码取到的对象,将其转为LoginResponseMessage对象,判断其是否登陆成功。
    • 核心:对于如何让eventloop中线程来进行通知主线程登陆成功,我们可以使用一个countdownlatch+AtomicBoolean,前者用于通知主线程拿到登陆结果,后者用于表示登陆的状态成功与否!

服务端:编写一个实现SimpleChannelInboundHandler的子类,指定接收LoginRequestMessage对象,接着来编写对应的channelRead()方法来进行业务操作,最终根据实际情况来向客户端返回一个LoginResponseMessage

客户端

client:

//登陆消息通知计数器
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
//成功状态变量
AtomicBoolean LOGIN = new AtomicBoolean(false);


//负责接收服务器的响应数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    log.debug("msg: {}", msg);
    //单独处理登陆的响应结果,其他结果直接输出消息内容
    if (msg instanceof LoginResponseMessage) {
        LoginResponseMessage response = (LoginResponseMessage) msg;
        if (response.isSuccess()){
            LOGIN.set(true);//设置登陆状态为true
        }
        WAIT_FOR_LOGIN.countDown();//计数-1,若是为0,则会通知使用该计数器阻塞等待的线程
    }
}


@Override
public void channelActive(ChannelHandlerContext ctx){
    //负责接收用户在控制台上的输入,负责向服务器发送数据
    new Thread(()->{
        Scanner scanner = new Scanner(System.in);
        System.out.println("请输入用户名:");
        String username = scanner.nextLine();
        System.out.println("请输入密码:");
        String password = scanner.nextLine();
        //构造登陆消息对象发送给服务端
        Message message = new LoginRequestMessage(username, password);
        ctx.channel().writeAndFlush(message);
        try {
            //等待其他线程进行计数为0,此时才会唤醒向下执行
            WAIT_FOR_LOGIN.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ...

    }, "system in").start();
}

服务端

import com.changlu.message.ChatRequestMessage;
import com.changlu.message.ChatResponseMessage;
import com.changlu.server.session.SessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @ClassName ChatRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/13 18:54
 * @Description 聊天请求对象处理器:针对于ChatRequestMessage
 */
//该处理器表示对解码得到的ChatRequestMessage对象感兴趣(根据自定义协议解码时读取到的对象决定,实际就是与客户端发送来的对象类型有关!)  可以看到这个ChatRequestMessage对象与在客户端封装的ChatRequestMessage一致
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {
        String to = msg.getTo();
        final Channel channel = SessionFactory.getSession().getChannel(to);
        //若是对方已下线,告知发送方消息
        if (channel == null){
            ctx.writeAndFlush(new ChatResponseMessage(msg.getFrom(), "对方用户不存在或已下线!"));
            return;
        }
        channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));
    }
}

server:

//handler
LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();

 @Override
protected void initChannel(NioSocketChannel ch) throws Exception {
    ch.pipeline().addLast(new ProcotolFrameDecoder());//定长解码器(自己做的实现,已经定义好了解码规则)
    ch.pipeline().addLast(LOGGING_HANDLER);//日志处理器
    ch.pipeline().addLast(MESSAGE_CODEC);//协议解码器
    ch.pipeline().addLast(LOGIN_HANDLER);//【登陆处理handler】
}

效果

服务端:

image-20220113205350066

客户端

image-20220113205310304


②客户端根据命令实现业务发送

需求:登陆成功之后,显示指定的提示信息来进行处理用户输入的命令;登陆失败,关闭连接。

客户端:同样在发送登陆请求的线程中执行。

@Override
public void channelActive(ChannelHandlerContext ctx){
    //负责接收用户在控制台上的输入,负责向服务器发送数据
    new Thread(()->{
        //...
        try {
            //接收响应后计数-1,停止阻塞继续向下执行
            WAIT_FOR_LOGIN.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //获取登陆状态变量
        if (!LOGIN.get()) {
            System.out.println("登陆失败!");
            ctx.channel().close();
            return;
        }
        System.out.println("登陆成功!");
        while (true) {
            System.out.println("==================================");
            System.out.println("send [username] [content]");
            System.out.println("gsend [group name] [content]");
            System.out.println("gcreate [group name] [m1,m2,m3...]");
            System.out.println("gmembers [group name]");
            System.out.println("gjoin [group name]");
            System.out.println("gquit [group name]");
            System.out.println("quit");
            System.out.println("==================================");
            String command = scanner.nextLine();
            String[] split = command.split(" ");
            switch (split[0]){
                case "send" :
                    ctx.writeAndFlush(new ChatRequestMessage(username, split[1], split[2]));
                    break;
                case "gsend" :
                    ctx.writeAndFlush(new GroupChatRequestMessage(username, split[1], split[2]));
                    break;
                case "gcreate" :
                    Set<String> users = new HashSet<>(Arrays.asList(split[2].split(",")));
                    ctx.writeAndFlush(new GroupCreateRequestMessage(split[0],users));
                    break;
                case "gmembers" :
                    ctx.writeAndFlush(new GroupMembersRequestMessage(split[1]));
                    break;
                case "gjoin" :
                    ctx.writeAndFlush(new GroupJoinRequestMessage(username, split[1]));
                    break;
                case "gquit" :
                    ctx.writeAndFlush(new GroupQuitRequestMessage(username, split[1]));
                    break;
                case "quit" :
                    ctx.channel().close();
                    break;
            }
        }

    }, "system in").start();
}

image-20220114135934166


③单聊业务(send [username] [content])

首先看一下基于内存的session实现:

import io.netty.channel.Channel;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class SessionMemoryImpl implements Session {

    //保存用户名与channel映射的map集合
    private final Map<String, Channel> usernameChannelMap = new ConcurrentHashMap<>();
    //保存channel与用户名映射的map集合
    private final Map<Channel, String> channelUsernameMap = new ConcurrentHashMap<>();
    //用于保存channel与绑定具体属性的map集合
    private final Map<Channel,Map<String,Object>> channelAttributesMap = new ConcurrentHashMap<>();

    //登陆成功:将对应的映射保存到三个map即可中
    @Override
    public void bind(Channel channel, String username) {
        usernameChannelMap.put(username, channel);
        channelUsernameMap.put(channel, username);
        channelAttributesMap.put(channel, new ConcurrentHashMap<>());
    }

    @Override
    public void unbind(Channel channel) {
        String username = channelUsernameMap.remove(channel);
        usernameChannelMap.remove(username);
        channelAttributesMap.remove(channel);
    }

    @Override
    public Object getAttribute(Channel channel, String name) {
        return channelAttributesMap.get(channel).get(name);
    }

    @Override
    public void setAttribute(Channel channel, String name, Object value) {
        channelAttributesMap.get(channel).put(name, value);
    }

    //根据username获取指定的channel
    @Override
    public Channel getChannel(String username) {
        return usernameChannelMap.get(username);
    }

    @Override
    public String toString() {
        return usernameChannelMap.toString();
    }
}

client:读取到控制台输入的命令信息,封装成一个ChatRequestMessage发送出去。

case "send" :
ctx.writeAndFlush(new ChatRequestMessage(username, split[1], split[2]));
break;

server:订阅经过自定义协议解码得到ChatRequestMessage对象,并对其来急性业务处理。

  • 具体业务操作:①取到目的方名称,从session对象中取出指定的channel。②判断channel是否为空,若是为空表示该用户不在线,那么回发一条提示数据;若是不为空,则直接将数据内容由取到的channel发出。
  • 新增channelUnregistered重写事件:解除绑定登陆的session channel。
import com.changlu.message.ChatRequestMessage;
import com.changlu.message.ChatResponseMessage;
import com.changlu.server.session.SessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @ClassName ChatRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/13 18:54
 * @Description 聊天请求对象处理器:针对于ChatRequestMessage
 */
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {
        String to = msg.getTo();
        final Channel channel = SessionFactory.getSession().getChannel(to);
        //若是对方已下线,告知发送方消息(取到null说明对方压根没有登陆上线过)
        if (channel == null){
            ctx.writeAndFlush(new ChatResponseMessage(msg.getFrom(), "对方用户不存在或已下线!"));
            return;
        }
        channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));
    }
}

服务端:

//该handler是可共享的,线程安全的
ChatRequestMessageHandler CHAT_HANDLER = new ChatRequestMessageHandler();

serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
           //...
           ch.pipeline().addLast(CHAT_HANDLER);//聊天请求处理handler
    }
});

效果:服务端先启动,之后启动两个client,并依次登陆lisi、wangwu。

登陆好之后lisi client执行命令send wangwu 123,此时服务器端收到消息,并使用wangwu的channel进行发送数据:

image-20220114142827792

只登陆lisi client,接着直接执行命令send wangwu hello,此时lisi client收到消息,wangwu client还未上线

image-20220114144400237


④群聊建群拉人处理(gcreate [group name] [m1,m2,m3…])

客户端:解析命令,读取到群名以及成员,邀请成员+自己组成一个set集合,最终包装成GroupCreateRequestMessage发送出去。

case "gcreate" :
    Set<String> users = new HashSet<>(Arrays.asList(split[2].split(",")));
    users.add(username);//将自己也添加到群聊中
    ctx.writeAndFlush(new GroupCreateRequestMessage(split[1],users));
    break;

服务端:编写一个对GroupCreateRequestMessage感兴趣的handler,来进行业务处理,核心本质就是向HashMap中添加群组、set成员集合。最终根据是否添加成功来进行相应的处理。

import com.changlu.message.GroupCreateRequestMessage;
import com.changlu.message.GroupCreateResponseMessage;
import com.changlu.server.session.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.List;
import java.util.Set;

/**
 * @ClassName GroupCreateRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/14 14:46
 * @Description 新建群聊处理:创建群聊,并且拉入指定成员
 */
@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler  extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception {
        String groupName = msg.getGroupName();
        Set<String> members = msg.getMembers();
        //群组管理器
        final GroupSession groupSession = GroupSessionFactory.getGroupSession();
        Group group = groupSession.createGroup(groupName, members);
        //若是返回为null,表示原先没有,当前插入成功
        if (group == null){
            //响应一:创建成功,向原始客户端发送一条创建成功消息
            ctx.channel().writeAndFlush(new GroupCreateResponseMessage(true, "群组创建成功!"));
            List<Channel> membersChannel = groupSession.getMembersChannel(groupName);
            //响应二:向所有被拉入群聊的客户端发送一条被拉入群聊消息
            for (Channel channel : membersChannel) {
                channel.writeAndFlush(new GroupCreateResponseMessage(true, "你已经被拉取群聊:" + groupName));
            }
        } else {
            //响应三:创建失败,向源客户端发送提示信息
            ctx.channel().writeAndFlush(new GroupCreateResponseMessage(false, "群组已存在!"));
        }

    }
}
//处理GroupCreateRequestMessage的handler处理器
GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER = new GroupCreateRequestMessageHandler();

@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
    ...
    ch.pipeline().addLast(GROUP_CREATE_HANDLER);//创建群聊拉人处理handler
}

效果:创建三个客户端lisi、wangwu、zhaoliu,记着lisi client执行命令gcreate DreamGroup wangwu,zhaoliu

1、群聊创建成功,lisi client首先收到创建群聊成功消息,接着lisi自己以及新拉入群聊的wangwu、zhaoliu都收到被拉入群聊消息

image-20220114151216538

其他人收到被拉入信息这里就不展示了

2、群聊创建失败,lisi client收到创建群聊失败消息。

image-20220114151407871


⑤群聊消息发送(gsend [group name] [content])

客户端:解析命令,封装成GroupChatRequestMessage对象发送出去。

case "gsend" :
    ctx.writeAndFlush(new GroupChatRequestMessage(username, split[1], split[2]));
    break;

服务端:编写对GroupChatRequestMessage感兴趣的handler,紧接着根据群名获取到所有的channel,接着依次根据channel向外发送出去数据。

import com.changlu.message.GroupChatRequestMessage;
import com.changlu.message.GroupChatResponseMessage;
import com.changlu.server.session.GroupSessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.List;

/**
 * @ClassName GroupChatRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/14 15:26
 * @Description 群组聊天:向群组发送一条消息。【gcreate [group name] [m1,m2,m3...]】
 */
@ChannelHandler.Sharable
public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception {
        String groupName = msg.getGroupName();
        String content = msg.getContent();
        //根据群名取出所有的channel来进行发送数据
        List<Channel> membersChannel = GroupSessionFactory.getGroupSession().getMembersChannel(groupName);
        GroupChatResponseMessage responseMessage = new GroupChatResponseMessage(msg.getFrom(), content);
        responseMessage.setSuccess(true);
        for (Channel channel : membersChannel) {
            channel.writeAndFlush(responseMessage);
        }
    }
}
//指定的执行器
GroupChatRequestMessageHandler GROUP_CHAT_HANDLER = new GroupChatRequestMessageHandler();

@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
    ...
    ch.pipeline().addLast(GROUP_CHAT_HANDLER);//向群聊发送消息处理handler
}

效果:在执行命令之前,首先需要执行gcreate命令,先创建群聊以及拉人,创建成功之后才能够进行群聊消息命令的发送执行

image-20220114155702122

image-20220114155732781



⑥获取群成员信息(gmembers [group name])

客户端:解析命令,封装成GroupMembersRequestMessage对象发送出去。

System.out.println("gmembers [group name]");

case "gmembers" :
    ctx.writeAndFlush(new GroupMembersRequestMessage(split[1]));
    break;

服务端:编写对GroupMembersRequestMessage感兴趣的handler,接着来执行业务操作,向源channel发送群成员信息。

import com.changlu.message.GroupMembersRequestMessage;
import com.changlu.message.GroupMembersResponseMessage;
import com.changlu.server.session.GroupSessionFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.Set;

/**
 * @ClassName GroupMembersRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/14 15:59
 * @Description 查看群成员信息:根据群名获取所有群成员信息。【gmembers [group name]】
 */
@ChannelHandler.Sharable
public class GroupMembersRequestMessageHandler extends SimpleChannelInboundHandler<GroupMembersRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, GroupMembersRequestMessage msg) throws Exception {
        final String groupName = msg.getGroupName();
        final Set<String> members = GroupSessionFactory.getGroupSession().getMembers(groupName);
        ctx.channel().writeAndFlush(new GroupMembersResponseMessage(members));
    }
}
GroupMembersRequestMessageHandler GROUP_MEMBERS_HANDLER = new GroupMembersRequestMessageHandler();

ch.pipeline().addLast(GROUP_MEMBERS_HANDLER);//获取指定群聊所有群成员处理handler

效果

image-20220114162912161


⑦加入群聊(gjoin [group name])

客户端:解析命令,封装成GroupJoinRequestMessage发送出去。

System.out.println("gjoin [group name]");

case "gjoin" :
    ctx.writeAndFlush(new GroupJoinRequestMessage(username, split[1]));
    break;

服务端:获取到群名、用户名,来执行业务操作。

import com.changlu.message.GroupJoinRequestMessage;
import com.changlu.message.GroupJoinResponseMessage;
import com.changlu.server.session.Group;
import com.changlu.server.session.GroupSessionFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @ClassName GroupJoinRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/14 16:04
 * @Description 加入群聊处理器:【gjoin [group name]】
 */
@ChannelHandler.Sharable
public class GroupJoinRequestMessageHandler extends SimpleChannelInboundHandler<GroupJoinRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, GroupJoinRequestMessage msg) throws Exception {
        String groupName = msg.getGroupName();
        String username = msg.getUsername();
        Group group = GroupSessionFactory.getGroupSession().joinMember(groupName, username);
        if (group != null){
            ctx.writeAndFlush(new GroupJoinResponseMessage(true, "群聊【"+ groupName +"】加入成功!"));
        }else{
            ctx.writeAndFlush(new GroupJoinResponseMessage(false, "当前无该群聊!"));
        }
    }
}
GroupJoinRequestMessageHandler GROUP_JOIN_HANDLER = new GroupJoinRequestMessageHandler();

ch.pipeline().addLast(GROUP_JOIN_HANDLER);//加入群聊处理handler

效果

image-20220114163329699

image-20220114163424980


⑧退出群聊(gquit [group name])

客户端:解析命令,封装成GroupQuitRequestMessage发送出去。

System.out.println("gquit [group name]");

case "gquit" :
    ctx.writeAndFlush(new GroupQuitRequestMessage(username, split[1]));
    break;

服务端:获取到群名、用户名,来执行业务操作。

import com.changlu.message.GroupQuitRequestMessage;
import com.changlu.message.GroupQuitResponseMessage;
import com.changlu.server.session.Group;
import com.changlu.server.session.GroupSessionFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @ClassName GroupQuitRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/14 16:09
 * @Description 退出群聊处理器:【gquit [group name]】
 */
@ChannelHandler.Sharable
public class GroupQuitRequestMessageHandler extends SimpleChannelInboundHandler<GroupQuitRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, GroupQuitRequestMessage msg) throws Exception {
        final String username = msg.getUsername();
        final String groupName = msg.getGroupName();
        final Group group = GroupSessionFactory.getGroupSession().removeMember(groupName, username);
        if (group != null) {
            ctx.writeAndFlush(new GroupQuitResponseMessage(true, "退出群聊成功!"));
        }else{
            ctx.writeAndFlush(new GroupQuitResponseMessage(true, "该群聊不存在!"));
        }
    }
}
GroupQuitRequestMessageHandler GROUP_QUIT_HANDLER = new GroupQuitRequestMessageHandler();

ch.pipeline().addLast(GROUP_QUIT_HANDLER);//退出群聊处理handler

效果

image-20220114163829443


⑨退出登陆

客户端:解析命令,直接客户端关闭连接即可。

System.out.println("quit");

case "quit" :
    ctx.channel().close();
    break;

服务端:若是客户端自己主动断开连接,那么服务端也会触发指定的inactive操作,那么此时取消绑定的channel集合!

import com.changlu.server.session.SessionFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

/**
 * @ClassName QuitHandler
 * @Author ChangLu
 * @Date 2022/1/14 16:16
 * @Description 退出连接执行器
 */
@Slf4j
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {

    // 当连接断开时触发inactive事件
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        SessionFactory.getSession().unbind(ctx.channel());
        log.debug("{} 连接已断开!", ctx.channel());
    }

    // 当出现异常时触发
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        SessionFactory.getSession().unbind(ctx.channel());
        log.debug("{} 已经异常断开 异常是{}", ctx.channel(), cause.getMessage());
    }
}
QuitHandler QUIT_HANDLER = new QuitHandler();

ch.pipeline().addLast(QUIT_HANDLER);//退出及处理异常handler

效果

image-20220114164138910


⑩空闲检测(发送心跳)

知识点说明

网络编程中容易出现的问题:连接假死。对于服务器高负荷的时候就是一个很大的问题。netty提供了这中假死的方式,就是空闲检测器。(就是一个handler,IdleStateHandler),对于服务器来讲若是很长时间没有收到客户端数据,就可能认为该连接有问题或者很长一段时间写出的数据没有写出去。只要去监测你是不是读或写空闲时间太长了就行了或者读和写加在一起空闲的长度。

  • IdleStateHandler·:三个参数构造,参数1检测读的空闲时间超过了某秒,参数2检测写的空闲时间超过了多少秒,参数3检测读写都空闲的时间上线。单位秒。
  • 若是指定秒数中没有收到channel发来数据,那么就会触发事件(read or write …),可以编写ChannelDuplexHandler重写其中的userEventTriggered来进行判断触发了什么事件。

心跳:针对于连接假死,人没有发数据,但可以让客户端定时的发送数据,为了向服务器证明或者就像服务器发送心跳包,那么在服务端、客户端各自设置自动检测,服务端检测读事件,客户端检测写事件,通常客户端的检测时间是服务端1/2时间。

代码

server:IdleStateHandler来指定检测事件的秒数来触发事件,ChannelDuplexHandler中用于进行指定事件捕获来关闭连接

@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
    ch.pipeline().addLast(new ProcotolFrameDecoder());
    ch.pipeline().addLast(LOGGING_HANDLER);
    ch.pipeline().addLast(MESSAGE_CODEC);
    //在服务端,主要用来判断是不是读空闲时间过长
    // 5s 内如果没有收到 channel 的数据,会触发一个 IdleState#READER_IDLE 事件
    ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
    ch.pipeline().addLast(new ChannelDuplexHandler(){ // 使用一个双向处理器ChannelDuplexHandler 可以同时作为入站和出站处理器
        //用来捕获IdleStateHandler的事件
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            IdleStateEvent event = (IdleStateEvent) evt;
            //触发读事件
            if (event.state() == IdleState.READER_IDLE) {
                log.debug("已经5s没有读到数据了!");
                //关闭channel连接
                ctx.channel().close();
            }
        }
    });
    //...其他handler
}

client:对于服务端这种检测机制,客户端需要每3秒(服务端监测的一半)自动发送一个心跳消息来表示当前一直在连接着

bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new ProcotolFrameDecoder());
        //                    ch.pipeline().addLast(LOGGING_HANDLER);
        ch.pipeline().addLast(MESSAGE_CODEC);
        //客户端,主要用来判断是不是写空闲时间过长,来进行发送心跳,表示当前用户并没有断开
        // 3s 内如果没有收到 channel 的数据,会触发一个 IdleState#READER_IDLE 事件
        ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
        ch.pipeline().addLast(new ChannelDuplexHandler(){ // 使用一个双向处理器ChannelDuplexHandler 可以同时作为入站和出站处理器
            //用来捕获IdleStateHandler的事件
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                IdleStateEvent event = (IdleStateEvent) evt;
                //触发读事件
                if (event.state() == IdleState.WRITER_IDLE) {
                    log.debug("已经3s没有读到数据了!");
                    ctx.channel().writeAndFlush(new PingMessage());
                }
            }
        });
    }
}

效果:当服务端一直收到消息,那么就一直不会触发指定的事件,自然也不会关闭channel

image-20220114214048336


扩展

①指定用户下线(quitmember [username])

客户端:解析命令quitmember [username],封装成QuitMemberRequestMessage对象发出。

 System.out.println("quitmember [username]");

case "quitmember":
    ctx.writeAndFlush(new QuitMemberRequestMessage(split[1]));
    break;
  • 针对于断开连接,需要额外定义如下两个事情:
    1. 设置一个布尔变量EXIT(AtomicBoolean),用于进行线程通信表示断开连接,在三个控制台等待输入阻塞事件下分别来对其EXIT判断是否断开连接!
    2. 重写channelInactive、exceptionCaught事件,一个是断开连接会触发的事件,另一个是出现异常捕捉到的事件。一旦捕获就设置EXIT为true!
AtomicBoolean EXIT = new AtomicBoolean(false);//检测断开连接变量

ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter(){
    // 在连接断开时触发
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.debug("连接已断开,请按任意键退出...");
        EXIT.set(true);
    }

    // 在出现异常时触发
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.debug("连接已断开,请按任意键退出...,异常信息:{}",cause.getMessage());
        EXIT.set(true);
    }
    
    @Override
    public void channelActive(ChannelHandlerContext ctx){
        //负责接收用户在控制台上的输入,负责向服务器发送数据
        new Thread(()->{
            //...
            System.out.println("请输入用户名:");
            String username = scanner.nextLine();
            if (EXIT.get()){
                return;
            }
            System.out.println("请输入密码:");
            String password = scanner.nextLine();
            if (EXIT.get()){
                return;
            }
            
            //while(true)中等待输入命令下添加判断
            String command = scanner.nextLine();
            if (EXIT.get()){
                return;
            }
            //...
    }
}

服务端:编写针对于对QuitMemberRequestMessage感兴趣的handler,来取出指定下线的username,接着从sessionFactory中取到channel,进行手动断开即可!

import com.changlu.message.QuitMemberRequestMessage;
import com.changlu.message.QuitMemberResponseMessage;
import com.changlu.server.session.SessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @ClassName QuitMemberRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/14 21:48
 * @Description 指定用户下线执行器:【quitmember [username]】
 */
@ChannelHandler.Sharable
public class QuitMemberRequestMessageHandler extends SimpleChannelInboundHandler<QuitMemberRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, QuitMemberRequestMessage msg) throws Exception {
        String username = msg.getUsername();
        Channel channel = SessionFactory.getSession().getChannel(username);
        if (channel == null) {
            ctx.writeAndFlush(new QuitMemberResponseMessage(true, "该用户不在线!"));
        } else {
            //取消绑定以及强制关闭
            SessionFactory.getSession().unbind(channel);
            channel.close();
            ctx.writeAndFlush(new QuitMemberResponseMessage(true, "该用户已强制下线!"));
        }
    }
}
QuitMemberRequestMessageHandler QUIT_MEMBER_HANDLER = new QuitMemberRequestMessageHandler();
ch.pipeline().addLast(QUIT_MEMBER_HANDLER);//强制下线用户handler

效果

image-20220114220434319

image-20220114220532851


整理者:长路 时间:2021.1.12-1.14

我是长路,感谢你的耐心阅读。如有问题请指出,我会积极采纳!
欢迎关注我的公众号【长路Java】,分享Java学习文章及相关资料
Q群:851968786 我们可以一起探讨学习
注明:转载可,需要附带上文章链接

这篇关于05、Netty学习笔记—(案例:聊天业务)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!