netty5 | 1初识

netty5  | 1初识

看完netty4 ,因为好奇看了一下netty 5,虽然没出正式版,但是netty 5的新特性的,比如重构了HTTP多部分的编解码,ByteBuf被重写了...,然后带着好奇看了看netty 5的教程 http://netty.io/wiki/user-guide-for-5.x.html 发现netty 4和netty 5入门编码上的变化倒不是很大。于是吧之前netty 4的例子改造一下换成netty 5,然后跑起来了。有人说

netty5 只支持jdk7,在这里备注一下,笔者亲测netty 5在jdk 7和jdk 8上都能跑。


先看服务端:

public class NServer {private static final Logger logger = LogManager.getLogger(NServer.class);private final static String host = "127.0.0.1";private final static Integer port = 8898;@Testpublic void testServer() {/*** * ·NioEventLoopGroup 实际上是个连接池,NioEventLoopGroup在后台启动了n个NioEventLoop * 来处理Channel事件,每个NioEventLoop负责m个Channel * ·NioEventLoopGroup从NioEventLoop数组集中挨个取出NioEventLoop用以处理Channel */EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {// NIO服务器端的辅助启动类ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup);// 设置 nio 类型的 channelserverBootstrap.channel(NioServerSocketChannel.class);/*** * 此处在写一个TCP/IP 的服务端,因此我们被允许设置 socket 的参数选项比如tcpNoDelay 和 keepAlive。 */serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);/*** * option() 是提供NioServerSocketChannel用来接收进来的连接。 childOption() * 是提供父管道ServerChannel接收到的 连接(此例是 NioServerSocketChannel)。 */serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);/*** * 绑定I/O事件的处理类 处理网络IO事件 */serverBootstrap.childHandler(new NServerInitializer());/*** * 服务器启动后 绑定监听端口 同步等待成功 ,异步操作的通知回调 回调处理用的 ChildChannelHandler *  */ChannelFuture channelFuture = serverBootstrap.bind(host, port).sync();logger.debug("NServer启动");// 监听服务器关闭监听(应用程序等待直到channel关闭)channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {// 关闭EventLoopGroup释放资源包bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();logger.debug("服务端释放了线程资源...");}}}
服务端的事件处理,收到客户端请求,返回“ok”:


/*** * server端网络IO事件处理 *  * @author shiky * */public class NServerHandler extends ChannelHandlerAdapter {private static final Logger logger = LogManager.getLogger(NServerHandler.class);/*** * 读取客户消息 */@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {logger.debug("服务器读取到客户端请求...");if (null != msg) {try {StringBuffer sbf = new StringBuffer("收到客户端->");sbf.append(ctx.channel().remoteAddress());sbf.append("的消息:");sbf.append(msg);logger.debug(sbf);System.out.println(sbf);// 服务端响应消息ctx.writeAndFlush("ok");ctx.close();} finally {ReferenceCountUtil.release(msg);}}}/*** * 服务端监听到客户端活动 */@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {logger.debug("channelActive>>>> Client:" + ctx.channel().remoteAddress() + "在线");ctx.fireChannelActive();}/**** * 响应处理 */@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {logger.debug("服务端readComplete 响应完成");ctx.fireChannelReadComplete();}/*** * 监听客户端掉线 */@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {// Client:"+ctx.channel().remoteAddress()+"掉线");super.channelInactive(ctx);}/*** * 异常信息 (根据需要,选择是否关闭) */@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {logger.debug("服务端异常" + cause.getMessage());ctx.fireExceptionCaught(cause);// ctx.close();}}

服务端对于编码和粘包拆包的处理:

/*** *  * @author: shiky *  *  */public class NServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();/*** * 个地方的 必须和服务端对应上。否则无法正常解码和编码 * 设置包长,解决,粘包问题 */pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 8, 0, 8));pipeline.addLast(new LengthFieldPrepender(8));// 当 read 的时候pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));// 当 send 的时候pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));// 服务端逻辑pipeline.addLast(new NServerHandler());}}

客户端:

import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.junit.Test;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;/**** *  * @author shiky * @Describe 客户端 */public class NClient {private static final Logger logger = LogManager.getLogger(NClient.class);private final static String host = "127.0.0.1";private final static Integer port = 8898;@Testpublic void testClient() {// 配置客户端NIO线程组EventLoopGroup group = new NioEventLoopGroup();try {// 客户端辅助启动类 对客户端配置Bootstrap bootstrap = new Bootstrap();bootstrap.group(group);bootstrap.channel(NioSocketChannel.class);bootstrap.option(ChannelOption.TCP_NODELAY, true);bootstrap.handler(new NClinetlInitializer());// 异步链接服务器 同步等待链接Channel ch = bootstrap.connect(host, port).sync().channel();ch.writeAndFlush("发送一条指令:我的小鱼你醒了,还认识早晨吗?" + Thread.currentThread().getId());ChannelFuture channelFuture = ch.closeFuture().sync();// 监听服务器关闭监听(应用程序等待直到channel关闭)channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {group.shutdownGracefully();logger.debug("客户端的释放了线程资源...");}}}

客户端事件处理:

/*** *  * @author: shiky *  */public class NClientHandler extends ChannelHandlerAdapter {private static final Logger logger = LogManager.getLogger(NClientHandler.class);@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(ctx);// logger.debug("客户端 active");}/*** * 处理服务端响应数据 */@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (null != msg) {try {logger.debug("客户端收到服务器响应数据:" + msg);} finally {ReferenceCountUtil.release(msg);}}}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();logger.debug("客户端收到服务器响应数据处理完成");}/*** * 处理异常,根据需要选择要不要关闭连接 */@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {// downstream:"+cause.getMessage());// ctx.close();logger.warn("客户端异常:" + cause.getMessage());}}

客户端编码解码处理:

/*** *  * @author shiky * */public class NClinetlInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();/**** * 个地方的 必须和服务端对应上。否则无法正常解码和编码 * 设置包长,解决,粘包问题 */pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 8, 0, 8));pipeline.addLast("frameEncoder", new LengthFieldPrepender(8));// 当 read 的时候pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));// 当 send 的时候pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));// 客户端的逻辑pipeline.addLast("handler", new NClientHandler());}}

跑一下,先启动服务端,然后启动客户端,服务端打印:

客户端打印:

免责声明:本网信息来自于互联网,目的在于传递更多信息,并不代表本网赞同其观点。其原创性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容、文字的真实性、完整性、及时性本站不作任何保证或承诺,并请自行核实相关内容。本站不承担此类作品侵权行为的直接责任及连带责任。如若本网有任何内容侵犯您的权益,请及时联系我们,本站将会在24小时内处理完毕。
相关文章
返回顶部