# 手动部署Kafka

# 安装前准备

手动部署前先关闭selinux,如未操作可能导致systemd启动失败

# 执行如下命令永久关闭SELINUX
sudo sed -i 's/^SELINUX=.*/SELINUX=disabled/' /etc/selinux/config
# 重启生效
sudo reboot


# 临时禁用(免重启),临时验证关闭效果可用此法,如果确认关闭有效,务必永久关闭
sudo setenforce 0

资源下载,Kafka官网 https://kafka.apache.org/downloads

或者从云盘安装工具下载 https://pan.baidu.com/s/1_3IPQVkYb3X4dX35aVDDxg?pwd=2024

工具不支持Kafka4.x,推荐下载Scala 2.13 - kafka_2.13-3.9.1.tgz或更高版本的3.9.x包。

# 一、单机部署

# 单机安装jdk和解压软件

Kafka依赖Java JDK,安装前通过Kafka官网文档找对应的Java Version兼容情况。以下是Kafka3.9.x的支持情况说明:

Java 8, Java 11, and Java 17 are supported.
Note that Java 8 support project-wide has been deprecated since Apache Kafka 3.0 and Java 11 support for the broker and tools has been deprecated since Apache Kafka 3.7. Both will be removed in Apache Kafka 4.0.

Java 11 and later versions perform significantly better if TLS is enabled, so they are highly recommended (they also include a number of other performance improvements: G1GC, CRC32C, Compact Strings, Thread-Local Handshakes and more).
# 检查当前系统是否安装jdk
java -version
# 如没有jdk,则安装openjdk,安装后通过java -version查看到jdk版本则本步OK
dnf install -y java-11-openjdk

# 上传提前下载的tgz文件到服务器,在tgz所在目录下执行如下命令
mkdir -p /data/apps && tar -xf kafka_2.13-3.9.1.tgz -C /data/apps
# 重命名解压后的文件为kafka,方便维护
mv /data/apps/kafka_2.13-3.9.1 /data/apps/kafka

# 执行整段代码,将kafka设置到环境变量
cat >> /etc/profile << 'EOF'
export KAFKA_HOME=/data/apps/kafka
export PATH=$KAFKA_HOME/bin:$PATH
EOF

# 重载环境变量配置以便生效
source /etc/profile

# 执行命令检查环境变量是否生效,如输出结果则说明OK
kafka-topics.sh --version
# 创建 zk 和 ka 的数据存储目录
mkdir -p /data/{zkData,kaData}

# 单机调整参数 server.properties

vim /data/apps/kafka/config/server.properties

server.properties默认只需要调整log.dirs=参数,以及新增auto.create.topics.enable=true,其余配置根据客户实际情况后续调整:

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
# ***需要修改日志目录到数据盘
log.dirs=/data/kaData/logs

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# ***需要开启topic自动create
auto.create.topics.enable=true

# 单机服务管理-kafka

命令启停kafka:

# 由于前面已将kafka放入环境变量,故可以在任何目录执行sh可执行程序。如执行可执行程序无效,则先cd /data/apps/kafka/bin/再操作。
# kafka启动方法一:前台启动,可以直接看日志,如果ctrl+c会导致kafka自动退出
kafka-server-start.sh /data/apps/kafka/config/server.properties
# kafka启动方法二:守护进程启动(推荐),ctrl+c不会退出
kafka-server-start.sh -daemon /data/apps/kafka/config/server.properties

# 关闭kafka
kafka-server-stop.sh

将kafka做成systemd服务自启动,运行如下代码片段注册kafka.service:

cat > /etc/systemd/system/kafka.service << 'EOF'
[Unit]
Description=Kafka
After=network.target

[Service]
Type=simple
ExecStart=/data/apps/kafka/bin/kafka-server-start.sh /data/apps/kafka/config/server.properties
ExecStop=/data/apps/kafka/bin/kafka-server-stop.sh
Restart=always
# 默认root用户启动
# User=kafka
# Group=kafka
# Environment=JAVA_HOME=/data/apps/jdk
Environment=KAFKA_HOME=/data/apps/kafka

[Install]
WantedBy=multi-user.target
EOF

执行如下命令重新加载systemd服务,并设置启动kafka:

systemctl daemon-reload
systemctl enable kafka
systemctl start kafka
systemctl status kafka

# 单机Java客户端接入

单机环境下,Kafka启动后,默认暴露9002端口,Java客户端连接 kafka服务IP:9002即可。

注意kafka服务器开放端口的访问权限,如需调整kafka/config/server.properties配置文件。

Kafka JVM内存调优,修改kafka/bin/kafka-server-start.sh

# 单机测试验证

# 创建一个消息主题进行验证
kafka-topics.sh --create --bootstrap-server 172.20.20.66:9092 --replication-factor 1 --partitions 1 --topic test-cop-io
# 列出主题检查是否创建成功
kafka-topics.sh --list --bootstrap-server 172.20.20.66:9092
# 清理刚刚创建的测试主题
kafka-topics.sh --delete --bootstrap-server 172.20.20.66:9092 --topic test-cop-io

# 参数说明:
# --topic:指定 topic 名称
# --partitions:指定分区数
# --replication-factor:用来设置主题得副本数,每个主题可以有多个副本

kafka-topics.sh --bootstrap-server 192.168.100.151:9092 192.168.100.152:9092 192.168.100.153:9092 --describe --topic seeyon-test

kafkactl 工具:kafkactl 是一个轻量级的 Kafka 命令行工具,功能类似于 kafka-topics.sh 或 kafka-console-consumer.sh,但更简洁、易于使用。

产品不依赖此工具,运维调试时可以参考使用此工具。安装配置步骤如下:

# kafkactl 工具
# kafkactl下载地址 https://github.com/deviceinsight/kafkactl/releases
# 部署博客参考资料 https://lihuimintu.github.io/2021/04/26/kafkactl/

# 新建config.yml文件,并按实际情况调整连接kafka服务的配置
vi /root/.config/kafkactl/config.yml
contexts:
  # default context without SASL
  remote-cluster:
    brokers:
      - 192.168.1.10:9092
    requestTimeout: 15s
    kafkaversion: 2.8.1
    # optional: tls config
    tls:
      enabled: false
      ca: my-ca
      cert: my-cert
      certKey: my-key
      # set insecure to true to ignore all tls verification (defaults to false)
      insecure: true

    # optional: sasl support
    sasl:
      enabled: true
      username: xx
      password: xxx
      # optional configure sasl mechanism as plaintext, scram-sha256, scram-sha512 (defaults to plaintext)
      mechanism: plaintext

current-context: remote-cluster

kafkactl调试命令参考:

# 切换配置
kafkactl config use-context dev
# 查看当前使用的配置
kafkactl config get-contexts
# 查看配置
 kafkactl config view
 
 # 创建主题
 kafkactl create topic my-topic --partitions=2
 # 获取主题
 kafkactl get topics
 # 查看主题详情
 kafkactl describe topic my-topic
 # 删除主题
 kafkactl delete topic my-topic
 
 # 生产数据
 echo "key##value1" | kafkactl produce my-topic --separator=##
 kafkactl produce my-topic --key=my-key --value=my-value
 # 消费数据
 kafkactl consume my-topic --from-beginning --print-keys -o yaml
 
 # 重置消费组
 kafkactl reset consumer-group-offset my-group --topic my-topic --oldest --execute
 # 获取消费组
 kafkactl get cg 
 # 查看具体的消费组
 kafkactl describe consumer-group my-group
 # 删除消费组
 kafkactl delete consumer-group my-group
 
 # 修改分区
 # set brokers 102,103 as replicas for partition 3 of topic my-topic
kafkactl alter topic my-topic 3 -r 102,103

# 获取broker
kafkactl get brokers
kafkactl describe broker 1

# 单机调整参数 zookeeper.properties(非必须)

Kafka高版本无需zk,本章节仅供低版本参考。

vim /data/apps/kafka/config/zookeeper.properties

zookeeper.properties默认只需要调整dataDir=参数:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
# ***需要修改zk数据存储到数据盘
dataDir=/data/zkData
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080

# 单机服务管理-zookeeper(非必须)

Kafka高版本无需zk,本章节仅供低版本参考。

Kafka3.9.x内置了zookeeper,可以直接命令行启停:

# 由于前面已将kafka放入环境变量,故可以在任何目录执行sh可执行程序。如执行可执行程序无效,则先cd /data/apps/kafka/bin/再操作。
# zk启动方法一:前台启动,可以直接看日志,如果ctrl+c会导致kafka自动退出
zookeeper-server-start.sh /data/apps/kafka/config/zookeeper.properties
# zk启动方法二:守护进程启动(推荐),ctrl+c不会退出
zookeeper-server-start.sh -daemon /data/apps/kafka/config/zookeeper.properties

# zk关闭服务
zookeeper-server-stop.sh

将zookeeper做成systemd服务自启动,运行如下代码片段注册zookeeper.service:

cat > /etc/systemd/system/zookeeper.service << 'EOF'
[Unit]
Description=ZooKeeper Service
After=network.target
After=syslog.target

[Service]
Type=forking
# Environment=JAVA_HOME=/data/apps/jdk
# 默认root用户启动
# User=zookeeper
# Group=zookeeper
ExecStart=/data/apps/kafka/bin/zookeeper-server-start.sh -daemon /data/apps/kafka/config/zookeeper.properties
ExecStop=/data/apps/kafka/bin/zookeeper-server-stop.sh

# 如果 kafka 或者 zookeeper 使用了远程文件系统,可以多加一个 Requires 选项
# Requires=network.target remote-fs.target
# After=network.target remote-fs.target

[Install]
WantedBy=default.target
EOF

执行如下命令重新加载systemd服务,并设置启动zookeeper:

systemctl daemon-reload
systemctl enable zookeeper
systemctl start zookeeper
systemctl status zookeeper

# 二、集群部署

https://zookeeper.apache.org/ https://zookeeper.apache.org/doc/r3.8.3/zookeeperStarted.html https://kafka.apache.org/ https://kafka.apache.org/documentation/#quickstart

# 集群系统调优

# 主机名映射 - 这个很重要 否则集群节点间的连接会失败
cat >> /etc/hosts << 'EOF'
192.168.100.163 v8-kafka-1
192.168.100.164 v8-kafka-2
192.168.100.165 v8-kafka-3
EOF

# 按需安装
dnf install -y lsof nc

# 集群安装JDK

# 安装系统自带的 openjdk 或 自己安装需要的jdk版本
dnf install -y java-11-openjdk

# 集群安装kafka

mkdir -p /apps && tar -xf kafka_2.13-3.5.1.tgz -C /apps
mv /apps/kafka_2.13-3.5.1 /apps/kafka

cat >> /etc/profile << 'EOF'
export KAFKA_HOME=/apps/kafka
export PATH=$KAFKA_HOME/bin:$PATH
EOF

source /etc/profile

# 创建 zk 和 ka 的数据存储目录(可挂载 NFS 共享文件夹到这个目录下)
mkdir -p /data/{zkData,kaData}

# 集群配置内置zookeeper

# 手动创建 myid
# myid 文件中的内容 = 配置文件的 server.X 的 X
# 节点1
echo '1' > /data/zkData/myid
# 节点2
echo '2' > /data/zkData/myid
# 节点3
echo '3' > /data/zkData/myid
# 在手动创建myid文件时,需要注意以下要求:
# 1、myid文件必须在ZooKeeper的dataDir目录下创建,且文件名必须为myid。
# 2、myid文件中的内容必须是该服务器节点的唯一ID,这个ID必须是1到255之间的整数。
# 3、在集群中,每个节点的myid文件必须配置不同的数字,否则会导致节点无法启动或者出现其他问题。
# 4、myid文件的权限必须设置为可读写,否则会导致节点无法启动。
# 这些要求是为了确保每个ZooKeeper服务器节点的myid都是唯一的,并且在集群中正确地标识该节点。在创建myid文件时,需要注意这些要求,并按照规定的格式进行配置,以确保ZooKeeper集群的正常运行。

# 调整配置 - zookeeper.properties
sed -i 's?^dataDir=/tmp/zookeeper?dataDir=/data/zkData?' /apps/kafka/config/zookeeper.properties
# sed -i 's?^clientPort=2181?clientPort=2181?' /apps/kafka/config/zookeeper.properties
# sed -i 's?^admin.enableServer=false?admin.enableServer=true?' /apps/kafka/config/zookeeper.properties
# sed -i 's?^# admin.serverPort=8080?admin.serverPort=8080?' /apps/kafka/config/zookeeper.properties
# 添加集群节点 有多少个添加多少个 注意按需修改
# 注意:三个server,后面创建的文件myid里面写的X就是server.X=ip:2888:3888 中ip所对应的X
# server.X=A:B:C
# X-代表服务器编号
# A-代表ip
# B和C-代表端口,这个端口用来系统之间通信
cat >> /apps/kafka/config/zookeeper.properties << 'EOF'
server.1=192.168.100.163:2888:3888
server.2=192.168.100.164:2888:3888
server.3=192.168.100.165:2888:3888
tickTime=2000
initLimit=5
syncLimit=2
EOF

# 或手动编辑文件
vim /apps/kafka/config/zookeeper.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/data/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
server.1=192.168.100.163:2888:3888
server.2=192.168.100.164:2888:3888
server.3=192.168.100.165:2888:3888
tickTime=2000
initLimit=5
syncLimit=2
# 启动
# zookeeper-server-start.sh /apps/kafka/config/zookeeper.properties
zookeeper-server-start.sh -daemon /apps/kafka/config/zookeeper.properties

# 关闭
zookeeper-server-stop.sh

# 非 root 用户时 可取消 # User=zookeeper 和 # Group=zookeeper 这两行的注释
# 如果kafka或者zookeeper使用了远程文件系统,可以多加一个Requires选项
# Requires=network.target remote-fs.target
# After=network.target remote-fs.target
cat > /etc/systemd/system/zookeeper.service << 'EOF'
[Unit]
Description=ZooKeeper Service
After=network.target
After=syslog.target

[Service]
Type=forking
# User=zookeeper
# Group=zookeeper
ExecStart=/apps/kafka/bin/zookeeper-server-start.sh -daemon /apps/kafka/config/zookeeper.properties
ExecStop=/apps/kafka/bin/zookeeper-server-stop.sh

[Install]
WantedBy=default.target
EOF

systemctl daemon-reload
systemctl enable zookeeper
systemctl start zookeeper
systemctl status zookeeper

# 集群配置且启动kafka

# 修改说明

broker.id 默认是0,node1配置为1,node2配置为2,node3配置为3
host.name 新增这个字段,node1配置为192.168.100.163,node2配置为192.168.100.164,node3配置为192.168.100.165
log.dirs 默认路径为/tmp/kafka-logs,修改为/data/kafka/logs
zookeeper.connect 默认为localhost:2181,修改为zookeeper集群的地址192.168.100.163:2181,192.168.100.164:2181,192.168.100.165:2181
mkdir -p /data/kafka/logs

# 使用命令修改文件

# 所有节点执行 kafka-1/kafka-2/kafka-3
sed -i 's#^log.dirs=.*#log.dirs=/data/kafka/logs#' /apps/kafka/config/server.properties
sed -i 's#^zookeeper.connect=.*#zookeeper.connect=192.168.100.163:2181,192.168.100.164:2181,192.168.100.165:2181#' /apps/kafka/config/server.properties
sed -i 's/^#listener.security.protocol.map=/listener.security.protocol.map=/' /apps/kafka/config/server.properties
echo "host.name=$(ip addr | grep -Po '(?<=inet ).*(?=\/)' | grep -Ev '127.0.0|0.0.0' | head -1)" >> /apps/kafka/config/server.properties
echo "advertised.listeners=PLAINTEXT://$(hostname):9092,SSL://$(hostname):9093" >> /apps/kafka/config/server.properties
echo "auto.create.topics.enable=true" >> /apps/kafka/config/server.properties
echo "listeners=PLAINTEXT://:9092,SSL://:9093" >> /apps/kafka/config/server.properties
# echo "# default.replication.factor=1" >> /apps/kafka/config/server.properties

# kafka-1
sed -i 's,^broker.id=.*,broker.id=1,' /apps/kafka/config/server.properties

# kafka-2
sed -i 's,^broker.id=.*,broker.id=2,' /apps/kafka/config/server.properties

# kafka-3
sed -i 's,^broker.id=.*,broker.id=3,' /apps/kafka/config/server.properties

# 所有节点执行后,使用以下命令查询结果是否正确
grep -n '^broker.id=' /apps/kafka/config/server.properties
grep -n '^host.name=' /apps/kafka/config/server.properties
grep -n '^log.dirs=' /apps/kafka/config/server.properties
grep -n '^zookeeper.connect=' /apps/kafka/config/server.properties
grep -n '^auto.create.topics.enable=' /apps/kafka/config/server.properties
grep -n '^advertised.listeners=' /apps/kafka/config/server.properties
grep -n '^listener.security.protocol.map=' /apps/kafka/config/server.properties
grep -n '^listeners=' /apps/kafka/config/server.properties
grep -n 'default.replication.factor=' /apps/kafka/config/server.properties

手动修改配置文件

vim /apps/kafka/config/server.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#
# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
# See kafka.server.KafkaConfig for additional details and defaults
#

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

############################# Socket Server Settings #############################

# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

listeners=PLAINTEXT://:9092,SSL://:9093
host.name=192.168.100.187

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://kafka-1:9092,SSL://kafka-1:9093

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/data/kafka/logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

# 是否允许自动创建topic,若是false,就需要通过命令创建topic。
auto.create.topics.enable=true
# 一个topic ,默认分区的replication个数 ,不得大于集群中broker的个数
# default.replication.factor=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
#zookeeper.connect=localhost:2181
zookeeper.connect=192.168.100.163:2181,192.168.100.164:2181,192.168.100.165:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000

############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
# 启动
/apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties
# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties
# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

# 关闭
/apps/kafka/bin/kafka-server-stop.sh

# 集群管理
kafka-cluster.sh

# 服务文件
cat > /etc/systemd/system/kafka.service << 'EOF'
[Unit]
Description=Kafka
After=network.target

[Service]
Type=simple
ExecStart=/apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties
ExecStop=/apps/kafka/bin/kafka-server-stop.sh
Restart=always
# User=kafka
# Group=kafka
# Environment=JAVA_HOME=/apps/jdk
Environment=KAFKA_HOME=/apps/kafka

[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable kafka
systemctl start kafka
systemctl status kafka
systemctl restart kafka
systemctl stop kafka

注意事项

如果出现解析报错信息,配置 advertised.listeners=PLAINTEXT://kafka-1:9092,SSL://kafka-1:9093 时使用 IP 地址。

验证集群

# 当客户端连接Kafka集群时,它通常会连接到集群中的任何一个节点,这被称为"bootstrap"节点。客户端会从这些节点中获取集群的元数据信息,包括其他节点的地址和其他相关信息。

# 创建一个消息主题进行验证
kafka-topics.sh --create --bootstrap-server 192.168.100.163:9092 --replication-factor 1 --partitions 1 --topic test-cop-io
# 列出主题检查是否创建成功
kafka-topics.sh --list --bootstrap-server 192.168.100.163:9092
# 清理刚刚创建的测试主题
kafka-topics.sh --delete --bootstrap-server 192.168.100.163:9092 --topic test-cop-io

# 集群nginx配置

一般情况下内网直接访问任意节点即可。 Kafka集群本身已经具备了负载均衡的机制,因此在大多数情况下,不需要在Kafka集群之前再额外引入前端负载均衡器。 需要注意的是,在某些特殊场景下,例如需要对外提供统一的访问入口、进行SSL加密通信等,可能需要考虑在Kafka集群之前引入前端负载均衡器。但这种情况下,负载均衡器的作用主要是进行流量转发和安全控制,而不是进行消息负载均衡处理。 前端代理暴露服务给域名使用的场景。未验证通过,待有时间再弄。

# 集群直接暴露

1、无需添加listeners与advertised.listeners两个配置项 2、修改集群中每个broker的advertised.host.name,将其修改为你想要映射的公网IP或域名 3、修改集群中每个broker的advertised.port,将其修改为你想要映射到公网上的端口 4、必须确保集群中的每个broker都暴露的公网,也就是都必须进行映射。 5、如果多个broker中的advertised.host.name相同,也就是对外只有一个公网IP,那么多个broker间的advertised.port必须不同。 6、如果多个broker中的advertised.host.name不同,也就是对外含有多个公网IP,那么多个broker间的advertised.port可以相同。 7、添加映射规则的时候,必须将每个broker的IP和9092端口(kafka默认端口)映射至当前broker中配置的advertised.host.name(公网IP或域名)和advertised.port(端口)上。 8、java api 外网访问kafka时,bootstrap.servers配置项需要添加所有映射的advertised.host.name:advertised.port

# 集群nginx代理

在nginx配置文件最外层添加以下配置:

stream{
    upstream brokers{
        server broker1_ip:broker1_port;
        server broker2_ip:broker2_port;
        server broker3_ip:broker3_port;
    }
    server{
        listen 18001;
        proxy_pass brokers;
    }
}

其中broker1_ip是每个broker的advertised.host.name,将其修改为你想要映射的公网IP或域名

broker1_port是每个broker的advertised.port,将其修改为你想要映射到公网上的端口

将每个broker的IP和9092端口(kafka默认端口)映射至当前nginx配置文件中配置的broker1_ip(公网IP或域名)和broker1_port(端口)上。

常用命令

journalctl -u kafka -n 100 -o cat

lsof -i:9092
lsof -i:2181
编撰人:yangfc、het