Netty 群聊案例
ChatServer
java
public class GroupChatServer {
private int port;
public GroupChatServer(int port) {
this.port = port;
}
public void run() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel)
throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//添加解码器、编码器和自定义处理器
pipeline.addLast("decoder",new StringDecoder())
.addLast("encoder",new StringEncoder())
.addLast(new GroupChatServerHandler());
}
});
System.out.println("netty 服务器启动");
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new GroupChatServer(7000).run();
}
}
ChatServerHandler
java
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
//定义一个 channelGroup,管理所有的channel
private static final ChannelGroup channels =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
//连接一旦建立就会执行该方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//通知其他所有用户
channels.writeAndFlush("[客户端]" + channel.remoteAddress()
+ "加入聊天" + "\n");
channels.add(channel);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channels.writeAndFlush("[客户端]" + channel.remoteAddress()
+ "离开聊天" + "\n");
}
// 处于活动状态 提示 xxx 上线
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + "上线了");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + "下线了");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg)
throws Exception {
Channel channel = ctx.channel();
channels.forEach(ch -> {
if (channel != ch) {
ch.writeAndFlush("[客户端]" + channel.remoteAddress()
+ "发送了消息:" + msg + "\n");
}else {
ch.writeAndFlush("[自己]发送了消息:" + msg + "\n");
}
});
}
}
ChatClient
java
public class GroupChatClient {
private final String host;
private final int port;
public GroupChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() throws InterruptedException {
EventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast("decoder",new StringDecoder())
.addLast("encoder",new StringEncoder())
.addLast(new GroupChatClientHandler());
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
//客户端输入信息 扫描器
Channel channel = future.channel();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
channel.writeAndFlush(msg + "\n");
}
}finally {
eventExecutors.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new GroupChatClient("127.0.0.1", 7000).run();
}
}
ChatClientHandler
java
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s)
throws Exception {
System.out.println(s.trim());
}
}
结果验证
- 客户端1
- 客户端2
- 客户端3