网络编程(五)netty

netty

服务器端实现

EchoServer.java
package cn.mldn.http.server.server;

import cn.mldn.http.server.server.handler.EchoServerHandler;
import cn.mldn.info.HostInfo;
import cn.mldn.http.server.serious.JSONDecoder;
import cn.mldn.http.server.serious.JSONEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;

/**
 * 实现了基础的线程池与网络连接的配置项
 */
public class EchoServer {
    public void run() throws Exception {    
         // 进行服务器端的启动处理
        // 线程池是提升服务器性能的重要技术手段,利用定长的线程池可以保证核心线程的有效数量
        // 在Netty之中线程池的实现分为两类:主线程池(接收客户端连接)、工作线程池(处理客户端连接)
        // 创建接收线程池
        EventLoopGroup bossGroup = new NioEventLoopGroup(10);
        // 创建工作线程池
        EventLoopGroup workerGroup = new NioEventLoopGroup(20);
        System.out.println("服务器启动成功,监听端口为:" + HostInfo.PORT);
        try {
            // 创建一个服务器端的程序类进行NIO启动,同时可以设置Channel
            // 服务器端
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 设置要使用的线程池以及当前的Channel类型
            serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);
            // 接收到信息之后需要进行处理,于是定义子处理器
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,4)) ;
                    socketChannel.pipeline().addLast(new JSONDecoder()) ;
                    socketChannel.pipeline().addLast(new LengthFieldPrepender(4)) ;
                    socketChannel.pipeline().addLast(new JSONEncoder()) ;
                    // 追加了处理器
                    socketChannel.pipeline().addLast(new EchoServerHandler());
                }
            });
            // 可以直接利用常亮进行TCP协议的相关配置
            serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
            serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
            // ChannelFuture描述的时异步回调的处理操作
            ChannelFuture future = serverBootstrap.bind(HostInfo.PORT).sync();
            // 等待Socket被关闭
            future.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully() ;
            bossGroup.shutdownGracefully() ;
        }
    }
}

EchoServerHandler.java
package cn.mldn.http.server.server.handler;

import cn.mldn.vo.Member;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

/**
 * 处理Echo的操作方式,其中ChannelInboundHandlerAdapter是针对于数据输入的处理
 * Netty是基于NIO的一种开发框架的封装,这里面和AIO是没有任何关系的。
 */
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            System.out.println(msg.getClass() + " **************");
            Member member = (Member) msg ;
            System.err.println("{服务器}" + member);
            member.setName("【ECHO】" + member.getName());
            // 回应的输出操作
            ctx.writeAndFlush(member);
        } finally {
            // 释放缓存
            ReferenceCountUtil.release(msg) ;
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close() ;
    }
}

EchoServerMain.java
package cn.mldn.http.server.server.main;

import cn.mldn.http.server.server.EchoServer;

public class EchoServerMain {
    public static void main(String[] args) throws Exception {
        new EchoServer().run();
    }
}

客户端实现

EchoClient.java
package cn.mldn.http.server.client;

import cn.mldn.http.server.client.handler.EchoClientHandler;
import cn.mldn.info.HostInfo;
import cn.mldn.http.server.serious.JSONDecoder;
import cn.mldn.http.server.serious.JSONEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;

public class EchoClient {
    public void run() throws Exception {
        // 1、如果现在客户端不同,那么也可以不使用多线程模式来处理;
        // 在Netty中考虑到代码的统一性,也允许你在客户端设置线程池
        // 创建一个线程池
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 创建客户端处理程序
            Bootstrap client = new Bootstrap();
            client.group(group).channel(NioSocketChannel.class)
                    // 允许接收大块的返回数据
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));
                            socketChannel.pipeline().addLast(new JSONDecoder());
                            socketChannel.pipeline().addLast(new LengthFieldPrepender(4));
                            socketChannel.pipeline().addLast(new JSONEncoder());
                            // 追加了处理器
                            socketChannel.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            ChannelFuture channelFuture = client.connect(HostInfo.HOST_NAME, HostInfo.PORT).sync();
            // 关闭连接
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

EchoClientHandler.java
package cn.mldn.http.server.client.handler;

import cn.mldn.vo.Member;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

/**
 * 需要进行数据的读取操作,服务器端处理完成的数据信息会进行读取
 */
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
    private static final int REPEAT = 500;// 消息重复发送次数

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 现在直接进行对象的发送
        Member member = new Member();
        member.setMid("xiaoli");
        member.setName("小李老师");
        member.setAge(16);
        member.setSalary(1.1);
        // 直接进行对象实例发送
        ctx.writeAndFlush(member) ;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 只要服务器端发送完成信息之后,都会执行此方法进行内容的输出操作
        try {
            Member member = (Member) msg ;
            // 输出服务器端的响应内容
            System.out.println(member);
        } finally {
            // 释放缓存
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

EchoClientMain.java
package cn.mldn.http.server.client.main;

import cn.mldn.http.server.client.EchoClient;

public class EchoClientMain {
    public static void main(String[] args) throws Exception {
        new EchoClient().run();
    }
}

编码解码

JSONDecoder.java
package cn.mldn.http.server.serious;

import cn.mldn.vo.Member;
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;

import java.util.List;

public class JSONDecoder extends MessageToMessageDecoder<ByteBuf> {

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List<Object> list) throws Exception {
        // 可以用的数据长度
        int len = msg.readableBytes();
        byte data[] = new byte[len];
        msg.getBytes(msg.readerIndex(), data, 0, len);
        list.add(JSON.parseObject(new String(data)).toJavaObject(Member.class)) ;
    }
}

JSONEncoder.java
package cn.mldn.http.server.serious;

import com.alibaba.fastjson.JSONObject;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class JSONEncoder extends MessageToByteEncoder<Object> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, ByteBuf out) throws Exception {
        byte data [] = JSONObject.toJSONString(msg).getBytes() ;
        out.writeBytes(data) ;
    }
}

源码

echo-master.zip

上一篇 网络编程(四)AIO