Fellow Travellers

netty简介

于佳鑫
字数统计: 4.3k阅读时长: 21 min
2018/12/24 Share

1、Netty简介

Netty是一个 异步 事件驱动 的网络应用程序框架 用于快速开发可维护的高性能协议服务器和客户端。

  • 异步:当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。

  • 事件驱动:事件驱动模型是基于发布-订阅模式的编程模型。(下面的图rabbitMQ是不是有点像)

    20171118110955032

  • SEDA:SEDA(Staged Event-Driven Architecture)的核心思想是把一个请求处理过程分成几个Stage,不同资源消耗的Stage使用不同数量的线程来处理,Stage间使用事件驱动的异步通信模式。

Netty是一个NIO客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化了TCP和UDP套接字服务器等网络编程。

“快速简便”并不意味着最终的应用程序会受到可维护性或性能问题的影响。Netty经过精心设计,具有丰富的协议,如FTP,SMTP,HTTP以及各种二进制和基于文本的传统协议。因此,Netty成功地找到了一种在不妥协的情况下实现易于开发,性能,稳定性和灵活性的方法。

2、Netty的使用

2.1 Netty实现Http服务端

TestServer.java

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * netty Http服务端
 */
public class TestServer {

    public static void main(String[] args) {

        // 创建两个线程组(死循环)
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();  // 获取连接,连接获取到之后,交给下面的去处理
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // 简化服务端启动
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 定义子处理器,当连接创建之后会自动调用初始化器
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new TestServerInitializer()); // 子处理器(对请求进行处理)

            // 绑定端口号
            //  - sync:同步
            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();

            channelFuture.channel().closeFuture().sync();
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            // 采用优雅关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

}

TestServerInitializer.java

import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpServerCodec;

/**
 * 初始化器
 *  当连接创建之后会自动调用初始化器
 */
public class TestServerInitializer extends ChannelInitializer {

    // 初始化管道,它是一个回调方法
    @Override
    protected void initChannel(Channel ch) throws Exception {
        // 管道,管道中可以有多个处理器
        ChannelPipeline pipeline = ch.pipeline();

        // 增加到最后
        pipeline.addLast("httpServerCodec",new HttpServerCodec());  // HttpServerCodec:将请求进行编解码(不能使用单例)
        // 将我们规格写的处理器,添加到pipline
        pipeline.addLast("testHttpServerHandler",new TestHttpServerHandler());

    }
}

TestHttpServerHandler.java

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;

import java.net.URI;

/**
 * 自定义处理器
 *  对于http请求的处理器,范型写HttpObject
 *
 *  Inbound:进来(进来的请求)
 *  Outbound:出来(返回的响应)
 */
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject>{

    // 读取客户端请求,并向客户端发送响应的方法
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        if (msg instanceof HttpRequest) {

            HttpRequest httpRequest = (HttpRequest) msg;
            System.out.println("请求方法名为:" + (httpRequest.method().name()));

            URI uri = new URI(httpRequest.uri());
            if ("/favicon.ico".equals(uri.getPath())){
                System.out.println("请求favicon.ico");
                return;
            }

            ByteBuf content = Unpooled.copiedBuffer("hello,world!", CharsetUtil.UTF_8);
            // 设置响应
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
            response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH,content.readableBytes());
            // 如果使用write()方法不会立即响应,会放进缓冲区内
            ctx.writeAndFlush(response);

//            ctx.close();
        }
    }

    // 其他事件回调方法们

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel active");
        super.channelActive(ctx);
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel registered");
        super.channelRegistered(ctx);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handler added");
        super.handlerAdded(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel inactive");
        super.channelInactive(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel unregistered");
        super.channelUnregistered(ctx);
    }
}

2.2 Netty实现socket连接

2.2.1 服务端

MyServer.java

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * netty socket服务端
 */
public class MyServer {

    public static void main(String[] args) {
        // 创建两个线程组(死循环)
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();  // 获取连接,连接获取到之后,交给下面的去处理
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // 简化服务端启动
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 定义子处理器,当连接创建之后会自动调用初始化器
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new MyServerInitializer()); // 子处理器(对请求进行处理)

            // 绑定端口号
            //  - sync:同步
            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            // 采用优雅关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

}

MyServerInitializer.java

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        // 解码器(将2进制的数据解析成真正携带的东西)
        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
        // 编码器
        pipeline.addLast(new LengthFieldPrepender(4));
        // 字符集编解/码器
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        // 我们自己的处理器
        pipeline.addLast(new MyServerHandler());
    }
}

MyServerHandler.java

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.UUID;

/**
 * 自定义处理器
 *   范型(String):客户端与服务端相互传输的数据类型
 */
public class MyServerHandler extends SimpleChannelInboundHandler<String> {

    /**
     * 读取客户端请求,并向客户端发送响应的方法
     * @param ctx netty上下文
     * @param msg 客户端请求对象
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // 打印ip:port
        System.out.println(ctx.channel().remoteAddress() + "," + msg);

        // 返回一个异步的对象(ChannelFuture)
        ctx.channel().writeAndFlush("from server " + UUID.randomUUID());
    }


    /**
     * 当出现异常时,做什么
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

2.2.2 客户端

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * netty socket客户端
 */
public class MyClient {

    public static void main(String[] args) {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {

            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) // 反射创建NioSocketChannel对象
                    .handler(new MyClientInitializer());    // 客户端一般只使用headler

            // 连接服务端
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }

}

MyClientInitializer.java

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        // 解码器(将2进制的数据解析成真正携带的东西)
        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
        // 编码器
        pipeline.addLast(new LengthFieldPrepender(4));
        // 字符集编解/码器
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        // 我们自己的处理器
        pipeline.addLast(new MyClientHandler());
    }
}

MyClientHandler.java

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.time.LocalDateTime;

public class MyClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // 打印服务器端 ip:port
        System.out.println(ctx.channel().remoteAddress() + "," + msg);

        // 返回一个异步的对象(ChannelFuture)
        ctx.channel().writeAndFlush("from client " + LocalDateTime.now());
    }


    /**
     * 当出现异常时,做什么
     * @param ctx
     * @param cause 异常
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }


    /**
     * 通道已经活动了,已经建立了连接,可以在这发信息
     * @param ctx
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush("client 第一次发消息");
    }
}

2.2.3 实现简单聊天室

MyChatServer.java

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * netty socket服务端(聊天室demo)
 */
public class MyChatServer {

    public static void main(String[] args) {
        // 创建两个线程组(死循环)
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();  // 获取连接,连接获取到之后,交给下面的去处理
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // 简化服务端启动
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 定义子处理器,当连接创建之后会自动调用初始化器
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new MyChatServerInitializer()); // 子处理器(对请求进行处理)

            // 绑定端口号
            //  - sync:同步
            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            // 采用优雅关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

}

MyChatServerInitializer

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class MyChatServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

        ChannelPipeline pipeline = ch.pipeline();
        // 根据分隔符解码   ABC\nDEF\r\n ==> ABC | DEF
        pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));

        // 字符集编解/码器
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));

        // 我们自己的处理器
        pipeline.addLast(new MyChatServerHandler());
    }
}

MyChatServerHandler

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

// 怎么实现?netty提供的各种回调
// 连接建立的时候,广播

// 接收消息的时候,广播

// 下线的时候,广播
public class MyChatServerHandler extends SimpleChannelInboundHandler<String> {

    // ChannelGroup继承了Set,不会重复
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        Channel channel = ctx.channel();

        channelGroup.forEach(x -> {
            if (x != channel){
                x.writeAndFlush(channel.remoteAddress() + "发送的消息为:" + msg + "\n");
            } else {
                x.writeAndFlush("[自己] " + msg + "\n");
            }
        });


    }

    /**
     * 客户端连接刚刚建立时
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();

        // 广播到组内到所有到channel
        channelGroup.writeAndFlush("[服务器] - " + channel.remoteAddress() + "加入\n");
        // 将新连接的channel加入
        channelGroup.add(channel);
    }

    /**
     * 客户端断开连接时调用
     *  netty会自动将断开的channel从channelGroup移除
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // 获取断开连接的客户端的channel对象
        Channel channel = ctx.channel();
        // 广播到组内到所有到channel
        channelGroup.writeAndFlush("[服务器] - " + channel.remoteAddress() + "离开\n");
    }


    /**
     * 连接活动的时候(可能是事件驱动模型)
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + " 上线");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + " 下线");
    }

    /**
     * 出现异常时调用
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

MyChatClient

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.BufferedReader;
import java.io.InputStreamReader;

public class MyChatClient {

    public static void main(String[] args) {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {

            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) // 反射创建NioSocketChannel对象
                    .handler(new MyChatClientInitializer());    // 客户端一般只使用headler

            // 连接服务端
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
//            channelFuture.channel().closeFuture().sync();
            Channel channel = channelFuture.channel();
            BufferedReader br = new BufferedReader(new InputStreamReader(System.in));

            // 死循环,一直监听用户输入
            while (true){
                channel.writeAndFlush(br.readLine()+"\r\n");
            }

        }catch (Exception e){
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

MyChatClientInitializer

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class MyChatClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

        ChannelPipeline pipeline = ch.pipeline();
        // 根据分隔符解码   ABC\nDEF\r\n ==> ABC | DEF
        pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));

        // 字符集编解/码器
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));

        // 我们自己的处理器
        pipeline.addLast(new MyChatClientHandler());
    }
}

MyChatClientHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class MyChatClientHandler extends SimpleChannelInboundHandler<String> {

    /**
     * 打印服务端传来的数据
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg);
    }
}

2.3 心跳检测实现(具体怎么用,不知道)

集群之间(互相)检测服务器是否存活;避免网络中断的机制。

MyServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class MyServer {

    public static void main(String[] args) {
        // 创建两个线程组(死循环)
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();  // 获取连接,连接获取到之后,交给下面的去处理
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // 简化服务端启动
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 定义子处理器,当连接创建之后会自动调用初始化器
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO)) // handler是对于bossGroup施加的
                    .childHandler(new MyServerInitializer()); // 子处理器(对请求进行处理),childHandler是对于workerGroup施加的

            // 绑定端口号
            //  - sync:同步
            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            // 采用优雅关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

MyServerInitializer

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        // 空闲检测Handler,(handler们是一种 ==> 责任链模式)
        pipeline.addLast(new IdleStateHandler(5,7,10, TimeUnit.SECONDS));  // 5s没有读 --> 触发一个事件,7s没有写 --> 触发一个事件,10s没有读写 --> 触发一个事件
        // 我们自己的处理器,对空闲检测进行处理
        pipeline.addLast(new MyServerHandler());
    }
}

MyServerHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;

/**
 * 自定义处理器
 *  我们继承一个适配器(SimpleChannelInboundHandler的父类)
 */
public class MyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 触发了某一个 事件 后,会调用
     * @param ctx 上下文对象
     * @param evt 事件对象
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;

            String eventType = null;
            switch (event.state()){
                case READER_IDLE:
                    eventType = "读空闲";
                    break;
                case WRITER_IDLE:
                    eventType = "写空闲";
                    break;
                case ALL_IDLE:
                    eventType = "读写空闲";
                    break;
            }

            System.out.println(ctx.channel().remoteAddress() + "的超时事件为:" + eventType);
            ctx.channel().close();
        }

        //super.userEventTriggered(ctx, evt); // 默认触发之后,会转发到下一个handler对象中进行处理
    }
}

2.4 实现websocket简易聊天室

MyWebServiceServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * websocket服务端
 */
public class MyWebServiceServer {

    public static void main(String[] args) {
        // 创建两个线程组(死循环)
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();  // 获取连接,连接获取到之后,交给下面的去处理
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // 简化服务端启动
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 定义子处理器,当连接创建之后会自动调用初始化器
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO)) // handler是对于bossGroup施加的
                    .childHandler(new WebSocketChannelInitializer()); // 子处理器(对请求进行处理),childHandler是对于workerGroup施加的

            // 绑定端口号
            //  - sync:同步
            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            // 采用优雅关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

WebSocketChannelInitializer

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        // http处理器
        pipeline.addLast(new HttpServerCodec());
        // 以块的方式进行写的处理器
        pipeline.addLast(new ChunkedWriteHandler());
        // 对http对象进行聚合的处理器,  netty传输时,会将一个请求或响应分成段或者块
        // HttpObjectAggregator作用:将10个段  ==》 聚合成一个完整的FullHttprequest或FullHttpResponse
        // 如果超出指定长度,则会调用handleOversizedMessage
        pipeline.addLast(new HttpObjectAggregator(8192));
        // 处理websocket,这个处理程序为您运行websocket服务器做了所有繁重的工作,它负责websocket握手以及控制帧(frame)的处理(Close,Ping,Pong)。
        // 数据 帧 将传递给管道中的下一个处理程序(由您实现)进行处理。
        // websocket定义了6种帧:binery帧 二进制帧,text帧  文本帧  ping帧  发心跳  close帧   关闭
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));

        pipeline.addLast(new WebSocketFrameHandler());
    }
}

QQ20181210-0

WebSocketFrameHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import java.time.LocalDateTime;

/**
 * 处理text帧的处理器
 */
public class WebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        System.out.println("收到消息:" + msg.text());

        ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间:"+ LocalDateTime.now()));
    }

    /**
     * 连接建立时的回调方法
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

        // 获取channal的全局唯一id
        System.out.println("handlerAdded:" + ctx.channel().id().asLongText());
    }

    /**
     * 连接关闭时调用
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerRemoved:" + ctx.channel().id().asLongText());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("异常发生");
        ctx.close();    // 关闭连接
    }
}

test.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>WebSocket客户端</title>
</head>
<body>

<script type="text/javascript">

    var socket;

    if(window.WebSocket) {
        socket = new WebSocket("ws://localhost:8899/ws");

        socket.onmessage = function(event) {
            var ta = document.getElementById("responseText");
            ta.value = ta.value + "\n" + event.data;
        }

        socket.onopen = function(event) {
            var ta = document.getElementById("responseText");
            ta.value = "连接开启!";
        }

        socket.onclose = function(event) {
            var ta = document.getElementById("responseText");
            ta.value = ta.value + "\n" + "连接关闭!";
        }
    } else {
        alert('浏览器不支持WebSocket!')
    }

    function send(message) {
        if(!window.WebSocket) {
            return;
        }

        if(socket.readyState == WebSocket.OPEN) {
            socket.send(message);
        } else {
            alert("连接尚未开启!");
        }
    }
</script>&lt; &rt;


<form onsubmit="return false;">

    <textarea name="message" style="width: 400px; height: 200px"></textarea>

    <input type="button" value="发送数据" onclick="send(this.form.message.value)">

    <h3>服务端输出:</h3>

    <textarea id="responseText" style="width: 400px; height: 300px;"></textarea>

    <input type="button" onclick="javascript: document.getElementById('responseText').value=''" value="清空内容">

</form>



</body>
</html>

问题:

将客户端网络断掉,服务器端并不会知道。

解决办法:使用心跳(当然我还不会用)

CATALOG
  1. 1. 1、Netty简介
  2. 2. 2、Netty的使用
    1. 2.1. 2.1 Netty实现Http服务端
    2. 2.2. 2.2 Netty实现socket连接
      1. 2.2.1. 2.2.1 服务端
      2. 2.2.2. 2.2.2 客户端
      3. 2.2.3. 2.2.3 实现简单聊天室
    3. 2.3. 2.3 心跳检测实现(具体怎么用,不知道)
    4. 2.4. 2.4 实现websocket简易聊天室
      1. 2.4.1. 问题: