本文共 5291 字,大约阅读时间需要 17 分钟。
在Netty服务器的业务逻辑处理中,长时间耗时的业务操作可以交给任务队列异步处理。这种方式可以避免阻塞当前I/O线程,确保Netty服务器能够高效处理多个客户端连接。
在Netty中,任务队列是实现异步任务调度的核心机制。开发者可以通过以下方式使用任务队列:
在Netty的服务器端Demo中,自定义的Handler处理器通常是同步操作。具体实现方式如下:
ChannelInboundHandlerAdapter类,重写channelRead方法。SimpleChannelInboundHandler类,重写channelRead0方法。在业务逻辑执行时需要注意以下几点:
需要注意的是,ChannelInboundHandlerAdapter的channelRead方法执行时,会占用当前I/O线程。因此,不能在此方法中执行耗时操作。
Netty的任务队列是按顺序执行任务的。具体来说:
如果需要自定义任务队列,可以按照以下步骤操作:
Channel对象。Channel.eventLoop()获取对应的NioEventLoop线程。这个线程封装了任务队列TaskQueue。Runnable提交到任务队列中,可以通过调用NioEventLoop线程的execute方法。以下是一个简单的代码示例:
public class MyServerHandler extends ChannelInboundHandlerAdapter { private static final Logger log = LoggerFactory.getLogger(MyServerHandler.class); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; log.info("Server Accept Client Context (" + ctx.channel().remoteAddress() + ")消息 -》" + in.toString(CharsetUtil.UTF_8)); Channel channel = ctx.channel(); EventLoop eventLoop = channel.eventLoop(); eventLoop.execute(new Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(10); log.info("异步任务 1执行,Thread = {}", Thread.currentThread().getName()); ctx.writeAndFlush(Unpooled.copiedBuffer("异步任务 1", CharsetUtil.UTF_8)); log.info("异步任务 1执行完毕"); } catch (Exception ex) { System.out.println("发生异常" + ex.getMessage()); } } }); eventLoop.execute(new Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(5); log.info("异步任务 2执行,Thread = {}", Thread.currentThread().getName()); ctx.writeAndFlush(Unpooled.copiedBuffer("异步任务 2", CharsetUtil.UTF_8)); log.info("异步任务 2执行完毕"); } catch (Exception ex) { log.info("发生异常,message={}" + ex.getMessage()); } } }); log.info("channelReadComplete方法执行完毕,Thread = {}", Thread.currentThread().getName()); }} 从打印结果可以看出:
如果需要实现定时异步任务,可以使用scheduleTaskQueue任务队列。使用方法类似于普通任务队列,但调度方式有所不同。
以下是一个简单的代码示例:
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; log.info("Server Accept Client Context (" + ctx.channel().remoteAddress() + ")消息 -》" + in.toString(CharsetUtil.UTF_8)); ctx.channel().eventLoop().schedule(() -> { try { log.info("延迟异步任务执行,Thread = {}", Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(5); ctx.writeAndFlush(Unpooled.copiedBuffer("异步任务 2", CharsetUtil.UTF_8)); log.info("延迟异步任务执行完毕"); } catch (Exception ex) { log.info("发生异常,message={}" + ex.getMessage()); } }, 5, TimeUnit.SECONDS); log.info("channelRead方法执行完毕,Thread = {}", Thread.currentThread().getName());} 从打印结果可以看出,延迟任务会在指定时间后才被执行。
自定义任务与自定义定时任务的主要区别在于调度方式和任务队列的选择:
execute方法进行调度。schedule方法进行调度。TaskQueue任务队列中。ScheduleTaskQueue任务队列中。如果需要对耗时操作(如数据库访问、网络请求等)进行异步处理,可以通过在Handler处理器中使用异步线程池来实现。这种方式可以避免占用I/O线程资源,提高Netty服务器的吞吐量。
以下是一个简单的代码示例:
public class MyServerHandler3 extends ChannelInboundHandlerAdapter { private static final Logger log = LoggerFactory.getLogger(MyServerHandler3.class); private static final EventExecutorGroup eventExecutorGroup = new DefaultEventExecutorGroup(3); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("MyServerHandler 连接已建立..."); super.channelActive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; log.info("Server Accept Client Context (" + ctx.channel().remoteAddress() + ")消息 -》" + in.toString(CharsetUtil.UTF_8)); for (int i = 1; i <= 5; i++) { int finalI = i; eventExecutorGroup.submit(() -> { try { TimeUnit.SECONDS.sleep(10); log.info("异步任务 {}执行,Thread = {}", finalI, Thread.currentThread().getName()); ctx.writeAndFlush(Unpooled.copiedBuffer("异步任务 " + finalI, CharsetUtil.UTF_8)); log.info("异步任务 {}执行完毕", finalI); } catch (Exception ex) { System.out.println("发生异常" + ex.getMessage()); } }); } log.info("channelRead方法执行完毕,Thread = {}", Thread.currentThread().getName()); }} 从打印结果可以看出,耗时任务被提交至独立的线程池中执行,避免占用I/O线程资源。
转载地址:http://mdcfk.baihongyu.com/