1、Netty简介
Netty是一个 异步 事件驱动 的网络应用程序框架 用于快速开发可维护的高性能协议服务器和客户端。
异步:当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。
事件驱动:事件驱动模型是基于发布-订阅模式的编程模型。(下面的图rabbitMQ是不是有点像)
SEDA:SEDA(Staged Event-Driven Architecture)的核心思想是把一个请求处理过程分成几个Stage,不同资源消耗的Stage使用不同数量的线程来处理,Stage间使用事件驱动的异步通信模式。
Netty是一个NIO客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化了TCP和UDP套接字服务器等网络编程。
“快速简便”并不意味着最终的应用程序会受到可维护性或性能问题的影响。Netty经过精心设计,具有丰富的协议,如FTP,SMTP,HTTP以及各种二进制和基于文本的传统协议。因此,Netty成功地找到了一种在不妥协的情况下实现易于开发,性能,稳定性和灵活性的方法。
2、Netty的使用
2.1 Netty实现Http服务端
TestServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* netty Http服务端
*/
public class TestServer {
public static void main(String[] args) {
// 创建两个线程组(死循环)
NioEventLoopGroup bossGroup = new NioEventLoopGroup(); // 获取连接,连接获取到之后,交给下面的去处理
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 简化服务端启动
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 定义子处理器,当连接创建之后会自动调用初始化器
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new TestServerInitializer()); // 子处理器(对请求进行处理)
// 绑定端口号
// - sync:同步
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
// 采用优雅关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
TestServerInitializer.java
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpServerCodec;
/**
* 初始化器
* 当连接创建之后会自动调用初始化器
*/
public class TestServerInitializer extends ChannelInitializer {
// 初始化管道,它是一个回调方法
@Override
protected void initChannel(Channel ch) throws Exception {
// 管道,管道中可以有多个处理器
ChannelPipeline pipeline = ch.pipeline();
// 增加到最后
pipeline.addLast("httpServerCodec",new HttpServerCodec()); // HttpServerCodec:将请求进行编解码(不能使用单例)
// 将我们规格写的处理器,添加到pipline
pipeline.addLast("testHttpServerHandler",new TestHttpServerHandler());
}
}
TestHttpServerHandler.java
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import java.net.URI;
/**
* 自定义处理器
* 对于http请求的处理器,范型写HttpObject
*
* Inbound:进来(进来的请求)
* Outbound:出来(返回的响应)
*/
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject>{
// 读取客户端请求,并向客户端发送响应的方法
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (msg instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) msg;
System.out.println("请求方法名为:" + (httpRequest.method().name()));
URI uri = new URI(httpRequest.uri());
if ("/favicon.ico".equals(uri.getPath())){
System.out.println("请求favicon.ico");
return;
}
ByteBuf content = Unpooled.copiedBuffer("hello,world!", CharsetUtil.UTF_8);
// 设置响应
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH,content.readableBytes());
// 如果使用write()方法不会立即响应,会放进缓冲区内
ctx.writeAndFlush(response);
// ctx.close();
}
}
// 其他事件回调方法们
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel active");
super.channelActive(ctx);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel registered");
super.channelRegistered(ctx);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handler added");
super.handlerAdded(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel inactive");
super.channelInactive(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel unregistered");
super.channelUnregistered(ctx);
}
}
2.2 Netty实现socket连接
2.2.1 服务端
MyServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* netty socket服务端
*/
public class MyServer {
public static void main(String[] args) {
// 创建两个线程组(死循环)
NioEventLoopGroup bossGroup = new NioEventLoopGroup(); // 获取连接,连接获取到之后,交给下面的去处理
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 简化服务端启动
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 定义子处理器,当连接创建之后会自动调用初始化器
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new MyServerInitializer()); // 子处理器(对请求进行处理)
// 绑定端口号
// - sync:同步
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
// 采用优雅关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
MyServerInitializer.java
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 解码器(将2进制的数据解析成真正携带的东西)
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
// 编码器
pipeline.addLast(new LengthFieldPrepender(4));
// 字符集编解/码器
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
// 我们自己的处理器
pipeline.addLast(new MyServerHandler());
}
}
MyServerHandler.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.UUID;
/**
* 自定义处理器
* 范型(String):客户端与服务端相互传输的数据类型
*/
public class MyServerHandler extends SimpleChannelInboundHandler<String> {
/**
* 读取客户端请求,并向客户端发送响应的方法
* @param ctx netty上下文
* @param msg 客户端请求对象
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 打印ip:port
System.out.println(ctx.channel().remoteAddress() + "," + msg);
// 返回一个异步的对象(ChannelFuture)
ctx.channel().writeAndFlush("from server " + UUID.randomUUID());
}
/**
* 当出现异常时,做什么
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
2.2.2 客户端
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* netty socket客户端
*/
public class MyClient {
public static void main(String[] args) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) // 反射创建NioSocketChannel对象
.handler(new MyClientInitializer()); // 客户端一般只使用headler
// 连接服务端
ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
MyClientInitializer.java
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 解码器(将2进制的数据解析成真正携带的东西)
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
// 编码器
pipeline.addLast(new LengthFieldPrepender(4));
// 字符集编解/码器
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
// 我们自己的处理器
pipeline.addLast(new MyClientHandler());
}
}
MyClientHandler.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.time.LocalDateTime;
public class MyClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 打印服务器端 ip:port
System.out.println(ctx.channel().remoteAddress() + "," + msg);
// 返回一个异步的对象(ChannelFuture)
ctx.channel().writeAndFlush("from client " + LocalDateTime.now());
}
/**
* 当出现异常时,做什么
* @param ctx
* @param cause 异常
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/**
* 通道已经活动了,已经建立了连接,可以在这发信息
* @param ctx
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("client 第一次发消息");
}
}
2.2.3 实现简单聊天室
MyChatServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* netty socket服务端(聊天室demo)
*/
public class MyChatServer {
public static void main(String[] args) {
// 创建两个线程组(死循环)
NioEventLoopGroup bossGroup = new NioEventLoopGroup(); // 获取连接,连接获取到之后,交给下面的去处理
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 简化服务端启动
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 定义子处理器,当连接创建之后会自动调用初始化器
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new MyChatServerInitializer()); // 子处理器(对请求进行处理)
// 绑定端口号
// - sync:同步
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
// 采用优雅关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
MyChatServerInitializer
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
public class MyChatServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 根据分隔符解码 ABC\nDEF\r\n ==> ABC | DEF
pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
// 字符集编解/码器
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
// 我们自己的处理器
pipeline.addLast(new MyChatServerHandler());
}
}
MyChatServerHandler
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
// 怎么实现?netty提供的各种回调
// 连接建立的时候,广播
// 接收消息的时候,广播
// 下线的时候,广播
public class MyChatServerHandler extends SimpleChannelInboundHandler<String> {
// ChannelGroup继承了Set,不会重复
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Channel channel = ctx.channel();
channelGroup.forEach(x -> {
if (x != channel){
x.writeAndFlush(channel.remoteAddress() + "发送的消息为:" + msg + "\n");
} else {
x.writeAndFlush("[自己] " + msg + "\n");
}
});
}
/**
* 客户端连接刚刚建立时
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
// 广播到组内到所有到channel
channelGroup.writeAndFlush("[服务器] - " + channel.remoteAddress() + "加入\n");
// 将新连接的channel加入
channelGroup.add(channel);
}
/**
* 客户端断开连接时调用
* netty会自动将断开的channel从channelGroup移除
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 获取断开连接的客户端的channel对象
Channel channel = ctx.channel();
// 广播到组内到所有到channel
channelGroup.writeAndFlush("[服务器] - " + channel.remoteAddress() + "离开\n");
}
/**
* 连接活动的时候(可能是事件驱动模型)
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + " 上线");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + " 下线");
}
/**
* 出现异常时调用
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
MyChatClient
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.BufferedReader;
import java.io.InputStreamReader;
public class MyChatClient {
public static void main(String[] args) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) // 反射创建NioSocketChannel对象
.handler(new MyChatClientInitializer()); // 客户端一般只使用headler
// 连接服务端
ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
// channelFuture.channel().closeFuture().sync();
Channel channel = channelFuture.channel();
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
// 死循环,一直监听用户输入
while (true){
channel.writeAndFlush(br.readLine()+"\r\n");
}
}catch (Exception e){
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
MyChatClientInitializer
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
public class MyChatClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 根据分隔符解码 ABC\nDEF\r\n ==> ABC | DEF
pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
// 字符集编解/码器
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
// 我们自己的处理器
pipeline.addLast(new MyChatClientHandler());
}
}
MyChatClientHandler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class MyChatClientHandler extends SimpleChannelInboundHandler<String> {
/**
* 打印服务端传来的数据
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg);
}
}
2.3 心跳检测实现(具体怎么用,不知道)
集群之间(互相)检测服务器是否存活;避免网络中断的机制。
MyServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class MyServer {
public static void main(String[] args) {
// 创建两个线程组(死循环)
NioEventLoopGroup bossGroup = new NioEventLoopGroup(); // 获取连接,连接获取到之后,交给下面的去处理
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 简化服务端启动
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 定义子处理器,当连接创建之后会自动调用初始化器
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO)) // handler是对于bossGroup施加的
.childHandler(new MyServerInitializer()); // 子处理器(对请求进行处理),childHandler是对于workerGroup施加的
// 绑定端口号
// - sync:同步
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
// 采用优雅关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
MyServerInitializer
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 空闲检测Handler,(handler们是一种 ==> 责任链模式)
pipeline.addLast(new IdleStateHandler(5,7,10, TimeUnit.SECONDS)); // 5s没有读 --> 触发一个事件,7s没有写 --> 触发一个事件,10s没有读写 --> 触发一个事件
// 我们自己的处理器,对空闲检测进行处理
pipeline.addLast(new MyServerHandler());
}
}
MyServerHandler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
/**
* 自定义处理器
* 我们继承一个适配器(SimpleChannelInboundHandler的父类)
*/
public class MyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 触发了某一个 事件 后,会调用
* @param ctx 上下文对象
* @param evt 事件对象
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()){
case READER_IDLE:
eventType = "读空闲";
break;
case WRITER_IDLE:
eventType = "写空闲";
break;
case ALL_IDLE:
eventType = "读写空闲";
break;
}
System.out.println(ctx.channel().remoteAddress() + "的超时事件为:" + eventType);
ctx.channel().close();
}
//super.userEventTriggered(ctx, evt); // 默认触发之后,会转发到下一个handler对象中进行处理
}
}
2.4 实现websocket简易聊天室
MyWebServiceServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* websocket服务端
*/
public class MyWebServiceServer {
public static void main(String[] args) {
// 创建两个线程组(死循环)
NioEventLoopGroup bossGroup = new NioEventLoopGroup(); // 获取连接,连接获取到之后,交给下面的去处理
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 简化服务端启动
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 定义子处理器,当连接创建之后会自动调用初始化器
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO)) // handler是对于bossGroup施加的
.childHandler(new WebSocketChannelInitializer()); // 子处理器(对请求进行处理),childHandler是对于workerGroup施加的
// 绑定端口号
// - sync:同步
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
// 采用优雅关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
WebSocketChannelInitializer
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// http处理器
pipeline.addLast(new HttpServerCodec());
// 以块的方式进行写的处理器
pipeline.addLast(new ChunkedWriteHandler());
// 对http对象进行聚合的处理器, netty传输时,会将一个请求或响应分成段或者块
// HttpObjectAggregator作用:将10个段 ==》 聚合成一个完整的FullHttprequest或FullHttpResponse
// 如果超出指定长度,则会调用handleOversizedMessage
pipeline.addLast(new HttpObjectAggregator(8192));
// 处理websocket,这个处理程序为您运行websocket服务器做了所有繁重的工作,它负责websocket握手以及控制帧(frame)的处理(Close,Ping,Pong)。
// 数据 帧 将传递给管道中的下一个处理程序(由您实现)进行处理。
// websocket定义了6种帧:binery帧 二进制帧,text帧 文本帧 ping帧 发心跳 close帧 关闭
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new WebSocketFrameHandler());
}
}
WebSocketFrameHandler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.time.LocalDateTime;
/**
* 处理text帧的处理器
*/
public class WebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("收到消息:" + msg.text());
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间:"+ LocalDateTime.now()));
}
/**
* 连接建立时的回调方法
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 获取channal的全局唯一id
System.out.println("handlerAdded:" + ctx.channel().id().asLongText());
}
/**
* 连接关闭时调用
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved:" + ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常发生");
ctx.close(); // 关闭连接
}
}
test.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WebSocket客户端</title>
</head>
<body>
<script type="text/javascript">
var socket;
if(window.WebSocket) {
socket = new WebSocket("ws://localhost:8899/ws");
socket.onmessage = function(event) {
var ta = document.getElementById("responseText");
ta.value = ta.value + "\n" + event.data;
}
socket.onopen = function(event) {
var ta = document.getElementById("responseText");
ta.value = "连接开启!";
}
socket.onclose = function(event) {
var ta = document.getElementById("responseText");
ta.value = ta.value + "\n" + "连接关闭!";
}
} else {
alert('浏览器不支持WebSocket!')
}
function send(message) {
if(!window.WebSocket) {
return;
}
if(socket.readyState == WebSocket.OPEN) {
socket.send(message);
} else {
alert("连接尚未开启!");
}
}
</script>< &rt;
<form onsubmit="return false;">
<textarea name="message" style="width: 400px; height: 200px"></textarea>
<input type="button" value="发送数据" onclick="send(this.form.message.value)">
<h3>服务端输出:</h3>
<textarea id="responseText" style="width: 400px; height: 300px;"></textarea>
<input type="button" onclick="javascript: document.getElementById('responseText').value=''" value="清空内容">
</form>
</body>
</html>
问题:
将客户端网络断掉,服务器端并不会知道。
解决办法:使用心跳(当然我还不会用)