Kafka高可用
一、服务端
1、搭建集群

-
Kafka 的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个 Broker 组成
-
这样如果集群中某一台机器宕机,其他机器上的 Broker 也依然能够对外提供服务。这其实就是 Kafka 提供高可用的手段之一
2、主从备份机制(Replication)

kafka 为了提高 partition 的可靠性而提供了副本的概念(Replica),通过副本机制来实现冗余备份。
每个分区可以有多个副本,并且在副本集合中会存在一个leader 的副本,所有的读写请求都是由 leader 副本来进行处理。
Kafka 定义了两类副本:
-
领导者副本(Leader Replica)
-
追随者副本(Follower Replica)
同步方式:ISR 主从同步备份机制
我们可以认为,副本集会存在一主多从的关系。一般情况下,同一个分区的多个副本会被均匀分配到集群中的不同 broker 上,当 leader 副本所在的 broker 出现故障后,可以重新选举新的 leader 副本继续对外提供服务。通过这样的副本机制来提高 kafka 集群的可用性。

所有与leader保持一定程度同步的副本(包括Leader)组成ISR(In-Sync Replicas)
ISR(in-sync replica)需要同步复制保存的follower
如果leader失效后,需要选出新的leader,选举的原则如下:
-
第一:选举时优先从ISR中选定,因为这个列表中follower的数据是与leader同步的
-
第二:如果ISR列表中的follower都不行了,就只能从其他follower中选取
极端情况,就是所有副本都失效了,这时有两种方案:
-
第一:等待ISR中的一个活过来,选为Leader,数据可靠,但活过来的时间不确定
-
第二:选择第一个活过来的Replication,不一定是ISR中的,选为leader,以最快速度恢复可用性,但数据不一定完整
3、顺序写磁盘
相对于随机写性能提高百倍以上
当broker接收到producer发送过来的消息时,需要根据消息的主题和分区信息,将该消息写入到该分区当前最后的segment文件中,文件的写入方式是追加写。
由于是对segment文件追加写,故实现了对磁盘文件的顺序写,避免磁盘随机写时的磁盘寻道的开销,同时由于是追加写,故写入速度与磁盘文件大小无关,具体如图

二、生产端
1、发送类型
-
发送并忘记
发送并忘记(不关心消息是否正常到达,对返回结果不做任何判断处理),效率高,无法保证消息可靠性
-
同步发送
使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功
RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
System.out.println(recordMetadata.offset());
- 异步发送
调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数。没有阻塞等待效果,效率更高。
//异步消息发送
producer.send(kvProducerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null){
System.out.println("记录异常信息到日志表中");
}
System.out.println(recordMetadata.offset());
}
});
2、acks确认应答机制(保证消息不丢失)
- acks(Acknowledgments)

- 配置文件配置方式:

- 参数的选择说明
| 确认机制 | 说明 |
|---|---|
| acks=0 | 生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快 |
| acks=1(默认值) | 只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应 |
| acks=all | 只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应 |
3、retries重试机制(保证消息不丢失)

生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms
- 配置文件配置方式:
- retries : 10 最多重试次数

4、消息压缩
默认情况下, 消息发送时不会被压缩。
配置文件修改压缩算法:

| 压缩算法 | 说明 |
|---|---|
| snappy | 占用较少的 CPU, 却能提供较好的性能和相当可观的压缩比, 如果看重性能和网络带宽,建议采用 |
| lz4 | 占用较少的 CPU, 压缩和解压缩速度较快,压缩比也很客观 |
| gzip | 占用较多的 CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法 |
使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。
5、批量发送
配置文件修改批量发送大小:

| 参数 | 说明 |
|---|---|
| batch-size | Kafka的生产者发送数据到服务器,不是来一条就发一条,而是经过缓冲的,默认批量发送数据依次16K |
| buffer-memory | 缓存池大小,默认32M |
三、消费者端
1、消费者组

-
消费者组(Consumer Group) :指的就是由一个或多个消费者组成的群体
-
一个发布在Topic上消息被分发给此消费者组中的一个消费者
-
所有的消费者都在一个组中,那么这就变成了queue模型
-
所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型
-
2、消费有序性
应用场景:
-
即时消息中的单对单聊天和群聊,保证发送方消息发送顺序与接收方的顺序一致
-
充值转账两个渠道在同一个时间进行余额变更,短信通知必须要有顺序

topic分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序。 所以,如果你想要顺序的处理Topic的所有消息,那就只提供一个分区。
3、消费者重复消费的产生原因

-
生产者如果网络抖动相关问题,可能重复发送消息
-
消费者宕机、重启或者被强行kill进程,导致消费者消费的offset没有提交
-
kafka消费端会每隔5秒自动提交消费偏移量(auto.commit.interval.ms)如果网络问题没来及提交,其他消费者会重复消费消息
-
kafka消费者会每隔10秒向服务端发送心跳(session.timeout.ms)表明还活着,否则服务端认为消费者离组会触发重平衡,重平衡rebalance也会造成消息重复消费
3.1 重平衡
kafka不会像其他JMS队列那样需要得到消费者的确认,消费者可以使用kafka来追踪消息在分区的位置(偏移量)
消费者会往一个叫做_consumer_offset的特殊主题发送消息,消息里包含了每个分区的偏移量。如果消费者发生崩溃或有新的消费者加入群组,就会触发重平衡

- 正常情况

如果消费者2挂掉以后,会发生再均衡,消费者2负责的分区会被其他消费者进行消费
重平衡后不可避免会出现一些问题
量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。
4、消费者消息重复如何解决
1、kafak可以手工处理偏移量(不推荐)
- 当enable.auto.commit被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去
2、从业务角度处理(实环情况不管采用任何消息中间件,重复消费都避免不了)
- 生产端: 发送的消息添加唯一id(雪花算法,或者纳秒时间戳) 【注意默认不使用messageID,需要显式配置】
- 消费端:
- 方案1 利用去重表去重判断
- 方案2 利用redis去重判断(Redis分布式锁)
SETNX key value
5、吞吐优化总结
1、增加分区数量
2、异步发送消息
3、消息压缩
4、批量发送