一、服务端

1、搭建集群

1-Kafka高可用.png

  • Kafka 的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个 Broker 组成

  • 这样如果集群中某一台机器宕机,其他机器上的 Broker 也依然能够对外提供服务。这其实就是 Kafka 提供高可用的手段之一

2、主从备份机制(Replication)

2-主从备份机制.png

kafka 为了提高 partition 的可靠性而提供了副本的概念(Replica),通过副本机制来实现冗余备份。

每个分区可以有多个副本,并且在副本集合中会存在一个leader 的副本,所有的读写请求都是由 leader 副本来进行处理。

Kafka 定义了两类副本:

  • 领导者副本(Leader Replica)

  • 追随者副本(Follower Replica)

同步方式:ISR 主从同步备份机制

我们可以认为,副本集会存在一主多从的关系。一般情况下,同一个分区的多个副本会被均匀分配到集群中的不同 broker 上,当 leader 副本所在的 broker 出现故障后,可以重新选举新的 leader 副本继续对外提供服务。通过这样的副本机制来提高 kafka 集群的可用性。

3-主从备份机制.png

所有与leader保持一定程度同步的副本(包括Leader)组成ISR(In-Sync Replicas)

ISR(in-sync replica)需要同步复制保存的follower

如果leader失效后,需要选出新的leader,选举的原则如下:

  • 第一:选举时优先从ISR中选定,因为这个列表中follower的数据是与leader同步的

  • 第二:如果ISR列表中的follower都不行了,就只能从其他follower中选取

极端情况,就是所有副本都失效了,这时有两种方案:

  • 第一:等待ISR中的一个活过来,选为Leader,数据可靠,但活过来的时间不确定

  • 第二:选择第一个活过来的Replication,不一定是ISR中的,选为leader,以最快速度恢复可用性,但数据不一定完整

3、顺序写磁盘

相对于随机写性能提高百倍以上

当broker接收到producer发送过来的消息时,需要根据消息的主题和分区信息,将该消息写入到该分区当前最后的segment文件中,文件的写入方式是追加写。

由于是对segment文件追加写,故实现了对磁盘文件的顺序写,避免磁盘随机写时的磁盘寻道的开销,同时由于是追加写,故写入速度与磁盘文件大小无关,具体如图

4-顺序写磁盘.png

二、生产端

1、发送类型

  • 发送并忘记

    发送并忘记(不关心消息是否正常到达,对返回结果不做任何判断处理),效率高,无法保证消息可靠性

  • 同步发送

    使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功

RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
System.out.println(recordMetadata.offset());
  • 异步发送

调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数。没有阻塞等待效果,效率更高。

//异步消息发送
producer.send(kvProducerRecord, new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if(e != null){
            System.out.println("记录异常信息到日志表中");
        }
        System.out.println(recordMetadata.offset());
    }
});

2、acks确认应答机制(保证消息不丢失)

  • acks(Acknowledgments)

5-Kafka高可用.png

  • 配置文件配置方式:

6-acks配置.png

  • 参数的选择说明
确认机制说明
acks=0生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快
acks=1(默认值)只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应
acks=all只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应

3、retries重试机制(保证消息不丢失)

7-Kafka高可用.png
生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms

  • 配置文件配置方式:
  • retries : 10 最多重试次数

8-重试机制.png

4、消息压缩

默认情况下, 消息发送时不会被压缩。

配置文件修改压缩算法:

9-消息压缩.png

压缩算法说明
snappy占用较少的 CPU, 却能提供较好的性能和相当可观的压缩比, 如果看重性能和网络带宽,建议采用
lz4占用较少的 CPU, 压缩和解压缩速度较快,压缩比也很客观
gzip占用较多的 CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法

使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。

5、批量发送

配置文件修改批量发送大小:

10批量发送.png

参数说明
batch-sizeKafka的生产者发送数据到服务器,不是来一条就发一条,而是经过缓冲的,默认批量发送数据依次16K
buffer-memory缓存池大小,默认32M

三、消费者端

1、消费者组

11-消费者组.png

  • 消费者组(Consumer Group) :指的就是由一个或多个消费者组成的群体

  • 一个发布在Topic上消息被分发给此消费者组中的一个消费者

    • 所有的消费者都在一个组中,那么这就变成了queue模型

    • 所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型

2、消费有序性

应用场景:

  • 即时消息中的单对单聊天和群聊,保证发送方消息发送顺序与接收方的顺序一致

  • 充值转账两个渠道在同一个时间进行余额变更,短信通知必须要有顺序

12-消费有序性.png

topic分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序。 所以,如果你想要顺序的处理Topic的所有消息,那就只提供一个分区。

3、消费者重复消费的产生原因

13-重复消费.png

  • 生产者如果网络抖动相关问题,可能重复发送消息

  • 消费者宕机、重启或者被强行kill进程,导致消费者消费的offset没有提交

  • kafka消费端会每隔5秒自动提交消费偏移量(auto.commit.interval.ms)如果网络问题没来及提交,其他消费者会重复消费消息

  • kafka消费者会每隔10秒向服务端发送心跳(session.timeout.ms)表明还活着,否则服务端认为消费者离组会触发重平衡,重平衡rebalance也会造成消息重复消费

3.1 重平衡

kafka不会像其他JMS队列那样需要得到消费者的确认,消费者可以使用kafka来追踪消息在分区的位置(偏移量)

消费者会往一个叫做_consumer_offset的特殊主题发送消息,消息里包含了每个分区的偏移量。如果消费者发生崩溃或有新的消费者加入群组,就会触发重平衡

14-重平衡.png

  • 正常情况

15-正常情况.png

如果消费者2挂掉以后,会发生再均衡,消费者2负责的分区会被其他消费者进行消费

重平衡后不可避免会出现一些问题

量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。

4、消费者消息重复如何解决

1、kafak可以手工处理偏移量(不推荐)

  • 当enable.auto.commit被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去

2、从业务角度处理(实环情况不管采用任何消息中间件,重复消费都避免不了)

  • 生产端: 发送的消息添加唯一id(雪花算法,或者纳秒时间戳) 【注意默认不使用messageID,需要显式配置】
  • 消费端:
    • 方案1 利用去重表去重判断
    • 方案2 利用redis去重判断(Redis分布式锁) SETNX key value

5、吞吐优化总结

1、增加分区数量

2、异步发送消息

3、消息压缩

4、批量发送