Zookeeper Session源码

发布时间 2023-04-05 12:00:10作者: Dazzling!

我们说客户端与服务端建立连接交互的时候会创建一个 Session 与之对应,那假设客户端请求来了,服务端是如何处理的?Session 又是如何创建出来的?

我们先来看第一个问题:服务端如何处理客户端发来的请求?

一、如何处理请求

所谓的请求全称是网络请求,涉及到网络就少不了 Socket 通信,ZooKeeper 采取的是 NIO 的方式,提供了一个 NIOServerCnxn 实例来维护每一个客户端的连接,也就是说客户端与服务端的通信都是靠 NIOServerCnxn 这个类来处理的,无非就干两件事:接收客户端的请求以及处理请求(将请求体从网络 I/O 中读出来做处理)。这个处理类的核心方法是doIO(),也就是如下:

public class NIOServerCnxn extends ServerCnxn {
    void doIO(SelectionKey k) throws InterruptedException {
        // ...
    }
}

核心处理类结构已经清晰了,那我们接下来就分析 doIO() 这个方法就好了,我上面说这个方法无非就干两件事:接收请求、处理请求。那如何接收请求呢?请求又分为两种:创建连接的请求和发送数据的请求。

我们逐个分析:

  • 何为创建连接的请求?其实很好理解,就是第一次客户端和服务端通信的时候要建立连接,建立完连接才能发送数据进行真正请求。
  • 何为发送数据的请求?上面创建完连接了才能真正发送数据给服务端。

一言以蔽之就是:客户端要想和服务端通信必须先建立连接才能发送数据,且连接只需要建立一次即可。所以我们会有一个变量: initialized 代表是否初始化完成,也就是代表是否已经建立过连接了。代码如下:

public void 接收请求() {
    if (!initialized) {
        // 还没初始化,那就建立连接
        readConnectRequest();
    } else {
        // 初始化完了,那就发送数据
        readRequest();
    }
}

我们继续分析:readConnectRequest(),也就是如何创建连接?首先我们能明确一点的是创建完连接后肯定要把 initialized 状态变为 true,代表已经建立完成。我们先把这块代码写下:

private void readConnectRequest() throws IOException, InterruptedException, ClientCnxnLimitException {
    // 省略真正建立连接代码
    
    initialized = true;
}

在开始真正建立连接之前,我们肯定都知道一点:客户端会传输一些数据给服务端,但是网络传输都是靠字节数组,所以服务端接收到数据后第一件事就是拿到字节数组进行反序列化,反序列化成一个对象,我们叫这个对象为:ConnectRequest。我们继续完善下代码:

public void 建立连接() {
    // 拿到客户端发来的字节数组
    BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
    ConnectRequest connReq = new ConnectRequest();
    // 反序列化成ConnectRequest对象
    connReq.deserialize(bia, "connect");
}

我们在上一 Session 原理篇的时候说过 Session 是有生命周期的,带时效性的,也就是有过期时间的。那这个过期时间肯定是客户端和服务端建立连接的时候通过客户端发过去的,所以我们反序列化出来的对象里还会有 sessionTimeout 字段,如下:

// 基本验证
int sessionTimeout = connReq.getTimeOut();
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
    sessionTimeout = minSessionTimeout;
}
int maxSessionTimeout = getMaxSessionTimeout();
if (sessionTimeout > maxSessionTimeout) {
    sessionTimeout = maxSessionTimeout;
}

很简单的一些验证,就好比我们业务代码中分页一样,都会判断不能小于 1、不能大于最大条数等验证。这个过期时间传给 Server 后,并不是真正的过期时间,因为我们在上一篇中也讲解过了,真实过期时间会被计算为 tickTime 的倍数

有了上面这些参数后我们就可以真正创建 Session 了:

long id = createSession(cnxn, passwd, sessionTimeout);

在开始如何创建 Session 之前,我们先画个流程图:

session-12.png

现在创建 Session 的时机和前置条件都搞懂了,那创建 Session 都需要经过哪些步骤呢?

二、如何创建 Session

其实创建 Session 的原理我们在上一篇都讲解得很清楚了,按照大步骤来划分的话无非就是下面这四步:

  • 按照一定规则为客户端生成 SessionId;
  • Session 的创建以及过期机制;
  • 每次正常 CRUD 以及定时心跳 Ping 都会重新刷新 Session 的过期时间,我们称这个过程为 Session 的激活;
  • Session 到期后如何回收。

本篇不打算详细剖析上面这四步,那样会太占用篇幅,所以我把这四点放到下篇单独讲解,本篇讲解整个脉络,也可以称之为“框架”。比如我们上面讲解了服务端是如何处理请求的,然后现在我们假设 Session 已经创建完成了,“框架”如下:

public void proces () {
    // 1. 接收请求
    // 2. 处理请求
    // 3. 创建Session
    //   3.1 生成SessionId
    //   3.2 Session过期机制
    //   3.3 Session激活
    //   3.4 Session回收
}

上面已经为我们提供好了如何激活 Session 的方法,但是什么时候触发这个方法呢?我们也说了是在正常 CRUD 或者心跳 Ping 的时候会进行激活,在剖析这块代码之前我们先时光倒流,回忆一下最开始我们讲如何处理请求的流程:

public void 接收请求() {
    if (!initialized) {
        // 还没初始化,那就建立连接
        readConnectRequest();
    } else {
        // 初始化完了,那就发送数据
        readRequest();
    }
}

如果没建立过连接,那么就建立连接readConnectRequest();如果之前建立过连接,那就是正常地发送数据(可能是正常的 CRUD,也可能是心跳 Ping 请求)。所以大家肯定突然明白一个事情,那就是我们调用 Session 激活方法的入口就是这里——readRequest(),接下来我们一起看下是如何调用的吧~