# kafka

# 是什么

Kafka 是一种分布式、高吞吐量的分布式发布订阅消息系统

# 为什么需要

  1. 缓冲和削峰
  2. 解耦重要业务流程
  3. 异步通信

# 集群部署

  1. 下载对应的 kafka 安装包
  2. 修改 config 下的 server.properties:
    a. broker.id
    b. log.dirs
    c. zookeeper.connect
  3. 在各个机器上启动 kafka:bin/kafka-sever-start.sh -daemon config/server.properties
  4. 配置集群启停脚本

# 交互命令

  1. kafka-topics.sh --bootstrap-server hadoop102:9092
    a. list all topic: --list
    b. create topic: --create --partitions 1 --replication-factor 3 --topic first
    c. delete topic: --delete --topic first
    d. describe topic: --describe --topic first
    e. alter topic: --alter–topic first --partitions 3
  2. producer/consumer --bootstrap-server hadoop102:9092
    a. send message: --topic first
    b. consume message: --from-beginning --topic first

# kafka 的架构

20250206184652

  1. Producer/Consumer: 生产者 / 消费者
  2. Consumer Group: 消费者组,负责消费来自不同分区的数据,一个分区只能由一个消费者组消费
  3. Broker: 代理人,一台 kafka 服务器就是一个 broker,一个 broker 包含多个 topic
  4. Topic: 主题,用于存储数据
  5. Partition: 分区,一个 Topic 可以分为多个 Partition 分区,每个分区内数据有序
  6. Replica: 副本,每个 partition 分区都有一个 leader 副本和多个 follower 副本
  7. Zookeeper: 协调器,记录 broker 服务器的运行状态以及不同分区之间的 leader 和 follower 信息

# kafka 生产者

# 生产者发送消息原理

20250207012841

发送消息时,会启动 main 进程和 sender 进程:

  1. main 线程负责生产数据:
    a. 创建 Producer 对象
    b. 执行 0.send (ProducerRecord)
    c. 经过 1. 拦截器进行必要处理
    d. 利用 2. 序列化器对消息进行序列化
    e. 利用 3. 分区器将消息划分到不同的消息队列中
    f. kafka 使用双端队列(队列大小默认 32M,每批次 16k),一边塞消息一边发消息
  2. sender 线程负责发送数据
    a. 从双端队列中拉取数据
    b. 使用 NetworkClient 与不同的 broker 建立连接
    c. 基于 selector 将数据发送到不同的分区上(默认只能发送 5 个未接收 ack 应答的数据,超时会发送,默认 0ms,超 batch_size 也会发送,默认 16k)
    d. 并接收来自 broker 的应答,应答 ack=0 表示接收到数据不等落盘直接应答,ack=1 表示 leader 收到数据后应答,ack=-1/all 表示 leader 和 isr 队列都收到数据后应答

# 生产者发送时的分区策略

20250207094842
默认采用 DefaultPartitioner: 实现 Partitioner 接口

  1. 指明了 partition,则使用指明值
  2. 未指明 partition,则使用 key 的 hash% 分区值
  3. 未指明 partition 和 key,则使用粘性分区(随机选定一个分区一直用直到满)

自定义分区,实现 Partitioner 接口

1
2
3
4
5
6
7
8
9
public class MyPartitioner implements Partitioner {
public int partition(...) {
String msgValue = value.toString();
int partition;
if (msgValue.contains("atguigu")) partition = 0;
else partition = 1;
return partition;
}
}

# 生产者发送吞吐量提升方法

即 NetworkClient 从双端队列中拉取消息的吞吐量,如何提升?

  1. 提高拉取批次大小 batch.size
  2. 修改等待时间 linger.ms
  3. 修改数据压缩算法 snappy
  4. 修改缓存区大小 recordaccumulator

# 生产者如何保证数据可靠性(ACK 应答级别)

20250207094948

producer 向集群发送数据,有三种 ack 级别:

  1. 对于 ack=0,leader 不等待马上应答,若 leader 突然挂了,followers 都拷贝不到数据,存在丢数风险
  2. 对于 ack=1,leader 收到数据后应答,若 leader 突然挂了,followers 依旧无法拷贝到数据,存在丢数风险
  3. 对于 ack=-1,leader 和 followers 们收到数据后应答,leader 和 followers 都有数据,若此时一个 follower 突然挂了,leader 无法向发送方发送全部收到的 ack,因此:
    a. kafka 设计了一个动态的 ISR(in-sync replica set),用来动态维护同步的 foller+leader 集合(isr: 0,1,2; leader: 0),ack=-1 负责与 ISR 列表中的节点进行应答确认
    b. isr 中若一个 follower 长时间未与 leader 通信,则被踢出这个 ISR 列表,默认 30s
    c. isr 中若所有的 follower 都挂了,则退化成 ack=1,又存在丢数风险
    d. 因此,数据完全可靠条件 = ack=-1 + 分区副本数 >=2 + ISR 里最小副本数 >=2

    分区副本数 >=2:即 isr 中至少存在一个 leader 副本和 follower 副本
    ISR 里最小副本数 >=2: 亦如此,此条件用于约束 follower 也在 isr 中
    若在发送确认 ack 之前,leader 突然挂了,则需要重新选举 leader,此时新选举的 leader 应该如何处理重复数据?

# 生产者如何保证数据去重

# 数据幂等性

数据传递语义
● 至少一次:ack=-1 + 分区副本数 >=2 + ISR 应答最小副本数 >=2 => 会重复,但保证数据不丢
● 最多一次:ack=0 => 不会重复,但会丢数据
● 精确一次:ack=-1 + 幂等性 => 引入幂等性和事务,从而保证数据不丢也不重复
○ 即无论接受到多少重复数据,broker 只持久化一条,根据 key 值 <ProducerId, Partition, SeqNumber> 判断是否重复,且只能保证单分区单会话内不重复,默认开启幂等性

# 生产者事务

由于幂等性只能保证单分区单会话内不重复,因此需要引入事务,从而保证跨分区跨会话不重复,只能使用事务确保

20250207145445

  1. 每个 broker 上都有 事务协调器, 事务信息特殊主题, 分区副本 leader
  2. 在使用事务前,Producer 先自定义一个 transaction.id
  3. 事务划分根据 transaction.id 的 hashcode%50 计算出事务使用哪儿个分区,对应分区的 leader 副本所在的 broker 的事务协调器就是此次事务的话事人
  4. 提交事务时,producer 向协调器请求 ProducerId,并将数据发送到分区副本 leader 对应的 topic 中,并向协调器发送 commit 请求
  5. commit 请求的消息会持久化到事务信息特殊主题中,成功之后会给 topic 发送 commit 请求,判断数据真正写入到 topic 中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 3. 创建 Kafka 生产者(配置事务支持所需的属性)
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0"); // 设置事务 id(必须),事务 id 任意起名
try {
kafkaProducer.initTransactions(); // 初始化事务(必须先于任何事务操作调用)
kafkaProducer.beginTransaction(); // 开启一个新事务
for (int i = 0; i < 5; i++) { // 4. 批量发送消息(在事务中)
// 构建消息记录(主题,消息内容)
ProducerRecord<String, String> record = new ProducerRecord<>("first", "atguigu " + i);
kafkaProducer.send(record); // 异步发送消息(可添加回调函数处理发送结果)
}
// int i = 1 / 0; // 模拟异常(测试事务回滚时可取消注释)
kafkaProducer.commitTransaction(); // 提交事务(只有当所有消息成功发送时才会执行)
} catch (Exception e) {
kafkaProducer.abortTransaction();
System.err.println("Transaction aborted: " + e.getMessage()); // 发生异常时终止事务(自动回滚未提交的消息)
} finally {
kafkaProducer.close(); // 5. 确保最终关闭生产者(会等待所有未完成操作完成)
}

# kafka broker

在 kafka 中如何存储数据的,以及如何和 zk 沟通的

# kafka broker 工作流程

# zk 中存储的 kafka 信息

20250207162920

  1. broker_ids [0,1,2] 有哪儿些服务器
  2. leader_id {“leader”:1,“isr”:[0,1,2]} 谁是 leader
  3. controller {“候选 leader”: 0} 在 leader 挂了之后,成为接任 leader 候选之一

# broker 与 zk 协调的总体工作流程

20250207163609

  1. 三个节点向 zk 注册
  2. 哪儿台 broker 的 controller 先向 zk 注册,哪儿台 broker 的 controller 说了算
  3. controller 按照选举规则选举出 leader(以 isr 节点存活为前提,优先选举 ar 中排在前面的),并将信息同步到 zk 中
  4. 其他节点的 controller 从 zk 同步相关信息
  5. 假设选举出来的 leader 挂了,zk 的 controller 会监听到变化,执行重新选举流程

# 节点服役与退役

# broker 服役与退役

其实服役与退役的本质就是副本的迁移,若服役则将已有副本迁移到新节点,若退役则将已有副本迁移到其他节点

20250207164554

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 服役 hadoop105
bin/kafka-server-start.sh -daemon ./config/server.properties # hadoop105

vim topics-to-move.json # hadoop102
{"topics": [{"topic": "first"}]}

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate # hadoop102

vim increase-replication-factor.json # hadoop102,将上面返回的json内容Proposed partition reassignment configuration填入
{"version":1,"partitions":[{"topic":"first","partition":0,"replic
as":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","par
tition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"to
pic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","
any","any"]}]}

bin/kafka-reassign-partitions.sh --reassignment-json-file increase-replication-factor.json --execute # hadoop102,执行迁移

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute # hadoop102,验证
1
2
3
4
5
6
7
8
9
10
11
12
13
# 退役 hadoop105
vim topics-to-move.json # hadoop105
{"topics": [{"topic": "first"}], "version": 1}

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate # hadoop102

vim increase-replication-factor.json # hadoop102,同上,将返回的json内容填入

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute # hadoop102,执行迁移

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify # hadoop102,验证

bin/kafka-server-stop.sh # hadoop105,退役

# 章节小结

章节小结

# kafka 副本

kafka 所有的副本叫做 AR(Assigned Replicas),AR = ISR + OSR,从 ISR 中踢出去的就放到 OSR 里面

# leader 选举流程(演示)

20250207171519

20250207171913

# follower 故障处理(演示)

20250207172116

follower 故障:
20250207172245

follower 归队:(向高水位靠齐)
20250207172349

# leader 故障处理(演示)

leader 挂了:
20250207172445

新 leader 上位:(所有 follower 向 leader 低水位靠齐)
20250207172550

# kafka 文件存储

# 文件存储机制

20250207181456

20250207181925

kafka 的索引是稀疏索引,每存储 4kb 的文件,才会往 index 中记录一条索引

# 文件清理

20250207182451

20250207182505

20250207182515

# kafka 如何实现高效读写

  1. kafka 本身是分布式集群,可以采用分区技术,并行度高
  2. 读数据采用稀疏索引,可以快速定位要消费的数据
  3. 顺序写磁盘,日志一直追加写到文件末端
  4. 采用页缓存 + 零拷贝技术 (封装了 OS 底层的页缓存技术,消息无需落盘至应用层就被拷贝走了,即零拷贝)
    20250207183042

# kafka 消费者

# 消费方式

20250207183521

# 消费总体流程

20250207183800

offset 维护在系统主题中保存,老版本是维护在 zk 中,但是由于消费者需要与 zk 大量通信,新版本迁移到 broker 上的特殊主题当中

# 消费者组原理

# 消费者组初始化流程

20250207185811

  1. 首先确认消费者组的 coordinator,即哪个 broker 上的 coordinator 负责管理该消费者组,由 groupid 的 hashcode%50 计算得出
  2. 消费者组中的消费者向 coordinator 发送 joinGroup 请求,coordinator 选举消费者组的一个 cosumer 作为消费 leader
  3. coordinator 将消费的 topic 概况发送给 consumer leader
  4. consumer leader 负责制定当前消费者组的消费方案并上报给 coordinator
  5. coordinator 将消费方案下发给各个消费者
  6. 消费者组中的消费者与 coordinator 进行 rebalance,若超时消费或者心跳丢失就触发消费者组之间的再平衡

# 消费者组详细消费流程

20250207190338
consumer 启动 consumerNetworkClient,并按照批次大小或时间间隔向 broker 发送拉取请求,并基于回调函数拉取数据,然后经过反序列化、拦截器、最终处理数据

# 消费者分区分配再平衡

20250207193718

  1. range 分配策略针对一个 topic 而言
    20250207193830

  2. roundrobin 分配策略针对所有 topic 而言
    20250207194314

  3. sticky 分配策略: 尽量均匀又随机
    20250207194607

# offset 位置

20250207194919

__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic + 分区号,value 就是当前 offset 的值。
每隔一段时间,kafka 内部会对这个 topic 进行 compact 压缩存储,也就是每个 group.id+topic + 分区号就保留最新数据。

1
bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageForm atter" --from-beginning

为了使我们能够专注于自己的业务逻辑,kafka 提供了自动提交 offset 的功能。
20250207195252

手动提交:
20250207195440

指定 offset 位置:

1
2
3
4
5
6
7
8
9

Set<TopicPartition> assignment= new HashSet<>();
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1)); // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
for (TopicPartition tp: assignment) { // 遍历所有分区,并指定 offset 从 1700 的位置开始消费
kafkaConsumer.seek(tp, 1700);
}

指定时间消费:将时间转化成对应 offset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1)); // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
HashMap<TopicPartition, Long> timestampToSearch = new
HashMap<>();
for (TopicPartition topicPartition : assignment) { // 封装集合存储,每个分区对应一天前的数据
timestampToSearch.put(topicPartition,
System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}

Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch); // 获取从 1 天前开始消费的每个分区的 offset
for (TopicPartition topicPartition : assignment) { // 遍历每个分区,对每个分区设置消费时间。
OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition); // 根据时间指定开始消费的位置
if (offsetAndTimestamp != null){
kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
}
}

# 消费者事务

20250207200506
为解决重复消费和漏消费的问题,采用消费者事务
20250207200654

# 章节小结

章节小结

更新于 阅读次数

请我喝[茶]~( ̄▽ ̄)~*

Euan Cai 微信支付

微信支付