Bigdata-Kafka集群快速搭建与增删改查命令讲解

Posted by Yancy on 2017-03-28

Bigdata-Kafka集群快速搭建与增删改查命令讲解

kafka集群和zookeeper集群规范

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
kafka集群和zookeeper集群规范
kafka版本:kafka_2.10-0.8.2.1
zookeeper版本:zookeeper-3.4.5
启动用户:jollybi
kafka安装目录:/data/tools/
kafka新建消息目录:/data/tools/kafka_2.10-0.8.2.1/kafka-logs
启动用户:jollybi
zookeeper安装目录:/data/tools/
zookeeper新建日志目录:/data/tools/zookeeper-3.4.5/tmp/logs
统一配置hosts
10.155.90.153 kafka1.jollychic.com kafka1
10.155.90.155 kafka2.jollychic.com kafka2
10.155.90.138 kafka3.jollychic.com kafka3

本文使用了3台机器部署Kafka集群,IP和主机名对应关系如下:

1
2
3
10.155.90.153 kafka1.jollychic.com kafka1
10.155.90.155 kafka2.jollychic.com kafka2
10.155.90.138 kafka3.jollychic.com kafka3

Step1: 配置/etc/hosts (3台一致)

1
2
3
4
127.0.0.1 localhost.localdomain localhost
10.155.90.153 kafka1.jollychic.com kafka1
10.155.90.155 kafka2.jollychic.com kafka2
10.155.90.138 kafka3.jollychic.com kafka3

Step2: 环境KafKa集群环境

1
2
3
下载地址:http://mirrors.hust.edu.cn/apache/kafka/0.9.0.0/kafka_2.10-0.8.2.1.tgz
[jollybi@kafka1 ]# tar -xvf kafka_2.10-0.8.2.1.tgz -C /data/tools/
cd /data/tools/kafka_2.10-0.8.2.1/config

设置data目录,最好不要用默认的/tmp/kafka-logs

1
mkdir -p /data/tools/kafka_2.10-0.8.2.1/kafka-logs/

修改kafka配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[root@kafka_01 config]# vim server.properties
设置brokerid(从0开始,3个节点分别设为1,2,3,不能重复)在这里id=0跟zookeeper id设置一样就行。 集群机器:按顺序写1
broker.id=1
auto.leader.rebalance.enable=true
#修改本地IP地址:
listeners=PLAINTEXT://10.46.72.172:9092
port=9292 //设置访问端口
host.name=10.155.90.153 kafka本机IP
log.dirs=/data/tools/kafka_2.10-0.8.2.1/kafka-logs
#设置注册地址(重要,默认会把本机的hostanme注册到zk中,客户端连接时需要解析该hostanme,所以这里直接注册本机的IP地址,避免hostname解析失败,报错java.nio.channels.UnresolvedAddressException或java.io.IOException: Can not resolve address)
#设置zookeeper地址
#zookeeper.connect=10.46.72.172:2181,10.47.88.103:2181,10.47.102.137:2181
zookeeper.connect=kafka1.jollychic.com:2281,kafka2.jollychic.com:2281,kafka3.jollychic.com:2281
//这里我设置hosts代替IP

配置zookeeper地址

1
2
3
4
5
6
7
vim zookeeper.properties
dataDir=/data/tools/zookeeper-3.4.5/tmp
# the port at which the clients will connect
clientPort=2281
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
~

配置kafka访问地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
vim producer.properties
metadata.broker.list=169.44.62.139:9292,169.44.59.138:9292,169.44.62.137:9292
# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=
# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync
# specify the compression codec for all data generated: none, gzip, snappy, lz4.
# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively
compression.codec=none
# message encoder
serializer.class=kafka.serializer.DefaultEncoder

Step3: 集群机器快速拷贝配置:

远程复制分发安装文件
最好是把文件打包scp过去
接下来将上面的安装文件拷贝到集群中的其他机器上对应的目录

1
2
scp -P58958 -r kafka_2.10-0.8.2.1 jollybi@169.44.62.137:/home/jollybi/tools/.
scp -P58958 -r kafka_2.10-0.8.2.1 jollybi@169.44.62.138:/home/jollybi/tools/.

统一修改分别其他两台kafka server.properties

1
2
3
4
5
broker.id=2
host.name=169.44.62.137 kafka2本机IP
broker.id=3
host.name=169.44.62.138 kafka3本机IP

Step4:启动 kafka集群

1
2
3
4
5
6
7
./kafka-server-start.sh -daemon ../config/server.properties
[jollybi@kafka1 config]$ jps
3443 QuorumPeerMain
16280 Jps
3628 Kafka
Step5: Kafka常用命令(普及)

Step5:测试集群基本命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
### 默认创建kafka Topic: 1个副本备份 1个分区消费
./kafka_2.10-0.8.2.1/bin/kafka-topics.sh --create --zookeeper kafka1.jollychic.com:2281,kafka2.jollychic.com:2281,kafka3.jollychic.com:2281 --replication-factor 1 --partitions 1 --topic mongotail_lz4
./kafka_2.10-0.8.2.1/bin/kafka-topics.sh --create --zookeeper kafka1.jollychic.com:2281,kafka2.jollychic.com:2281,kafka3.jollychic.com:2281 --replication-factor 1 --partitions 1 --topic mongotail_lz4_imp
### 大数据这边需要12个分区这里我删除Topic重新创建Topic:
### 删除kafka Topic:
第一步:删除kafka topic
./kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper kafka1.jollychic.com:2281,kafka2.jollychic.com:2281,kafka3.jollychic.com:2281 --delete --topic mongotail_lz4
./kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper kafka1.jollychic.com:2281,kafka2.jollychic.com:2281,kafka3.jollychic.com:2281 --delete --topic mongotail_lz4_imp
### 这是由于删除topic没删干净会报错:
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
### 删除线程执行删除操作的真正逻辑是:
1. 它首先会给当前所有broker发送更新元数据信息的请求,告诉这些broker说这个topic要删除了,你们可以把它的信息从缓存中删掉了
2. 开始删除这个topic的所有分区
2.1 给所有broker发请求,告诉它们这些分区要被删除。broker收到后就不再接受任何在这些分区上的客户端请求了
2.2 把每个分区下的所有副本都置于OfflineReplica状态,这样ISR就不断缩小,当leader副本最后也被置于OfflineReplica状态时leader信息将被更新为-1
2.3 将所有副本置于ReplicaDeletionStarted状态
2.4 副本状态机捕获状态变更,然后发起StopReplicaRequest给broker,broker接到请求后停止所有fetcher线程、移除缓存,然后删除底层log文件
2.5 关闭所有空闲的Fetcher线程
### 第二步:删除zookeeper相关的路径:
[jollybi@kafka1 zookeeper-3.4.5]$ ./bin/zkCli.sh -server 127.0.0.1:2281
[zk: 127.0.0.1:2281(CONNECTED) 1] rmr /brokers/topics/mongotail_lz4
[zk: 127.0.0.1:2281(CONNECTED) 0] rmr /consumers/group_ml_general/offsets/mongotail_lz4
[zk: 127.0.0.1:2281(CONNECTED) 1] rmr /consumers/group_ml_general/owners/mongotail_lz4
[zk: 127.0.0.1:2281(CONNECTED) 3] rmr /config/topics/mongotail_lz4
[zk: 127.0.0.1:2281(CONNECTED) 4] rmr /admin/delete_topics/mongotail_lz4
### 删除topic上面消费组
删除 ZooKeeper 下面的 /consumers/[group_id] 路径就可以了。ZooKeeper 原生API只支持删除空的路径,所以建议你使用 curator framework 进行这个删除操作,ZkPaths.deleteChildren 会递归式地删除整个路径(包括子路径)。
[zk: 127.0.0.1:2281(CONNECTED) 0] rmr /consumers/kafka-node1-imp-group
[zk: 127.0.0.1:2281(CONNECTED) 1] rmr /consumers/kafka-node1-group
[zk: 127.0.0.1:2281(CONNECTED) 2] rmr /consumers/console-consumer-59053
[zk: 127.0.0.1:2281(CONNECTED) 3] rmr /consumers/
1、设置配置文件允许删除
delete.topic.enable=true 配置添加到 config/server.properties
2、执行删除命令
./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test-topic
在zookeeper中确认:
删除zookeeper下/brokers/topics/test-topic节点
删除zookeeper下/config/topics/test-topic节点
删除zookeeper下/admin/delete_topics/test-topic节点
consumer 的话 删除groupid
### 重启kafka集群
### 大数据kafka建topic,不能建一个分区,那样吞吐量上不去,不够用,我们以前建了12个分区的
### 创建新 kafka Topic: 3个副本,12个分区
./kafka_2.10-0.8.2.1/bin/kafka-topics.sh --create --zookeeper kafka1.jollychic.com:2281,kafka2.jollychic.com:2281,kafka3.jollychic.com:2281 --replication-factor 3 --partitions 12 --topic mongotail_lz4
./kafka_2.10-0.8.2.1/bin/kafka-topics.sh --create --zookeeper kafka1.jollychic.com:2281,kafka2.jollychic.com:2281,kafka3.jollychic.com:2281 --replication-factor 3 --partitions 12 --topic mongotail_lz4_imp
### 查看创建的Topic:
./kafka_2.10-0.8.2.1/bin/kafka-topics.sh --describe --zookeeper kafka1.jollychic.com:2281,kafka2.jollychic.com:2281,kafka3.jollychic.com:2281 --topic mongotail_lz4
./kafka_2.10-0.8.2.1/bin/kafka-topics.sh --describe --zookeeper kafka1.jollychic.com:2281,kafka2.jollychic.com:2281,kafka3.jollychic.com:2281 --topic mongotail_lz4_imp
### 查看集群情况:
./kafka_2.10-0.8.2.1/bin/kafka-topics.sh --describe --zookeeper kafka1.jollychic.com:2281,kafka2.jollychic.com:2281,kafka3.jollychic.com:2281 --topic mongotail_lz4
### 模拟生产者(producer)kafka生产客户端生产数据命令:
./bin/kafka-console-producer.sh --broker-list 75.126.39.124:9292,75.126.5.162:9292,75.126.5.178:9292 --topic mongotail_lz4_imp
......
my test message 1
my test message 2
^C
### 模拟消费者(consumer)kafka消费客户端数据命令 现在我们来看看消息:
./bin/kafka-console-consumer.sh --zookeeper 75.126.39.124:2281,75.126.5.162:2281,75.126.5.178:2281 --from-beginning** --topic mongotail_lz4_imp
......
my test message 1
my test message 2
^C

Kafka常用命令

以下是kafka常用命令行总结:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
1、 list topic 显示所有topic
./bin/kafka-topics.sh --list --zookeeper kafka1.jollychic.com:2281,kafka2.jollychic.com:2281,kafka3.jollychic.com:2281
2、 查看topic的详细信息
./bin/kafka-topics.sh --zookeeper 75.126.39.124:2281,75.126.5.162:2281,75.126.5.178:2281 -describe -topic mongotail_lz4_imp
3、为topic增加partition分区 参数:--alter --partitions
./bin/kafka-topics.sh --alter --zookeeper 75.126.39.124:2281,75.126.5.162:2281,75.126.5.178:2281 --partitions 20 --topic mongotail_lz4_imp
4、kafka生产者客户端命令
./bin/kafka-console-producer.sh --broker-list 75.126.39.124:9292,75.126.5.162:9292,75.126.5.178:9292 --topic mongotail_lz4_imp
5、kafka消费者客户端命令
./bin/kafka-console-consumer.sh --zookeeper 75.126.39.124:2281,75.126.5.162:2281,75.126.5.178:2281 --from-beginning --topic
6、kafka服务启动
./kafka-server-start.sh -daemon ../config/server.properties
7、下线broker
./kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper 127.0.0.1:2181 --broker #brokerId# --num.retries 3 --retry.interval.ms 60
shutdown broker
8、删除topic
./kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic testKJ1 --zookeeper 127.0.0.1:2181
./kafka-topics.sh --zookeeper localhost:2181 --delete --topic testKJ1
9、查看consumer组内消费的offset
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test --topic testKJ1
10、查看某个消费组的消费详情(支持0.9版本+)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group
11、新消费者(支持0.9版本+)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties

Step6:Kafka存储

每个replica一个目录

1
2
3
4
[jollybi@kafka1 kafka_2.10-0.8.2.0]# cd /data/tools/kafka_2.10-0.8.2.0/kafka-logs/
[root@master kafka-logs]# ls
__consumer_offsets-0 __consumer_offsets-20 __consumer_offsets-32 __consumer_offsets-44 my-replicatedtopic1-0
__consumer_offsets-1 __consumer_offsets-21 __consumer_offsets-33 __consumer_offsets-45 my-replicated-topic1-1

二级结构

1
2
3
[jollybi@kafka1 kafka-logs]# cd my-replicated-topic1-0/
[jollybi@kafka1 my-replicated-topic1-0]# ls
00000000000000000000.index 00000000000000000000.log

交流学习:

🐧 Linux shell_高级运维派: 459096184 圈子 (系统运维-应用运维-自动化运维-虚拟化技术研究欢迎加入)
🐧 BigData-Exchange School : 521621407 圈子(大数据运维)(Hadoop开发人员)(大数据研究爱好者) 欢迎加入

相应Bidata有内部微信交流群互相学习,加入QQ群有链接。