Kafka 是一个消息队列中间件,它可以结合 Flink、Spark、ELK、日志采集 等,构建强大的实时数据处理平台。

今天分享一下如何搭建一套zookeeper+Kafka的消息队列集群。虽然Kafka在新版本已经可以不通过zookeeper就可以实现高可用,但是企业中大多数还是zookeeper+Kafka这个黄金搭配。理论型的知识后续再展开讲解,先将集群部署起来。

过程比较详细,篇幅稍长,请耐心阅读!

1 基础环境准备

1.1 服务器规划

建议准备 3 台服务器,做一个高可用集群。

节点

IP

角色

操作系统

node1

10.0.0.190

Zookeeper + Kafka

RockyLinux 9.5

node2

10.0.0.191

Zookeeper + Kafka

RockyLinux 9.5

node3

10.0.0.192

Zookeeper + Kafka

RockyLinux 9.5

1.2 环境依赖

三台都需要操作

1.2.1 修改主机名

# 节点1
hostnamectl set-hostname node1

# 节点2
hostnamectl set-hostname node2

# 节点3 
hostnamectl set-hostname node3

1.2.2 安装 JDK 1.8+

下载jdk包:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html

也可以直接yum安装openjdk,更快速一点

# 将下载上传三个服务器后解压到/data盘
tar -xvf jdk-8u461-linux-x64.tar.gz  -C /data

配置环境变量

vi /etc/profile

在文件末尾添加下面内容:

export JAVA_HOME=/data/jdk1.8.0_461
export JAVA_BIN=$JAVA_HOME/bin
export CLASSPATH=:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_BIN

让配置生效

source  /etc/profile

1.2.3 关闭防火墙

测试环境可关闭,生产需开放端口。

systemctl stop firewalld
systemctl disable firewalld

1.2.4 配置主机名解析

cat >> /etc/hosts <<EOF
10.0.0.190 node1
10.0.0.191 node2
10.0.0.192 node3
EOF

1.2.5 关闭SELinux

# 永久关闭
sed -i 's/enforcing/disabled/' /etc/selinux/config  

# 临时
setenforce 0  

2 部署 Zookeeper 集群

Kafka 依赖 Zookeeper 管理集群状态,所以需要先安装 Zookeeper。可以先在一个节点下载,配置完复制过去就行

2.1 下载并解压(节点1操作)

# 下载,可以选择合适的版本下载
wget https://downloads.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz

# 解压
tar -xvf apache-zookeeper-3.8.4-bin.tar.gz -C /data

2.2 配置 zoo.cfg

# 进入配置文件目录
cd /data/apache-zookeeper-3.8.4-bin/conf

# 复制参考配置文件
cp zoo_sample.cfg zoo.cfg

# 修改配置文件,主要修改下面两项
dataDir=/data/zookeeper/data
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888

参数解析:

  • dataDir:存储快照和 myid 文件的路径。

  • server.X:集群成员配置,其中

    • 2888 端口:内部通信端口, 与 Leader 通信

    • 3888 端口:用于选举 Leader

2.3 拷贝到另外两个节点

scp -r /data/apache-zookeeper-3.8.4-bin root@10.0.0.191:/data
scp -r /data/apache-zookeeper-3.8.4-bin root@10.0.0.192:/data

2.4 配置节点 ID

# 每个节点新建文件夹
mkdir -p /data/zookeeper/data

# 节点1
echo 1 > /data/zookeeper/data/myid 

# 节点2
echo 2 > /data/zookeeper/data/myid

# 节点3
echo 3 > /data/zookeeper/data/myid

2.5 配置环境变量

将zookeeper加入环境变量,方便后期启动。

vi /etc/profile 

# 尾部添加
export ZOOKEEPER_HOME=/data/apache-zookeeper-3.8.4-bin
export PATH=$ZOOKEEPER_HOME/bin:$PATH

# 使配置文件生效
source /etc/profile

2.6 启动 Zookeeper

zkServer.sh start
zkServer.sh status

确认 3 台机器都正常启动,其中两台为fllower,一台为leader,则zookeeper集群安装成功。

3 部署 Kafka 集群

3.1 下载并解压

跟zookeeper一样,在节点1先下载,配置完复制到另外两个节点

# 下载安装包
wget https://downloads.apache.org/kafka/3.9.1/kafka_2.13-3.9.1.tgz

# 解压到/data目录
tar -xvf kafka_2.13-3.9.1.tgz -C /data

3.2 修改配置文件

进入到安装目录,编辑配置文件server.properties

cd /data/kafka_2.13-3.9.1/config
vi server.properties

修改下面信息

broker.id=1                   # 不得重复,整个集群中唯一
listeners=PLAINTEXT://10.0.0.190:9092   # 监听端口
log.dirs=/data/kafka/logs
zookeeper.connect=node1:2181,node2:2181,node3:2181
num.partitions=3

参数解析:

  • broker.id:Kafka Broker 唯一 ID。node1=1, node2=2, node3=3。

  • listeners:服务监听地址和端口。需要换成真实的IP地址,每个节点填本机IP地址

  • log.dirs:存储 Kafka 消息日志的目录。建议挂载大容量磁盘。

  • num.partitions:Topic 默认分区数,决定并行消费能力。

  • zookeeper.connect:连接zookeeper。

3.3 配置拷贝到另外两个节点

scp -r /data/kafka_2.13-3.9.1 root@10.0.0.191:/data
scp -r /data/kafka_2.13-3.9.1 root@10.0.0.192:/data

注意:节点2和节点3需要修改broker.idlinsteners两个配置

新建日志目录(每个节点操作)

mkdir -p /data/kafka/logs

3.4 配置环境变量

将kafka加入环境变量,方便后期启动。三个节点都配置。

vi /etc/profile 

# 尾部添加
export KAFKA_HOME=/data/kafka_2.13-3.9.1
export PATH=$KAFKA_HOME/bin:$PATH

# 使配置文件生效
source /etc/profile

3.5 启动 Kafka

kafka-server-start.sh -daemon /data/kafka_2.13-3.9.1/config/server.properties

三台机器依次启动 Kafka。可以看到kafka进程已正常启动。

4 测试验证

4.1 创建 Topic

kafka-topics.sh --create --topic test \
--bootstrap-server node1:9092,node2:9092,node3:9092 \
--partitions 3 --replication-factor 3

参数解析:

  • --partitions 3:分区数,决定并发度。

  • --replication-factor 3:副本数,保证容错能力。

4.2 查看 Topic

kafka-topics.sh --list --bootstrap-server node1:9092

4.3 生产消息

kafka-console-producer.sh --broker-list node1:9092 --topic test
> hello lige
> hello kafka

4.4 消费消息

kafka-console-consumer.sh --bootstrap-server node2:9092 --topic test --from-beginning

至此,集群搭建完成。