Netty 通信框架 笔记01

Netty 笔记

简介

Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty是一个基于NIO的客户、服务器端编程框架,使用Netty可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用 。

BIO、NIO、AIO

  • 阻塞与非阻塞:

主要指的是访问 IO的线程是否会阻塞 (或者说是等待)线程访问资源 , 该资源是否准备就绪的一种处理方式 。

  • 同步和异步

主要是指的数据的请求方式

同步和异步是指访问数据的一种机制

  • BIO

同步阻塞 IO , Block IO , IO操作时会阻塞线程 , 并发处理能力低 。

Socket编程就是 BIO ,一个 socket连接一个处理线程(这个线程负责这个 Socket连接的一系列数据传 输 操 作 )。 阻 塞 的 原 因 在 于 : 操 作 系 统 允 许 的 线 程 数 量 是 有 限 的 , 多 个socket申请与服务端建立连接时,服务端不能提供相应数量的处理线程,没有分配到处理线程的连接就会阻塞等待或被拒绝。

BIO.png

  • NIO

同步非阻塞 IO , None-Block IO。 NIO是对 BIO的 改 进 ,基 于 Reactor模 型 。一 个 socket连接只有在特点时候才会发生数据传输 IO操作,大部分时间这个“ 数据通道”是空闲的,但还是占用着线程。 NIO作 出 的 改 进 就 是 “ 一 个 请 求 一 个 线 程 ”, 在 连 接 到
服务端的众多 socket 中,只有需要进行 IO操作的才能获取服务端的处理线程进行 IO 。这样就不会因为线程不够用而限制了 socket 的接入。

NIO.png

  • AIO (NIO 2.0)

异步非阻塞 IO ,这种 IO模型是由操作系统先完成了客户端请求处理再通知服务器去启动线程进行处理 。 AIO也称 NIO2.0 , 在 JDK7开始支持

Netty Reactor 模型 - 单线程模型、多线程模型、主从多线程模型

单线程模型

用户发起 IO请求到 Reactor线程,Ractor线程将 用户的 IO请求放入到通道 , 然后再进行后续处理

处理完成后 ,Reactor线程重新获得控制权 , 继续其他客户端的处理

这种模型一个时间点只有一个任务在执行 , 这个任务执行完了 , 再去执行下一个任务 。

单线程模型.png

  • 特征

    • 但单线程的 Reactor模型每一个用户事件都在一个线程中执行:
    • 性能有极限,不能处理成百上千的事件
    • 当负荷达到一定程度时,性能将会下降
    • 某一个事件处理器 发生 故障,不能继续处理其他事件

多线程模型

Reactor多线程模型是由一组 NIO 线程来处理 IO操作 ( 之 前 是 单 个 线 程 ), 所 以 在 请 求 处 理 上 会 比 上 一 中 模 型 效 率更高,可以处理更多的客户端请求。这种模式 使用 多个线程执行多个任务 , 任务可以同时执行

多线程模型.png

  • 不足

    • 但是如果并发仍然很大 , Reactor 仍然无法处理大量的客户端请求

主从多线程模型

这种线程模型是 Netty 推荐使用的线程模型

这种模型适用于 高并发场景 , 一组线程池接收请求 , 一组线程池处理 IO 。

  • 主从线程池模型.png

Let's do it ! -- websocket简单聊天室

  • pom.xml依赖

     <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.25.Final</version>
     </dependency>
  • Netty_server类

    public class WebSocketNettyServer {
        public static void main(String[] args) {
            // 创建两个线程池
            NioEventLoopGroup mainGrp = new NioEventLoopGroup(); // 主线程池
            NioEventLoopGroup subGrp = new NioEventLoopGroup(); // 从线程池
    
            try {
                // 创建Netty服务器启动对象
                ServerBootstrap serverBootstrap = new ServerBootstrap();
    
                // 初始化服务器启动对象
                serverBootstrap
                        // 指定使用上面创建的两个线程池
                        .group(mainGrp, subGrp)
                        // 指定Netty通道类型
                        .channel(NioServerSocketChannel.class)
                        // 指定通道初始化器用来加载当Channel收到事件消息后,
                        // 如何进行业务处理
                        .childHandler(new WebSocketChannelInitializer());
    
                // 绑定服务器端口,以同步的方式启动服务器
                ChannelFuture future = serverBootstrap.bind(9090).sync();
                // 等待服务器关闭
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 优雅关闭服务器
                mainGrp.shutdownGracefully();
                subGrp.shutdownGracefully();
            }
    
        }
    }
  • 通道初始化器

    /**
     * 通道初始化器
     * 用来加载通道处理器(ChannelHandler)
     */
    public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
    
        // 初始化通道
        // 在这个方法中去加载对应的ChannelHandler
        protected void initChannel(SocketChannel ch) throws Exception {
            // 获取管道,将一个一个的ChannelHandler添加到管道中
            ChannelPipeline pipeline = ch.pipeline();
    
            // 添加一个http的编解码器
            pipeline.addLast(new HttpServerCodec());
            // 添加一个用于支持大数据流的支持
            pipeline.addLast(new ChunkedWriteHandler());
            // 添加一个聚合器,这个聚合器主要是将HttpMessage聚合成FullHttpRequest/Response
            pipeline.addLast(new HttpObjectAggregator(1024 * 64));
    
            // 需要指定接收请求的路由
            // 必须使用以ws后缀结尾的url才能访问
            pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
    
            // 添加自定义的Handler
            pipeline.addLast(new ChatHandler());
        }
    }
  • 处理消息的Handler

    public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    
        // 用来保存所有的客户端连接
        private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd hh:MM");
    
        // 当Channel中有新的事件消息会自动调用
        protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
            // 当接收到数据后会自动调用
    
            // 获取客户端发送过来的文本消息
            String text = msg.text();
            System.out.println("接收到消息数据为:" + text);
    
            for (Channel client : clients) {
                // 将消息发送到所有的客户端
                client.writeAndFlush(new TextWebSocketFrame(sdf.format(new Date()) + ":" + text));
            }
        }
    
        // 当有新的客户端连接服务器之后,会自动调用这个方法
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            // 将新的通道加入到clients
            clients.add(ctx.channel());
        }
    }
  • 前端简单页面

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>在线聊天室</title>
    </head>
    <body>
        <input type="text" id="message">
        <input type="button" value="发送消息" onclick="sendMsg()">
    
        接收到的消息:
        <p id="server_message" style="background-color: #AAAAAA"></p>
    
        <script>
    
            var websocket = null;
    
            // 判断当前浏览器是否支持websocket
            if(window.WebSocket) {
                websocket = new WebSocket("ws://127.0.0.1:9090/ws");
    
                websocket.onopen = function() {
                    console.log("建立连接.");
                }
                websocket.onclose = function() {
                    console.log("断开连接");
                }
                websocket.onmessage = function(e) {
                    console.log("接收到服务器消息:" + e.data);
                    var server_message = document.getElementById("server_message");
                    server_message.innerHTML += e.data + "<br/>";
                }
            }
            else {
                alert("当前浏览器不支持web socket");
            }
    
            function sendMsg() {
                var message = document.getElementById("message");
                websocket.send(message.value);
            }
        </script>
    </body>
    </html>
  • 运行 Server类即可

基于Netty 开发的 一款简单版聊天APP (安卓端)

待续...

Last modification:December 2nd, 2019 at 04:05 pm
如果觉得我的文章对你有用,请随意赞赏

Leave a Comment