Kafka环境搭建配置及示例demo

流氓凡 资源分享 2023-01-18 127 0

环境搭建

服务器环境:centos7

JDK:1.8


如果没有java环境使用以下命令安装:

yum install java-1.8.0-openjdk.x86_64

安装成功校验

java -version
openjdk version "1.8.0_242"
OpenJDK Runtime Environment (build 1.8.0_242-b08)
OpenJDK 64-Bit Server VM (build 25.242-b08, mixed mode)


安装Kafka

接下来安装Kafka,通过wget方式下载tar包到服务器上,下载地址到kafka官网下载页面

这里需要kafka版本需要与spring boot 和spring-kafka版本一一对应,不然可能无法启动,具体参考图:

image.png

来源:https://spring.io/projects/spring-kafka

cd /opt
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
tar zxvf kafka_2.12-2.8.1.tgz
mv kafka_2.12-2.8.1 kafka

配置Kafka

cd /opt/kafka
vi config/server.properties

修改或新增部分配置文件内容

listeners=PLAINTEXT://内网:9092          
advertised.host.name=外网ip或者域名
advertised.listeners=PLAINTEXT://外网ip或者域名:9092

内网ip查看:

ifconfig

在这里插入图片描述


验证Kafka

启动zookeeper

cd /opt/kafka
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

启动kafka

bin/kafka-server-start.sh config/server.properties

创建topic(3.x版本不适用此命令)

使用kafka-topics.sh 创建单分区单副本的topic test:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

后台运行Kafka

bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

整合SpringBoot

maven安装依赖

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

配置文件

###########【Kafka集群】###########
spring.kafka.bootstrap-servers=10.0.0.1:9092,10.0.0.2:9092
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory=33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false

生产者:

@Component
@Slf4j
public class TestProducer {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Value("${kafka.topic.test}")
    private String topic;


    /**
     * 生产者简单发送消息
     *
     * @param msg
     */
    public void send(String msg) {
        System.out.println("发送信息内容:" + msg);
        kafkaTemplate.send(topic, msg);
    }

    /**
     * 回调发送信息 是否成功
     *
     * @param msg
     */
    public void sendCallbackMessage(String msg) {
        System.out.println("发送信息内容:" + msg);
        ListenableFuture future = kafkaTemplate.send(topic, msg);
        future.addCallback(o -> log.info("kafka消息发送成功:" + msg), throwable -> log.error("kafka消息发送失败:" + msg));
    }


}

消费者:

@Component
@Slf4j
public class TestConsumer {

    @KafkaListener(topics = "${kafka.topic.test}")
    public void listen(ConsumerRecord<?, ?> record) {
        log.info("消费者接收到消息,topic={}, offset={}, message={}", record.topic(), record.offset(), record.value());
    }

}


发送消息:

@Override
    public void testKafka() {
        testProducer.send("发送一条普通测试消息");
    }



评论