f3;

Map f4;

Byte f5;

Short f6;

Integer f7;

Long f8;

Float f9;

Double f10;

short

List f12;

}

public

static

createObject

()

new

true

byte

1

2

new

obj.f1 = obj1;

"abc"

"abc"

"abc"

byte

1

2

obj.f5 = Byte.MAX_VALUE;

obj.f6 = Short.MAX_VALUE;

obj.f7 = Integer.MAX_VALUE;

obj.f8 = Long.MAX_VALUE;

1.0f

2

1

3.0

new

short

short

1

short

2

short

1

short

4

return

}

}

纯Java序列化:

public

class

CustomObjectExample

// mvn exec:java -Dexec.mainClass="io.fury.examples.CustomObjectExample"

public

static

void

main

String[] args

// Fury应该在多个对象序列化之间复用,不要每次创建新的Fury实例

Fury fury = Fury.builder().withLanguage(Language.JAVA)

false

false

.build();

byte

out

}

}

跨语言序列化:

public

class

CustomObjectExample

// mvn exec:java -Dexec.mainClass="io.fury.examples.CustomObjectExample"

public

static

void

main

String[] args

// Fury应该在多个对象序列化之间复用,不要每次创建新的Fury实例

Fury fury = Fury.builder().withLanguage(Language.XLANG)

false

"example.SomeClass1"

"example.SomeClass2"

byte

// bytes can be data serialized by other languages.

out

}

}

Python序列化示例

from

dataclasses import dataclass

from

typing import List, Dict

import

pyfury

@dataclass

class

SomeClass2:

f1

Any = None

f2

str = None

f3

List[str] = None

f4

Dict[pyfury.Int8Type, pyfury.Int32Type] = None

f5

pyfury.Int8Type = None

f6

pyfury.Int16Type = None

f7

pyfury.Int32Type = None

# int类型默认会按照long类型进行序列化,如果对端是更加narrow的类型,

# 需要使用pyfury.Int32Type等进行标注

f8

int = None # 也可以使用pyfury.Int64Type进行标注

f9

pyfury.Float32Type = None

f10

float = None # 也可以使用pyfury.Float64Type进行标注

f11

pyfury.Int16ArrayType = None

f12

List[pyfury.Int16Type] = None

@dataclass

class

SomeClass1:

f1

Any

f2

Dict[pyfury.Int8Type, pyfury.Int32Type]

if

__name__ == "__main__":

fury_

pyfury.Fury(reference_tracking=False)

fury_.register_class(SomeClass1,

"example.SomeClass1")

fury_.register_class(SomeClass2,

"example.SomeClass2")

obj2

SomeClass2(f1=True, f2={-1: 2})

obj1

SomeClass1(

f1

obj2,

f2

"abc",

f3

["abc", "abc"],

f4

{1: 2},

f5

2 ** 7 - 1,

f6

2 ** 15 - 1,

f7

2 ** 31 - 1,

f8

2 ** 63 - 1,

f9

1.0 / 2,

f10

1 / 3.0,

f11

array.array("h", [1, 2]),

f12

[-1, 4],

)

data

fury_.serialize(obj)

# bytes can be data serialized by other languages.

print(fury_.deserialize(data))

GoLang序列化示例

package

import

"code.alipay.com/ray-project/fury/go/fury"

import

"fmt"

func

main

()

type

struct

interface

string

interface

map

int8

int32

int8

int16

int32

int64

float32

float64

int16

F12 fury.Int16Slice

}

type

struct

interface

map

int8

int32

}

false

if

"example.SomeClass1"

nil

panic

}

if

"example.SomeClass2"

nil

panic

}

obj2 := &SomeClass2{}

true

map

int8

int32

-1

2

obj := &SomeClass1{}

obj.F1 = obj2

"abc"

interface

"abc"

"abc"

map

int8

int32

1

2

obj.F4 = f4

obj.F5 = fury.MaxInt8

obj.F6 = fury.MaxInt16

obj.F7 = fury.MaxInt32

obj.F8 = fury.MaxInt64

1.0

2

1

3.0

int16

1

2

int16

-1

4

bytes, err := fury_.Marshal(value)

if

nil

}

var

interface

// bytes can be data serialized by other languages.

if

nil

panic

}

fmt.Println(newValue)

}

序列化共享&循环引用

手动实现

大量冗长复杂易出

跨语言序列化框架支持循环引用

极大简化这些复杂场景的序列化,加速业务迭代效率。

Java序列化示例

import

import

import

class

ReferenceExample

static

class

SomeClass

SomeClass f1;

Map

String

String

Map

String

String

}

static

Object

new

obj.f1 = obj;

"k1"

"v1"

"k2"

"v2"

obj.f3 = obj.f2;

return

}

}

Java序列化:

public

class

ReferenceExample

// mvn exec:java -Dexec.mainClass="io.fury.examples.ReferenceExample"

public

static

void

main

String[] args

// Fury应该在多个对象序列化之间复用,不要每次创建新的Fury实例

Fury fury = Fury.builder().withLanguage(Language.JAVA)

true

false

.build();

byte

out

}

}

跨语言序列化:

public

class

ReferenceExample

// mvn exec:java -Dexec.mainClass="io.fury.examples.ReferenceExample"

public

static

void

main

String[] args

// Fury应该在多个对象序列化之间复用,不要每次创建新的Fury实例

Fury fury = Fury.builder().withLanguage(Language.XLANG)

true

"example.SomeClass"

byte

// bytes can be data serialized by other languages.

out

}

}

Python序列化示例

from

import

import

class

SomeClass

"SomeClass"

f2: Dict[str, str]

f3: Dict[str, str]

if

"__main__"

True

"example.SomeClass"

obj = SomeClass()

"k1"

"v1"

"k2"

"v2"

obj.f1, obj.f3 = obj, obj.f2

data = fury_.serialize(obj)

# bytes can be data serialized by other languages.

print(fury_.deserialize(data))

Golang序列化示例

package

import

"code.alipay.com/ray-project/fury/go/fury"

import

"fmt"

func

main

()

type

struct

F1 *SomeClass

map

string

string

map

string

string

}

true

if

"example.SomeClass"

nil

panic

}

map

string

string

"k1"

"v1"

"k2"

"v2"

value.F3 = value.F2

value.F1 = value

bytes, err := fury_.Marshal(value)

if

nil

}

var

interface

// bytes can be data serialized by other languages.

if

nil

panic

}

fmt.Println(newValue)

}

Zero-Copy序列化

对于大规模数据传输场景,内存拷贝有时会成为整个系统的瓶颈。为此各种语言和框架做了大量优化,比如Java提供了NIO能力,避免了内存在用户态和内核态之间的来回拷贝;Kafka使用Java的NIO来实现零拷贝;Python Pickle5提供了

Out-Of-Band Buffer[7]

序列化能力来避免额外拷贝。

对于高性能跨语言数据传输,序列化框架也需要能够支持Zero-Copy,避免数据Buffer的额外拷贝。下面是一个Fury序列化多个基本类型数组组成的对象树的示例,分别对应到Java基本类型数组、Python Numpy数组、Golang 基本类型slice。对于ByteBuffer零拷贝,在本文的性能测试部分也给出了部分介绍。

Java序列化示例

Java序列化

import

import

import

import

import

public

class

ZeroCopyExample

// mvn exec:java -Dexec.mainClass="io.fury.examples.ZeroCopyExample"

public

static

void

main

(String[] args)

// Fury应该在多个对象序列化之间复用,不要每次创建新的Fury实例

Fury fury = Fury.builder()

.withLanguage(Language.JAVA)

false

.build();

list

"str"

new

1000

new

int

100

new

double

100

new

list

List buffers =

map

System.out.println(fury.deserialize(bytes, buffers));

}

}

跨语言序列化:

import

import

import

import

import

public

class

ZeroCopyExample

// mvn exec:java -Dexec.mainClass="io.fury.examples.ZeroCopyExample"

public

static

void

main

(String[] args)

Fury fury = Fury.builder().withLanguage(Language.XLANG).build();

list

"str"

new

1000

new

int

100

new

double

100

new

list

// bytes can be data serialized by other languages.

List buffers =

map

System.out.println(fury.deserialize(bytes, buffers));

}

}

Python序列化示例

import

import

import

as

if

"__main__"

fury_ = pyfury.Fury()

"str"

1000

"i"

100

100

0.0

serialized_objects = []

data

for

in

data

by

data

Golang序列化示例

package

import

"code.alipay.com/ray-project/fury/go/fury"

import

"fmt"

func

main

()

true

// Golang版本暂不支持其他基本类型slice的zero-copy

interface

"str"

make

byte

1000

nil

var

func

(o fury.SerializedObject)

bool

append

return

false

})

var

interface

var

for

range

append

}

err := fury.Deserialize(buf, &newList, buffers)

fmt.Println(newList)

Drop-in替换Kryo/Hession

高性能的通用Java序列化框架

Drop-in替换

Kryo 20倍

以上

100倍

200倍

下面是一个序列化自定义类型的示例:

import io.fury.Fury;

import java.util.List;

import java.util.Arrays;

public

class

Example

public

static

void

main

String[] args

object

new

// Fury实例应该在序列化多个对象之间复用,不要每次创建新的实例

{

Fury fury = Fury.builder()

.withLanguage(Language.JAVA)

// 设置为true可以避免反序列化未注册的非内置类型,

// 避免安全漏洞

false

true

// 注册类型可以减少classname的序列化,不是强制要求

// fury.register(SomeClass.class);

byte

object

out

}

{

ThreadSafeFury fury = Fury.builder().withLanguage(Language.JAVA)

true

false

.buildThreadSafeFury();

byte

object

out

}

{

new

Fury fury = Fury.builder()

.withLanguage(Language.JAVA)

false

true

// 注册类型可以减少classname的序列化

fury.register(SomeClass.class);

return

});

byte

object

out

}

}

}

通过Fury Format避免序列化

对于有极致性能要求的场景,如果用户只需要读取部分数据,或者在Serving场景根据对象树某个字段进行过滤和转发,可以使用Fury Format来避免其它字段的序列化。Fury Row Format是参考SQL行存和Arrow列存实现的一套可以随机访问的二进制行存结构。目前实现了Java/Python/C++版本,Python版本通过Cython绑定到C++实现。

该格式是自包含的,可以根据schema直接计算出任意字段的offset

减少Java GC overhead

。由于避免了反序列化,因此不会创建对象,从而避免了GC问题。

避免Python反序列化

_getattr__/__getitem__/slice/

special methods

python pojo/list/object

缓存友好,数据密集存储。

Python示例

这里给出一个读取部分数据的样例以及性能测试结果。在下面这个序列化场景中,需要读取第二个数组字段的第10万个元素,Fury耗时几乎为0,而pickler需要8秒。

@dataclass

class

Bar

f1: str

f2: List[pa.int64]

@dataclass

class

Foo

f1: pa.int32

f2: List[pa.int32]

f3: Dict[str, pa.int32]

f4: List[Bar]

encoder = pyfury.encoder(Foo)

10

1000

{i}

for

in

1000

{i}

10

for

in

1000

binary: bytes = encoder.to_row(foo).to_bytes()

{datetime.datetime.now()}

foo_row = pyfury.RowData(encoder.schema, binary)

100000

100000

200000

5

{datetime.datetime.now()}

binary = pickle.dumps(foo)

{datetime.datetime.now()}

new_foo = pickle.loads(binary)

100000

100000

200000

5

{datetime.datetime.now()}

Java示例

public

class

Bar

String f1;

List

}

public

class

Foo

int f1;

List

Map f3;

List

}

Encoder encoder = Encoders.rowEncoder(Foo.class);

// 该数据可以被Python零拷贝解析

// 可以是来自python序列化的数据

1

// 零拷贝读取List f2字段

4

// 零拷贝读取List f4字段

10

// 零拷贝读取读取List f4第11个元素数据

// 零拷贝读取读取List f4第11个元素数据的f2字段的第6个元素

1

5

Encoder barEncoder = Encoders.rowEncoder(Bar.class);

// 部分反序列化对象

Bar newBar = barEncoder.fromRow(barStruct);

20

// 对象创建示例:

// Foo foo = new Foo();

// foo.f1 = 10;

// foo.f2 = IntStream.range(0, 1000000).boxed().collect(Collectors.toList());

// foo.f3 = IntStream.range(0, 1000000).boxed().collect(Collectors.toMap(i -> "k"+i, i->i));

// List bars = new ArrayList<>(1000000);

// for (int i = 0; i < 1000000; i++) {

// Bar bar = new Bar();

// bar.f1 = "s"+i;

// bar.f2 = LongStream.range(0, 10).boxed().collect(Collectors.toList());

// bars.add(bar);

// }

// foo.f4 = bars;

自动转换Arrow

Fury Format支持自动与Arrow列存互转。

Python示例:

import pyfury

encoder = pyfury.encoder(Foo)

encoder.to_arrow_record_batch([foo] * 10000)

encoder.to_arrow_table([foo] * 10000)

C++示例:

std::shared_ptr arrow_writer;

EXPECT_TRUE(

ArrowWriter::Make(schema, ::arrow::default_memory_pool(), &arrow_writer)

.ok());

for

EXPECT_TRUE(arrow_writer->Write(row).ok());

}

std::shared_ptr<::arrow::RecordBatch> record_batch;

EXPECT_TRUE(arrow_writer->Finish(&record_batch).ok());

EXPECT_TRUE(record_batch->Validate().ok());

EXPECT_EQ(record_batch->num_columns(), schema->num_fields());

EXPECT_EQ(record_batch->num_rows(), row_nums);

Java示例:

Schema

schema = TypeInference.inferSchema(BeanA.class);

ArrowWriter

arrowWriter = ArrowUtils.createArrowWriter(schema);

Encoder

encoder = Encoders.rowEncoder(BeanA.class);

for

(int i = 0; i < 10; i++) {

BeanA

beanA = BeanA.createBeanA(2);

arrowWriter.write(encoder.toRow(beanA));

}

return

arrowWriter.finishAsRecordBatch();

对比其它序列化框架

跟其它框架的对比将分为功能、性能和易用性三个维度,每个维度上Fury都有比较显著的优势。

功能比较

这里从10个维度将Fury跟别的框架进行对比,每个维度的含义分别为:

多语言/跨语言:是否支持多种语言以及是否支持跨语言序列化

自动序列化:是否需要写大量序列化代码,还是可以完全自动话

是否需要schema编译:是否需要编写schema IDL文件,并编译schema生成代码

自定义类型:是否支持自定义类型,即POJO/DataClass/Struct等

非自定义类型:是否支持非自定义类型,即是否支持直接序列化基本类型、数组、List、Map等,还是需要将这些类型包装到自定义类型内部才能进行序列化

引用/循环引用:对于指向同一个对象的两份引用,是否只会序列化数据一次;对于循环引用,是否能够进行序列化而不是出现递归报错

多态子类型:对于List/Map的多个子类型如ArrayList/LinkedList/ImmutableList,HashMap/LinkedHashMap等,反序列化是否能够得到相同的类型,还是会变成ArrayList和HashMap

反序列化是否需要传入类型:即是否需要在反序列化时需要提前知道数据对应的类型。如果需要的话则灵活性和易用性会受到限制,而且传入的类型不正确的话反序列化可能会crash

部分反序列化/随机读写:反序列化是否可以只读取部分字段或者嵌套的部分字段,对于大对象这可以节省大量序列化开销

堆外内存读写:即是否支持直接读写native内存

数值类型可空:是否支持基本类型为null,比如Java的Integer等装箱类型以及python的int/float可能为null。

性能比较(数值越小越好)

这里给出在纯Java序列化场景对比其它框架的性能测试结果。其它语言的性能测试将在后续文章当中发布。

测试环境:

操作系统:4.9.151-015.ali3000.alios7.x86_64

CPU型号:Intel(R) Xeon(R) Platinum 8163 CPU @ 2.50GHz

Byte Order:Little Endian

L1d cache:32K

L1i cache:32K

L2 cache:1024K

L3 cache:33792K

测试原则:

自定义类型序列化测试数据使用的是

kryo-benchmark[8]

的数据,保证测试结果对Fury没有任何偏向性。尽管Kryo测试数据里面有大量基本类型数组,为了保证测试的公平性我们并没有开启Fury的Out-Of-Band零拷贝序列化能力。然后使用我们自己创建的对象单独准备了一组零拷贝测试用例。

测试工具:

为了避免JVM JIT给测试带来的影响,我们使用

JMH[9]

工具进行测试,每组测试在五个子进程依次进行,避免受到进程CPU调度的影响,同时每个进程里面执行三组Warmup和5组正式测试,避免受到偶然的环境波动影响。

下面是我们使用JMH测试fury/kryo/fst/hession/protostuff/jdk序列化框架在序列化到堆内存和堆外内存时的性能(数值越小越好)。

自定义类型性能对比

Struct

20倍

public

class

Struct

implements

Serializable

int

long

float

double

...

int

long

float

double

}

序列化:

反序列化:

Sample

Sample类型主要由基本类型、装箱类型、字符串和数组等类型字段组成,对于这种类型的对象,Fury的性能可以达到Kryo的6~7倍。没有更快的原因是因为这里的多个基本类型数组需要进行拷贝,这块占用一定的耗时。如果使用Fury的Out-Of-Band序列化的话。这些额外的拷贝就可以完全避免掉,但这样比较不太公平,因此这里没有开启。

public

final

class

Sample

implements

Serializable

public

int

public

long

public

float

public

double

public

short

public

char

public

boolean

public

public

public

public

public

public

public

public

int

public

long

public

float

public

double

public

short

public

char

public

boolean

public

// Can be null.

public

// Can be null.

public

Sample

()

public

populate

boolean

123

1230000

12.345f

1.234567

12345

'!'

true

321

3210000L

54.321f

7.654321

32100

'$'

BooleanValue = Boolean.FALSE;

new

int

1234

123

12

1

0

1

12

123

1234

new

long

123400

12300

1200

100

0

100

1200

12300

123400

new

float

12.34f

12.3f

12

1

0

1

12

12.3f

12.34f

new

double

1.234

1.23

12

1

0

1

12

1.23

1.234

new

short

1234

123

12

1

0

1

12

123

1234

"asdfASDF"

new

boolean

true

false

false

true

"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"

if

this

}

return

this

}

}

序列化耗时:

反序列化耗时:

MediaContent

4~5倍

String序列化开销比较大

Fury的String零拷贝序列化协议

直接把String内部的Buffer抽取出来,然后直接放到Out-Of-Band buffer里面,完全避免掉String序列化的开销

public

final

class

Media

implements

java

io

Serializable

public

public

// Can be null.

public

int

public

int

public

public

long

public

long

public

int

public

boolean

public

public

public

// Can be null.

public

Media

()

public

enum

JAVA,

FLASH;

}

}

public

final

class

MediaContent

implements

java

io

Serializable

public

public

public

MediaContent

()

public

MediaContent

(Media media, List images)

this

this

}

public

populate

boolean

new

"http://javaone.com/keynote.ogg"

641

481

"video/theora\u1234"

18000001

58982401

new

"Bill Gates, Jr."

"Steven Jobs"

media.player = Media.Player.FLASH;

"Copyright (c) 2009, Scooby Dooby Doo"

new

this

null

images.add(

new

"http://javaone.com/keynote_huge.jpg"

"Javaone Keynote\u1234"

32000

24000

Image.Size.LARGE,

media));

images.add(

new

"http://javaone.com/keynote_large.jpg"

null

1024

768

images.add(

new

"http://javaone.com/keynote_small.jpg"

null

320

240

return

this

}

}

序列化耗时:

反序列化耗时:

Buffer零拷贝性能对比

基本类型数组

对于基本类型可以看到Fury序列化几乎耗时为0,而别的框架耗时随着数组大小线性增加。

反序列时Fury耗时也会线性增加是因为需要把Buffer拷贝到Java基本类型数组里面。

public

class

ArraysData

implements

Serializable

public

boolean

public

byte

public

int

public

long

public

double

public

ArraysData

()

public

ArraysData

int

new

boolean

new

byte

new

int

new

long

new

double

new

random.nextBytes(bytes);

for

int

0

booleans[i] = random.nextBoolean();

ints[i] = random.nextInt();

longs[i] = random.nextLong();

doubles[i] = random.nextDouble();

}

}

}

序列化耗时:

反序列耗时:

堆外Buffer

除了基本类型数组,我们也测试了Java ByteBuffer的序列化性能。由于Kryo和Fst并不支持ByteBuffer序列化,同时并没有提供直接读写ByteBuffer的接口,因此我们使用了byte array来模拟内存拷贝。可以看到对于堆外Buffer,Fury的序列化和反序列化耗时都是一个常量,不随Buffer大小而增加。

序列化耗时:

反序列化耗时:

易用性比较

这里以一个自定义类型为例对比易用性,该类型包含常见基本类型字段以及集合类型字段,最终需要序列化的对象是一个Bar的实例:

class

Foo

String

Map

String

}

class

Bar

Foo f1;

String

List f3;

Map

Integer f5;

Long f6;

Float f7;

Double f8;

short[] f9;

List f10;

}

Fury序列化

Fury序列化只需一行代码,且无任何学习成本。

Fury fury = Fury.builder().withLanguage(Language.XLANG).build();

data

data

data

对比Protobuf

首先需要

安装protoc编译器[10]

,注意protoc的版本不能高于proto依赖库的版本

然后定义针对需要序列化的对象的schema:

syntax

"proto3";

package

protobuf;

option

java_package = "io.ray.fury.benchmark.state.generated";

option

java_outer_classname = "ProtoMessage";

message

Foo {

optional

string f1 = 1;

map

int32> f2 = 2;

}

message

Bar {

optional

Foo f1 = 1;

optional

string f2 = 2;

repeated

Foo f3 = 3;

map

Foo> f4 = 4;

optional

int32 f5 = 5;

optional

int64 f6 = 6;

optional

float f7 = 7;

optional

double f8 = 8;

repeated

int32 f9 = 9; // proto不支持int16

repeated

int64 f10 = 10;

}

然后通过protoc编译schema生成Java/Python/GoLang代码文件。

java: protoc --experimental_allow_proto3_optional -I=src/main/java/io/ray/fury/benchmark/state --java_out=src/main/java/ bench.proto

bench.proto

生成Python/GoLang代码

构建工具集成成本

protobuf-maven-plugin[11]

依然需要用户在机器安装protoc,而不是自动下载protoc。

用户已经有了自定义类型和基本类型以及组合类型构成的对象(树)

开发成本,且每种需要都需要写一遍,代码冗长且易出错难维护

数据转换和拷贝开销

public

static

byte

return

}

public

static

build

(Bar bar)

ProtoMessage.Bar.Builder barBuilder = ProtoMessage.Bar.newBuilder();

if

null

barBuilder.clearF1();

else

barBuilder.setF1(buildFoo(bar.f1));

}

if

null

barBuilder.clearF2();

else

barBuilder.setF2(bar.f2);

}

if

null

barBuilder.clearF3();

else

for

barBuilder.addF3(buildFoo(foo));

}

}

if

null

barBuilder.clearF4();

else

bar.f4.forEach(

(k, v) -> {

ProtoMessage.Foo.Builder fooBuilder1 = ProtoMessage.Foo.newBuilder();

fooBuilder1.setF1(v.f1);

v.f2.forEach(fooBuilder1::putF2);

barBuilder.putF4(k, fooBuilder1.build());

});

}

if

null

barBuilder.clearF5();

else

barBuilder.setF5(bar.f5);

}

if

null

barBuilder.clearF6();

else

barBuilder.setF6(bar.f6);

}

if

null

barBuilder.clearF7();

else

barBuilder.setF7(bar.f7);

}

if

null

barBuilder.clearF8();

else

barBuilder.setF8(bar.f8);

}

if

null

barBuilder.clearF9();

else

for

short

barBuilder.addF9(i);

}

}

if

null

barBuilder.clearF10();

else

barBuilder.addAllF10(bar.f10);

}

return

}

public

static

buildFoo

(Foo foo)

ProtoMessage.Foo.Builder builder = ProtoMessage.Foo.newBuilder();

if

null

builder.clearF1();

else

builder.setF1(foo.f1);

}

if

null

builder.clearF2();

else

foo.f2.forEach(builder::putF2);

}

return

}

public

static

fromFooBuilder

(ProtoMessage.Foo.Builder builder)

new

if

foo.f1 = builder.getF1();

}

foo.f2 = builder.getF2Map();

return

}

public

static

deserializeBar

byte

throws

new

ProtoMessage.Bar.Builder barBuilder = ProtoMessage.Bar.newBuilder();

barBuilder.mergeFrom(bytes);

if

bar.f1 = fromFooBuilder(barBuilder.getF1Builder());

}

if

bar.f2 = barBuilder.getF2();

}

bar.f3 =

barBuilder.getF3BuilderList().stream()

.map(ProtoState::fromFooBuilder)

.collect(Collectors.toList());

new

barBuilder.getF4Map().forEach((k, v) -> bar.f4.put(k, fromFooBuilder(v.toBuilder())));

if

bar.f5 = barBuilder.getF5();

}

if

bar.f6 = barBuilder.getF6();

}

if

bar.f7 = barBuilder.getF7();

}

if

bar.f8 = barBuilder.getF8();

}

new

short

for

int

0

short

}

bar.f10 = barBuilder.getF10List();

return

}

Python序列化代码:大概130~150行

GoLang序列化代码:大概130~150行

即使之前没有针对该数据的自定义类型,也无法将protobuf生成的class直接用在业务代码里面。因为protobuf生成的class

并不符合面向对象设计[12]

,无法给生成的class添加行为。这时候就需要定义额外的wrapper,如果自动内部有其它自定义类型,还需要将这些类型转换成对应的wrapper,这进一步限制了使用的灵活性。

对比Flatbuffer

Flatbuffer与protobuf一样,也需要大量的学习成本和开发成本:

安装

flatc编译器[13]

,对于Linux环境,可能还需要进行源码编译安装flatc。

定义Schema

namespace

io.ray.fury.benchmark.state.generated;

table

FBSFoo {

string

string;

f2_key

[string]; // flatbuffers不支持map

f2_value

[int];

}

table

FBSBar {

f1

FBSFoo;

f2

string;

f3

[FBSFoo];

f4_key

[int]; // flatbuffers不支持map

f4_value

[FBSFoo];

f5

int;

f6

long;

f7

float;

f8

double;

f9

[short];

f10

[long];

//

由于fbs不支持基本类型nullable,因此还需要单独一组字段或者一个vector标识这些值是否为null

}

root_type

FBSBar;

然后通过flatc编译schema生成Java/Python/GoLang代码文件。

java: flatc -I=src/main/java/io/ray/fury/benchmark/state -o=src/main/java/ bar.fbs

生成Python/GoLang代码

为了避免把生成的代码提交到代码仓库,需要将proto跟构建工具进行集成,目前似乎只有bazel构建工具有比较好的集成,别的构建工具如maven/gradle等似乎都没有比较好的集成方式。

代码冗长易出错难维护

代码不灵活、难写且易出错

序列化对象树时需要先深度优先和先序遍历整颗对象树,并手动保存每个变长字段的offset到临时状态,然后再序列化所有字段偏移或者内联标量值,这块代码写起来非常繁琐,一旦offset存储出现错误,序列化将会出现

等报错,较难排查

list元素需要按照反向顺序进行序列化不符合直觉。由于buffer是从后往前构建,因此对于list,需要将元素逆向依次进行序列化。

不支持map类型,需要将map序列化为两个list或者序列化为一个table,进一步带来了额外的开发成本。

下面是Java的序列化代码,大概需要100~150行;处理每个字段是否为null,大概还需要100行左右代码。因此Java序列化大概需要200~250行代码:

public

static

byte

serialize

Bar bar

return

}

public

static

buildBar

Bar bar

// 这里忽略了空值处理的代码

new

int

int

new

int

for

int

0

get

}

int

int

int

{

int

new

int

int

new

int

int

0

for

keys[i] = entry.getKey();

valueOffsets[i] = buildFoo(builder, entry.getValue());

i++;

}

f4_key_offset = FBSBar.createF4KeyVector(builder, keys);

f4_value_offset = FBSBar.createF4ValueVector(builder, valueOffsets);

}

int

int

FBSBar.startFBSBar(builder);

FBSBar.addF1(builder, buildFoo(builder, bar.f1));

FBSBar.addF2(builder, f2_offset);

FBSBar.addF3(builder, f3_offset);

FBSBar.addF4Key(builder, f4_key_offset);

FBSBar.addF4Value(builder, f4_value_offset);

FBSBar.addF5(builder, bar.f5);

FBSBar.addF6(builder, bar.f6);

FBSBar.addF7(builder, bar.f7);

FBSBar.addF8(builder, bar.f8);

FBSBar.addF9(builder, f9_offset);

FBSBar.addF10(builder, f10_offset);

builder.finish(FBSBar.endFBSBar(builder));

return

}

public

static

int

buildFoo

FlatBufferBuilder builder, Foo foo

int

int

new

int

int

new

int

int

0

for

keyOffsets[i] = builder.createString(entry.getKey());

values[i] = entry.getValue();

i++;

}

int

int

return

}

public

static

deserializeBar

ByteBuffer buffer

new

FBSBar fbsBar = FBSBar.getRootAsFBSBar(buffer);

bar.f1 = deserializeFoo(fbsBar.f1());

bar.f2 = fbsBar.f2();

{

new

for

int

0

add

}

bar.f3 = f3List;

}

{

new

for

int

0

f4.put(fbsBar.f4Key(i), deserializeFoo(fbsBar.f4Value(i)));

}

bar.f4 = f4;

}

bar.f5 = fbsBar.f5();

bar.f6 = fbsBar.f6();

bar.f7 = fbsBar.f7();

bar.f8 = fbsBar.f8();

{

short

new

short

for

int

0

f9[i] = fbsBar.f9(i);

}

bar.f9 = f9;

}

{

new

for

int

0

add

}

bar.f10 = f10;

}

return

}

public

static

deserializeFoo

FBSFoo fbsFoo

new

string

new

foo.f2 = map;

for

int

0

map.put(fbsFoo.f2Key(i), fbsFoo.f2Value(i));

}

return

}

Python序列化代码:大概200~250行

GoLang序列化代码:大概200~250行

将Flatbuffer生成类型包装到其它符合面向对象设计的类里面:由于Flatbuffer序列化过程需要保存大量中间offset,且需要先把所有可变长度对象写入buffer,因此通过wrapper修改flatbuffer数据会比较复杂,使得包装Flatbuffer生成类型只适合反序列化读数据过程,导致添加wrapper也变得很困难。

对比Msgpack

Msgpack Java和Python并不支持自定义类型序列化,需要用户增加扩展类型手动进行序列化,因此这里省略。

总结

Fury最早是我在2019年开发,当时是为了支持

分布式计算框架Ray[14]

的跨语言序列化以及蚂蚁在线学习场景样本流的跨语言传输问题。经过蚂蚁丰富业务场景的打磨,目前已经在蚂蚁在线学习、运筹优化、Serving等多个计算场景稳定运行多年。

总体来看Fury主要优势主要是:

跨语言原生序列化

,大幅提高了跨语言序列化的易用性,降低研发成本;

通过JIT技术来优化序列化性能

。这里也可以看到通过把数据库和大数据领域的代码生成思想用在序列化上面是一个很好的思路,可以取得非常显著的性能提升;

Zero-Copy序列化

,避免所有不必要的内存拷贝;

多语言行存支持避免序列化和元数据开销

未来我们会在协议、框架和生态三个方面继续优化:

协议层面

JIT代码生成支持数据压缩模式

进一步通过SIMD向量化指令进行大规模数据压缩

框架层面

更多Java序列化代码JIT化;

完善C++支持,通过使用Macro、模板和编译时反射在编译时注册捕获Fury需要的类型信息,实现自动C++序列化;

通过Golang-ASM支持基于JIT的Golang序列化实现;

通过将更多Python代码Cython化来进一步加速Python序列化;

支持JavaScript,打通NodeJS生态;

支持Rust;

生态层面

与RPC框架SOFA、Dubbo、Akka等集成

与分布式计算框架Spark和Flink等集成

多语言的支持与生态建设是一项复杂的工作,接下来我们会尽快开源Fury,吸引感兴趣

的同学一起参与进来。如果有开源使

用场景或者合作意向,欢迎通过邮箱chaokun.yck@antgroup.com 交流。

参考链接:

[1]https://github.com/EsotericSoftware/kryo

[2]https://spark.apache.org/docs/latest/index.html

[3]https://flink.apache.org/

[4]https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html

[5]https://arrow.apache.org/

[6]https://developers.google.com/protocol-buffers/docs/javatutorial#parsing-and-serialization

[7]https://peps.python.org/pep-0574

[8]https://github.com/EsotericSoftware/kryo/tree/master/benchmarks

[9]https://openjdk.org/projects/code-tools/jmh/

[10]https://developers.google.com/protocol-buffers/docs/downloads

[11]https://www.xolstice.org/protobuf-maven-plugin/usage.html

[12]https://developers.google.com/protocol-buffers/docs/javatutorial#parsing-and-serialization

[13]https://github.com/google/flatbuffers/releases

[14]https://github.com/ray-project/ray

重磅来袭!2022上半年阿里云社区最热电子书榜单!

千万阅读量、百万下载量、上百本电子书,近200位阿里专家参与编写。多元化选择、全领域覆盖,汇聚阿里巴巴技术实践精华,读、学、练一键三连。开发者藏经阁,开发者的工作伴侣~

点击阅读原文查看详情。