# kafka
# 是什么
Kafka 是一种分布式、高吞吐量的分布式发布订阅消息系统
# 为什么需要
- 缓冲和削峰
- 解耦重要业务流程
- 异步通信
# 集群部署
- 下载对应的 kafka 安装包
- 修改 config 下的 server.properties:
a. broker.id
b. log.dirs
c. zookeeper.connect - 在各个机器上启动 kafka:bin/kafka-sever-start.sh -daemon config/server.properties
- 配置集群启停脚本
# 交互命令
- kafka-topics.sh --bootstrap-server hadoop102:9092
a. list all topic: --list
b. create topic: --create --partitions 1 --replication-factor 3 --topic first
c. delete topic: --delete --topic first
d. describe topic: --describe --topic first
e. alter topic: --alter–topic first --partitions 3 - producer/consumer --bootstrap-server hadoop102:9092
a. send message: --topic first
b. consume message: --from-beginning --topic first
# kafka 的架构
- Producer/Consumer: 生产者 / 消费者
- Consumer Group: 消费者组,负责消费来自不同分区的数据,一个分区只能由一个消费者组消费
- Broker: 代理人,一台 kafka 服务器就是一个 broker,一个 broker 包含多个 topic
- Topic: 主题,用于存储数据
- Partition: 分区,一个 Topic 可以分为多个 Partition 分区,每个分区内数据有序
- Replica: 副本,每个 partition 分区都有一个 leader 副本和多个 follower 副本
- Zookeeper: 协调器,记录 broker 服务器的运行状态以及不同分区之间的 leader 和 follower 信息
# kafka 生产者
# 生产者发送消息原理
发送消息时,会启动 main 进程和 sender 进程:
- main 线程负责生产数据:
a. 创建 Producer 对象
b. 执行 0.send (ProducerRecord)
c. 经过 1. 拦截器进行必要处理
d. 利用 2. 序列化器对消息进行序列化
e. 利用 3. 分区器将消息划分到不同的消息队列中
f. kafka 使用双端队列(队列大小默认 32M,每批次 16k),一边塞消息一边发消息 - sender 线程负责发送数据
a. 从双端队列中拉取数据
b. 使用 NetworkClient 与不同的 broker 建立连接
c. 基于 selector 将数据发送到不同的分区上(默认只能发送 5 个未接收 ack 应答的数据,超时会发送,默认 0ms,超 batch_size 也会发送,默认 16k)
d. 并接收来自 broker 的应答,应答 ack=0 表示接收到数据不等落盘直接应答,ack=1 表示 leader 收到数据后应答,ack=-1/all 表示 leader 和 isr 队列都收到数据后应答
# 生产者发送时的分区策略
默认采用 DefaultPartitioner: 实现 Partitioner 接口
- 指明了 partition,则使用指明值
- 未指明 partition,则使用 key 的 hash% 分区值
- 未指明 partition 和 key,则使用粘性分区(随机选定一个分区一直用直到满)
自定义分区,实现 Partitioner 接口
1 | public class MyPartitioner implements Partitioner { |
# 生产者发送吞吐量提升方法
即 NetworkClient 从双端队列中拉取消息的吞吐量,如何提升?
- 提高拉取批次大小 batch.size
- 修改等待时间 linger.ms
- 修改数据压缩算法 snappy
- 修改缓存区大小 recordaccumulator
# 生产者如何保证数据可靠性(ACK 应答级别)
producer 向集群发送数据,有三种 ack 级别:
- 对于 ack=0,leader 不等待马上应答,若 leader 突然挂了,followers 都拷贝不到数据,存在丢数风险
- 对于 ack=1,leader 收到数据后应答,若 leader 突然挂了,followers 依旧无法拷贝到数据,存在丢数风险
- 对于 ack=-1,leader 和 followers 们收到数据后应答,leader 和 followers 都有数据,若此时一个 follower 突然挂了,leader 无法向发送方发送全部收到的 ack,因此:
a. kafka 设计了一个动态的 ISR(in-sync replica set),用来动态维护同步的 foller+leader 集合(isr: 0,1,2; leader: 0),ack=-1 负责与 ISR 列表中的节点进行应答确认
b. isr 中若一个 follower 长时间未与 leader 通信,则被踢出这个 ISR 列表,默认 30s
c. isr 中若所有的 follower 都挂了,则退化成 ack=1,又存在丢数风险
d. 因此,数据完全可靠条件 = ack=-1 + 分区副本数 >=2 + ISR 里最小副本数 >=2分区副本数 >=2:即 isr 中至少存在一个 leader 副本和 follower 副本
ISR 里最小副本数 >=2: 亦如此,此条件用于约束 follower 也在 isr 中
若在发送确认 ack 之前,leader 突然挂了,则需要重新选举 leader,此时新选举的 leader 应该如何处理重复数据?
# 生产者如何保证数据去重
# 数据幂等性
数据传递语义
● 至少一次:ack=-1 + 分区副本数 >=2 + ISR 应答最小副本数 >=2 => 会重复,但保证数据不丢
● 最多一次:ack=0 => 不会重复,但会丢数据
● 精确一次:ack=-1 + 幂等性 => 引入幂等性和事务,从而保证数据不丢也不重复
○ 即无论接受到多少重复数据,broker 只持久化一条,根据 key 值 <ProducerId, Partition, SeqNumber> 判断是否重复,且只能保证单分区单会话内不重复,默认开启幂等性
# 生产者事务
由于幂等性只能保证单分区单会话内不重复,因此需要引入事务,从而保证跨分区跨会话不重复,只能使用事务确保
- 每个 broker 上都有 事务协调器, 事务信息特殊主题, 分区副本 leader
- 在使用事务前,Producer 先自定义一个 transaction.id
- 事务划分根据 transaction.id 的 hashcode%50 计算出事务使用哪儿个分区,对应分区的 leader 副本所在的 broker 的事务协调器就是此次事务的话事人
- 提交事务时,producer 向协调器请求 ProducerId,并将数据发送到分区副本 leader 对应的 topic 中,并向协调器发送 commit 请求
- commit 请求的消息会持久化到事务信息特殊主题中,成功之后会给 topic 发送 commit 请求,判断数据真正写入到 topic 中
1 | // 3. 创建 Kafka 生产者(配置事务支持所需的属性) |
# kafka broker
在 kafka 中如何存储数据的,以及如何和 zk 沟通的
# kafka broker 工作流程
# zk 中存储的 kafka 信息
- broker_ids [0,1,2] 有哪儿些服务器
- leader_id {“leader”:1,“isr”:[0,1,2]} 谁是 leader
- controller {“候选 leader”: 0} 在 leader 挂了之后,成为接任 leader 候选之一
# broker 与 zk 协调的总体工作流程
- 三个节点向 zk 注册
- 哪儿台 broker 的 controller 先向 zk 注册,哪儿台 broker 的 controller 说了算
- controller 按照选举规则选举出 leader(以 isr 节点存活为前提,优先选举 ar 中排在前面的),并将信息同步到 zk 中
- 其他节点的 controller 从 zk 同步相关信息
- 假设选举出来的 leader 挂了,zk 的 controller 会监听到变化,执行重新选举流程
# 节点服役与退役
# broker 服役与退役
其实服役与退役的本质就是副本的迁移,若服役则将已有副本迁移到新节点,若退役则将已有副本迁移到其他节点
1 | 服役 hadoop105 |
1 | 退役 hadoop105 |
# 章节小结
章节小结
# kafka 副本
kafka 所有的副本叫做 AR(Assigned Replicas),AR = ISR + OSR,从 ISR 中踢出去的就放到 OSR 里面
# leader 选举流程(演示)
# follower 故障处理(演示)
follower 故障:
follower 归队:(向高水位靠齐)
# leader 故障处理(演示)
leader 挂了:
新 leader 上位:(所有 follower 向 leader 低水位靠齐)
# kafka 文件存储
# 文件存储机制
kafka 的索引是稀疏索引,每存储 4kb 的文件,才会往 index 中记录一条索引
# 文件清理
# kafka 如何实现高效读写
- kafka 本身是分布式集群,可以采用分区技术,并行度高
- 读数据采用稀疏索引,可以快速定位要消费的数据
- 顺序写磁盘,日志一直追加写到文件末端
- 采用页缓存 + 零拷贝技术 (封装了 OS 底层的页缓存技术,消息无需落盘至应用层就被拷贝走了,即零拷贝)
# kafka 消费者
# 消费方式
# 消费总体流程
offset 维护在系统主题中保存,老版本是维护在 zk 中,但是由于消费者需要与 zk 大量通信,新版本迁移到 broker 上的特殊主题当中
# 消费者组原理
# 消费者组初始化流程
- 首先确认消费者组的 coordinator,即哪个 broker 上的 coordinator 负责管理该消费者组,由 groupid 的 hashcode%50 计算得出
- 消费者组中的消费者向 coordinator 发送 joinGroup 请求,coordinator 选举消费者组的一个 cosumer 作为消费 leader
- coordinator 将消费的 topic 概况发送给 consumer leader
- consumer leader 负责制定当前消费者组的消费方案并上报给 coordinator
- coordinator 将消费方案下发给各个消费者
- 消费者组中的消费者与 coordinator 进行 rebalance,若超时消费或者心跳丢失就触发消费者组之间的再平衡
# 消费者组详细消费流程
consumer 启动 consumerNetworkClient,并按照批次大小或时间间隔向 broker 发送拉取请求,并基于回调函数拉取数据,然后经过反序列化、拦截器、最终处理数据
# 消费者分区分配再平衡
-
range 分配策略针对一个 topic 而言
-
roundrobin 分配策略针对所有 topic 而言
-
sticky 分配策略: 尽量均匀又随机
# offset 位置
__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic + 分区号,value 就是当前 offset 的值。
每隔一段时间,kafka 内部会对这个 topic 进行 compact 压缩存储,也就是每个 group.id+topic + 分区号就保留最新数据。
1 | bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageForm atter" --from-beginning |
为了使我们能够专注于自己的业务逻辑,kafka 提供了自动提交 offset 的功能。
手动提交:
指定 offset 位置:
1 |
|
指定时间消费:将时间转化成对应 offset
1 | Set<TopicPartition> assignment = new HashSet<>(); |
# 消费者事务
为解决重复消费和漏消费的问题,采用消费者事务
# 章节小结
章节小结