netty
服务器端实现
EchoServer.java
package cn.mldn.http.server.server;
import cn.mldn.http.server.server.handler.EchoServerHandler;
import cn.mldn.info.HostInfo;
import cn.mldn.http.server.serious.JSONDecoder;
import cn.mldn.http.server.serious.JSONEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
/**
* 实现了基础的线程池与网络连接的配置项
*/
public class EchoServer {
public void run() throws Exception {
// 进行服务器端的启动处理
// 线程池是提升服务器性能的重要技术手段,利用定长的线程池可以保证核心线程的有效数量
// 在Netty之中线程池的实现分为两类:主线程池(接收客户端连接)、工作线程池(处理客户端连接)
// 创建接收线程池
EventLoopGroup bossGroup = new NioEventLoopGroup(10);
// 创建工作线程池
EventLoopGroup workerGroup = new NioEventLoopGroup(20);
System.out.println("服务器启动成功,监听端口为:" + HostInfo.PORT);
try {
// 创建一个服务器端的程序类进行NIO启动,同时可以设置Channel
// 服务器端
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 设置要使用的线程池以及当前的Channel类型
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);
// 接收到信息之后需要进行处理,于是定义子处理器
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,4)) ;
socketChannel.pipeline().addLast(new JSONDecoder()) ;
socketChannel.pipeline().addLast(new LengthFieldPrepender(4)) ;
socketChannel.pipeline().addLast(new JSONEncoder()) ;
// 追加了处理器
socketChannel.pipeline().addLast(new EchoServerHandler());
}
});
// 可以直接利用常亮进行TCP协议的相关配置
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
// ChannelFuture描述的时异步回调的处理操作
ChannelFuture future = serverBootstrap.bind(HostInfo.PORT).sync();
// 等待Socket被关闭
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully() ;
bossGroup.shutdownGracefully() ;
}
}
}
EchoServerHandler.java
package cn.mldn.http.server.server.handler;
import cn.mldn.vo.Member;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
/**
* 处理Echo的操作方式,其中ChannelInboundHandlerAdapter是针对于数据输入的处理
* Netty是基于NIO的一种开发框架的封装,这里面和AIO是没有任何关系的。
*/
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
System.out.println(msg.getClass() + " **************");
Member member = (Member) msg ;
System.err.println("{服务器}" + member);
member.setName("【ECHO】" + member.getName());
// 回应的输出操作
ctx.writeAndFlush(member);
} finally {
// 释放缓存
ReferenceCountUtil.release(msg) ;
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close() ;
}
}
EchoServerMain.java
package cn.mldn.http.server.server.main;
import cn.mldn.http.server.server.EchoServer;
public class EchoServerMain {
public static void main(String[] args) throws Exception {
new EchoServer().run();
}
}
客户端实现
EchoClient.java
package cn.mldn.http.server.client;
import cn.mldn.http.server.client.handler.EchoClientHandler;
import cn.mldn.info.HostInfo;
import cn.mldn.http.server.serious.JSONDecoder;
import cn.mldn.http.server.serious.JSONEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
public class EchoClient {
public void run() throws Exception {
// 1、如果现在客户端不同,那么也可以不使用多线程模式来处理;
// 在Netty中考虑到代码的统一性,也允许你在客户端设置线程池
// 创建一个线程池
EventLoopGroup group = new NioEventLoopGroup();
try {
// 创建客户端处理程序
Bootstrap client = new Bootstrap();
client.group(group).channel(NioSocketChannel.class)
// 允许接收大块的返回数据
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));
socketChannel.pipeline().addLast(new JSONDecoder());
socketChannel.pipeline().addLast(new LengthFieldPrepender(4));
socketChannel.pipeline().addLast(new JSONEncoder());
// 追加了处理器
socketChannel.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture channelFuture = client.connect(HostInfo.HOST_NAME, HostInfo.PORT).sync();
// 关闭连接
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
EchoClientHandler.java
package cn.mldn.http.server.client.handler;
import cn.mldn.vo.Member;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
/**
* 需要进行数据的读取操作,服务器端处理完成的数据信息会进行读取
*/
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
private static final int REPEAT = 500;// 消息重复发送次数
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 现在直接进行对象的发送
Member member = new Member();
member.setMid("xiaoli");
member.setName("小李老师");
member.setAge(16);
member.setSalary(1.1);
// 直接进行对象实例发送
ctx.writeAndFlush(member) ;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 只要服务器端发送完成信息之后,都会执行此方法进行内容的输出操作
try {
Member member = (Member) msg ;
// 输出服务器端的响应内容
System.out.println(member);
} finally {
// 释放缓存
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
EchoClientMain.java
package cn.mldn.http.server.client.main;
import cn.mldn.http.server.client.EchoClient;
public class EchoClientMain {
public static void main(String[] args) throws Exception {
new EchoClient().run();
}
}
编码解码
JSONDecoder.java
package cn.mldn.http.server.serious;
import cn.mldn.vo.Member;
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.util.List;
public class JSONDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List<Object> list) throws Exception {
// 可以用的数据长度
int len = msg.readableBytes();
byte data[] = new byte[len];
msg.getBytes(msg.readerIndex(), data, 0, len);
list.add(JSON.parseObject(new String(data)).toJavaObject(Member.class)) ;
}
}
JSONEncoder.java
package cn.mldn.http.server.serious;
import com.alibaba.fastjson.JSONObject;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class JSONEncoder extends MessageToByteEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, ByteBuf out) throws Exception {
byte data [] = JSONObject.toJSONString(msg).getBytes() ;
out.writeBytes(data) ;
}
}
源码
echo-master.zip
上一篇 网络编程(四)AIO