[TOC]
kafka tool 可视化
kafka-topics-ui
Kafka Web UI 可视化界面
Kafka Eagle 监控系统是一款用来监控 Kafka 集群的工具
Kafka 管理工具 CMAK,以前叫 kafka-manager
KIP-500:Replace ZooKeeper with a Metadata Quorum
提到将使用 Quorum 机制
多年前就有人提到Is Zookeeper a must for Kafka?
zookeeper 将从 kafka 中移除. - 2020.05.15
Kafka 2.6 版本还是需要 zookeeper(说明不是那么简单的事) - 2020.07.16
Kafka 2.8 What’s New in Apache Kafka 2.8.0 - 2021.04.19
我们很高兴地宣布2.8引入了不带ZooKeeper的Kafka的抢先体验!
该实现尚未完成功能,不应在生产中使用,但是可以在没有ZooKeeper的情况下启动新集群
Kafka 3.0 - 2021.09.20 进度:还没有完全移除ZooKeeper
改进了 KRaft--Apache Kafka 的内置共识机制,该机制将取代 Apache ZooKeeper™。
https://downloads.apache.org/kafka/3.0.0/RELEASE_NOTES.html
Kafka 总结(一):Kafka 概述
https://blog.csdn.net/qq_36807862/article/details/81283142
Kafka 总结(二):Kafka 核心组件
https://blog.csdn.net/qq_36807862/article/details/81283229
Kafka 总结(三):Kafka 核心流程分析
https://blog.csdn.net/qq_36807862/article/details/81283286
Kafka 总结(四):Kafka 命令操作
https://blog.csdn.net/qq_36807862/article/details/81283397
Kafka 总结(五):API 编程详解
https://blog.csdn.net/qq_36807862/article/details/81283460
Kafka 总结(六):Kafka Stream 详解
https://blog.csdn.net/qq_36807862/article/details/81283491
Kafka 总结(七):数据采集应用
https://blog.csdn.net/qq_36807862/article/details/81283546
Kafka 总结(八):KafKa 与 ELK 整合应用
https://blog.csdn.net/qq_36807862/article/details/81283568
Kafka 总结(九):KafKa 与 Spark 整合应用
https://blog.csdn.net/qq_36807862/article/details/81283606
大数据中台之 Kafka,到底好在哪里?
https://blog.csdn.net/csdn_bang/article/details/103797920
Kafka 是最初由 Linkedin 公司开发,是一个分布式、分区的、多副本的、多订阅者,基于 zookeeper 协调的分布式日志系统(也可以当做 MQ 系统),常见可以用于 web/nginx 日志、访问日志,消息服务等等,Linkedin 于 2010 年贡献给了 Apache 基金会并成为顶级开源项目。
主要应用场景是:日志收集系统和消息系统。
Kafka 主要设计目标如下:
Kafka 是一个分布式的、可分区的、可复制的消息系统。
Kafka 是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。
有两种主要的消息传递模式:点对点传递模式、发布-订阅模式。
Kafka 就是一种发布-订阅模式
是特定类型的消息流(字节的有效负载),话题是消息的分类;
kafka 中消息订阅和发送都是基于某个 topic。
比如有个 topic 叫做 NBA 赛事信息,那么 producer 会把 NBA 赛事信息的消息发送到此 topic 下面。
所有订阅此 topic 的 consumer 将会拉取到此 topic 下的消息。
Topic 就像一个特定主题的收件箱,producer 往里丢,consumer 取走。
是能够发布消息到话题的任何对象;
生产者即数据的发布者,该角色将消息发布到 Kafka 的 topic 中。broker 接收到生产者发送的消息后,broker 将该消息追加到当前用于追加数据的 segment 文件中。生产者发送的消息,存储到一个 partition 中,生产者也可以指定数据存储的 partition。
如果 Partition 没填,那么情况会是这样的:
已发布的消息保存在一组服务器中,它们被称为代理(Broker)或 Kafka 集群;
一个 Borker 就是 Kafka 集群中的一个实例,或者说是一个服务单元。
连接到同一个 zookeeper 的多个 broker 实例组成 kafka 的集群。
在若干个 broker 中会有一个 broker 是 leader,其余的 broker 为 follower。
broker 存储 topic 的数据。如果某 topic 有 N 个 partition,集群有 N 个 broker,那么每个 broker 存储该 topic 的一个 partition。
如果某 topic 有 N 个 partition,集群有(N+M)个 broker,那么其中有 N 个 broker 存储该 topic 的一个 partition,剩下的 M 个 broker 不存储该 topic 的 partition 数据。
如果某 topic 有 N 个 partition,集群中 broker 数目少于 N 个,那么一个 broker 存储该 topic 的一个或多个 partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致 Kafka 集群数据不均衡。
topic 中的数据分割为一个或多个 partition。每个 topic 至少有一个 partition。每个 partition 中的数据使用多个 segment 文件存储。partition 中的数据是有序的,不同 partition 间的数据丢失了数据的顺序。如果 topic 有多个 partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将 partition 数目设为 1。
每个分区可以设置备份数量
分区由一个 leader+多个 followers 组成,生产者直接与 leader 进行沟通,leader 接收消息后,其他的 followers 会同步这个消息。所有的 follwers 同步消息后,该消息才会成为可消费的状态。
Broker 中 Topic 与分区,分区与生产者,分区之间的选举备份等等信息都需要 zookeeper 进行协调。
每个 Topic 由多个分区组成,每个分区内部的数据保证了有序性,即是按照时间序列,append 到分区的尾部。分区是有固定大小的,容量不够时,会创建新的分区。Kafka 在一定时间内会定期清理过期的文件。
这种连续性的文件存储,一方面有效的利用磁盘的线性存取;另一方面减轻了内存的压力。
可以订阅一个或多个话题,并从 Broker 拉(pull)数据,从而消费这些已发布的消息;
push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。push 模式的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。
Kafka 和其它消息系统有一个不一样的设计,在 consumer 之上加了一层 group(Consumer Group);
同一个 group 的 consumer 可以并行消费同一个 topic 的消息,但是同 group 的 consumer,不会重复消费。
如果同一个 topic 需要被多次消费,可以通过设立多个 consumer group 来实现。每个 group 分别消费,互不影响。
每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name,若不指定 group name 则属于默认的 group)。
很多传统的 message queue 都会在消息被消费完后将消息删除,一方面避免重复消费,另一方面可以保证 queue 的长度比较少,提高效率。而如上文所将,Kafka 并不删除已消费的消息,为了实现传统 message queue 消息只被消费一次的语义,Kafka 保证保证同一个 consumer group 里只有一个 consumer 会消费一条消息。与传统 message queue 不同的是,Kafka 还允许不同 consumer group 同时消费同一条消息,这一特性可以为消息的多元化处理提供了支持。实际上,Kafka 的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用 Storm 这种实时流处理系统对消息进行实时在线处理,同时使用 Hadoop 这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的 consumer 在不同的 consumer group 即可。
Kafka 的相关术语以及之间的关系 1
Kafka 的相关术语以及之间的关系 2
上图中一个 topic 配置了 3 个 partition。Partition1 有两个 offset:0 和 1。Partition2 有 4 个 offset。Partition3 有 1 个 offset。副本的 id 和副本所在的机器的 id 恰好相同。
如果一个 topic 的副本数为 3,那么 Kafka 将在集群中为每个 partition 创建 3 个相同的副本。集群中的每个 broker 存储一个或多个 partition。多个 producer 和 consumer 可同时生产和消费数据。
每个 partition 有多个副本,其中有且仅有一个作为 Leader,Leader 是当前负责数据的读写的 partition。
Follower 跟随 Leader,所有写请求都通过 Leader 路由,数据变更会广播给所有 Follower,Follower 与 Leader 保持数据同步。如果 Leader 失效,则从 Follower 中选举出一个新的 Leader。当 Follower 与 Leader 挂掉、卡住或者同步太慢,leader 会把这个 follower 从“in sync replicas”(ISR)列表中删除,重新创建一个 Follower。
我们将消息的发布(publish)称作 producer,
将消息的订阅(subscribe)表述为 consumer,
将中间的存储阵列称作 broker(代理);
多个 broker 协同合作,
producer 和 consumer 部署在各个业务逻辑中被频繁的调用,
三者通过 zookeeper 管理协调请求和转发。
这样一个高性能的分布式消息发布订阅系统就完成了。
Producer 如果生产了数据,会先通过 zookeeper 找到 broker,然后将数据存放到 broker;
Consumer 如果要消费数据,会先通过 zookeeper 找对应的 broker,然后消费;
在 Kafka 中很多节点的调度以及资源的分配,都要依赖于 zookeeper 来完成。
kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒,
每个 topic 可以分多个 partition, consumer group 对 partition 进行 consume 操作;
kafka 集群支持热扩展(不停机的情况下扩展 kafka);
消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;
允许集群中节点失败(若副本数量为 n,则允许 n-1 个节点失败);
支持数千个客户端同时读写;
可以使用 Storm 这种实时流处理系统对消息进行实时进行处理,
同时还可以使用 Hadoop 这种批处理系统进行离线处理;
注意:
Topic、partition、replica 的关系图
由上图可见,leader replica 做了大量的工作。所以如果不同 partition 的 leader replica 在 kafka 集群的 broker 上分布不均匀,就会造成负载不均衡。
注:kafka 通过轮询算法保证 leader replica 是均匀分布在多个 broker 上。如下图。
副本均匀分部图
可以看到每个 partition 的 leader replica 均匀的分布在三个 broker 上,follower replica 也是均匀分布的。
Replica 总结:
topic 下划分了多个 partition,消息的生产和消费最终都是发生在 partition 之上;
读写示意图
为了提高性能,现代操作系统往往使用内存作为磁盘的缓存;
虽然每个程序都在自己的线程里只缓存了一份数据,但在操作系统的缓存里还有一份,这等于存了两份数据。
与传统的将数据缓存在内存中然后刷到硬盘的设计不同,
Kafka 直接将数据写到了文件系统的日志中。
在业务中,常常都是使用 At least once 的模型,如果需要可重入的话,往往是业务自己实现。
先获取数据,再 Commit Offset,最后进行业务处理:
先获取数据,再进行业务处理,业务处理成功后 Commit Offset:
思路是这样的,首先要保证消息不丢,再去保证不重复。所以盯着 At least once 的原因来搞。
首先想出来的:
由于业务接口是否幂等,不是 Kafka 能保证的,所以 Kafka 这里提供的 Exactly once 是有限制的,消费者的下游也必须是 Kafka。
所以以下讨论的,没特殊说明,消费者的下游系统都是 Kafka(注:使用 Kafka Conector,它对部分系统做了适配,实现了 Exactly once)。生产者幂等性好做,没啥问题。
解决重复消费有两个方法:
本来 Exactly once 实现第 1 点就 OK 了。但是在一些使用场景下,我们的数据源可能是多个 Topic,处理后输出到多个 Topic,这时我们会希望输出时要么全部成功,要么全部失败。这就需要实现事务性。
既然要做事务,那么干脆把重复消费的问题从根源上解决,把 Commit Offset 和输出到其他 Topic 绑定成一个事务。
思路是这样的,为每个 Producer 分配一个 Pid,作为该 Producer 的唯一标识。
Producer 会为每一个维护一个单调递增的 Seq。类似的,Broker 也会为每个记录下最新的 Seq。
当 req_seq == broker_seq+1 时,Broker 才会接受该消息,因为:
场景是这样的:
其中第 2、3 点作为一个事务,要么全成功,要么全失败。这里得益于 Offset 实际上是用特殊的 Topic 去保存,这两点都归一为写多个 Topic 的事务性处理。
基本思路是这样的:
做事务时,先标记开启事务,写入数据,全部成功就在 Transaction Log 中记录为 Prepare Commit 状态,否则写入 Prepare Abort 的状态。
之后再去给每个相关的 Partition 写入一条 Marker(Commit 或者 Abort)消息,标记这个事务的 Message 可以被读取或已经废弃。成功后在 Transaction Log 记录下 Commit/Abort 状态,至此事务结束。
数据流:
这里 Prepare 的状态主要是用于事务恢复,例如给相关的 Partition 发送控制消息,没发完就宕机了,备机起来后,Producer 发送请求获取 Pid 时,会把未完成的事务接着完成。
当 Partition 中写入 Commit 的 Marker 后,相关的消息就可被读取。所以 Kafka 事务在 Prepare Commit 到 Commit 这个时间段内,消息是逐渐可见的,而不是同一时刻可见。
前面都是从生产的角度看待事务。还需要从消费的角度去考虑一些问题。
消费时,Partition 中会存在一些消息处于未 Commit 状态,即业务方应该看不到的消息,需要过滤这些消息不让业务看到,Kafka 选择在消费者进程中进行过来,而不是在 Broker 中过滤,主要考虑的还是性能。
Kafka 高性能的一个关键点是 Zero Copy,如果需要在 Broker 中过滤,那么势必需要读取消息内容到内存,就会失去 Zero Copy 的特性。
文件组织
Kafka 的数据,实际上是以文件的形式存储在文件系统的。Topic 下有 Partition,Partition 下有 Segment,Segment 是实际的一个个文件,Topic 和 Partition 都是抽象概念。
在目录 /partitionid}/ 下,存储着实际的 Log 文件(即 Segment),还有对应的索引文件。
每个 Segment 文件大小相等,文件名以这个 Segment 中最小的 Offset 命名,文件扩展名是 .log。Segment 对应的索引的文件名字一样,扩展名是 .index。
有两个 Index 文件:
为了减少索引文件的大小,降低空间使用,方便直接加载进内存中,这里的索引使用稀疏矩阵,不会每一个 Message 都记录下具体位置,而是每隔一定的字节数,再建立一条索引。
索引包含两部分:
BaseOffset:意思是这条索引对应 Segment 文件中的第几条 Message。这样做方便使用数值压缩算法来节省空间。例如 Kafka 使用的是 Varint。
Position:在 Segment 中的绝对位置。
查找 Offset 对应的记录时,会先用二分法,找出对应的 Offset 在哪个 Segment 中,然后使用索引,在定位出 Offset 在 Segment 中的大概位置,再遍历查找 Message。
消息不会被重复发送,最多被传输一次,但也有可能一次不传输。
基本思想是保证每一条消息 commit 成功之后,再进行消费处理;
设置自动提交为 false,接受到消息之后,首先 commit,然后再进行消费
消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
基本思想是保证每一条消息处理成功之后,再进行 commit;
设置自动提交为 false;消息处理成功之后,手动进行 commit;
采用这种模式时,最好保证消费操作的“幂等性”,防止重复消费;
不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的。
核心思想是将 offset 作为唯一 id 与消息同时处理,并且保证处理的原子性;
设置自动提交为 false;消息处理成功之后再提交;
比如对于关系型数据库来说,可以将 id 设置为消息处理结果的唯一索引,再次处理时,如果发现该索引已经存在,那么就不处理;
如果 producer 发布消息时发生了网络错误,但又不确定实在提交之前发生的还是提交之后发生的,
这种情况虽然不常见,但是必须考虑进去,现在 Kafka 版本还没有解决这个问题,
将来的版本正在努力尝试解决。
并不是所有的情况都需要“精确的一次”这样高的级别,Kafka 允许 producer 灵活的指定级别。
比如 producer 可以指定必须等待消息被提交的通知,
或者完全的异步发送消息而不等待任何通知,或者仅仅等待 leader 声明它拿到了消息。
以消息集为单位处理消息,比以单个的消息为单位处理,会提升不少性能。
Producer 把消息集一块发送给服务端,而不是一条条的发送;
服务端把消息集一次性的追加到日志文件中,这样减少了琐碎的 I/O 操作。
consumer 也可以一次性的请求一个消息集。
Kafka 采用了端到端的压缩:因为有“消息集”的概念,客户端的消息可以一起被压缩后送到服务端,
并以压缩后的格式写入日志文件,以压缩的格式发送到 consumer,
消息从 producer 发出到 consumer 拿到都是被压缩的,只有在 consumer 使用的时候才被解压缩,所以叫做“端到端的压缩”。
Kafka 支持 GZIP 和 Snappy 压缩协议。
客户端控制消息将被分发到哪个分区。
可以通过负载均衡随机的选择,或者使用分区函数。
Kafka 允许用户实现分区函数,指定分区的 key,将消息 hash 到不同的分区上;
比如如果你指定的 key 是 user id,那么同一个用户发送的消息都被发送到同一个分区上。
经过分区之后,consumer 就可以有目的的消费某个分区的消息。
批量发送可以很有效的提高发送效率。
Kafka producer 的异步发送模式允许进行批量发送,先将消息缓存在内存中,然后一次请求批量发送出去。
这个策略可以配置的,比如可以指定缓存的消息达到某个量的时候就发出去,
或者缓存了固定的时间后就发送出去(比如 100 条消息就发送,或者每 5 秒发送一次)。
这种策略将大大减少服务端的 I/O 次数。
Kafa consumer 消费消息时,向 broker 发出"fetch"请求去消费特定分区的消息。
consumer 指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息。
customer 拥有了 offset 的控制权,可以向后回滚去重新消费之前的消息,这是很有意义的。
Kafka 遵循了一种大部分消息系统共同的传统的设计:
producer 将消息推送到 broker,consumer 从 broker 拉取消息。
latest 最后, earliest 最早, none
采用默认配置情况下,既不能完全保证 At-least-once 也不能完全保证 at-most-once;
比如:
在自动提交之后,数据消费流程失败,这样就会有丢失,不能保证 at-least-once;
数据消费成功,但是自动提交失败,可能会导致重复消费,这样也不能保证 at-most-once;
但是将自动提交时长设置的足够小,则可以最大限度的保证 at-most-once;
enable.auto.commit 是否自动提交自己的 offset 值;默认值时 true
auto.commit.interval.ms 自动提交时长间隔;默认值时 5000 ms
consumer.commitSync(); offset 提交命令;
在 Kafka 中 offset 是由 consumer 维护的(实际可以由 zookeeper 来完成,0.8 以后在名为__consumer_offsets 的 topic 中,该 topic 有 50 个分区)。这种机制有两个好处
创建副本的单位是 topic 的分区,每个分区都有一个 leader 和零或多个 followers;
所有的读写操作都由 leader 处理;同一个分区的副本数量不能多于 brokers 的数量;
各分区的 leader 均匀的分布在 brokers 中。
所有的 followers 都复制 leader 的日志,日志中的消息和顺序都和 leader 中的一致。
flowers 向普通的 consumer 那样从 leader 那里拉取消息并保存在自己的日志文件中。
Kafka 判断一个节点是否活着有两个条件:
leader 的选择
Kafka 的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素。
当存在多副本的情况下,会尽量把多个副本,分配到不同的 Broker 上。
Kafka 会为 Partition 选出一个 Leader,之后所有该 Partition 的请求,实际操作的都是 Leader,然后再同步到其他的 Follower。
当一个 Broker 歇菜后,所有 Leader 在该 Broker 上的 Partition 都会重新选举,选出一个 Leader。(这里不像分布式文件存储系统那样会自动进行复制保持副本数)
然后这里就涉及两个细节:
Kafka 使用 ZK 在 Broker 中选出一个 Controller,用于 Partition 分配和 Leader 选举。
Controller 会在 ZK 的 /brokers/ids 节点上注册 Watch,一旦有 Broker 宕机,它就能知道。
当 Broker 宕机后,Controller 就会给受到影响的 Partition 选出新 Leader。
Controller 从 ZK 的 /brokers/topics/[topic]/partitions/[partition]/state 中,读取对应 Partition 的 ISR(in-sync replica 已同步的副本)列表,选一个出来做 Leader。
选出 Leader 后,更新 ZK,然后发送 LeaderAndISRRequest 给受影响的 Broker,让它们知道改变这事。
为什么这里不是使用 ZK 通知,而是直接给 Broker 发送 RPC 请求,我的理解可能是这样做 ZK 有性能问题吧。
如果 ISR 列表是空,那么会根据配置,随便选一个 Replica 做 Leader,或者干脆这个 Partition 就是歇菜。
如果 ISR 列表的有机器,但是也歇菜了,那么还可以等 ISR 的机器活过来。
这里的策略,服务端这边的处理是 Follower 从 Leader 批量拉取数据来同步。但是具体的可靠性,是由生产者来决定的。
生产者生产消息的时候,通过 request.required.acks 参数来设置数据的可靠性。
在 Acks=-1 的时候,如果 ISR 少于 min.insync.replicas 指定的数目,那么就会返回不可用。
这里 ISR 列表中的机器是会变化的,根据配置 replica.lag.time.max.ms,多久没同步,就会从 ISR 列表中剔除。
以前还有根据落后多少条消息就踢出 ISR,在 1.0 版本后就去掉了,因为这个值很难取,在高峰的时候很容易出现节点不断的进出 ISR 列表。
从 ISA 中选出 Leader 后,Follower 会把自己日志中上一个高水位后面的记录去掉,然后去和 Leader 拿新的数据。
因为新的 Leader 选出来后,Follower 上面的数据,可能比新 Leader 多,所以要截取。
这里高水位的意思,对于 Partition 和 Leader,就是所有 ISR 中都有的最新一条记录。消费者最多只能读到高水位。
从 Leader 的角度来说高水位的更新会延迟一轮,例如写入了一条新消息,ISR 中的 Broker 都 Fetch 到了,但是 ISR 中的 Broker 只有在下一轮的 Fetch 中才能告诉 Leader。
也正是由于这个高水位延迟一轮,在一些情况下,Kafka 会出现丢数据和主备数据不一致的情况,0.11 开始,使用 Leader Epoch 来代替高水位。
首先这个 acks 参数,是在 KafkaProducer,也就是生产者客户端里设置的
也就是说,你往 kafka 写数据的时候,就可以来设置这个 acks 参数。然后这个参数实际上有三种常见的值可以设置,分别是:0、1 和 all。
第一种选择是把 acks 参数设置为 0,意思就是我的 KafkaProducer 在客户端,只要把消息发送出去,不管那条数据有没有在哪怕 Partition Leader 上落到磁盘,我就不管他了,直接就认为这个消息发送成功了。
如果你采用这种设置的话,那么你必须注意的一点是,可能你发送出去的消息还在半路。结果呢,Partition Leader 所在 Broker 就直接挂了,然后结果你的客户端还认为消息发送成功了,此时就会导致这条消息就丢失了。
第二种选择是设置 acks = 1,意思就是说只要 Partition Leader 接收到消息而且写入本地磁盘了,就认为成功了,不管他其他的 Follower 有没有同步过去这条消息了。
这种设置其实是 kafka 默认的设置,大家请注意,划重点!这是默认的设置
也就是说,默认情况下,你要是不管 acks 这个参数,只要 Partition Leader 写成功就算成功。
但是这里有一个问题,万一 Partition Leader 刚刚接收到消息,Follower 还没来得及同步过去,结果 Leader 所在的 broker 宕机了,此时也会导致这条消息丢失,因为人家客户端已经认为发送成功了。
最后一种情况,就是设置 acks=all,这个意思就是说,Partition Leader 接收到消息之后,还必须要求 ISR 列表里跟 Leader 保持同步的那些 Follower 都要把消息同步过去,才能认为这条消息是写入成功了。
如果说 Partition Leader 刚接收到了消息,但是结果 Follower 没有收到消息,此时 Leader 宕机了,那么客户端会感知到这个消息没发送成功,他会重试再次发送消息过去。
此时可能 Partition 2 的 Follower 变成 Leader 了,此时 ISR 列表里只有最新的这个 Follower 转变成的 Leader 了,那么只要这个新的 Leader 接收消息就算成功了。
订阅 Topic 是以一个消费组来订阅的,一个消费组里面可以有多个消费者。同一个消费组中的两个消费者,不会同时消费一个 Partition。
换句话来说,就是一个 Partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费。
因此,如果消费组内的消费者如果比 Partition 多的话,那么就会有个别消费者一直空闲。
订阅 Topic 时,可以用正则表达式,如果有新 Topic 匹配上,那能自动订阅上。
一个消费组消费 Partition,需要保存 Offset 记录消费到哪,以前保存在 ZK 中,由于 ZK 的写性能不好,以前的解决方法都是 Consumer 每隔一分钟上报一次。
这里 ZK 的性能严重影响了消费的速度,而且很容易出现重复消费。在 0.10 版本后,Kafka 把这个 Offset 的保存,从 ZK 总剥离,保存在一个名叫 consumeroffsets topic 的 Topic 中。
写进消息的 Key 由 Groupid、Topic、Partition 组成,Value 是偏移量 Offset。Topic 配置的清理策略是 Compact。总是保留最新的 Key,其余删掉。
一般情况下,每个 Key 的 Offset 都是缓存在内存中,查询的时候不用遍历 Partition,如果没有缓存,第一次就会遍历 Partition 建立缓存,然后查询返回。
确定 Consumer Group 位移信息写入 consumers_offsets 的哪个 Partition,具体计算公式:
__consumers_offsets partition =
Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
//groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。
生产过程中 Broker 要分配 Partition,消费过程这里,也要分配 Partition 给消费者。
类似 Broker 中选了一个 Controller 出来,消费也要从 Broker 中选一个 Coordinator,用于分配 Partition。
看 Offset 保存在那个 Partition;该 Partition Leader 所在的 Broker 就是被选定的 Coordinator。
这里我们可以看到,Consumer Group 的 Coordinator,和保存 Consumer Group Offset 的 Partition Leader 是同一台机器。
把 Coordinator 选出来之后,就是要分配了。
整个流程是这样的:
当 Partition 或者消费者的数量发生变化时,都得进行 Reblance。
架构模型方面
kafka 遵从一般的 MQ 结构,producer,broker,consumer,以 consumer 为中心,消息的消费信息保存的客户端 consumer 上,
consumer 根据消费的点,从 broker 上批量 pull 数据;无消息确认机制。
RabbitMQ 遵循 AMQP 协议,RabbitMQ 的 broker 由 Exchange,Binding,queue 组成,
其中 exchange 和 binding 组成了消息的路由键(消息到哪个队列);
客户端 Producer 通过连接 channel 和 server 进行通信,Consumer 从 queue 获取消息进行消费
(长连接,queue 有消息会推送到 consumer 端,consumer 循环从输入流读取数据)
rabbitMQ 以 broker 为中心;有消息的确认机制(消费过后剔除队列)。
在吞吐量方面
kafka 具有高的吞吐量,内部采用消息的批量处理,zero-copy 机制,
数据的存储和获取是本地磁盘顺序批量操作(文件系统),具有 O(1)的复杂度,消息处理的效率很高。
rabbitMQ 在吞吐量方面稍逊于 kafka,rabbitMQ 支持对消息的可靠的传递,支持事务,不支持批量的操作;
在可用性方面
rabbitMQ 支持 miror 的 queue,主 queue 失效,miror queue 接管。
kafka 的 broker 支持主备模式(副本)。
在集群负载均衡方面
kafka 采用 zookeeper 对集群中的 broker、consumer 进行管理,可以注册 topic 到 zookeeper 上;
通过 zookeeper 的协调机制,producer 保存对应 topic 的 broker 信息,可以随机或者轮询发送到 broker 上;
rabbitMQ 的负载均衡需要单独的 loadbalancer 进行支持。
Offset,路由啥都替我们干了,用起来很简单。
Offset 啥的都是要我们自己记录。(注:消息消费的时候,首先要知道去哪消费,这就是路由,消费完之后,要记录消费单哪,就是 Offset)
因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此 Kafka 提供两种策略去删除旧数据。一是基于时间,二是基于 partition 文件大小。例如可以通过配置$KAFKA_HOME/config/server.properties,让 Kafka 删除一周前的数据,也可通过配置让 Kafka 在 partition 文件超过 1GB 时删除旧数据,如下所示。
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
# 可以删除的日志文件的最小年龄
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#日志段文件的最大大小。当达到这个大小时,将创建一个新的日志段。
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
# By default the log cleaner is disabled and the log retention policy will default to
#just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs
#can then be marked for log compaction.
log.cleaner.enable=false
Kafka 读取特定消息的时间复杂度为 O(1),即与文件大小无关,所以这里删除文件与 Kafka 性能无关,选择怎样的删除策略只与磁盘以及具体的需求有关