netty网络框架四

发布时间 2023-03-28 11:15:44作者: 开源遗迹

一、netty编解码器机制

在Netty中,编解码器(Codec)是一种非常重要的机制。它们可以将二进制数据转换成Java对象,或者将Java对象转换成二进制数据,从而方便网络通信的实现。

Netty提供了多种编解码器,包括ByteToMessageDecoder、MessageToByteEncoder、ReplayingDecoder等。开发人员可以选择不同的编解码器来处理不同类型的消息。

以下是一个简单的示例,展示如何使用自定义编解码器实现消息的编解码:

public class Message {
    private int length;
    private String content;

    // 构造函数、getter和setter省略
}

public class MessageDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 先判断是否足够读取一个int类型的长度信息
        if (in.readableBytes() < 4) {
            return;
        }

        // 读取长度信息并根据长度解析出消息内容
        in.markReaderIndex();
        int length = in.readInt();
        if (in.readableBytes() < length) {
            in.resetReaderIndex();
            return;
        }
        byte[] contentBytes = new byte[length];
        in.readBytes(contentBytes);
        String content = new String(contentBytes, Charset.forName("UTF-8"));

        // 将解析出的消息封装成Message对象,并输出日志
        Message message = new Message(length, content);
        System.out.println("Decode message: " + message.toString());

        // 将解析出的消息对象添加到List<Object>中,用于通知业务处理器
        out.add(message);
    }
}

public class MessageEncoder extends MessageToByteEncoder<Message> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Message message, ByteBuf out) throws Exception {
        // 先写入消息的长度信息
        out.writeInt(message.getLength());

        // 再写入消息内容的字节数组
        out.writeBytes(message.getContent().getBytes(Charset.forName("UTF-8")));

        System.out.println("Encode message: " + message.toString());
    }
}

在上述代码中,我们定义了一个Message类,包含了消息的长度和内容。同时,我们实现了一个MessageDecoder编解码器,用于将二进制数据转换成Message对象;以及一个MessageEncoder编解码器,用于将Message对象转换成二进制数据。

具体来说,MessageDecoder继承了ByteToMessageDecoder,并重写了其中的decode方法,该方法会在接收到数据时被调用。在该方法中,我们先读取四个字节的长度信息,然后根据长度信息解析出消息的内容,将其封装成Message对象,并输出日志。最后,将Message对象添加到List<Object>中,用于通知业务处理器。

而MessageEncoder则继承了MessageToByteEncoder,并重写了其中的encode方法,该方法会在需要发送消息时被调用。在该方法中,我们先写入四个字节的长度信息,再写入消息内容的字节数组,并输出日志。

通过自定义编解码器的方式,我们可以很方便地实现消息的编解码,并且将其应用于Netty中,用于网络通信。

二、ProtoBuf机制

在Netty框架中,可以使用ProtoBuf作为序列化机制来进行网络通信。Netty提供了ProtoBuf的编解码器(ProtobufEncoder和ProtobufDecoder),使得将ProtoBuf消息和二进制数据进行相互转换变得方便。 使用ProtoBuf作为消息格式,需要先定义消息格式和消息类型,然后通过ProtoBuf提供的编译器生成相应的Java类。示例代码如下: 定义消息格式(Message)和消息类型(Message Type):

protobuf
syntax = "proto3"; message Person { string name = 1; int32 age = 2; repeated string hobbies = 3; }

使用ProtoBuf编译器生成Java类:

plaintext
protoc --java_out=. Person.proto

在Netty中使用ProtoBuf编解码器对消息进行编解码:

java
// 定义消息格式 Person person = Person.newBuilder() .setName("John") .setAge(30) .addHobbies("reading") .addHobbies("swimming") .build(); // 编码器 ProtobufEncoder encoder = new ProtobufEncoder(); // 将消息转换为二进制数据 ByteBuf byteBuf = Unpooled.buffer(); encoder.encode(null, person, byteBuf); // 解码器 ProtobufDecoder decoder = new ProtobufDecoder(Person.getDefaultInstance()); // 将二进制数据转换为消息对象 Person decodedPerson = (Person) decoder.decode(null, byteBuf);

以上代码演示了如何使用ProtoBuf编解码器对消息进行编解码。在实际应用中,通常需要在Netty的ChannelPipeline中添加ProtoBuf编解码器,以实现自动编解码功能。示例代码如下:

java
public class ProtoBufServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 添加ProtoBuf编解码器 pipeline.addLast(new ProtobufEncoder()); pipeline.addLast(new ProtobufDecoder(Person.getDefaultInstance())); // 添加业务逻辑处理器 pipeline.addLast(new ProtoBufServerHandler()); } }

在上述代码中,ProtoBuf编解码器被添加到了Netty的ChannelPipeline中,并且指定了要解码的消息类型为Person。这样,当有数据需要进行编解码时,Netty会自动调用编解码器对数据进行相应的处理。

三、netty入站与出站机制

Netty中的入站和出站机制是指数据在网络传输过程中进入和离开应用程序的过程。入站数据是从网络中接收到的数据,出站数据是要发送到网络的数据。 Netty的入站和出站机制是通过ChannelPipeline实现的,ChannelPipeline本质上是一个事件处理器的链表。当有数据进入或离开Channel时,ChannelPipeline会按照链表中的顺序依次调用每个事件处理器的方法,完成数据的处理和转换。 入站数据的处理流程:

  1. 数据从Channel读取到ByteBuf缓冲区中。
  2. 编解码器(如ProtobufDecoder)对ByteBuf中的数据进行解码,转换为业务对象。
  3. 业务逻辑处理器(如ServerHandler)对业务对象进行处理和转换,并生成需要发送的数据。
  4. 编解码器(如ProtobufEncoder)对需要发送的数据进行编码,转换为ByteBuf缓冲区中的数据。
  5. Channel将ByteBuf缓冲区中的数据写入到网络中。 出站数据的处理流程:
  6. 应用程序生成需要发送的数据,并将数据封装到ByteBuf缓冲区中。
  7. 编解码器(如ProtobufEncoder)对ByteBuf中的数据进行编码,转换为网络中的二进制数据。
  8. Channel将网络中的二进制数据发送到远程节点。 在Netty中,可以通过继承SimpleChannelInboundHandler和ChannelOutboundHandlerAdapter来实现入站和出站数据的处理逻辑。SimpleChannelInboundHandler是一个泛型类,表示处理入站数据的处理器,可以指定要处理的数据类型。ChannelOutboundHandlerAdapter是一个抽象类,表示处理出站数据的处理器。 示例代码如下:
java
public class ServerHandler extends SimpleChannelInboundHandler<Person> { @Override protected void channelRead0(ChannelHandlerContext ctx, Person person) throws Exception { // 处理业务逻辑,生成需要发送的数据 Message message = convertToMessage(person); // 将数据发送到网络中 ctx.writeAndFlush(message); } } public class ClientHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { // 封装数据到ByteBuf中 ByteBuf byteBuf = convertToByteBuf(msg); // 将ByteBuf中的数据发送到网络中 ctx.writeAndFlush(byteBuf); } }

以上代码演示了如何实现入站和出站数据的处理逻辑。在入站数据处理器中,继承了SimpleChannelInboundHandler,并重写了channelRead0方法,该方法表示处理接收到的数据的逻辑。在出站数据处理器中,继承了ChannelOutboundHandlerAdapter,并重写了write方法,该方法表示将要发送的数据封装成ByteBuf并发送到网络中的逻辑。在实际应用中,可以根据具体的业务需求,编写自己的入站和出站数据处理器。

 

四、handler链调用机制

在Netty中,ChannelPipeline中的每个处理器都是一个ChannelHandler,ChannelHandler在ChannelPipeline中按顺序组成了一个处理器链。当有数据进入或离开Channel时,ChannelPipeline会按照链表中的顺序依次调用每个ChannelHandler的方法,完成数据的处理和转换。 ChannelHandler的方法有两种类型:入站方法和出站方法。入站方法是用于处理从网络中接收到的数据,包括channelRegistered、channelUnregistered、channelActive、channelInactive、channelRead、channelReadComplete等方法。出站方法是用于处理将数据发送到网络中,包括write、flush等方法。 在ChannelPipeline中,每个ChannelHandler都可以处理入站和出站事件,即可以实现入站方法和出站方法。当有数据进入或离开Channel时,ChannelPipeline会按照链表中的顺序依次调用每个ChannelHandler的方法,完成数据的处理和转换。 对于入站事件,ChannelPipeline会从链表的头部开始调用ChannelHandler的入站方法,直到找到第一个处理器可以处理该事件为止。如果所有的处理器都不能处理该事件,则会调用ChannelPipeline中的exceptionCaught方法,将异常信息传递给最后一个处理器。 对于出站事件,ChannelPipeline会从链表的尾部开始调用ChannelHandler的出站方法,直到找到第一个处理器可以处理该事件为止。如果所有的处理器都不能处理该事件,则会抛出异常。 需要注意的是,当ChannelHandler的方法被调用时,需要调用下一个ChannelHandler的方法,可以通过调用ChannelHandlerContext的相应方法来实现。例如,在入站方法中,可以调用ctx.fireChannelRead(msg)方法来将数据传递给下一个ChannelHandler进行处理。 总之,Netty的handler链调用机制是通过ChannelPipeline实现的,当有数据进入或离开Channel时,ChannelPipeline会按顺序调用每个ChannelHandler的方法,完成数据的处理和转换。

 

五、netty自定义协议解决Tcp粘包拆包

在使用TCP协议传输数据时,由于TCP是基于流的协议,数据可能会被切割成多个包发送,或多个包的数据可能会被拼接在一起发送,这就是TCP粘包和拆包问题。Netty提供了多种解决TCP粘包和拆包问题的方法,其中之一是自定义协议。 自定义协议的实现步骤如下:

  1. 定义解码器和编码器:自定义协议需要实现自己的解码器和编码器。解码器用于将接收到的数据解码成业务对象,编码器用于将业务对象编码成字节流发送。
java
public class MyDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // 判断是否有足够的字节可以读取 if (in.readableBytes() < 4) { return; } // 标记读取位置 in.markReaderIndex(); // 读取数据长度 int length = in.readInt(); // 判断是否有足够的字节可以读取 if (in.readableBytes() < length) { // 重置读取位置 in.resetReaderIndex(); return; } // 读取数据 byte[] data = new byte[length]; in.readBytes(data); // 将字节数组转化为业务对象 Object obj = deserialize(data); out.add(obj); } } public class MyEncoder extends MessageToByteEncoder<Object> { @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { // 将业务对象转化为字节数组 byte[] data = serialize(msg); // 写入数据长度和数据 out.writeInt(data.length); out.writeBytes(data); } }

解码器的作用是将接收到的数据解码成业务对象,并添加到List out中。在解码器中,首先判断是否有足够的字节可以读取,如果没有,则返回空,等待下次数据到达;如果有足够的字节可以读取,则读取数据长度和数据,并将数据转化为业务对象。 编码器的作用是将业务对象编码成字节流,并写入ByteBuf中。在编码器中,首先将业务对象转化为字节数组,然后将数据长度和数据写入ByteBuf中。 2. 实现自定义协议:自定义协议需要定义消息类型、消息体等信息,具体实现方式根据业务需求而定。

 

java
public class MyMessage { private int messageType; private int messageLength; private byte[] messageData; // 省略getter和setter方法 }

在自定义协议中,可以定义消息类型、消息长度和消息体等信息。在示例中,定义了一个MyMessage类,包含了消息类型、消息长度和消息体。 3. 在ChannelPipeline中添加解码器和编码器:在Netty的ChannelPipeline中添加自定义的解码器和编码器,以实现自定义协议的解码和编码。

java
public class MyServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 添加解码器和编码器 pipeline.addLast(new MyDecoder()); pipeline.addLast(new MyEncoder()); // 添加业务处理器 pipeline.addLast(new MyServerHandler()); } }

在示例中,将MyDecoder和MyEncoder添加到ChannelPipeline中,以实现自定义协议的解码和编码。 4. 在业务处理器中处理消息:在业务处理器中,可以处理接收到的业务对象,以完成业务逻辑。

java
public class MyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof MyMessage) { MyMessage message = (MyMessage) msg; // 处理业务对象 ... } } }

在示例中,判断接收到的业务对象是否为MyMessage类型,如果是,则将其转化为MyMessage对象,并处理相关业务逻辑。 通过以上步骤,就可以实现自定义协议,解决TCP粘包和拆包问题。自定义协议可以根据具体业务需求进行设计和实现,提高数据传输的安全性和可靠性。

六、netty服务器启动流程

 

七、netty接收请求流程

 

八、Pipeline调用ChannelHandler流程

 

九、任务加入异步线程池流程

 

十、RPC调用流程