共计 5466 个字符,预计需要花费 14 分钟才能阅读完成。
在 netty 基本组件介绍中,我们大致了解了 netty 的一些基本组件,今天我们来搭建一个基于 netty 的 Tcp 服务端程序,通过代码来了解和熟悉这些组件的功能和使用方法。
首先我们自己创建一个 Server 类,命名为 TCPServer
第一步初始化 ServerBootstrap,ServerBootstrap 是 netty 中的一个服务器引导类,对 ServerBootstrap 的实例化就是创建 netty 服务器的入口
public class TCPServer {
private Logger log = LoggerFactory.getLogger(getClass());
// 端口号
private int port=5080;
// 服务器运行状态
private volatile boolean isRunning = false;
// 处理 Accept 连接事件的线程,这里线程数设置为 1 即可,netty 处理链接事件默认为单线程,过度设置反而浪费 cpu 资源
private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 处理 hadnler 的工作线程,其实也就是处理 IO 读写。线程数据默认为 CPU 核心数乘以 2
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
public void init() throws Exception{
// 创建 ServerBootstrap 实例
ServerBootstrap serverBootstrap=new ServerBootstrap();
// 初始化 ServerBootstrap 的线程模型
serverBootstrap.group(workerGroup,workerGroup);//
// 设置将要被实例化的 ServerChannel 类
serverBootstrap.channel(NioServerSocketChannel.class);//
// 在 ServerChannelInitializer 中初始化 ChannelPipeline 责任链,并添加到 serverBootstrap 中
serverBootstrap.childHandler(new ServerChannelInitializer());
// 标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// 是否启用心跳保活机机制
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口后,开启监听
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
if(channelFuture.isSuccess()){
System.out.println(“TCP 服务启动 成功 —————“);
}
}
/**
* 服务启动
*/
public synchronized void startServer() {
try {
this.init();
}catch(Exception ex) {
}
}
/**
* 服务关闭
*/
public synchronized void stopServer() {
if (!this.isRunning) {
throw new IllegalStateException(this.getName() + ” 未启动 .”);
}
this.isRunning = false;
try {
Future<?> future = this.workerGroup.shutdownGracefully().await();
if (!future.isSuccess()) {
log.error(“workerGroup 无法正常停止:{}”, future.cause());
}
future = this.bossGroup.shutdownGracefully().await();
if (!future.isSuccess()) {
log.error(“bossGroup 无法正常停止:{}”, future.cause());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
this.log.info(“TCP 服务已经停止 …”);
}
private String getName() {
return “TCP-Server”;
}
}
上面的代码中主要使用到的 ServerBootstrap 类的方法有以下这些:
group:设置 SeverBootstrap 要用到的 EventLoopGroup,也就是定义 netty 服务的线程模型,处理 Acceptor 链接的主 ” 线程池 ” 以及用于 I / O 工作的从 ” 线程池 ”;
channel:设置将要被实例化的 SeverChannel 类;
option:指定要应用到新创建 SeverChannel 的 ChannelConfig 的 ChannelOption. 其实也就是服务本身的一些配置;
chidOption:子 channel 的 ChannelConfig 的 ChannelOption。也就是与客户端建立的连接的一些配置;
childHandler:设置将被添加到已被接收的子 Channel 的 ChannelPipeline 中的 ChannelHandler,其实就是让你在里面定义处理连接收发数据,需要哪些 ChannelHandler 按什么顺序去处理;
第二步接下来我们实现 ServerChannelInitializer 类,这个类继承实现自 netty 的 ChannelInitializer 抽象类,这个类的作用就是对 channel(连接)的 ChannelPipeline 进行初始化工作,说白了就是你要把处理数据的方法添加到这个任务链中去,netty 才知道每一步拿着 socket 连接和数据去做什么。
@ChannelHandler.Sharable
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
static final EventExecutorGroup group = new DefaultEventExecutorGroup(2);
public ServerChannelInitializer() throws InterruptedException {
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//IdleStateHandler 心跳机制, 如果超时触发 Handle 中 userEventTrigger() 方法
pipeline.addLast(“idleStateHandler”,
new IdleStateHandler(15, 0, 0, TimeUnit.MINUTES));
// netty 基于分割符的自带解码器,根据提供的分隔符解析报文,这里是 0x7e;1024 表示单条消息的最大长度,解码器在查找分隔符的时候,达到该长度还没找到的话会抛异常
// pipeline.addLast(
// new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(new byte[] {0x7e}),
// Unpooled.copiedBuffer(new byte[] {0x7e})));
// 自定义编解码器
pipeline.addLast(
new MessagePacketDecoder(),
new MessagePacketEncoder()
);
// 自定义 Hadler
pipeline.addLast(“handler”,new TCPServerHandler());
// 自定义 Hander, 可用于处理耗时操作,不阻塞 IO 处理线程
pipeline.addLast(group,”BussinessHandler”,new BussinessHandler());
}
}
这里我们注意下
pipeline.addLast(group,”BussinessHandler”,new BussinessHandler());
在这里我们可以把一些比较耗时的操作(如存储、入库)等操作放在 BussinessHandler 中进行,因为我们为它单独分配了 EventExecutorGroup 线程池执行,所以说即使这里发生阻塞,也不会影响 TCPServerHandler 中数据的接收。
最后就是各个部分的具体实现
解码器的实现:
public class MessagePacketDecoder extends ByteToMessageDecoder
{
public MessagePacketDecoder() throws Exception
{
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception
{
try {
if (buffer.readableBytes() > 0) {
// 待处理的消息包
byte[] bytesReady = new byte[buffer.readableBytes()];
buffer.readBytes(bytesReady);
// 这之间可以进行报文的解析处理
out.add(bytesReady);
}
}finally {
}
}
}
编码器的实现
public class MessagePacketEncoder extends MessageToByteEncoder<Object>
{
public MessagePacketEncoder()
{
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception
{
try {
// 在这之前可以实现编码工作。
out.writeBytes((byte[])msg);
}finally {
}
}
}
TCPServerHandler 的实现
public class TCPServerHandler extends ChannelInboundHandlerAdapter {
public TCPServerHandler() {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 拿到传过来的 msg 数据,开始处理
}
// 检测到空闲连接,触发
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 这里可做一些断开连接的处理
}
}
BussinessHandler 的实���与 TCPServerHandler 基本类似,它可以处理一些相对比较耗时的操作,我们这里就不实现了。
通过以上的代码我们可以看到,一个基于 netty 的 TCP 服务的搭建基本就是三大块:
1、对引导服务器类 ServerBootstrap 的初始化;
2、对 ChannelPipeline 的定义,也就是把多个 ChannelHandler 组成一条任务链;
3、对 ChannelHandler 的具体实现,其中可以有编解码器,可以有对收发数据的业务处理逻辑;
以上代码只是在基于 netty 框架搭建一个最基本的 TCP 服务,其中包含了一些 netty 基本的特性和功能,当然这只是 netty 运用的一个简单的介绍,如有不正确的地方还望指出与海涵。
: