Netty学习(Netty入门)

概述

Netty是什么

在这里插入图片描述

Netty的地位

在这里插入图片描述

Netty的优势

在这里插入图片描述

HelloWorld

public class HelloClient {
    public static void main(String[] args) throws InterruptedException {
        // 1. 启动类
        new Bootstrap()
            // 2. 添加 EventLoop
            .group(new NioEventLoopGroup())
            // 3. 选择客户端 channel 实现
            .channel(NioSocketChannel.class)
            // 4. 添加处理器
            .handler(new ChannelInitializer<NioSocketChannel>() {
                @Override // 在连接建立后被调用
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new StringEncoder());
                }
            })
            // 5. 连接到服务器
            .connect(new InetSocketAddress("localhost", 8080))
            .sync()
            .channel()
            // 6. 向服务器发送数据
            .writeAndFlush("hello, world");
    }
}


public class HelloServer {
    public static void main(String[] args) {
        // 1. 启动器,负责组装 netty 组件,启动服务器
        new ServerBootstrap()
            // 2. BossEventLoop, WorkerEventLoop(selector,thread), group 组
            .group(new NioEventLoopGroup())
            // 3. 选择 服务器的 ServerSocketChannel 实现
            .channel(NioServerSocketChannel.class) // OIO BIO
            // 4. boss 负责处理连接 worker(child) 负责处理读写,决定了 worker(child) 能执行哪些操作(handler)
            .childHandler(
                    // 5. channel 代表和客户端进行数据读写的通道 Initializer 初始化,负责添加别的 handler
                new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    // 6. 添加具体 handler
                    ch.pipeline().addLast(new LoggingHandler());
                    ch.pipeline().addLast(new StringDecoder()); // 将 ByteBuf 转换为字符串
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { // 自定义 handler
                        @Override // 读事件
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            System.out.println(msg); // 打印上一步转换好的字符串
                        }
                    });
                }
            })
            // 7. 绑定监听端口
            .bind(8080);
    }
}

流程分析

在这里插入图片描述

正确理解

正确理解 Netty中各个组件的功能和职责
在这里插入图片描述

组件

EventLoop

在这里插入图片描述

普通任务和定时任务


@Slf4j
public class TestEventLoop {
    public static void main(String[] args) {
        // 1. 创建事件循环组
        EventLoopGroup group = new NioEventLoopGroup(2); // io 事件,普通任务,定时任务
//        EventLoopGroup group = new DefaultEventLoopGroup(); // 普通任务,定时任务
        // 2. 获取下一个事件循环对象
        System.out.println(group.next());
        System.out.println(group.next());
        System.out.println(group.next());
        System.out.println(group.next());

        // 3. 执行普通任务
        group.next().execute(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("ok");
        });

        // 4. 执行定时任务
        group.next().scheduleAtFixedRate(() -> {
            log.debug("ojbk");
        }, 0, 1, TimeUnit.SECONDS);

        log.debug("main");
    }
}

IO任务

Netty客户端是多线程程序,idea debug 默认断点模式为ALL,即会停止主线程以及守护线程,所以当客户端断点自定义Evaluate发送数据时,守护线程的发送数据Channel也被断点停止,所以无法发送数据

选择Thread只停止当前线程,守护线程仍然可以运行
在这里插入图片描述

一个客户端的NIO线程跟Channel建立链接就会建立一个绑定关系,后续客户端的Channel上的IO事件都由一个EventLoop处理,
客户端-Channel-EventLoop绑定关系

在这里插入图片描述

EventLoop的分工细化

第一次细分,Netty建议将EventLoop职责细分,分为boss和worker
group中传入两个EventLoop,那么boss只负责accept事件,worker负责read事件

在这里插入图片描述

上诉优化,worker中的NIOEventLoopGroup除了要负责SocketChannel的NIO连接操作还要负责连接后的读写操作,如果读写较长较重,那么会阻塞影响到worker其他的连接或读写操作,所以,
再次细分,EventLoop有两种实现,NIOEventLoopGroup 能处理IO事件普通任务和定时任务,DefaultEventLoopGroup只能处理普通任务和定时任务,将读写操作交给它去处理耗时较长的读写操作。

在这里插入图片描述

作为对比,第一个没有指定group,默认使用了worker的NIOEventLoopGroup来处理读写操作,而第二则使用了DefaultEventLoop来处理读写操作

切换线程

在这里插入图片描述

Channel

在这里插入图片描述

正确的链接建立:ChannelFuture

处理异步连接

由于连接的建立是耗时的,所以Channel必须等到连接建立完成再执行获取,否则是无效的

在这里插入图片描述
connect方法返回的ChannelFuture若没有阻塞等待连接,那么接下来获取到的Channel是没有建立好连接的Channel

在这里插入图片描述
如上两种方法异步等待NIO线程建立完毕

谁发起的调用谁等待链接结果

正确的链接关闭:CloseFuture的关闭

在这里插入图片描述

不能直接在主线程或其他线程中直接处理关闭操作,因为nioEventLoopGroup-2-1属于异步线程,此处close方法非阻塞,有可能在关闭操作还未完成就执行了关闭后操作

解决方法:
使用阻塞关闭方法,只有当channel真的关闭了才执行后面的方法

在这里插入图片描述
优雅的关闭:等待还未执行完的操作执行完后再关闭
在这里插入图片描述

为什么Netty是异步设计

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

Future & Promise

概述

在这里插入图片描述
在这里插入图片描述

Future

jdk中的Future

Future就是在线程之间传递结果的一个容器,是被动的获取结果,由执行完任务的线程给予的结果,没有暴露主动赋予结果的方法


@Slf4j
public class TestJdkFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.创建线程池
        ExecutorService service = Executors.newFixedThreadPool(2);
        //2.提交任务
        Future<Object> future = service.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                log.debug("执行计算");
                Thread.sleep(1000);
                return 50;
            }
        });
        //3.祝线程通过future获取结果,get是阻塞等待方法
        log.debug("等待结果");
        log.debug("结果{}",future.get());

    }
}

Netty 中的 Future

与jdk中的差不多,继承至jdk的Future,做了增强


@Slf4j
public class TestNettyFuture {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        NioEventLoopGroup group = new NioEventLoopGroup();
        EventLoop eventLoop = group.next();
        //提交任务
        Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.debug("执行计算");
                System.out.println("执行计算");
                Thread.sleep(1000);
                return 50;
            }
        });
        //通过future获取结果,get是阻塞等待方法
        log.debug("等待结果");
        log.debug("结果{}",future.get());

        //异步方式获取结果
        future.addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future) throws Exception {
                //getNow非阻塞等待 立即获取结果
                log.debug("结果{}",future.getNow());
            }
        });

    }
}

Promise

Promise又继承至Netty的Future,功能更强大,可以主动填充结果,对于网络通信非常有用

@Slf4j
public class TestNettyPromise {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 准备 EventLoop 对象
        EventLoop eventLoop = new NioEventLoopGroup().next();
        // 2. 可以主动创建 promise, 结果容器
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
        new Thread(() -> {
            // 3. 任意一个线程执行计算,计算完毕后向 promise 填充结果
            log.debug("开始计算...");
            try {
                int i = 1 / 0;
                Thread.sleep(1000);
                promise.setSuccess(80);
            } catch (Exception e) {
                e.printStackTrace();
                promise.setFailure(e);
            }

        }).start();
        // 4. 接收结果的线程
        log.debug("等待结果...");
        log.debug("结果是: {}", promise.get());
    }

}

Handler & Pipeline

Pipeline

在这里插入图片描述

Inbound

在这里插入图片描述
入栈是按入栈顺序出,出栈是按入栈顺序返出

channelRead 是一个调用链,如果中间没调用,那么后面的handler 则调用不到

Outbound

注意ctx.writeAndFlushch.writeAndFlush
ctx.writeAndFlush 是从当前调用的 handler 往后寻找 OutboundHandler,若之前没有执行到OutboundHandler 那么找不到OutboundHandler执行

ch.writeAndFlush则是从整个调用链的最前端 tail 处理开始往后寻找OutboundHandler
而且先执行调用链中的InboundHandler输入,中间的 OutboundHandler 被跳过不影响正常输入执行
在这里插入图片描述
如图ch.writeAndFlush 的调用执行流程
在这里插入图片描述

ByteBuffer

nettyByteBuf容量动态扩容,nettyByteBuffer 固定容量
在这里插入图片描述
netty 中 ByteBuffer 默认使用直接内存(系统内存、内存条)
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

例如 扩容 2 的整数倍 2^9=512 扩容至 2^10 =1024

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
tail 只能处理原始 ByteBuf 如果中途 ByteBuf 被转换成其他数据类型,则 tail 无法自动release

零拷贝 slice

slice 是 netty 中对于零拷贝的体现之一
在这里插入图片描述
在这里插入图片描述

切片后生成的对象,实际上还是操作原始bytebuf 的内容
在这里插入图片描述

使用习惯,切片自己增加引用计数,避免被其他调用者释放
在这里插入图片描述
在这里插入图片描述

component 组合零拷贝

writeBytes 会发生真正的数据复制,每次writeBytes都会发生数据复制
addComponents 是使逻辑上连续,没有发生复制

在这里插入图片描述

在这里插入图片描述

双向通信

实现一个 echo server

编写 server

new ServerBootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) {
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    ByteBuf buffer = (ByteBuf) msg;
                    System.out.println(buffer.toString(Charset.defaultCharset()));

                    // 建议使用 ctx.alloc() 创建 ByteBuf
                    ByteBuf response = ctx.alloc().buffer();
                    response.writeBytes(buffer);
                    ctx.writeAndFlush(response);

                    // 思考:需要释放 buffer 吗
                    // 思考:需要释放 response 吗
                }
            });
        }
    }).bind(8080);

编写 client

NioEventLoopGroup group = new NioEventLoopGroup();
Channel channel = new Bootstrap()
    .group(group)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) throws Exception {
            ch.pipeline().addLast(new StringEncoder());
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    ByteBuf buffer = (ByteBuf) msg;
                    System.out.println(buffer.toString(Charset.defaultCharset()));

                    // 思考:需要释放 buffer 吗
                }
            });
        }
    }).connect("127.0.0.1", 8080).sync().channel();

channel.closeFuture().addListener(future -> {
    group.shutdownGracefully();
});

new Thread(() -> {
    Scanner scanner = new Scanner(System.in);
    while (true) {
        String line = scanner.nextLine();
        if ("q".equals(line)) {
            channel.close();
            break;
        }
        channel.writeAndFlush(line);
    }
}).start();

💡 读和写的误解

我最初在认识上有这样的误区,认为只有在 netty,nio 这样的多路复用 IO 模型时,读写才不会相互阻塞,才可以实现高效的双向通信,但实际上,Java Socket 是全双工的:在任意时刻,线路上存在A 到 BB 到 A 的双向信号传输。即使是阻塞 IO,读和写是可以同时进行的,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读

例如

public class TestServer {
    public static void main(String[] args) throws IOException {
        ServerSocket ss = new ServerSocket(8888);
        Socket s = ss.accept();

        new Thread(() -> {
            try {
                BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
                while (true) {
                    System.out.println(reader.readLine());
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
                // 例如在这个位置加入 thread 级别断点,可以发现即使不写入数据,也不妨碍前面线程读取客户端数据
                for (int i = 0; i < 100; i++) {
                    writer.write(String.valueOf(i));
                    writer.newLine();
                    writer.flush();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

客户端

public class TestClient {
    public static void main(String[] args) throws IOException {
        Socket s = new Socket("localhost", 8888);

        new Thread(() -> {
            try {
                BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
                while (true) {
                    System.out.println(reader.readLine());
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
                for (int i = 0; i < 100; i++) {
                    writer.write(String.valueOf(i));
                    writer.newLine();
                    writer.flush();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/774764.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

python绘制领域矩形

问题描述&#xff1a; 使用python书写代码实现以下功能&#xff1a;给定四个点的坐标&#xff0c;调用一个函数&#xff0c;可以使原来的四个点分别向四周上下左右移动15距离&#xff0c;分别记录下移动后的坐标&#xff0c;然后画出内侧矩形和外侧矩形 代码&#xff1a; im…

配置并调试后端程序(sql)

1.环境准备 安装VS Code和Node.js插件&#xff1a;确保你已经安装了VS Code和Node.js插件。创建launch.json文件&#xff1a;在你的项目中创建一个.vscode文件夹&#xff0c;并在其中创建launch.json文件。添加以下内容&#xff1a; {"version": "0.2.0"…

【C语言】五子棋(c语言实现)

这里写目录标题 最终效果菜单打印函数棋盘的初始化和打印人人对战落子判空函数悔棋函数判胜负函数人人对战 人机对战一是将直接调用rand生成随机值&#xff0c;这就不可控二是根据棋子赢面来判断哪里落子最好 如果选择退出程序直接exit就行主函数调用逻辑源代码 最终效果 五子棋…

The Sandbox 人物化身每月奖励: 七月版来了!

人物化身的持有者可以从 The Sandbox 领取自己的队服&#xff01; 视频&#xff1a;https://youtu.be/tSo5FPL7DhE 我们又推出了人物化身所有者月度奖励&#xff01;在七月&#xff0c;我们将通过 The Sandbox 队服来弘扬体育竞技精神。穿上这些时尚的元宇宙队服&#xff0c;代…

深度报告 | 百度安全携手极越安全发布《整车安全渗透测试白皮书》

注重点&#xff0c;如何确保车辆全生命周期的安全已成为整个行业亟待解决的问题。对于车企而言&#xff0c;通过渗透测试尽量多地发现安全威胁&#xff0c;是确保车辆信息系统的稳定运行、保障用户安全驾驶至关重要的措施。然而&#xff0c;传统的渗透测试方法已无法满足智能网…

Linux miniconda 安装tensorflow-gpu遇到找不到GPU问题

背景&#xff1a; Linux Miniconda python3.9 安装步骤 1、 pip install tensorflow-gpu2.8.0 -i https://pypi.tuna.tsinghua.edu.cn/simple 2、报错如下&#xff1a; 更换镜像源&#xff0c;单独安装 pip install tf-estimator-nightly2.8.0.dev2021122109 -i https:/…

使用 docker buildx 构建跨平台镜像

buildx是Docker官方提供的一个构建工具&#xff0c;它可以帮助用户快速、高效地构建Docker镜像&#xff0c;并支持多种平台的构建。使用buildx&#xff0c;用户可以在单个命令中构建多种架构的镜像&#xff0c;例如x86和arm架构&#xff0c;而无需手工操作多个构建命令。此外bu…

【docker】容器内配置环境变量

背景&#xff1a; 我要把下面的环境变量写到bash脚本里&#xff0c;起名叫environment_start.sh。 目的&#xff1a; 用于每次进入容器dev_into.sh的时候&#xff0c;让系统获取到环境变量。 操作步骤&#xff1a; 先在容器外找个合适的位置写环境变量bash脚本&#xff0c…

bmob Harmony鸿蒙快速开发搜索功能

搜索功能是很多应用都需要的功能。在很多平台上&#xff0c;要开发一个兼容性较好的搜索功能都还是需要添加比较多的视图代码的。 为了解决这个问题&#xff0c;鸿蒙ArkUI提供了一个快速添加搜索功能的视图组件给我们&#xff0c;结合Bmob Harmony鸿蒙SDK的搜索能力&#xff0…

四款主流电脑监控软件(电脑监控软件主要优势)

在现代企业环境中&#xff0c;确保员工的工作效率和企业信息的安全成为了管理者的重要任务。电脑监控软件作为一种有效的管理工具&#xff0c;能够帮助企业实现这些目标。固信电脑监控软件在这方面表现尤为出色&#xff0c;本文将详细介绍固信电脑监控软件的优势及其主要功能&a…

【C++】 解决 C++ 语言报错:Use of Uninitialized Variable

文章目录 引言 使用未初始化的变量&#xff08;Use of Uninitialized Variable&#xff09;是 C 编程中常见且危险的错误之一。它通常在程序试图使用尚未赋值的变量时发生&#xff0c;导致程序行为不可预测&#xff0c;可能引发运行时错误、数据损坏&#xff0c;甚至安全漏洞。…

基于芯片CSU8RP1382开发的咖啡秤方案

咖啡电子秤芯片方案精确值可做到分度值0.1g的精准称重,并带有过载提示、自动归零、去皮称重、压低报警等功能&#xff0c;工作电压在2.4V~3.6V之间&#xff0c;满足于咖啡电子秤的电压使用。同时咖啡电子秤PCBA设计可支持四个单位显示&#xff0c;分别为&#xff1a;g、lb、oz、…

云仓酒庄天津分公司:深化业务常态化运营

标题&#xff1a;云仓酒庄天津分公司&#xff1a;深化业务常态化运营&#xff0c;以沙龙为纽带&#xff0c;构建价值叠加的酒业新生态 在当今复杂多变的经济环境中&#xff0c;传统酒业面临着前所未有的挑战与机遇。随着数字化转型的加速和消费者偏好的日益多元化&#xff0c;…

JAVA--JSON转换工具类

JSON转换工具类 import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackso…

2024年保安员职业资格考试题库大数据揭秘,冲刺高分!

186.安全技术防范是一种由探测、&#xff08;&#xff09;、快速反应相结合的安全防范体系。 A.保安 B.出警 C.延迟 D.监控 答案&#xff1a;C 187.安全技术防范是以&#xff08;&#xff09;和预防犯罪为目的的一项社会公共安全业务。 A.预防灾害 B.预防损失 C.预防失…

StreamSets: 数据采集工具详解

欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;欢迎订阅相关专栏&#xff1a; 欢迎关注微信公众号&#xff1a;野老杂谈 ⭐️ 全网最全IT互联网公司面试宝典&#xff1a;收集整理全网各大IT互联网公司技术、项目、HR面试真题. ⭐️ AIGC时代的创新与未来&a…

【EI会议/稳定检索】2024年应用数学、化学研究与物理工程国际会议(AMPE 2024)

2024 International Conference on Applied Mathematics, Chemical Research, and Physical Engineering 2024年应用数学、化学研究与物理工程国际会议(AMPE 2024) 【会议信息】 会议简称&#xff1a;AMPE 2024 大会时间&#xff1a;点击查看 截稿时间&#xff1a;官网查看 大…

【T+】畅捷通T+产品,将原财务报表中的模板转换到财务报表菜单下。

【问题描述】 畅捷通T3产品中账套使用行业性质是【新会计准测制度】升级到畅捷通T+产品, 行业性质默认为【2001年企业会计制度】, 但是升级成功后,账套的财务报表下没有对应报表模板,需要手工编辑,太费劲了。 并且在T+产品中新建账套,行业性质选择【2001年企业会计制度】…

verilog行为建模(一):基本概念

目录 1.行为描述2.过程(procedural)块3.过程赋值(procedural assignment)4.过程时序控制5.简单延时6.边沿敏感时序7.wait语句 微信公众号获取更多FPGA相关源码&#xff1a; 1.行为描述 行为级描述是对系统的高抽象级描述。在这个级别&#xff0c;表达的是输入和输出之间转换…

比 PIP 快 100 倍的安装工具

uv 是一个由 Rust 开发的 pip 工具&#xff0c;比 pip 快 100 倍&#xff0c;难以置信&#xff0c;不过真的是快太多了。 安装 在 Mac 上直接通过 brew install uv 安装即可。 venv 创建运行环境&#xff0c;默认当前目录下 .venv uv venv 依赖安装 uv pip install -r re…