Kafka入门
AI-摘要
Tianli GPT
AI初始化中...
介绍自己
生成本文简介
推荐相关文章
前往主页
前往tianli博客
1、kafka概述
- 消息中间件对比
| 特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
|---|---|---|---|---|
| 开发语言 | java | erlang | java | scala |
| 单机吞吐量 | 万级 | 万级 | 10万级 | 100万级 |
| 时效性 | ms | us | ms | ms级以内 |
| 可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 非常高(分布式) |
| 功能特性 | 成熟的产品、较全的文档、各种协议支持好 | 并发能力强、性能好、延迟低 | MQ功能比较完善,扩展性佳 | 只支持主要的MQ功能,主要应用于大数据领域 |
分区
不是随机写 (追加)
分段设计
高可用设计
- 消息中间件对比-选择建议
| 消息中间件 | 建议 |
|---|---|
| Kafka | 追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务 |
| RocketMQ | 可靠性要求很高的金融互联网领域,稳定性高,经历了多次阿里双11考验 |
| RabbitMQ | 性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的RabbitMQ |
kafka介绍
Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。kafka官网:http://kafka.apache.org/

名词解释

-
producer: 发布消息的对象称之为主题生产者(Kafka topic producer)
-
topic: Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
-
consumer: 订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
-
broker: 已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
2、Kafka入门使用
- 引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--kafka客户端依赖-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
</dependencies>
- 修改application.yml配置文件
server:
port: 9991
spring:
application:
name: kafka-demo
kafka:
bootstrap-servers: 101.42.152.102:9092
# 生产者配置
producer:
# 配置消息的序列化类型为String类型
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消费者配置
consumer:
group-id: ${spring.application.name}-test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- 生产者代码
/**
* @Author :leaflei
* @Version: 1.0
* @Description :kafka生产者
*/
@RestController
public class ProducerController {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 生产消息:key+value
*/
@GetMapping("/send/{key}/{value}")
public String send1(@PathVariable("key") String key, @PathVariable("value") String value){
// 发送消息,key+value
kafkaTemplate.send("testTopic1",key, value);
return "ok";
}
/**
* 生产消息:value
*/
@GetMapping("/send/{value}")
public String send2( @PathVariable("value") String value){
// 发送消息,value
kafkaTemplate.send("testTopic2", value);
return "ok";
}
}
- 消费者代码
/**
* @Author :leaflei
* @Version: 1.0
* @Description :kafka消费者
*/
@Component
public class ConsumerListener {
/**
* 消费者监听 key+value
*
* @param consumerRecord 监听到的消息
*/
@KafkaListener(topics = "testTopic1")
public void receiveMsg1(ConsumerRecord<String, String> consumerRecord) {
// 使用java.util包下的Optional.ofNullable()方法,判断是否为空,并包装为一个Optional对象
Optional<ConsumerRecord<String, String>> optional = Optional.ofNullable(consumerRecord);
// ifPresent()方法:取出一个非空的值(consumerRecord对象)
optional.ifPresent(x -> System.out.println(x.key() + "==>" + x.value()));
}
/**
* 消费者监听 value
*
* @param consumerRecord 监听到的消息
*/
@KafkaListener(topics = "testTopic2")
public void receiveMsg2(ConsumerRecord<String, String> consumerRecord) {
Optional<ConsumerRecord<String, String>> optional = Optional.ofNullable(consumerRecord);
optional.ifPresent(x -> System.out.println(x.value()));
}
}
3、消费分组(多消费者)
- 同一个消费组下的消费者,只能有一个消费者收到消息(一对一)
# 配置多个消费者在相同的消费组内
# 消费者1
consumer:
#在消费组test1
group-id: ${spring.application.name}-test1
...
# 消费者2
consumer:
#在消费组test1
group-id: ${spring.application.name}-test1
...
# 消费者3
consumer:
#在消费组test1
group-id: ${spring.application.name}-test1
...
- 不同消费组下消费者,每个组内起码一个消费者能收到消息(1对多,广播效果)
# 配置多个消费者在不同的消费组
consumer:
# 配置消费者1所在群组 tes1
group-id: ${spring.application.name}-test1
...
consumer:
# 配置消费者2所在群组 test2
group-id: ${spring.application.name}-test2
...
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 leaflei
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果