介绍对kafka 的理解 数据冗余 防丢失 容错性 高吞吐量 低延迟 业务解耦 扩展性
利用 正反向消息机制+定时任务 实现数据强一致性
一、 优点
解偶,冗余:规避数据丢失,灵活:限流消峰、有序、缓冲、异步、健壮
二、选型
性能第一:rabbitMQ/zeromq
吞吐第一:kafka,rocketMQ(topic)
数据可靠性:kafka replica/ha
二、原理
2.1 术语
名词: Broker,Topic,Partition,Segment,Offset
调度:Broker Controller,Partition Leader,Partition Follower,Consumer group
可靠性:Replizcas(LEO,HW),ISR(In-Sync Replicas),AR(Assigned Replicas),OSR(Outof-Sync Replicas)
2.2 为什么快
顺序写
顺序写磁盘效率比随机写内存还要高 index/log
- zero copy
减少不必要的内核缓冲区跟用户缓冲区间的拷贝,从而减少CPU的开销和内核态切换开销,达到性能的提升。
传统io:
zerocopy:![传统io](kafka-src/io.jpg)
![zerocopy](kafka-src/zero.jpg)
- zero copy
log
sendfile(int out_fd, int in_fd, off_t *offset, size_t count)
//文件跟文件,文件跟socket
索引
mmap
| –topic1-0
| –00000000000000000000.index
| –00000000000000000000.log
| –00000000000000368769.index
| –00000000000000368769.log
- 分区
2.3 为什么可靠
producer acks
如果一个follower挂掉、卡住或者同步太慢,leader会把这个follower从ISR列表中删除。
Replica (机架感知
2.4 如何实现幂等性
- 为什么需要保证幂等性:
kafka需要要确保Exactly-Once语义。在Kafka Broker确认Ack时,出现网络异常、FullGC、OOM等问题时导致Ack超时,Producer会进行重复发送。 - 实现原理:
Producer初始化时,会被分配一个唯一的ProducerID,对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。
在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber发送给Broker,而之前Broker缓存过之前发送的相同的消息,那么在消息流中的消息就只有一条,不会出现重复发送的情况
2.5 如何实现事物
TransactionalID:
用户只需要在 Producer 的配置中配置 transactional.id,通过 initTransactions() 初始化事务状态信息,再通过 beginTransaction() 标识一个事务的开始,然后通过 commitTransaction() 或 abortTransaction() 对事务进行 commit 或 abort。saram实现已提上日程issues:
https://github.com/beatlabs/patron/issues/222confluence关于事物实现的解释https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
三、调优
吞吐量、延时、持久性、可用性 。每一个方向的优化思路都是不同的,甚至是相反。
**producer **
- batch.size:每个Batch要存放多少数据就可以发送出去了
- linger.ms :Batch被创建之后,最多过多久,不管这个Batch有没有写满
- compression.type :压缩类型 gzip 、lz
- request.required.acks
1(默认):这意味着producer在ISR中的leader已成功收到数据并得到确认。如果leader宕机了,则会丢失数据。
0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
-1:producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当ISR中只有leader时(前面ISR那一节讲到,ISR中的成员由于某些情况会增加也会减少,最少就只剩一个leader),这样就变成了acks=1的情况。min.insync.replicas
**Consumer **
- fetch.min.bytes :每次fetch请求时,server应该返回的最小字节数。
- auto.commit.enable:consumer所fetch的消息的offset将会自动的同步到zookeeper
- num.replica.fetchers:拉取线程数
Broker
- log.flush.interval.ms/ log.flush.interval.messages刷盘策略
- num.network.threads: broker处理消息的最大线程数
- num.io.threads:broker处理磁盘IO的线程数
- jvm :Xmx,Xms
避坑
1、 重复消费问题(kafka 只能保证At least once,不能保证Exactly once)
- 同一个 Consumer 重复消费:消费超时引发重复消费
提交 Offset 时候,消费时间超时,则 Broker 认为这条消息未消费成功。这时就会产生重复消费问题
不同的 Consumer 重复消费:当 Consumer 消费了消息,但还没有提交 Offset 时宕机,则这些已经被消费过的消息会被重复消费。
msg-uuid/设置offset提交时间/consumer 手动维护offset。
2、消费者不关闭会话带来的一系列问题
新消费者无法介入
3、解题思路
充分利用命令行工具
kafka-console-producer.sh
kafka-console-consumer.sh
kafka-topics.sh –topic –describe
kafka-consumer-groups.sh –group –describe
kafka-run-class.sh kafka.tools.ConsumerOffsetChecker –group –topic
kafka-producer-perf-test.sh –num-records 消息数 –record-size 每个记录的字节数 –throughput 每秒钟发送的记录数
SDK 对比
Kafka-go | Sarama | Confluent | |
---|---|---|---|
接入成本 | 低 | 低 | 高 |
性能-1w以下 | 低 | 高 | 高 |
性能-1w以上 | 中 | 中 | 高 |
性能-批量 | 高 | 中 | 中 |
灵活性 | 高 | 低 | 中 |
事物 | 否 | 否 | 是 |
ref: https://www.jianshu.com/p/8c6b056f73ce
https://mp.weixin.qq.com/s/9bYe095WzyRLZ3_B5PveZQ