dubbo系列之-序列化

技术

一款更有价值的序列化协议,dubbo也是很早就支持到了,就是大名鼎鼎的 "protobuf"

一、协议内容

protobuf 协议需要有.proto 文件和转换工具支持 (https://github.com/protocolbuffers/protobuf/releases),我们这里为了简单采用protostuff进行测试,他们两者生成的二进制数据结构格式完全相同的,可以说protostuff是一个基于Protobuf的序列化工具,protostuff通过schema的形式简化了复杂的自定义过程。

protobuf采用T-L-V (Tag-Length-Value)作为存储方式,既压缩后的字节流为如下形式。

picture.image

tag的计算公式为:

变量索引 << 3 | wire_type

01、wire_type

那么分析下Tag,tag代表数据类型wire_type和变量索引index,基础数据类型总共有如下几种:

picture.image

  • 对应的java中Integer

则为 int32 编码方式为Varint ,wireType = 0

  • 对应的java中Long

则为 int64 编码方式为Varint ,wireType = 0

  • 对应的java中String

则为 string 编码方式为length-delimi ,wireType = 2

  • 对应的java中Double

则为 double 编码方式为64-bit ,wireType = 1

02、变量索引

这个索引就是指我们类变量的顺序规则(仔细分析下,通过索引来定义属性位置,这样我们就不需像json一样每次都需要传递key参数,而只需要传递必须的value,但是这样带来一个明显的问题就是顺序和类型依赖很强)

03、length&value

length和value指的是后面变长内容的长度和序列化之后的字节内容。

04、编码规则

protobuf 有一套高效的编码方式,这里解释其中编码方式varint和zigzag和定长编码:

  • varint:

将二进制从右到左边7位一次计算,直到读取最后有效位置,7位有效位如果非最后7位则前面补1进行编码。

  • zigzag(如果为负数的情况):

(n << 1) ^ (n >> 31)

  • 定长编码:

像字符串"abc",这种压缩则直接为ascii编码

思考:为什么负数和正数会不一样?

二、案例分析

协议是固定的,先不去质疑,我们运行如下案例,看看能否反推下,加深对协议的理解。

Java要支持protostuff需要引入如下pom。

<dependency>                 
    <groupId>io.protostuff</groupId>        
    <artifactId>protostuff-runtime</artifactId>
    <version>1.7.2</version>
</dependency>
<dependency>       
    <groupId>io.protostuff</groupId>        
    <artifactId>protostuff-core</artifactId>
    <version>1.7.2</version>
</dependency>

定义一个需要序列化的对象

public class WishRequest implements Serializable {                 
    private Integer age;                   
    private Long money;
    private String msg;

用protostuff 工具类进行压缩

public class ProtostuffTest {                 
    public static void main(String[] args) {
        Schema<WishRequest> schema = RuntimeSchema.getSchema(WishRequest.class);
        WishRequest wishRequest = new WishRequest();
        wishRequest.setAge(18);      
        wishRequest.setMoney(1314L);
        wishRequest.setMsg("happy new year");    
        LinkedBuffer buffer = LinkedBuffer.allocate(1024);
        byte[] data = ProtobufIOUtil.toByteArray(wishRequest, schema, buffer);
        System.out.println(Arrays.toString(data));            
        System.out.println(data.length);
=============================================================================
[8, 18, 16, -94, 10, 26, 14, 104, 97, 112, 112, 121, 32, 
110, 101, 119, 32, 121, 101, 97, 114]======>输出数组
                    
[8(第一位), 18, 16(第3位), -94, 10, 26(第6位), 14, 104, 97, 112, 112, 121, 32,                  
110, 101, 119, 32, 121, 101, 97, 114]======>输出数组
21                  
//16进制输出                  
[08 12 10 a2 0a 1a 0e 68 61 70 70 79 20 6e 65 77 20 79 65 61 72]             

我们的对象输出了一个长度为 21 字节的数组,这里差个番外篇大家可以用hession、json进行同样压缩对比下输出的字节看看长度和内容上有什么区别。

压缩分析

回到tag-length-value,我们试着将数组拆分开进行分析。

第一个参数为age(Integer),从表格中得到wire_type = 0,变量索引顺序为第一个 = 1,那么tag = (1 << 3 | 0) = 1000 = 8;很巧和数组第一位吻合,age赋值为18(00000000 00000000 00000000 00010010),length可选长度默认是不需要的,直接看value它的有效位为(10010) 长度为5,按照7位取一次进行编码刚好一次可以取完,所以第二个字节为(000 10010)=18;回看输出数组第二位也很吻合都是18。

第二个参数为money,一样wire_type = 0,变量索引顺序为第二个 = 2,tag= (2<< 3 | 0) = 10 000 = 16;数组第三位也为16很吻合,age赋值为1314(00000000 00000000 00000101 00100010),有效长度为(101 00100010) 超过7位了,我们先取第一个七位 0100010 因为不是最后一位前面补1,最后字节为10100010(-94,负数的计算为取反+1=>01011101+1 => 01011110=94),我们回看也和数组第四位吻合,那么接下去再取第二个七位 000101 0(10) 该七位为最后一个七位不需要补1,直接输出,同样吻合输出数组第5位。

第三个参数为msg,wire_type=2;索引变量顺序3,tag= (3<< 3 | 2) = 11010=26,吻合数组第六位,wire_type对应为定长字符串“happy new year” 长度为14,那么length = 14,吻合数组第七位,下面最后剩下的字节(104, 97, 112, 112, 121, 32, 110, 101, 119, 32, 121, 101, 97, 114)可以用ascii翻译下,刚好就是happy new year。

三、在DUBBO中的实现

原来的dubbo demo工程中扩展:

//api 接口                     
public interface HelloService {               
    String sayHappyNewYear(WishRequest wish);
// 生产消费中的protocol xml 加上 protostuff 
<dubbo:protocol serialization="protostuff" name="dubbo" port="20880"/>
//消费类
public class ConsumerApplication {                 
    public static void main(String[] args) throws IOException, InterruptedException {
        ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("dubbo-consumer.xml");
        ctx.start();         
        WishRequest wishRequest = new WishRequest();    
        wishRequest.setAge(18);          
        wishRequest.setMsg("happy new year");
        wishRequest.setMoney(1314L);
        HelloService bean = ctx.getBean(HelloService.class);
        String jack = bean.sayHappyNewYear(wishRequest);
        System.out.println(jack);

采用wireshark 抓包结果如下:

picture.image *大家的输出可能和文章有些偏差,应该是类包名字和我不一致导致的。

开头是dabb这是dubbo的标志象征(这里不是dubbo要注意区分开因为16进制没有u和o),我们在灰色背景中寻找有没有我们刚才压缩打印的长度为21的数组 [08 12 10 a2 0a 1a 0e 68 61 70 70 79 20 6e 65 77 20 79 65 61 72 ],图中红线框的确是有的,我们思考下,为啥除了21长度的数组还会多出几百个字节的内容呢?

源码分析如下(留意代码旁边的注释):

//org.apache.dubbo.remoting.exchange.codec.ExchangeCodec#encodeRequest                     
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {   
    //....省略           
    if (req.isEvent()) {           
        //☆这次是接口请求不走上面心跳
        encodeEventData(channel, out, req.getData());
    } else {         
        //走这里进去            
        encodeRequestData(channel, out, req.getData(), req.getVersion());      
    }
    //....省略                   
    buffer.writerIndex(savedWriteIndex);           
    buffer.writeBytes(header); // write header.
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}   
//..rpc.protocol.dubbo.DubboCodec#encodeRequestData()                       
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) { 
    RpcInvocation inv = (RpcInvocation) data;
    out.writeUTF(version);//输出dubbo版本2.0.2
    out.writeUTF(inv.getAttachment(PATH_KEY)); //输出路径
    out.writeUTF(inv.getAttachment(VERSION_KEY));//输出方法版本0.0.0                 
    out.writeUTF(inv.getMethodName());//输出方法名           
    out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes())); //输出参数
    //前面几个string算下来有小200的字节,  
    Object[] args = inv.getArguments();
    if (args != null) {
        for (int i = 0; i < args.length; i++) { 
            //out 走的是 ProtostuffObjectOutput 类实现,我们进去看看
            out.writeObject(encodeInvocationArgument(channel, inv, i));         
        }             
    }               
    out.writeObject(inv.getAttachments()); //输出hashmap附带信息这表也有小200字节                   
}
//apache.dubbo.common.serialize.protostuff.ProtostuffObjectOutput#writeObject                       
public void writeObject(Object obj) throws IOException {
    byte[] bytes;           
    byte[] classNameBytes;       
    //如果是非对象,或者是基础参数类型(比如map,list..)会进行包装
    if (obj == null || WrapperUtils.needWrapper(obj)) {                 
        Schema<Wrapper> schema = RuntimeSchema.getSchema(Wrapper.class);
        Wrapper wrapper = new Wrapper(obj);
        bytes = GraphIOUtil.toByteArray(wrapper, schema, buffer);
        classNameBytes = Wrapper.class.getName().getBytes();        
    } else {
 //寻找类对应的 Schema ,这里的作用相当于是.proto 文件,使用的是反射,并且会有缓存,            
 //这里返回的是RuntimeSchema 
        Schema schema = RuntimeSchema.getSchema(obj.getClass());
        //压缩obj=WishRequest
        bytes = GraphIOUtil.toByteArray(obj, schema, buffer);
        classNameBytes = obj.getClass().getName().getBytes();        
    }
    dos.writeInt(classNameBytes.length);                   
    dos.writeInt(bytes.length);
    dos.write(classNameBytes);
    dos.write(bytes);
}   
//io.protostuff.GraphIOUtil#toByteArray                       
public static <T> byte[] toByteArray(T message, Schema<T> schema, LinkedBuffer buffer)
{
    final ProtostuffOutput output = new ProtostuffOutput(buffer);                       
    final GraphProtostuffOutput graphOutput = new GraphProtostuffOutput(output);
    schema.writeTo(graphOutput, message);
    return output.toByteArray();
}
//io.protostuff.runtime.RuntimeSchema#writeTo                       
@Override
public final void writeTo(Output output, T message){               
    //getFields() 中会返回对象所有的属性有age,money,msg,               
    //fields是在对象创建的时候通过策略模式找到指定的wire_type类型压缩实现类                   
    /**   
final Field<T> field = RuntimeFieldFactory.getFieldFactory(                 
        f.getType(), strategy).create(fieldMapping, name, f,
        strategy);
fields.add(field);      
   Field 的实现有很多,我们下面 RuntimeUnsafeFieldFactory.java类图,有每一种wireType          
   对应的压缩方式
    **/
    for (Field<T> f : getFields())                 
        f.writeTo(output, message);
}
//我们第一个对象是age Integer 类型的他对应的压缩方式实现应该是INT32 这个变量,我们进入到代码块
public static final RuntimeFieldFactory<Integer> INT32 = new RuntimeFieldFactory<Integer>(5) {
    public <T> Field<T> create(int number, String name, java.lang.reflect.Field f, IdStrategy strategy) {
        final boolean primitive = f.getType().isPrimitive();             
        final long offset = RuntimeUnsafeFieldFactory.us.objectFieldOffset(f);
        return new Field<T>(FieldType.INT32, number, name, (Tag)f.getAnnotation(Tag.class)) {                       
            public void mergeFrom(Input input, T message) throws IOException {}
            public void writeTo(Output output, T message) throws IOException {
                if (primitive) {
                    output.writeInt32(this.number, RuntimeUnsafeFieldFactory.us.getInt(message, offset), false);
                } else {        
                //将值转换为Integer
                    Integer value = (Integer)RuntimeUnsafeFieldFactory.us.getObject(message, offset);
                    if (value != null) {        
                    //☆ 继续跟进去
                        output.writeInt32(this.number, value, false);
                    }
                }   
            }       
            public void transfer(Pipe pipe, Input input, Output output, boolean repeated) throws IOException {}           
        };          
}              
//io.protostuff.ProtostuffOutput#writeInt32                       
@Override
public void writeInt32(int fieldNumber, int value, boolean repeated) {               
    if (value < 0){
        //...            
    }else{    
    //value 为 18 进入此分支,我们先瞄一眼 makeTag方法              
        tail = sink.writeVarInt32(value,this,
                sink.writeVarInt32(
makeTag(fieldNumber, WIRETYPE_VARINT),
                        this,
                        tail));
    }
}                   
#重点                       
//io.protostuff.WireFormat#makeTag                    
public static int makeTag(final int fieldNumber, final int wireType)
{//和协议介绍的一样字段索引左移3位 与 上wire_type     
    return (fieldNumber << 3) | wireType;
}
#重点                       
//io.protostuff.WriteSink#writeVarInt32                    
public LinkedBuffer writeVarInt32(int value, WriteSession session, LinkedBuffer lb) throws IOException {
    while(true) {           
        ++session.size;       
        if (lb.offset == lb.buffer.length) { 
            lb = new LinkedBuffer(session.nextBufferSize, lb);
        }
        //这里&上-128 判断从右往左7位取一次是否到最后,如果到最后就返回流               
        if ((value & -128) == 0) {         
            lb.buffer[lb.offset++] = (byte)value;
            return lb;
        }  
        lb.buffer[lb.offset++] = (byte)(value & 127 | 128);               
        value >>>= 7;//七位取一次,也和协议说明一样验证成功             
    }                     
}               

picture.image *RuntimeUnsafeFieldFactory.java 图

这样整个压缩的编码的过程我们就分析完了,可以看到对象传输占用了很少的内容,更多的是dubbo自己的包名和类信息还有参数信息,将包名缩短,变量定义减少,减少不必要传递的信息,或者不需要每次都传,那么的性能应该能提升很多。

四、总结

学习框架或者源码这种越是底层的东西,采用猜想验证的思路去一层层掀开实现往往更让人难以忘记。最底层的01序列化我们也啃完了,让我们一起期待下一个新篇章吧。

*文|小可爱

0
0
0
0
关于作者
相关资源
在火山引擎云搜索服务上构建混合搜索的设计与实现
本次演讲将重点介绍字节跳动在混合搜索领域的探索,并探讨如何在多模态数据场景下进行海量数据搜索。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论