Kafka-核心组件(五)

胜利只是暂时的,你该警醒,敌人在黑暗中嘲笑自己。 ——真弓·麒麟

3 协调器

  • Kafka 提供了消费者协调器(ConsumerCoordinator)、组协调器(GroupCoordinator)和任务管理协调器(WorkCoordinator)3种协调器(coordinator)。
  • Kafka 的高级消费者即通过 ZooKeeperConsumerConnector 实现的消费者是强依赖于 ZooKeeper 的,每一个消费者启动时都会在 ZooKeeper 的/consumers/${group.id}/ids上注册消费者的客户端 id,即${client.id},会在该路径以及 /brokers/ids 路径下注册监听器,用于当代理或是消费者发生变化时,消费者进行平衡操作。
  • 由于这种方式是每一个消费者对 ZooKeeper 路径分别进行监听,当发生平衡操作时,一个消费组下的所有消费者同时会触发平衡操作,而消费者之间并不知道其他消费者平衡操作的结果,这样就可能导致 Kafka 工作在一个不正确的状态。同时这种方式完全依赖于 ZooKeeper,以监听的方式来管理消费者,存在以下两个缺陷。
    • 羊群效应(herd effect):任何代理或是消费者的增、减都会触发所有的消费者同时进行平衡操作,每个消费者都对 ZooKeeper 同一个路径进行操作,这样就有可能发生类似死锁的情况,从而导致平衡操作失败。
    • 脑裂问题(split brain):消费者进行平衡操作时每个消费者都与 ZooKeeper 进行通信,以判断消费者或是代理变化情况,由于 ZooKeeper 本身的特性可能导致在同一时候各消费者所获取的状态不一致,这样就会导致 Kafka 运行在一个不正确状态之下。
  • 为了解决消费者依赖 ZooKeeper 所带来的问题,Kafka 在服务端引入了组协调器(GroupCoordinator),每个 KafkaServer 启动时都会创建一个 GroupCoordinator 实例,用于管理部分消费组和该消费组下每个消费者的消费偏移量。
  • 同时在客户端引入了消费者协调器(ConsumerCoordinator),每个 KafkaConsumer 实例化时会实例化一个 ConsumerCoordinator 对象,消费者协调器负责同一个消费组下各消费者与服务端组协调器之间的通信。

3.1 消费者协调器

  • 消费者协调器(ConsumerCoordinator)是 KafkaConsumer 的一个成员变量,该 KafkaConsumer 通过消费者协调器与服务端的组协调器进行通信。
  • 由于消费者协调器是 KafkaConsumer 私有的,因此消费者协调器中存储的信息也只有与之对应的消费者可见,不同消费者之间是看不到彼此的消费者协调器中的信息的。
    • 其实我们可以简单理解为消费者协调器是消费者执行代理类,它对消费者相关信息进行了封装,同时提供相应方法供消费者调用,消费者很多操作是通过调用消费者协调器相应方法来完成的。
    • 然而这并不等同代理类,ConsumerCoordinator 继承 AbstractCoordinator 类,AbstractCoordinator 实现了组管理的协议,消费者协调器是消费组管理相关请求的发起者。
  • 消费者协调器负责处理更新消费者缓存的 Metadata 请求,负责向组协调器发起加入消费组的请求,负责对本消费者加入消费组前、后相应的处理,负责请求离开消费组(如当消费者取消订阅时),还负责向组协调器发送提交消费偏移量的请求。
    • 并通过一个心跳检测定时任务来检测组协调器的运行状况,或是让组协调器感知自己的运行状况。
    • 同时,Leader 消费者的消费者协调器还负责执行分区的分配,当消费者协调器向组协调器请求加入消费组后,组协调器会为同一个组下的消费者选出一个 Leader,成为 Leader 的消费者其 ConsumerCoordinator 收到的信息与其他消费者有所不同。
    • Leader 消费者的 ConsumerCoordinator 负责消费者与分区的分配,会在请求 SyncGroupRequest 时将分配结果发送给 GroupCoordinator,而非 Leader 消费者(这里我们将其简称为 Follower 消费者),Follower 消费者向 GroupCoordinator 发送 SyncGroupRequest 请求时分区分配结果参数为空,GroupCoordinator 会将 Leader 副本发送过来的分区分配结果再返回给 Follower 消费者的 ConsumerCoodinator。这种处理方式,将分区分配的职责交由客户端自己处理,从而减轻了服务端的负担。

总之,消费者协调器负责消费者与组协调器通信。ConsumerCoordinator 底层实现所依赖的组件如图3-11所示。

消费者协调器底层实现所依赖的主要类,各类作用说明如下。

  • Heartbeat 类是消费者与组协调器之间心跳检测信息的封装。
  • ConsumerNetworkClient 类是消费者网络层处理封装类。
  • HeartbeatThread 是用于消费者与组协调器心跳检测的定时任务线程,在消费者与组协调器连接上后就会以守护线程的方式启动该线程。
  • MemberState 是一个枚举类,定义了消费者在消费组中的 3 种状态:UNJOINED 状态表示该消费者还没有加入消费组,REBALANCING 状态表示客户端即消费者正在进行平衡操作,STABLE 状态表示消费者已成功加入消费组中,并且正在稳定地进行心跳探测,也就是心跳探测到消费者处于稳定的运行状态。
  • RequstFuture 类用于异步接收请求的结果,消费者协调器实现时需要两类 RequestFuture,一个是等待查找组协调器的请求返回结果,另一个为消费者请求加入消费组的请求返回结果。
  • PartitionAssignor 是一个接口,定义了分区与消费者的分配策略的方法,Kafka 实现了一个基于轮询分配策略的 RoundRobinAssignor 和一个基于跨度分配策略的 RangAssignor。每个消费者的消费者协调器在向组协调器请求加入消费组时都会将自己支持的分区分配策略上传给组协调器,组协调器选出该消费组下所有消费者都支持的分区分配策略返回给 Leader 消费者,Leader 消费者根据分区分配策略进行分区分配,然后再将分配结果在 SyncGroupRequest 请求时发送给组协调器。
  • ConsumerInterceptors 类底层维护了一组该消费者所对应的拦截器 ConsumerInterceptor,用于在消息返回给客户端和提交消费偏移量后进行相应的处理,在创建一个消费者时可以为其指定多个拦截器,拦截器会被顺序执行。
  • OffsetCommitCallback 是一个接口,用于在消费偏移量提交后回调处理,在消费者协调器底层维护了一个消费偏移量提交后回调队列。
  • MetadataSnapshot 类是元数据信息的快照。

消费者协调器的相关内容就介绍到这里,它主要负责消费者与组协调器之间的通信,向组协调器提交加入消费组、离开消费组以及提交消费偏移量等请求,并进行相应的处理。

3.2 组协调器

组协调器(GroupCoordinator)负责对其管理的组员提交的相关请求进行处理,这里的组员即消费者。它负责管理与消费者之间建立连接,并从与之连接的消费者之中选出一个消费者作为 Leader 消费者,Leader 消费者负责消费者分区的分配,在 SyncGroupRequest 请求时发送给组协调器,组协调器会在请求处理后返回响应时下发给其管理的所有消费者。同时,组协调器还管理与之连接的消费者的消费偏移量的提交,将每个消费者消费偏移量保存到 Kafka 的内部主题当中,并通过心跳检测来检测消费者与自己的连接状态。

1.组协调器依赖的组件

每一个 KafkaServer 启动时都会实例化并启动一个组协调器,每个组协调器负责一部分消费组的管理。从组协调器实例化过程可以看出它依赖的主要组件如图3-12所示。

下面简要介绍组协调器所依赖的组件在 GroupCoordinator 管理中的具体作用。

  • KafkaConfig:用于实例化 OffsetConfig 和 GroupConfig。

  • ZkUtils:用于为消费者分配组协调器时从 ZooKeeper 获取内部主题的分区元数据信息。

  • ReplicaManager:GroupMetadataManager 需要将消费组元数据信息以及消费者提交的已消费的偏移量信息写入 Kafka 内部主题中,对内部主题的操作与对其他主题操作一样,通过 ReplicaManager 将消息写入 Leader 副本,由 ReplicaManager 负责 Leader 副本和 Follower 副本的管理。

  • GroupMetadataManager:负责管理 GroupMetadata 以及消费者提交的偏移量,并提供了一系列组管理的方法供组协调器调用。GroupMetadataManager 不仅将 GroupMetadata 信息发送到 Kafka 内部主题,而且同时维护一个 Pool 类型的 groupMetadataCache 对象,用于在内存中缓存一份 GroupMetadata。其中 GroupMetadata 包括了组员的元数据信息,当然这里的组员即为消费者,组协调器分配给消费者的 memberId 以及 leaderId、分区分配关系,同时包括状态元数据,这里状态是指组协调器对其管理的消费者管理定义的状态。组协调器定义了5种状态,如表3-6所示。

表3-6 消费组状态说明

状  态 状 态 说 明
PreparingRebalance 消费组准备进行平衡操作
AwaitingSync 等待 Leader 消费者将分区分配关系发送给组协调器
Stable 消费组正常运行状态,心跳检测正常
Dead 处于该状态的消费组已没有任何消费者成员,且 Metadata 信息也已被删除
Empty 处于该状态的消费组已没有任何消费者成员,但相应的 Metadata 没有被删除,直到所有消费者对应的消费偏移量元数据信息(OffsetAndMetadata)过期。若消费组只用于提交偏移量,则也会处于该状态

消费组状态机各状态转换关系如图3-13所示。对各状态转换的前置条件本书不展开讲解。

  • DelayedJoin:延迟操作类,在消费组进行平衡操作时,监视该消费组所有消费者都请求加入到消费组。
  • DelayedHeartbeat:延迟操作类,用于监视处理所有消费组成员与组协调器心跳超时控制。
  • GroupConfig:定义了组成员与组协调器之间 Session 超时时间配置。

enter image description here

图3-13 消费组状态转换图

以上只是对组协调器依赖的主要组件类进行了简单分析,以便于读者对组协调器的主要功能有一个大致了解,但对于相应的实现细节没有展开讲解。其实,在组协调器启动时也会创建一个定时任务,该定时任务在实例化 GroupMetadataManager 时被创建,用于定时清理过期的消费组元数据信息及过期的消费偏移量信息。

2.消费者入组过程

现在,简要介绍一个新的消费者加入消费组的主要步骤。消费者被创建后通过消费者协调器选择一个负载最小的节点,然后向该节点发送查找组协调器的请求,KafkaApis 会对请求进行处理,调用该节点对应的组协调器的 partitionFor() 方法,GroupCoordinator.partitionFor() 方法最终调用 GroupMetadataManager.partitionFor() 方法,通过请求时指定的 groupId,取其 hashcode 值与 Kafka 内部主题分区总数取模定位一个分区,该分区的 Leader 副本所在的节点即为该消费组的组协调器,该消费组的元数据信息以及消费者提交的消费偏移量就会像普通消息一样存储在该分区中。Kafka 内部主题默认有 50 个分区,每个分区有3个副本。

消费者找到组协调器之后就可以申请加入该消费组,即发送 JoinGroupRequest 请求,KafkaApis 最终会调用 Group.handleJoinGroup() 处理。在 Group.handleJoinGroup() 方法中依然是调用 GroupMetadataManager 相应方法完成消费者加入消费组的处理,在对 JoinGroupRequest 处理时会将该消费者注册到消费组。首先根据 groupId 信息获取或构造该消费组的 GroupMetadata 信息,然后将消费者的 clientId 值与一个 UUID 值拼接成一个字符串作为该消费者在消费组的 memberId 值,并构造 MemberMetadata 信息,再将该 MemberMetadata 信息注册到 GroupMetadata 中。GroupMetadata 对象中维护了一个 Map 用于保存当前消费组的成员元数据信息。在消费者注册到消费组时,若该消费组的 Leader 不存在,则将当前消费者选作当前消费组的 Leader 消费者,并将该消费者的 memberId 作为 leaderId,同时消费组中的各个消费者通过投票选出各消费者都支持的协议,这里的协议指分区分配策略。最后构造 JoinGroupResult 对象,回调 responseCallback 返回给消费者。由此分析可知,JoinGroupRequest 处理主要职责是为组成员分配 memberId,并将第一个加入组的消费者选为 Leader,同时选出一个各消费者都支持的分区分配策略。

在选出 Leader 消费者后,消费组各成员继续发送 SyncGroupRequest 请求。Leader 消费者会根据同组消费者都支持的分区分配策略,为消费者分配分区,在构造 SyncGroupRequest 请求时会上传分区分配结果,而 Follower 消费者在构造 SyncGroupRequest 请求时该参数为空。组协调器收到请求后一直等 Leader 消费者的请求处理完毕后再进行回调处理,向该消费组的所有消费者做出响应,在返回响应时会将分区分配结果发送给各消费者。最后将消费者与分区的对应关系写入 Kafka 内部主题中。

消费者加入消费组与组协调器之间的通信过程简要介绍至此,以上介绍消费者加入消费组的过程省去了相应条件的判断及消费组不同状态的处理。图3-14描述了两个消费者申请入组的过程。需要说明的是,图3-14假设左边的消费者在处理过程中会被选为 Leader 消费者,同时图3-14并没有反映 GroupCoordinator 分配的过程。

![enter image description here](../../../../../../../../images/2018/10/GroupCoordinator 分配的过程.png)

图3-14 消费者加入消费组的过程

3.消费偏移量管理

新版的 KafkaConsumer 将消费偏移量保存到 Kafka 一个内部主题中,当消费者正常运行或者进行平衡操作时都要向组协调器提交当前的消费偏移量。组协调器负责消费组的管理及消费偏移量的管理,但客户端可以仅选择让组协调器管理消费偏移量,例如,当客户端通过 assign() 方法订阅指定的分区时,就不用 Kafka 负责分区的分配。当组协调器收到 OffsetCommitRequest 请求时,会进行相应的检查判断,若满足偏移量处理的条件时,就会调用 GroupCoordinator. doCommitOffsets() 方法进行处理。这里所说的偏移量处理的条件有两种情况:一种是该消费组的成员提交的消费偏移量,另一种是仅选择让组协调器负责消费偏移量的管理的消费者提交的请求。若不满足偏移量提交条件就会调用回调函数返回相应的错误码。

在调用 GroupCoordinator.doCommitOffsets() 方法进行处理时,若是第一种情况,由于需要组协调器管理消费组,所以相比第二种情况多了一步调用 GroupCoordinator.completeAnd ScheduleNextHeartbeatExpiration() 方法的操作,该方法让延迟的心跳检测执行完成的,更新收到心跳的时间戳,同时再创建一个 DelayedHeartbeat,交由 DelayedOperationPurgatory 管理继续监视下一次心跳。之后两种情况处理逻辑相同,都是调用 GroupMetadataManager.prepareStoreOffsets() 进行处理,GroupMetadataManager.prepareStoreOffsets() 方法主要职责是构造消费者消费偏移量相关的消息,以及封装一个在偏移量对应的消息成功追加到Kafka内部主题之后回调的方法 putCacheCallback()。偏移量对应的消息以 groupId、主题名、分区编号构成的 Struct 作为消息的 Key,以 OffsetAndMetadata 相应字段构成消息的 Value。经过 GroupMetadataManager.prepareStoreOffsets() 方法处理后返回一个 DelayedStore 对象,该对象交由 GroupMetadataManager.store() 方法处理,在 store() 方法中调用 replicaManager.appendMessages() 方法将偏移量追加到 Kafka 内部主题中,在消息追加成功后会回调 putCacheCallback(), 在该回调函数中会更新缓存中记录的分区与 OffsetAndMetadata 的映射信息,并回调 responseCallback() 方法。

4 网络通信服务

在 KafkaServer 启动时,初始化并启动了一个 SocketServer 服务,用于接受客户端的连接、处理客户端请求、发送响应等,同时创建一个 KafkaRequestHandlerPool 用于管理 KafkaRequestHandler。SocketServer 是基于 Java NIO 实现的网络通信组件,其线程模型为:一个 Acceptor 线程负责接受客户端所有的连接;N(${num.network.threads})个 Processor 线程,每个 Processor 有多个 Selector,负责从每个连接中读取请求;M(${num.io.threads})个 Handler(KafkaRequestHandler) 线程处理请求,并将产生的请求返回给 Processor 线程。而 Handler 是由 KafkaRequestHandlerPool 管理,在 Processor 和 Handler 之间通过 RequestChannel 来缓冲请求,每个 Handler 从 RequestChannel.requestQueue 接受 RequestChannel.Request,并把 Request 交由 KafkaApis 的 handle() 方法处理,经处理后把对应的 Response 存进 RequestChannel.responseQueues 队列。Kafka 网络层线程模型如图3-15所示。

![enter image description here](../../../../../../../../images/2018/10/Kafka 网络层线程模型.png)

图3-15 Kafka 网络层线程模型

了解 Kafka 网络通信层基本模型之后,现在我们对 Kafka 网络通信各组件的具体职责进行简要介绍。

4.1 Acceptor

Acceptor 的主要职责是监听并接受客户端(统指请求发起方)的请求,建立和客户端的数据传输通道 ServerSocketChannel,然后为客户端指定一个 Processor。

Acceptor 是一个继承 AbstractServerThread 类的线程类,AbstractServerThread 是一个抽象线程类,实现了 Runnable 接口,提供和定义了对 Kafka 通信层各组件操作的辅助方法。Acceptor 对 Java NIO Selector 相关操作进行了封装,在 Acceptor 实例化时会通过 NSelector.open() 创建一个 Selector,创建和打开一个 ServerSocketChannel,同时启动与之对应的所有 Processor 线程(Acceptor 构造方法入参接收的 Processor 对象数组)。在 Acceptor 线程的 run() 方法中,首先为 ServerSocketChannel 在 Selector 上注册SelectionKey.OP_ACCEPT事件,然后以轮询方式查询并等待所关注的事件发生。若所关注的事件发生,则调用 Acceptor.accept() 方法对OP_ACCEPT事件进行处理。Acceptor.accept() 方法第二参数为 Processor 对象,Kafka 采用轮询(round-robin)的方式从 Acceptor 对应的 Processor 对象数组中取出一个 Processor。

Acceptor.accept() 方法的作用是对OP_ACCEPT事件进行处理,其实现逻辑如下。

(1)通过 SelectionKey 获取与之对应的 ServerSocketChannel,并调用 ServerSocketChannel. accept() 方法建立与客户端的连接 SocketChannel。

(2)由于建立了新的连接,因此调用 ConnectionQuotas.inc() 方法增加统计的连接数。

(3)将步骤1创建的 SocketChannel 交由 Processor.accept() 方法处理。将 SocketChannel 加入 Processor 的 newConnections 队列中,然后唤醒 Processor 线程开始处理 newConnections 队列。可见 newConnections 队列会被 Acceptor 线程和 Processor 线程并发操作,因此 newConnections 是一个 ConcurrentLinkedQueue 对象,用于保存新连接的 SocketChannel。

当 Accepor.accept() 处理完后,以轮询方式计算下一个连接所对应的 Processor 对象,在 Acceptor 线程处理运行状态时继续等待新的OP_ACCEPT事件,按以上步骤进行处理。

由以上分析可知,Acceptor 的作用就是接受客户端新的连接,创建 SocketChannel,以轮询的方式交由 Processor 处理,添加到 Processor 的 newConnections 队列并唤醒 Processor 线程。这样就建立起了客户端与 KafkaServer 之间通信的通道。Acceptor 在 Kafka 网络层连接中的地位如图3-16所示。

![enter image description here](../../../../../../../../images/2018/10/Acceptor 在 Kafka 网络层连接中的地位.png)

图3-16 Acceptor 在 Kafka 网络层连接中的地位

4.2 Processor

Processor 也是一个线程类,继承 AbstractServerThread 类,主要用于从客户端读取请求数据和将相应的响应结果返回给客户端。Processor 定义了一个 ConcurrentLinkedQueue[SocketChannel] 类型的 newConnections 队列,该队列用来保存新连接的交由本 Processor 处理的 SocketChannel;定义了一个 Map[String, RequestChannel.Response] 类型的 inflightResponses 集合,用来记录还未发送的响应;定义了一个管理网络连接的 KSelector 类型的 selector 字段。同时,Processor 构造方法还接受一个由调用者传入的 RequestChannel 对象,RequestChannel 是 Processor 与 Handler 线程之间交换数据的队列,用于暂存通信的 Request 和 Response。RequestChannel 将在下一小节进行详细介绍。

由于 Processor 是一个线程类,因此我们首先来分析其 run() 方法执行逻辑。该线程 run() 方法执行逻辑如下。

首先调用父类提供的 startupComplete() 方法。该方法通过 CountDownLatch 来实现 Processor 启动完成的信号标识,以此来唤醒由于等待 Processor 启动完成而被阻塞的线程。

然后检测 Processor 线程是否处于正常运行状态,即运行标志位 isRunning 是否为 true。若 Processor 线程处于正常运行状态时执行以下逻辑。

(1)处理 newConnections 队列中的 SocketChannel。迭代取出队列中的每个 SocketChannel,调用 KSelector.register() 方法为每个 SocketChannel 在 nioSelector 上注册OP_READ事件,该方法的第一个参数是由 SocketChannel 基本属性构造的一个与之对应的唯一 connectionId 字符串。KSelector 是 Kafka 实现的对 Java NIO Selector 的封装类。KSelector 是为了与 Java.nio.channels.Selector 区分开,简称为 KSelector,其实对应的是 org.apache.kafka.common.network.Selector,而 nioSelector 是指 Java.nio.channels.Selector。

(2)处理 Response。从 RequestChannel 的 responseQueues 数组中取出与当前 Processor 对应的用于保存 Response 的队列,并通过 Queue.poll() 方法从 Response 队列的头部取一个 Response(实质是 remove 操作,若队列头部元素为空则返回空),若 Response 不为空,则更新 Requst.Response 从 Response 队列出队列的时间,然后根据 ResponseAction 做相应处理。

  • 若为 NoOpAction,表示该连接对应的请求暂无响应需要发送给客户端,则调用 KSelector.unmute() 方法为 KafkaChannel 注册 OP_READ事件,允许其继续接受请求。通过 mute() 与 unmute() 操作以保证一个连接上只会有一个请求被处理。
  • 若为 SendAction,则表示该 Response 需要发送给客户端。那么首先查找该 Response 对应的 KafkaChannel,通过调用 KSelector.send() 方法为该 KafkaChannel 注册OP_WRITE事件(这里涉及 NIO 相关机制。关于 NIO 机制本书不做扩展讲述,请读者自行了解),并将该 Reponse 从 responseQueue 队列中移除添加到 inflightResponses 中。
  • 若为 CloseConnectionAction,表示是要将该连接关闭,则先减少 Processor 线程维护的连接数据,然后调用 KSelector.close() 方法关闭本连接。

(3)调用 KSelector.poll() 方法进行处理。该方法的职责与 Java nioSelector.select() 方法相对应,其实 KSelector.poll() 方法底层就是调用 nioSelector.select() 方法进行处理。poll() 方法会将已接受完成的数据包、发送成功的请求、断开的连接添加至 KSelector 维护的相应队列当中。经由 poll() 方法处理之后,接下来就是要对 IO 完成的进度进行检查并做相应处理。

(4)处理已接受完成的数据包队列 completedReceives。遍历该队列的每个 NetworkReceive 对象,与当前的 Processor 信息构造一个 RequestChannel.Request 对象,调用 requestChannel.sendRequest() 方法将 RequestChannel.Request 对象添加至 requestChannel 的 requestQueue 队列中,等待 Handler 进行处理。然后调用 KSelector.mute() 方法取消与该请求对应的 KafkaChannel 注册的OP_READ事件,即在发送响应给客户端之前该连接不能再读取任何请求数据。

(5)处理已发送完成的队列 completedSends。当已完成将 Repsonse 发送给客户端,则将该 Response 从 inflightResponses 集合中移除,同时调用 KSelector.unmute() 为对应的连接通道 KafkaChannel 重新注册OP_READ事件,以恢复该通道能够重新读取请求数据。

(6)处理断开连接的队列 disconnected。当一个连接已断开时,无法向该连接返回相应的 Response,则将该 Response 从 inflightResponses 集合中移除,然后将 SocketServer 维护的连接数减 1。

若 isRunning为false,则关闭该 Processor 管理的全部连接。通过 CountDownLatch.Countdown() 操作标识关闭操作已完成,唤醒由于等待该 Processor 结束而被阻塞的线程。

至此,Processor 线程的 run() 方法执行逻辑已介绍完毕,该方法执行流程如图3-17所示。

通过对 Processor.run() 方法实现逻辑分析可知:Processor 主要职责是负责从客户端读取数据并将处理后的 Response 返回给客户端。Processor 通过调用 KSelector 的 mute() 方法和 unmute() 方法分别对与之对应的连接通道 KafkaChannel 注册相应的事件,以保证请求与响应顺序一致。

4.3 RequestChannel

在4.2节我们提到了 RequestChannel,RequestChannel 是为了给 Processor 线程与 Handler 线程之间通信提供数据缓冲,是通信过程中 Request 与 Response 缓存的通道,是 Processor 线程与 Handler 线程交换数据的地方。

首先简单了解 RequestChannel 底层基本数据结构。RequestChannel 维护了一个 ArrayBlockingQueue 类型的队列用于缓存 Processor 添加的 Request 队列 requestQueue,一个用于保存 Repsonse 的 Array[BlockingQueue[RequestChannel.Response]] 类型的队列 responseQueues,每个 Processor 对应一个 BlockingQueue 类型的 Response 队列。同时,RequestChannel 还定义了一个List[(Int) => Unit]类型的 responseListeners 列表,用于记录当 Handler 线程向 responseQueues 添加 Response 时指定所要唤醒的 Processor 线程编号。在 SocketServer 初始化时,会调用 RequestChannel.addResponseListener() 方法为每个 Processor 线程映射一个唤醒该 Processor 线程的 id。

RequestChannel 提供了对这些集合添加和删除元素的方法,如 sendRequest() 方法用于将 Request 添加至 requestQueue 队列中,addResponseListener() 方法用于为每个 Processor 线程添加一个唤醒该线程的 id,sendResponse() 方法用于将 Response 添加到 responseQueues 队列中。在将 Response 添加至 responseQueues 队列的时候会触发 responseListeners 唤醒对应的 Processor 线程。

通过 RequestChannel 的底层数据结构及相应方法可以看出,RequestChannel 的作用就是在通信中起到缓冲队列的作用。Processor 线程将读取到请求添加到 RequestChannel.requestQueue 队列中;Handler 线程从 reqeustQueue 中取出请求进行处理,待处理完成后,Handler 线程将处理结果 Reponse 添加至 RequestChannel.responseQueues 队列中,在添加 Response 至 responseQueues 队列时会通过 responseListeners 唤醒对应的 Processor 线程,Processor 线程从 responseQueues 队列中取出与自己对应的 responseQueue 进行处理,最终将结果返回给客户端。该处理过程基本逻辑示意图如图3-18所示。

enter image description here

图3-18 RequestChannel 缓冲处理逻辑

4.4 SocketServer 启动过程

在对 Kafka 网络服务相关组件进行了简要讲解之后,现在我们再分析 SocketServer 启动过程。在启动一个 Kafka 代理时会实例化并启动一个 SocketServer 服务,首先分析 SocketServer 实例化过程。

首先,将${listeners}配置的每组协议分别映射成 Kafka 封装的一个用于保存协议信息的类 EndPoint,构造一个 EndPoint 对象的集合 endpoints。listeners 配置格式为 protocol://host:port 或者 protocol://[ipv6 host]:port,其中 protocol 为代理之间通信的安全协议类型。当前版本的 Kafka 支持的协议类型有 PLAINTEXT、SSL、SASL_PLAINTEXTSASL_SSL、TRACE。host 可以为代理主机的 IP、hostname 值或者对应的域名,也可以不指定;port 为指定的可用端口。listeners 可以配置多组协议类型,每组之间以逗号隔开,如 listeners=PLAINTEXT://myhost:9092,TRACE://:9091, PLAINTEXTSASL://0.0.0.0:9093,默认 listeners 为 listeners=PLAINTEXT://:9092,listeners 配置项是 SocketServe 所监听的地址。

然后,根据listeners.size* ${num.network.threads }之积计算需要创建的 Processor 总线程数,读者可以根据自己业务需要调整该参数。

最后,创建一个 RequestChannel 对象 requestChannel,一个保存 Processor 的数组对象 processors,用于保存 EndPoint 与 Acceptor 对应关系的 Map 类型 acceptors,并声明记录连接数的 connectionQuotas 对象。同时根据 processors 数组大小,为数组中的每个位置添加一个用于唤醒该位置对应的 Processor 线程的 ResponseListener。

在 SocketServer 实例化完成后,调用 SocketServer.startup() 方法启动 SocketServer,startup() 方法执行逻辑如下:首先根据配置的连接数限制实例化 ConnectionQuotas 对象,然后遍历端点集合,为每个 EndPoint 创建一组 Processor 线程和一个 Acceptor 线程。在实例化 Acceptor 时会启动相应的 Processor 线程,在启动 Acceptor 线程时会阻塞主线程直到 Acceptor 线程启动完成。

SocketServer 启动后就可以通过 Acceptor 接受客户端的请求,交由 Acceptor 对应的 Processor 处理,Processor 线程将请求添加到 RequestChannel.requestQueue 队列中,Handler 从 RequestChannel.requestQueue 队列中取出请求分发处理,然后将处理结果 Response 存入 RequestChannel.responseQueues 队列中,添加 Respoonse 时会唤醒与之对应的 Processor,Processor 从 RequestChannel.responseQueues 队列中取出与自己对应的 responseQueue 队列根据 ResponseAction 进行相应处理。

5 日志管理器

日志管理器(LogManager)是 Kafka 用来管理所有日志的,也称为日志管理子系统(Log Management Subsystem)。它负责管理日志的创建与删除、日志检索、日志加载和恢复、检查点及日志文件刷写磁盘以及日志清理等。本节将对日志管理器的部分功能进行详细讲解,在讲解日志管理器之前,首先介绍一下日志结构。

5.1 Kafka 日志结构

Kafka 消息是以主题为基本单位进行组织的,各个主题之间相互独立。每个主题在逻辑结构上又由一个或多个分区构成,分区数可以在创建主题时指定,也可以在主题创建后再修改。可以通过 Kafka 自带的用于主题管理操作的脚本 kafka-topics.sh 来修改某个主题的分区数,但只能增加一个主题的分区数而不能减少其分区数。每个分区可以有一个或多个副本,从副本中会选出一个副本作为 Leader,Leader 负责与客户端进行读写操作,其他副本作为 Follower。生产者将消息发送到 Leader 副本的代理节点,而 Follower 副本从 Leader 副本同步数据。

在存储结构上分区的每个副本在逻辑上对应一个 Log 对象,每个 Log 又划分为多个 LogSegment,每个 LogSegment 包括一个日志文件和两个索引文件,其中两个索引文件分别为偏移量索引文件和时间戳索引文件。Log 负责对 LogSegment 的管理,在 Log 对象中维护了一个 ConcurrentSkipListMap,其底层是一个跳跃表,保存该主题所有分区对应的所有 LogSegment。Kafka 将日志文件封装为一个 FileMessageSet 对象,将两个索引文件封装为 OffsetIndex 和 TimeIndex 对象。Log 和 LogSegment 都是逻辑上的概念,Log 是对副本在代理上存储文件的逻辑抽象,LogSegmnent 是对副本存储文件下每个日志片段的抽象,日志文件和索引文件才与磁盘上的物理存储相对应。假设有一个名为“log-format”的主题,该主题有3个分区,每个分区对应一个副本,则在存储结构中各对象映射关系如图3-19所示。

![enter image description here](../../../../../../../../images/2018/10/Kafka 日志存储结构中的映射关系.png)****

图3-19 Kafka 日志存储结构中的映射关系

如图3-19所示,在存储结构上每个分区副本对应一个目录,每个分区副本由一个或多个日志段(LogSegment)组成。每个日志段在物理结构上对应一个以“.index”为文件名后缀的偏移量索引文件、一个以“.timeindex”为文件名后缀的时间戳索引文件以及一个以“.log”为文件名后缀的消息集文件(FileMessageSet),消息集文件即日志文件或数据文件。需要说明的是,时间戳索引文件是在0.10.1.1版本新增加的索引文件,在这之前的版本只有偏移量索引文件。数据文件的大小由配置项 log.segment.bytes 指定,默认为 1 GB(1 073 741 824字节),同时 Kafka 提供了根据时间来切分日志段的机制,即若数据文件大小没有达到 log.segment.bytes 设置的阈值,但达到了 log.roll.ms 或是 log.roll.hours 设置的阈值,同样会创建新的日志段,在磁盘上创建一个数据文件和两个索引文件。接收消息追加(append)操作的日志段也称为活跃段(activeSegment)。

由图3-19也可以看出,分区所对应目录的命名规则为:主题名-分区编号,分区编号从0开始,顺序递增,分区编号最大值为分区总数减1,例如,对“log-format”主题,其分区目录依次为 log-format-0、log-format-1 和 log-format-2。数据文件命名规则为:由数据文件的第一条消息偏移量,也称为基准偏移量(BaseOffset),左补0构成20位数字字符组成,每个分区第一个数据文件的基准偏移量为 0,因此每个分区第一个数据文件对应的日志文件为 0000000000000000000.log,两个索引文件分别为 0000000000000000000.index和 0000000000000000000.timeindex。后续每个数据文件的基准偏移量为上一个数据文件最后一条消息对应的偏移量(log end offset,LEO)值加1。

图3-19所示的示意图只是从逻辑上进行描述,为了更直观地认识 Kafka 存储结构,现在我们在一个有3个代理的 Kafka 集群上创建 log-format 主题做更进一步分析,这里我们指定该主题有3个分区、2个副本,同时为了讲解日志结构需要,我们将该主题数据文件大小为设置为1 KB,索引文件跨度为设置为100字节,这样便于演示日志分段(有可能部分读者习惯称之为日志切片)。

创建主题,命令如下:

kafka-topics.sh --create --zookeeper  server-1:2181,server-2:2181,server-3:2181  
--replication-factor 2 --partitions 3 --topic log-format 

修改段大小及索引跨度配置,命令如下:

kafka-topics.sh –zookeeper server-1:2181,server-2:2181,server-3:2181 --alter 
--topic log-format --config segment.bytes=1024 --config index.interval.bytes=100 

分区副本在3台代理上的分布如图3-20所示。

登录 ZooKeeper 客户端查看分区及副本元数据信息如下:

get /brokers/topics/log-format 

输出信息如下:

{"version":1,"partitions":{"2":[3,1],"1":[2,3],"0":[1,2]}}

在 ZooKeeper 中元数据信息以 JSON 串的格式存储,其中 version 表示版本标识,固定值为 1,partitions 之后的 JSON 字符串表示每个分区对应的 ISR 列表,格式为“”分区编号”:AR”,多个分区信息满足 JSON 格式。例如,”2”:[1,3]表示分区编号为2的分区其副本分布在 brokerId 为1和3的两个节点上,与图3-20描述一致。在 AR 信息中,第一个副本称为优先副本,通常情况下优先副本即为分区的 Leader,若希望查看某个分区的副本 Leader 节点有以下两种方式。

(1)在 ZooKeeper 客户端查看分区状态信息。命令格式为:get /brokers/topics/<topicName>/ partitions/<partitionId>/state。例如,查看“log-format”编号为2的分区状态(state)元数据信息,执行命令如下:

get /brokers/topics/topic-format/partitions/2/state 

分区状态元数据信息输出如下:

{"controller_epoch":10,"leader":3,"version":1,"leader_epoch":0,"isr":[3,1]}

其中分区状态信息表达式中,字段controller_epoch表示集群的控制器选举次数,初始值为0,当一个代理当选为控制器后,该字段值加1,每次控制器变更该字段值都增1。该字段值与/controller_epoch节点存储的控制器变化次数值一致;字段 leader 表示该分区的 Leader 副本所在代理的唯一编号 brokerId;字段 version 表示版本编号,默认值为1;leader_epoch表示该分区 Leader 选举次数,初始值为0;isr 即为同步副本的代理编号列表。

(2)通过 Kafka 自带的 kafka-topics.sh 脚本查看主题分区及副本分布信息。例如,查看 log-format 主题分区及状态分布信息,命令如下:

kafka-topics.sh --describe --zookeeper server-1:2181,server-2:2181,server-3:2181 
--topic log-format

该命令执行结果输出信息如下:

Topic:log-format    PartitionCount:3    ReplicationFactor:2
    Configs:index.interval.bytes=100,segment.bytes=1024
Topic: log-format    Partition: 0    Leader: 1    Replicas: 1,2    Isr: 1,2
Topic: log-format    Partition: 1    Leader: 2    Replicas: 2,3    Isr: 2,3
Topic: log-format    Partition: 2    Leader: 3    Replicas: 3,1    Isr: 3,1

该方式按序罗列出主题所有分区对应的副本及 ISR 列表信息,副本 Leader 节点及该主题被修改的配置信息。Leader 指定该分区副本 Leader,Replicas 指定该分区副本所在代理的编号,Isr 指定同步中的副本代理列表。

数据文件用来存储消息,每条消息由一个固定长度的消息头和一个可变长度(N 字节)的净荷(payload)组成。同时若开启了消息 Key,即设置parser.key=true,在发送消息时需要指定消息的 Key,则每条消息数据包括一个可变长度的消息 Key 实体和消息实际数据 payload,payload 也称为消息体,这里强行将消息 Key 与消息 payload 分为两部分描述,是为了下文更清晰地介绍消息各部分内容的字节大小。在本书中,为了与消息的 Key 配合进行介绍,消息体也称为消息 Value。消息 Key 可以为空,默认情况下消息的 Key 与消息体之间以制表符分隔。消息结构如图3-21所示。

enter image description here

图3-21 消息结构

消息结构各部分说明如表3-7所示。

表3-7 消息结构各字段说明

消 息 字 段 字 段 说 明
CRC32 CRC32 校验和
magic Kafka 服务程序协议版本号,用来作兼容,当前版本的 Kafka 该值为1
attributes 该字段占1字节,其中低两位用来表示压缩方式,第三位表示时间戳类型,高4位为预留位置,暂无实际意义
timestamp 消息时间戳,当 magic 值大于0时消息头必须包括该字段
key-length 消息 Key 的长度
key 消息 Key 实际数据
payload-length 消息实际数据长度
payload 消息实际数据

通过表3-7说明可知,由于当前版本 Kafka 的 magic 取值为1,因此消息头必须包括时间戳(timestamp)字段。当前版本的 Kafka 支持消息创建时间(CreateTime)及消息追加时间(LogAppendTime)两种时间戳类型,时间戳的类型由消息头中 attributes 字段的第三位指定,该位取值0表示创建时间,取值1表示消息追加时间。

在实际存储时一条消息总长度还包括12字节额外的开销(LogOverhead),这12字节包括两部分。其中一部分用8字节长度记录消息的偏移量,每条消息的偏移量是相对该分区下第一个数据文件的基准偏移量而言,它唯一确定一条消息在分区下的逻辑位置,同一个分区下的消息偏移量按序递增,若与数据库类比,消息偏移量即为消息的 Id,即自增的主键。另外4字节表示消息总长度。因此当前版本 Kafka 一条消息的固定长度为34字节。

1.数据文件

在数据文件中我们会看到相邻两条消息的 position 值之差减去34,即为上一条消息实际长度。若parser.key=true,则相邻两条消息 position 之差减去34为消息 Key 和消息体的总长度。

启动一个生产者向主题 log-format 中发送一批消息。执行以下命令将二进制分段日志文件转储为字符类型的文件。

kafka-run-class.sh kafka.tools.DumpLogSegments  --files  /opt/data/kafka-logs/log- format-0/00000000000000000000.log  --print-data-log 

该数据文件部分内容如下:

Starting offset: 0
offset: 0 position: 0 CreateTime: 1487165556824 isvalid: true payloadsize: 1 magic: 1 compresscodec: NoCompressionCodec crc: 1269164006 payload: c
offset: 1 position: 35 CreateTime: 1487165689780 isvalid: true payloadsize: 1 magic: 1 compresscodec: NoCompressionCodec crc: 3541136778 payload: f

可以看到,offset=1 与 offset=0 的消息的 position 之差为35,用35减去1(payloadSize)即为消息固定长度34。通过转储后的消息内容,能够更直观地了解消息的存储结构。其中第一行标识该数据文件消息起始的偏移量,由于是第一个数据文件,因此起始偏移量为0,即第一条消息的偏移量为0。在转储后的文件中,Kafka 用字段 position 表示该条消息在数据文件中的实际位置,每个数据文件第一条消息的 position 为0,之后每条消息的 position 为前一条消息的 postion 与消息固定长度、消息总长度之和。

  • CreateTime 表示该条消息时间类型为消息创建时间,
  • isValid 表示消息 CRC 校验是否合法。
  • payloadSize 表示消息体实际长度。
  • Compresscodec 表示消息压缩方式。
  • crc 表示消息的 crc32 校验和。
  • payload 表示消息体实际内容。

再通过以下命令启动一个生产者,开启生产消息时指定消息的 Key:

kafka-console-producer.sh --broker-list  server-1:9092,server-2:9092,server-3:9092  --topic log-format -property parse.key=true  

查看其中某个分区下的一个数据文件信息如下:

offset: 53 position: 723 CreateTime: 1487163989090 isvalid: true payloadsize: 1 magic: 1 compresscodec: NoCompressionCodec crc: 2783563669 keysize: 1 key: a payload: a
offset: 54 position: 759 CreateTime: 1487163998112 isvalid: true payloadsize: 1 magic: 1 compresscodec: NoCompressionCodec crc: 740192312 keysize: 1 key: b payload: b

offset 为53和54的两条消息,在展示格式上打印出了 keysize 和 key 两个字段,分别表示消息 Key 的长度和 Key 的实际内容。同样,position:759 与 position:723 之差为 offset:53 这条消息的消息固定长度(34字节)与消息总长度(keySize 与 payloadSize 之和)之和。

2.偏移量索引文件
  • Kafka 将消息分段保存在不同的文件中,同时每条消息都有唯一标识的偏移量,数据文件以该文件基准偏移量左补0命名,并将每个日志段以基准偏移量为 Key 保存到 ConcurrentSkipListMap 集合中。
  • 这样查找指定偏移量的消息时,用二分查找算法就能够快速定位到消息所在的段文件。
  • 为了进一步提高查找效率,Kafka 为每个数据文件创建了一个基于偏移量的索引文件,该索引文件的文件名与数据文件相同,文件名后缀为 .index
  • 为了与另一个基于时间戳的索引区分开,我们在这里将基于偏移量的索引文件称为偏移量索引文件。
  • 偏移量索引文件存储了若干个索引条目(IndexEntry),索引条目用来将逻辑偏移量映射成消息在数据文件中的物理位置,每个索引条目由 offset 和 position 组成,每个索引条目唯一确定数据文件中的一条消息。
  • 索引条目的 offset 表示与之对应的数据文件中某条消息的 offset,position 为与之对应的数据文件中某条消息的 position,例如,数据文件中某条消息的 offset 和 position 分别为 offset:8 和 position:0,若为该条消息创建了索引,索引文件中索引值为 offset:8 和 position:0。并不是每条消息都对应有索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引,我们可以通过 index.interval.bytes 设置索引跨度。

每次写消息到数据文件时会检查是否要向索引文件写入索引条目,创建一个新索引条目的条件为:距离前一次写索引后累计消息字节数大于${index.interval.bytes}。具体实现是 LogSegment 维持一个 int 类型的变量 bytesSinceLastIndexEntry,初始值为0,每次写消息时先判断该值是否大于索引跨度。若小于索引跨度,则将该条消息的字节长度累加到变量 bytesSinceLastIndexEntry 中;否则会为该条消息创建一个索引条目写入索引文件,然后将 bytesSinceLastIndexEntry 重置为 0。

索引文件与数据文件对应关系如图 3-22 所示,其中偏移量值用 sn 表示,消息实际位置 position 值用 pn 表示,n 为数字,sn 之间是顺序按步长1递增,而 pn 之间并不表示按步长1顺序递增之意,仅用来区分不同的 position。

enter image description here

图3-22 索引文件与数据文件的映射关系

  • 通过索引文件,我们就能够根据指定的偏移量快速地定位到消息物理位置。
  • 首先根据指定的偏移量,通过二分查找,查询出该偏移量对应消息所在的数据文件和索引文件,
  • 然后在索引文件中通过二分查找,查找值小于等于指定偏移量的最大偏移量,
  • 最后从查找出的最大偏移量处开始顺序扫描数据文件,直至在数据文件中查询到偏移量与指定偏移量相等的消息。
3.时间戳索引文件
  • Kafka 从 0.10.1.1 版本开始引入了一个基于时间戳的索引文件,即每个日志段在物理上还对应一个时间戳索引文件,该索引文件文件名和与之对应的数据文件文件名相同,但以 .timeindex 为文件名后缀,我们称之为时间戳索引文件。
  • 该索引文件包括一个8字节长度的时间戳字段和一个4字节的偏移量字段,其中时间戳记录的是该日志段目前为止最大时间戳,偏移量则记录的是插入新的索引条目时,当前消息的偏移量。
  • 该索引文件索引条目之间的跨度由配置项 index.interval.bytes 设置的阈值决定,但同时必须保证新创建的索引条目的时间戳大于上一个索引的时间戳。
  • 时间戳索引文件中的时间戳对应的类型可以是消息创建时间(CreateTime),也可以是消息写入数据文件的时间(LogAppendTime)。
  • 时间戳索引文件中的时间戳类型与数据文件中的时间戳类型一致,索引条目对应的时间戳的值及偏移量与数据文件中相应消息的这两个字段的值相同。
  • Kafka 也提供了通过时间戳索引来访问消息的方法,在第6章6.3节有相关介绍。时间戳索引也采用了稀疏存储的方式,在记录偏移量索引条目时会判断是否需要同时写时间戳索引。

文章作者: HoldDie
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 HoldDie !
评论
 上一篇
Mac工具篇(二) Mac工具篇(二)
我们害怕的不是宿敌,而是未知。 ——烈酒不醉 基本操作1、设置iCloud链接ln -s ~/Library/Mobile\ Documents/com~apple~CloudDocs iCloud 2、安装 lrzsz2.1、安装br
2018-10-16
下一篇 
Kafka-核心组件(五) Kafka-核心组件(五)
当捡起最后一片落叶,我就能看到整个春天。 ——秋水 5.2 日志管理器启动过程在对日志结构进行简要介绍之后,现在开始分析日志管理器的实现原理。首先分析日志管理器的启动过程。 当代理启动时首先实例化并启动一个日志管理器。实例化日志管理器时
2018-10-15
  目录