# 手动部署

# 单机部署

# 安装jdk和解压软件

# 安装系统自带的 openjdk 或 自己安装需要的 jdk 版本
dnf install -y java-11-openjdk

mkdir -p /data/apps && tar -xf kafka_2.13-3.5.1.tgz -C /data/apps
mv /data/apps/kafka_2.13-3.5.1 /data/apps/kafka

cat >> /etc/profile << 'EOF'
export KAFKA_HOME=/data/apps/kafka
export PATH=$KAFKA_HOME/bin:$PATH
EOF

source /etc/profile
# 创建 zk 和 ka 的数据存储目录
mkdir -p /data/{zkData,kaData}

# 调整参数 server.properties

vim /data/apps/kafka/config/server.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#
# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
# See kafka.server.KafkaConfig for additional details and defaults
#

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://:9092,SSL://:9093
host.name=172.20.20.66

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://cop-kafka:9092,SSL://cop-kafka:9093

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=log.dirs=/data/kaData/logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
auto.create.topics.enable=true

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000

############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

# 调整参数 zookeeper.properties

vim /data/apps/kafka/config/zookeeper.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/data/zkData
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080

# 服务管理-zookeeper

# 启动
# zookeeper-server-start.sh /data/apps/kafka/config/zookeeper.properties
zookeeper-server-start.sh -daemon /data/apps/kafka/config/zookeeper.properties

# 关闭
zookeeper-server-stop.sh

# 非 root 用户时 可取消 # User=zookeeper 和 # Group=zookeeper 这两行的注释
# 如果 kafka 或者 zookeeper 使用了远程文件系统,可以多加一个 Requires 选项
# Requires=network.target remote-fs.target
# After=network.target remote-fs.target

cat > /etc/systemd/system/zookeeper.service << 'EOF'
[Unit]
Description=ZooKeeper Service
After=network.target
After=syslog.target

[Service]
Type=forking
# Environment=JAVA_HOME=/data/apps/jdk
# User=zookeeper
# Group=zookeeper
ExecStart=/data/apps/kafka/bin/zookeeper-server-start.sh -daemon /data/apps/kafka/config/zookeeper.properties
ExecStop=/data/apps/kafka/bin/zookeeper-server-stop.sh

[Install]
WantedBy=default.target
EOF

systemctl daemon-reload
systemctl enable zookeeper
systemctl start zookeeper
systemctl status zookeeper

# 服务管理-kafka

# 启动
/data/apps/kafka/bin/kafka-server-start.sh /data/apps/kafka/config/server.properties
# 以守护进程的方式运行
# /data/apps/kafka/bin/kafka-server-start.sh -daemon /data/apps/kafka/config/server.properties
# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.propertiesss

# 关闭
/data/apps/kafka/bin/kafka-server-stop.sh

# 集群管理
kafka-cluster.sh

# 服务文件
cat > /etc/systemd/system/kafka.service << 'EOF'
[Unit]
Description=Kafka
After=network.target

[Service]
Type=simple
ExecStart=/data/apps/kafka/bin/kafka-server-start.sh /data/apps/kafka/config/server.properties
ExecStop=/data/apps/kafka/bin/kafka-server-stop.sh
Restart=always
# User=kafka
# Group=kafka
# Environment=JAVA_HOME=/data/apps/jdk
Environment=KAFKA_HOME=/data/apps/kafka

[Install]
WantedBy=multi-user.target
EOF

systemctl daemon-reload
systemctl enable kafka
systemctl start kafka
systemctl status kafka

# 测试验证

# 创建一个消息主题进行验证
kafka-topics.sh --create --bootstrap-server 172.20.20.66:9092 --replication-factor 1 --partitions 1 --topic test-cop-io
# 列出主题检查是否创建成功
kafka-topics.sh --list --bootstrap-server 172.20.20.66:9092
# 清理刚刚创建的测试主题
kafka-topics.sh --delete --bootstrap-server 172.20.20.66:9092 --topic test-cop-io

# 参数说明:
# --topic:指定 topic 名称
# --partitions:指定分区数
# --replication-factor:用来设置主题得副本数,每个主题可以有多个副本

kafka-topics.sh --bootstrap-server 192.168.100.151:9092 192.168.100.152:9092 192.168.100.153:9092 --describe --topic seeyon-test

kafkactl 工具

# kafkactl 工具
# https://github.com/deviceinsight/kafkactl/releases
# https://lihuimintu.github.io/2021/04/26/kafkactl/

vi /root/.config/kafkactl/config.yml
contexts:
  # default context without SASL
  remote-cluster:
    brokers:
      - 192.168.1.10:9092
    requestTimeout: 15s
    kafkaversion: 2.8.1
    # optional: tls config
    tls:
      enabled: false
      ca: my-ca
      cert: my-cert
      certKey: my-key
      # set insecure to true to ignore all tls verification (defaults to false)
      insecure: true

    # optional: sasl support
    sasl:
      enabled: true
      username: xx
      password: xxx
      # optional configure sasl mechanism as plaintext, scram-sha256, scram-sha512 (defaults to plaintext)
      mechanism: plaintext

current-context: remote-cluster

# 切换配置
kafkactl config use-context dev
# 查看当前使用的配置
kafkactl config get-contexts
# 查看配置
 kafkactl config view
 
 # 创建主题
 kafkactl create topic my-topic --partitions=2
 # 获取主题
 kafkactl get topics
 # 查看主题详情
 kafkactl describe topic my-topic
 # 删除主题
 kafkactl delete topic my-topic
 
 # 生产数据
 echo "key##value1" | kafkactl produce my-topic --separator=##
 kafkactl produce my-topic --key=my-key --value=my-value
 # 消费数据
 kafkactl consume my-topic --from-beginning --print-keys -o yaml
 
 # 重置消费组
 kafkactl reset consumer-group-offset my-group --topic my-topic --oldest --execute
 # 获取消费组
 kafkactl get cg 
 # 查看具体的消费组
 kafkactl describe consumer-group my-group
 # 删除消费组
 kafkactl delete consumer-group my-group
 
 # 修改分区
 # set brokers 102,103 as replicas for partition 3 of topic my-topic
kafkactl alter topic my-topic 3 -r 102,103

# 获取broker
kafkactl get brokers
kafkactl describe broker 1

# 集群部署

https://zookeeper.apache.org/ https://zookeeper.apache.org/doc/r3.8.3/zookeeperStarted.html https://kafka.apache.org/ https://kafka.apache.org/documentation/#quickstart

# 系统调优

# 主机名映射 - 这个很重要 否则集群节点间的连接会失败
cat >> /etc/hosts << 'EOF'
192.168.100.163 v8-kafka-1
192.168.100.164 v8-kafka-2
192.168.100.165 v8-kafka-3
EOF

# 按需安装
dnf install -y lsof nc

# 安装JDK

# 安装系统自带的 openjdk 或 自己安装需要的jdk版本
dnf install -y java-11-openjdk

# 安装kafka

mkdir -p /apps && tar -xf kafka_2.13-3.5.1.tgz -C /apps
mv /apps/kafka_2.13-3.5.1 /apps/kafka

cat >> /etc/profile << 'EOF'
export KAFKA_HOME=/apps/kafka
export PATH=$KAFKA_HOME/bin:$PATH
EOF

source /etc/profile

# 创建 zk 和 ka 的数据存储目录(可挂载 NFS 共享文件夹到这个目录下)
mkdir -p /data/{zkData,kaData}

# 配置内置zookeeper集群

# 手动创建 myid
# myid 文件中的内容 = 配置文件的 server.X 的 X
# 节点1
echo '1' > /data/zkData/myid
# 节点2
echo '2' > /data/zkData/myid
# 节点3
echo '3' > /data/zkData/myid
# 在手动创建myid文件时,需要注意以下要求:
# 1、myid文件必须在ZooKeeper的dataDir目录下创建,且文件名必须为myid。
# 2、myid文件中的内容必须是该服务器节点的唯一ID,这个ID必须是1到255之间的整数。
# 3、在集群中,每个节点的myid文件必须配置不同的数字,否则会导致节点无法启动或者出现其他问题。
# 4、myid文件的权限必须设置为可读写,否则会导致节点无法启动。
# 这些要求是为了确保每个ZooKeeper服务器节点的myid都是唯一的,并且在集群中正确地标识该节点。在创建myid文件时,需要注意这些要求,并按照规定的格式进行配置,以确保ZooKeeper集群的正常运行。

# 调整配置 - zookeeper.properties
sed -i 's?^dataDir=/tmp/zookeeper?dataDir=/data/zkData?' /apps/kafka/config/zookeeper.properties
# sed -i 's?^clientPort=2181?clientPort=2181?' /apps/kafka/config/zookeeper.properties
# sed -i 's?^admin.enableServer=false?admin.enableServer=true?' /apps/kafka/config/zookeeper.properties
# sed -i 's?^# admin.serverPort=8080?admin.serverPort=8080?' /apps/kafka/config/zookeeper.properties
# 添加集群节点 有多少个添加多少个 注意按需修改
# 注意:三个server,后面创建的文件myid里面写的X就是server.X=ip:2888:3888 中ip所对应的X
# server.X=A:B:C
# X-代表服务器编号
# A-代表ip
# B和C-代表端口,这个端口用来系统之间通信
cat >> /apps/kafka/config/zookeeper.properties << 'EOF'
server.1=192.168.100.163:2888:3888
server.2=192.168.100.164:2888:3888
server.3=192.168.100.165:2888:3888
tickTime=2000
initLimit=5
syncLimit=2
EOF

# 或手动编辑文件
vim /apps/kafka/config/zookeeper.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/data/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
server.1=192.168.100.163:2888:3888
server.2=192.168.100.164:2888:3888
server.3=192.168.100.165:2888:3888
tickTime=2000
initLimit=5
syncLimit=2
# 启动
# zookeeper-server-start.sh /apps/kafka/config/zookeeper.properties
zookeeper-server-start.sh -daemon /apps/kafka/config/zookeeper.properties

# 关闭
zookeeper-server-stop.sh

# 非 root 用户时 可取消 # User=zookeeper 和 # Group=zookeeper 这两行的注释
# 如果kafka或者zookeeper使用了远程文件系统,可以多加一个Requires选项
# Requires=network.target remote-fs.target
# After=network.target remote-fs.target
cat > /etc/systemd/system/zookeeper.service << 'EOF'
[Unit]
Description=ZooKeeper Service
After=network.target
After=syslog.target

[Service]
Type=forking
# User=zookeeper
# Group=zookeeper
ExecStart=/apps/kafka/bin/zookeeper-server-start.sh -daemon /apps/kafka/config/zookeeper.properties
ExecStop=/apps/kafka/bin/zookeeper-server-stop.sh

[Install]
WantedBy=default.target
EOF

systemctl daemon-reload
systemctl enable zookeeper
systemctl start zookeeper
systemctl status zookeeper

# 配置且启动kafka集群

# 修改说明

broker.id 默认是0,node1配置为1,node2配置为2,node3配置为3
host.name 新增这个字段,node1配置为192.168.100.163,node2配置为192.168.100.164,node3配置为192.168.100.165
log.dirs 默认路径为/tmp/kafka-logs,修改为/data/kafka/logs
zookeeper.connect 默认为localhost:2181,修改为zookeeper集群的地址192.168.100.163:2181,192.168.100.164:2181,192.168.100.165:2181
mkdir -p /data/kafka/logs

# 使用命令修改文件

# 所有节点执行 kafka-1/kafka-2/kafka-3
sed -i 's#^log.dirs=.*#log.dirs=/data/kafka/logs#' /apps/kafka/config/server.properties
sed -i 's#^zookeeper.connect=.*#zookeeper.connect=192.168.100.163:2181,192.168.100.164:2181,192.168.100.165:2181#' /apps/kafka/config/server.properties
sed -i 's/^#listener.security.protocol.map=/listener.security.protocol.map=/' /apps/kafka/config/server.properties
echo "host.name=$(ip addr | grep -Po '(?<=inet ).*(?=\/)' | grep -Ev '127.0.0|0.0.0' | head -1)" >> /apps/kafka/config/server.properties
echo "advertised.listeners=PLAINTEXT://$(hostname):9092,SSL://$(hostname):9093" >> /apps/kafka/config/server.properties
echo "auto.create.topics.enable=true" >> /apps/kafka/config/server.properties
echo "listeners=PLAINTEXT://:9092,SSL://:9093" >> /apps/kafka/config/server.properties
# echo "# default.replication.factor=1" >> /apps/kafka/config/server.properties

# kafka-1
sed -i 's,^broker.id=.*,broker.id=1,' /apps/kafka/config/server.properties

# kafka-2
sed -i 's,^broker.id=.*,broker.id=2,' /apps/kafka/config/server.properties

# kafka-3
sed -i 's,^broker.id=.*,broker.id=3,' /apps/kafka/config/server.properties

# 所有节点执行后,使用以下命令查询结果是否正确
grep -n '^broker.id=' /apps/kafka/config/server.properties
grep -n '^host.name=' /apps/kafka/config/server.properties
grep -n '^log.dirs=' /apps/kafka/config/server.properties
grep -n '^zookeeper.connect=' /apps/kafka/config/server.properties
grep -n '^auto.create.topics.enable=' /apps/kafka/config/server.properties
grep -n '^advertised.listeners=' /apps/kafka/config/server.properties
grep -n '^listener.security.protocol.map=' /apps/kafka/config/server.properties
grep -n '^listeners=' /apps/kafka/config/server.properties
grep -n 'default.replication.factor=' /apps/kafka/config/server.properties

手动修改配置文件

vim /apps/kafka/config/server.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#
# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
# See kafka.server.KafkaConfig for additional details and defaults
#

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

############################# Socket Server Settings #############################

# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

listeners=PLAINTEXT://:9092,SSL://:9093
host.name=192.168.100.187

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://kafka-1:9092,SSL://kafka-1:9093

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/data/kafka/logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

# 是否允许自动创建topic,若是false,就需要通过命令创建topic。
auto.create.topics.enable=true
# 一个topic ,默认分区的replication个数 ,不得大于集群中broker的个数
# default.replication.factor=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
#zookeeper.connect=localhost:2181
zookeeper.connect=192.168.100.163:2181,192.168.100.164:2181,192.168.100.165:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000

############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
# 启动
/apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties
# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties
# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

# 关闭
/apps/kafka/bin/kafka-server-stop.sh

# 集群管理
kafka-cluster.sh

# 服务文件
cat > /etc/systemd/system/kafka.service << 'EOF'
[Unit]
Description=Kafka
After=network.target

[Service]
Type=simple
ExecStart=/apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties
ExecStop=/apps/kafka/bin/kafka-server-stop.sh
Restart=always
# User=kafka
# Group=kafka
# Environment=JAVA_HOME=/apps/jdk
Environment=KAFKA_HOME=/apps/kafka

[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable kafka
systemctl start kafka
systemctl status kafka
systemctl restart kafka
systemctl stop kafka

注意事项 如果出现解析报错信息,配置 advertised.listeners=PLAINTEXT://kafka-1:9092,SSL://kafka-1:9093 时使用 IP 地址。

验证集群

# 当客户端连接Kafka集群时,它通常会连接到集群中的任何一个节点,这被称为"bootstrap"节点。客户端会从这些节点中获取集群的元数据信息,包括其他节点的地址和其他相关信息。

# 创建一个消息主题进行验证
kafka-topics.sh --create --bootstrap-server 192.168.100.163:9092 --replication-factor 1 --partitions 1 --topic test-cop-io
# 列出主题检查是否创建成功
kafka-topics.sh --list --bootstrap-server 192.168.100.163:9092
# 清理刚刚创建的测试主题
kafka-topics.sh --delete --bootstrap-server 192.168.100.163:9092 --topic test-cop-io

# nginx配置

一般情况下内网直接访问任意节点即可。 Kafka集群本身已经具备了负载均衡的机制,因此在大多数情况下,不需要在Kafka集群之前再额外引入前端负载均衡器。 需要注意的是,在某些特殊场景下,例如需要对外提供统一的访问入口、进行SSL加密通信等,可能需要考虑在Kafka集群之前引入前端负载均衡器。但这种情况下,负载均衡器的作用主要是进行流量转发和安全控制,而不是进行消息负载均衡处理。 前端代理暴露服务给域名使用的场景。未验证通过,待有时间再弄。

# 直接暴露

1、无需添加listeners与advertised.listeners两个配置项 2、修改集群中每个broker的advertised.host.name,将其修改为你想要映射的公网IP或域名 3、修改集群中每个broker的advertised.port,将其修改为你想要映射到公网上的端口 4、必须确保集群中的每个broker都暴露的公网,也就是都必须进行映射。 5、如果多个broker中的advertised.host.name相同,也就是对外只有一个公网IP,那么多个broker间的advertised.port必须不同。 6、如果多个broker中的advertised.host.name不同,也就是对外含有多个公网IP,那么多个broker间的advertised.port可以相同。 7、添加映射规则的时候,必须将每个broker的IP和9092端口(kafka默认端口)映射至当前broker中配置的advertised.host.name(公网IP或域名)和advertised.port(端口)上。 8、java api 外网访问kafka时,bootstrap.servers配置项需要添加所有映射的advertised.host.name:advertised.port

# nginx代理

在nginx配置文件最外层添加以下配置:

stream{
    upstream brokers{
        server broker1_ip:broker1_port;
        server broker2_ip:broker2_port;
        server broker3_ip:broker3_port;
    }
    server{
        listen 18001;
        proxy_pass brokers;
    }
}

其中broker1_ip是每个broker的advertised.host.name,将其修改为你想要映射的公网IP或域名

broker1_port是每个broker的advertised.port,将其修改为你想要映射到公网上的端口

将每个broker的IP和9092端口(kafka默认端口)映射至当前nginx配置文件中配置的broker1_ip(公网IP或域名)和broker1_port(端口)上。

常用命令

journalctl -u kafka -n 100 -o cat

lsof -i:9092
lsof -i:2181
编撰人:yangfc