Kafka环境搭建配置及示例demo
环境搭建
服务器环境:centos7
JDK:1.8
如果没有java环境使用以下命令安装:
yum install java-1.8.0-openjdk.x86_64
安装成功校验
java -version openjdk version "1.8.0_242" OpenJDK Runtime Environment (build 1.8.0_242-b08) OpenJDK 64-Bit Server VM (build 25.242-b08, mixed mode)
安装Kafka
接下来安装Kafka,通过wget方式下载tar包到服务器上,下载地址到kafka官网下载页面
这里需要kafka版本需要与spring boot 和spring-kafka版本一一对应,不然可能无法启动,具体参考图:
来源:https://spring.io/projects/spring-kafka
cd /opt wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz tar zxvf kafka_2.12-2.8.1.tgz mv kafka_2.12-2.8.1 kafka
配置Kafka
cd /opt/kafka vi config/server.properties
修改或新增部分配置文件内容
listeners=PLAINTEXT://内网:9092 advertised.host.name=外网ip或者域名 advertised.listeners=PLAINTEXT://外网ip或者域名:9092
内网ip查看:
ifconfig
验证Kafka
启动zookeeper
cd /opt/kafka bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
启动kafka
bin/kafka-server-start.sh config/server.properties
创建topic(3.x版本不适用此命令)
使用kafka-topics.sh 创建单分区单副本的topic test:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
后台运行Kafka
bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
整合SpringBoot
maven安装依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
配置文件
###########【Kafka集群】########### spring.kafka.bootstrap-servers=10.0.0.1:9092,10.0.0.2:9092 ###########【初始化生产者配置】########### # 重试次数 spring.kafka.producer.retries=0 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) spring.kafka.producer.acks=1 # 批量大小 spring.kafka.producer.batch-size=16384 # 提交延时 spring.kafka.producer.properties.linger.ms=0 # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka # linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了 # 生产端缓冲区大小 spring.kafka.producer.buffer-memory=33554432 # Kafka提供的序列化和反序列化类 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 自定义分区器 # spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner ###########【初始化消费者配置】########### # 默认的消费组ID spring.kafka.consumer.properties.group.id=defaultConsumerGroup # 是否自动提交offset spring.kafka.consumer.enable-auto-commit=true # 提交offset延时(接收到消息后多久提交offset) spring.kafka.consumer.auto.commit.interval.ms=1000 # 当kafka中没有初始offset或offset超出范围时将自动重置offset # earliest:重置为分区中最小的offset; # latest:重置为分区中最新的offset(消费分区中新产生的数据); # none:只要有一个分区不存在已提交的offset,就抛出异常; spring.kafka.consumer.auto-offset-reset=latest # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作) spring.kafka.consumer.properties.session.timeout.ms=120000 # 消费请求超时时间 spring.kafka.consumer.properties.request.timeout.ms=180000 # Kafka提供的序列化和反序列化类 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 消费端监听的topic不存在时,项目启动会报错(关掉) spring.kafka.listener.missing-topics-fatal=false
生产者:
@Component @Slf4j public class TestProducer { @Autowired private KafkaTemplate kafkaTemplate; @Value("${kafka.topic.test}") private String topic; /** * 生产者简单发送消息 * * @param msg */ public void send(String msg) { System.out.println("发送信息内容:" + msg); kafkaTemplate.send(topic, msg); } /** * 回调发送信息 是否成功 * * @param msg */ public void sendCallbackMessage(String msg) { System.out.println("发送信息内容:" + msg); ListenableFuture future = kafkaTemplate.send(topic, msg); future.addCallback(o -> log.info("kafka消息发送成功:" + msg), throwable -> log.error("kafka消息发送失败:" + msg)); } }
消费者:
@Component @Slf4j public class TestConsumer { @KafkaListener(topics = "${kafka.topic.test}") public void listen(ConsumerRecord<?, ?> record) { log.info("消费者接收到消息,topic={}, offset={}, message={}", record.topic(), record.offset(), record.value()); } }
发送消息:
@Override public void testKafka() { testProducer.send("发送一条普通测试消息"); }
评论