博客
关于我
Netty 异步任务调度与异步线程池
阅读量:789 次
发布时间:2023-02-14

本文共 5291 字,大约阅读时间需要 17 分钟。

异步任务调度

在Netty服务器的业务逻辑处理中,长时间耗时的业务操作可以交给任务队列异步处理。这种方式可以避免阻塞当前I/O线程,确保Netty服务器能够高效处理多个客户端连接。

任务队列的使用

在Netty中,任务队列是实现异步任务调度的核心机制。开发者可以通过以下方式使用任务队列:

  • 自定义任务:开发自己的任务类,并将其提交至任务队列中。这种方式适用于需要执行的简单异步操作。
  • 自定义定时延时任务:除了普通任务外,还可以指定任务的延迟执行时间。这对于需要定期执行的业务逻辑非常有用。
  • 其它线程调度任务:除了在当前NioEventLoop线程中排队执行任务外,还可以在其他线程中通过Channel通道调度任务。这种方式适用于需要跨线程执行的场景。
  • Handler处理器的同步与异步

    在Netty的服务器端Demo中,自定义的Handler处理器通常是同步操作。具体实现方式如下:

    • 处理器继承自ChannelInboundHandlerAdapter类,重写channelRead方法。
    • 或者继承自SimpleChannelInboundHandler类,重写channelRead0方法。

    在业务逻辑执行时需要注意以下几点:

    • 同步操作:如果业务逻辑只包含短时间内完成的操作,可以直接在Handler处理器中执行。
    • 异步操作:如果业务逻辑包含耗时操作(如数据库访问、网络请求等),则必须将耗时操作放入任务队列中异步执行。这样可以避免阻塞I/O线程。

    需要注意的是,ChannelInboundHandlerAdapterchannelRead方法执行时,会占用当前I/O线程。因此,不能在此方法中执行耗时操作。

    任务执行顺序

    Netty的任务队列是按顺序执行任务的。具体来说:

  • 多任务执行:如果用户连续向任务队列中提交多个任务,Netty会按照顺序执行这些任务。
  • 先后顺序:任务队列是先后执行的,先执行第一个任务,执行完毕后再取第二个任务,依此类推。任务之间是串行执行的,不是并行的。
  • 自定义任务队列

    如果需要自定义任务队列,可以按照以下步骤操作:

  • 获取通道:首先获取Channel对象。
  • 获取线程:通过Channel.eventLoop()获取对应的NioEventLoop线程。这个线程封装了任务队列TaskQueue
  • 任务入队:将异步任务Runnable提交到任务队列中,可以通过调用NioEventLoop线程的execute方法。
  • 以下是一个简单的代码示例:

    public class MyServerHandler extends ChannelInboundHandlerAdapter {    private static final Logger log = LoggerFactory.getLogger(MyServerHandler.class);    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        ByteBuf in = (ByteBuf) msg;        log.info("Server Accept Client Context (" + ctx.channel().remoteAddress() + ")消息 -》" + in.toString(CharsetUtil.UTF_8));        Channel channel = ctx.channel();        EventLoop eventLoop = channel.eventLoop();        eventLoop.execute(new Runnable() {            @Override            public void run() {                try {                    TimeUnit.SECONDS.sleep(10);                    log.info("异步任务 1执行,Thread = {}", Thread.currentThread().getName());                    ctx.writeAndFlush(Unpooled.copiedBuffer("异步任务 1", CharsetUtil.UTF_8));                    log.info("异步任务 1执行完毕");                } catch (Exception ex) {                    System.out.println("发生异常" + ex.getMessage());                }            }        });        eventLoop.execute(new Runnable() {            @Override            public void run() {                try {                    TimeUnit.SECONDS.sleep(5);                    log.info("异步任务 2执行,Thread = {}", Thread.currentThread().getName());                    ctx.writeAndFlush(Unpooled.copiedBuffer("异步任务 2", CharsetUtil.UTF_8));                    log.info("异步任务 2执行完毕");                } catch (Exception ex) {                    log.info("发生异常,message={}" + ex.getMessage());                }            }        });        log.info("channelReadComplete方法执行完毕,Thread = {}", Thread.currentThread().getName());    }}

    从打印结果可以看出:

    • 用户连续向任务队列中放入多个任务时,Netty会按照顺序执行这些任务。
    • 所有操作都是在同一个I/O线程中执行的。

    自定义延时任务队列

    如果需要实现定时异步任务,可以使用scheduleTaskQueue任务队列。使用方法类似于普通任务队列,但调度方式有所不同。

    以下是一个简单的代码示例:

    @Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {    ByteBuf in = (ByteBuf) msg;    log.info("Server Accept Client Context (" + ctx.channel().remoteAddress() + ")消息 -》" + in.toString(CharsetUtil.UTF_8));    ctx.channel().eventLoop().schedule(() -> {        try {            log.info("延迟异步任务执行,Thread = {}", Thread.currentThread().getName());            TimeUnit.SECONDS.sleep(5);            ctx.writeAndFlush(Unpooled.copiedBuffer("异步任务 2", CharsetUtil.UTF_8));            log.info("延迟异步任务执行完毕");        } catch (Exception ex) {            log.info("发生异常,message={}" + ex.getMessage());        }    }, 5, TimeUnit.SECONDS);    log.info("channelRead方法执行完毕,Thread = {}", Thread.currentThread().getName());}

    从打印结果可以看出,延迟任务会在指定时间后才被执行。

    任务与定时任务的区别

    自定义任务与自定义定时任务的主要区别在于调度方式和任务队列的选择:

  • 调度方式
    • 普通异步任务使用execute方法进行调度。
    • 定时异步任务使用schedule方法进行调度。
  • 任务队列
    • 普通异步任务提交到TaskQueue任务队列中。
    • 定时异步任务提交到ScheduleTaskQueue任务队列中。
  • 异步线程池

    如果需要对耗时操作(如数据库访问、网络请求等)进行异步处理,可以通过在Handler处理器中使用异步线程池来实现。这种方式可以避免占用I/O线程资源,提高Netty服务器的吞吐量。

    以下是一个简单的代码示例:

    public class MyServerHandler3 extends ChannelInboundHandlerAdapter {    private static final Logger log = LoggerFactory.getLogger(MyServerHandler3.class);    private static final EventExecutorGroup eventExecutorGroup = new DefaultEventExecutorGroup(3);    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        System.out.println("MyServerHandler 连接已建立...");        super.channelActive(ctx);    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        ByteBuf in = (ByteBuf) msg;        log.info("Server Accept Client Context (" + ctx.channel().remoteAddress() + ")消息 -》" + in.toString(CharsetUtil.UTF_8));        for (int i = 1; i <= 5; i++) {            int finalI = i;            eventExecutorGroup.submit(() -> {                try {                    TimeUnit.SECONDS.sleep(10);                    log.info("异步任务 {}执行,Thread = {}", finalI, Thread.currentThread().getName());                    ctx.writeAndFlush(Unpooled.copiedBuffer("异步任务 " + finalI, CharsetUtil.UTF_8));                    log.info("异步任务 {}执行完毕", finalI);                } catch (Exception ex) {                    System.out.println("发生异常" + ex.getMessage());                }            });        }        log.info("channelRead方法执行完毕,Thread = {}", Thread.currentThread().getName());    }}

    从打印结果可以看出,耗时任务被提交至独立的线程池中执行,避免占用I/O线程资源。

    转载地址:http://mdcfk.baihongyu.com/

    你可能感兴趣的文章
    M_Map工具箱简介及地理图形绘制
    查看>>
    m_Orchestrate learning system---二十二、html代码如何变的容易
    查看>>
    M×N 形状 numpy.ndarray 的滑动窗口
    查看>>
    m个苹果放入n个盘子问题
    查看>>
    n = 3 , while n , continue
    查看>>
    n 叉树后序遍历转换为链表问题的深入探讨
    查看>>
    N!
    查看>>
    N-Gram的基本原理
    查看>>
    n1 c语言程序,全国青少年软件编程等级考试C语言经典程序题10道七
    查看>>
    Nacos Client常用配置
    查看>>
    nacos config
    查看>>
    Nacos Config--服务配置
    查看>>
    Nacos Derby 远程命令执行漏洞(QVD-2024-26473)
    查看>>
    Nacos 与 Eureka、Zookeeper 和 Consul 等其他注册中心的区别
    查看>>
    Nacos 单机集群搭建及常用生产环境配置 | Spring Cloud 3
    查看>>
    Nacos 启动报错[db-load-error]load jdbc.properties error
    查看>>
    Nacos 报Statement cancelled due to timeout or client request
    查看>>
    Nacos 注册服务源码分析
    查看>>
    Nacos 融合 Spring Cloud,成为注册配置中心
    查看>>
    Nacos-注册中心
    查看>>