前言
Redisson 官网:https://redisson.pro/
BitSetSetMultimapSortedSetMapListQueueBlockingQueueDequeBlockingDequeSemaphoreLockAtomicLongCountDownLatchPublish / SubscribeBloom filterRemote serviceSpring cacheExecutor serviceLive Object serviceScheduler service
以下是Redisson的结构:
Redisson底层采用的是Netty 框架。支持Redis 2.8以上版本,支持Java1.6+以上版本。
客户端初始化
createBootstrap
org.redisson.client.RedisClient#createBootstrap
private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
Bootstrap bootstrap = new Bootstrap()
.resolver(config.getResolverGroup())
//1.指定配置中的IO类型
.channel(config.getSocketChannelClass())
//2.指定配置中的线程模型
.group(config.getGroup());
//3.IO处理逻辑
bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
//4. 指定bootstrap配置选项
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());
bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());
config.getNettyHook().afterBoostrapInitialization(bootstrap);
return bootstrap;
}
Bootstrap
NioSocketChannel
二. 接着,给引导类指定一系列处理链路,这里主要就是定义连接的业务处理逻辑,不理解没关系,在后面我们会详细分析
RedisChannelInitializer
org.redisson.client.handler.RedisChannelInitializer
@Override
protected void initChannel(Channel ch) throws Exception {
// 开启SSL终端识别能力
initSsl(config, ch);
if (type == Type.PLAIN) {
//Redis正常连接处理类
ch.pipeline().addLast(new RedisConnectionHandler(redisClient));
} else {
//Redis订阅发布处理类
ch.pipeline().addLast(new RedisPubSubConnectionHandler(redisClient));
}
ch.pipeline().addLast(
//链路检测狗
connectionWatchdog,
//Redis协议命令编码器
CommandEncoder.INSTANCE,
//Redis协议命令批量编码器
CommandBatchEncoder.INSTANCE,
//Redis命令队列
new CommandsQueue());
if (pingConnectionHandler != null) {
//心跳包连接处理类
ch.pipeline().addLast(pingConnectionHandler);
}
if (type == Type.PLAIN) {
//Redis协议命令解码器
ch.pipeline().addLast(new CommandDecoder(config.getExecutor(), config.isDecodeInExecutor()));
} else {
//Redis订阅发布解码器
ch.pipeline().addLast(new CommandPubSubDecoder(config.getExecutor(), config.isKeepPubSubOrder(), config.isDecodeInExecutor()));
}
config.getNettyHook().afterChannelInitialization(ch);
}
图1 Redisson 链路处理图
Redisson的处理链
CommandEncoderCommandDecoder
失败重连
org.redisson.client.handler.ConnectionWatchdog#reconnect 重连机制
private void reconnect(final RedisConnection connection, final int attempts){
//重试时间越来越久
int timeout = 2 << attempts;
if (bootstrap.config().group().isShuttingDown()) {
return;
}
try {
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
tryReconnect(connection, Math.min(BACKOFF_CAP, attempts + 1));
}
}, timeout, TimeUnit.MILLISECONDS);
} catch (IllegalStateException e) {
// skip
}
}
netty中的Timer管理,使用了的Hashed time Wheel的模式,Time Wheel翻译为时间轮,是用于实现定时器timer的经典算法。
这个方法的声明是这样的:
/**
* Schedules the specified {@link TimerTask} for one-time execution after
* the specified delay.
*
* @return a handle which is associated with the specified task
*
* @throws IllegalStateException if this timer has been {@linkplain #stop() stopped} already
* @throws RejectedExecutionException if the pending timeouts are too many and creating new timeout
* can cause instability in the system.
*/
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
这个方法需要一个TimerTask对象以知道当时间到时要执行什么逻辑,然后需要delay时间数值和TimeUnit时间的单位。
Redis协议命令编码器
Redis 的作者认为数据库系统的瓶颈一般不在于网络流量,而是数据库自身内部逻辑处理上。所以即使 Redis 使用了浪费流量的文本协议,依然可以取得极高的访问性能。Redis 将所有数据都放在内存,用一个单线程对外提供服务,单个节点在跑满一个 CPU 核心的情况下可以达到了 10w/s 的超高 QPS。
RESP 是 Redis 序列化协议的简写。它是一种直观的文本协议,优势在于实现异常简单,解析性能极好。
\r\n
+$:-*
单行字符串 hello world
+hello world\r\n
多行字符串 hello world
$11\r\nhello world\r\n
多行字符串当然也可以表示单行字符串。
整数 1024
:1024\r\n
错误 参数类型错误
-WRONGTYPE Operation against a key holding the wrong kind of value\r\n
数组 [1,2,3]
*3\r\n:1\r\n:2\r\n:3\r\n
NULL 用多行字符串表示,不过长度要写成-1。
$-1\r\n
空串 用多行字符串表示,长度填 0。
$0\r\n\r\n
\r\n\r\n
org.redisson.client.handler.CommandEncoder#encode()
private static final char ARGS_PREFIX = '*';
private static final char BYTES_PREFIX = '$';
private static final byte[] CRLF = "\r\n".getBytes();
@Override
protected void encode(ChannelHandlerContext ctx, CommandData<?, ?> msg, ByteBuf out) throws Exception {
try {
//redis命令前缀
out.writeByte(ARGS_PREFIX);
int len = 1 + msg.getParams().length;
if (msg.getCommand().getSubName() != null) {
len++;
}
out.writeCharSequence(Long.toString(len), CharsetUtil.US_ASCII);
out.writeBytes(CRLF);
writeArgument(out, msg.getCommand().getName().getBytes(CharsetUtil.UTF_8));
if (msg.getCommand().getSubName() != null) {
writeArgument(out, msg.getCommand().getSubName().getBytes(CharsetUtil.UTF_8));
}
......
} catch (Exception e) {
msg.tryFailure(e);
throw e;
}
}
private void writeArgument(ByteBuf out, ByteBuf arg) {
out.writeByte(BYTES_PREFIX);
out.writeCharSequence(Long.toString(arg.readableBytes()), CharsetUtil.US_ASCII);
out.writeBytes(CRLF);
out.writeBytes(arg, arg.readerIndex(), arg.readableBytes());
out.writeBytes(CRLF);
}
Redis协议命令解码器
org.redisson.client.handler.CommandDecoder#readBytes
private static final char CR = '\r';
private static final char LF = '\n';
private static final char ZERO = '0';
private ByteBuf readBytes(ByteBuf is) throws IOException {
long l = readLong(is);
if (l > Integer.MAX_VALUE) {
throw new IllegalArgumentException(
"Java only supports arrays up to " + Integer.MAX_VALUE + " in size");
}
int size = (int) l;
if (size == -1) {
return null;
}
ByteBuf buffer = is.readSlice(size);
int cr = is.readByte();
int lf = is.readByte();
//判断是否以\r\n开头
if (cr != CR || lf != LF) {
throw new IOException("Improper line ending: " + cr + ", " + lf);
}
return buffer;
}
数据序列化
Redisson的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在Redis里的读取和存储。Redisson提供了以下几种的对象编码应用,以供大家选择:
org.redisson.codec.JsonJacksonCodecorg.redisson.codec.AvroJacksonCodecorg.redisson.codec.SmileJacksonCodecorg.redisson.codec.CborJacksonCodecorg.redisson.codec.MsgPackJacksonCodecorg.redisson.codec.IonJacksonCodecorg.redisson.codec.KryoCodecorg.redisson.codec.SerializationCodecorg.redisson.codec.FstCodecorg.redisson.codec.LZ4Codecorg.redisson.codec.SnappyCodecorg.redisson.client.codec.JsonJacksonMapCodecbyte[]org.redisson.client.codec.StringCodecorg.redisson.client.codec.LongCodecorg.redisson.client.codec.ByteArrayCodecorg.redisson.codec.CompositeCodec
Codec
public interface Codec {
//返回用于HMAP Redis结构中哈希映射值的对象解码器
Decoder<Object> getMapValueDecoder();
//返回用于HMAP Redis结构中哈希映射值的对象编码器
Encoder getMapValueEncoder();
//返回用于HMAP Redis结构中哈希映射键的对象解码器
Decoder<Object> getMapKeyDecoder();
//返回用于HMAP Redis结构中哈希映射键的对象编码器
Encoder getMapKeyEncoder();
//返回用于除HMAP之外的任何存储Redis结构的对象解码器
Decoder<Object> getValueDecoder();
//返回用于除HMAP之外的任何存储Redis结构的对象编码器
Encoder getValueEncoder();
//返回用于加载解码过程中使用的类的类加载器对象
ClassLoader getClassLoader();
}
BaseCodec
org.redisson.client.codec.BaseCodec
//返回用于除HMAP之外的任何存储Redis结构的对象解码器
Decoder<Object> getValueDecoder();
//返回用于除HMAP之外的任何存储Redis结构的对象编码器
Encoder getValueEncoder();
SerializationCodec
org.redisson.codec.SerializationCodec
Decoder
Encoder