搭建(ElasticSearch-2.x Logstash-2.x Kibana-4.5.x zookeeper3.4.6 Kafka ) Kafka为消息中心的ELK日志平台

Posted by Yancy on 2016-12-29

搭建(ElasticSearch-2.x Logstash-2.x Kibana-4.5.x zookeeper3.4.6) Kafka为消息中心的ELK日志平台

介绍

ELK是业界标准的日志采集,存储索引,展示分析系统解决方案

logstash提供了灵活多样的插件支持不同的input/output

主流使用redis/kafka作为日志/消息的中间环节

如果已有kafka的环境了,使用kafka比使用redis更佳

以下是一个最简化的配置做个笔记,elastic官网提供了非常丰富的文档

不要用搜索引擎去搜索,没多少结果的,请直接看官网文档

版本及连接

elasticseearch版本: 2.4.3

系统要求

如果仅作测试用, 不需要两天机器, 可以将两个节点部署在同一台机器上, 对磁盘/cpu要求不高, 内存大于2g基本足够了

如果是正式环境, 需要根据日志量进行评估, 例如, 每天日志量占硬盘约约10G, 且保留30天日志, 则磁盘会占用约300g, es设定的阈值是磁盘空间占满85%则日志开始告警. 所以, 需要至少 300/0.85=354g.

准备4台机器, 在同一个局域网内(可ping通), 分别在每台机器上部署相应es节点, 搭建一套日志集群.

4台机器, 最少的资源了, 但是没法做到高可用, 所以, 还需要再加一台机器, 防止脑裂, 具体见最后(两台主力机器+一台稳定的机器就行)

集群节点: 最少4台机器
内存: 8G及以上
cpu: 4核及以上
硬盘: 800G及以上, 建议1T, 集群容量约10亿级(取决于对应日志大小)
操作系统: centos

准备工作: 应用/网络 环境

SLB: 阿里云做负载均衡& 或者自己搭建nginx

ELK服务端集群:

系统centos 6.7 JDK1.8 版本:Elasticsearch-2.4.0

es_01 10.47.88.206
es_02 10.47.88.188

Kibana服务端集群:

系统centos 6.7 JDK1.8 版本:kibana-4.5.1

es_01 10.47.88.206
es_02 10.47.88.188

KafKa集群

系统centos 6.7 JDK1.8 版本:kafka_2.10-0.9

kafka_01 10.46.72.172
kafka_02 10.47.88.103
kafka_03 10.47.102.137

zookeeper集群

系统centos 6.7 JDK1.8 版本:zookeeper-3.4.6

kafka_01 10.46.72.172
kafka_02 10.47.88.103
kafka_03 10.47.102.137

logstash-2.4

客户端:系统centos 6.7 JDK1.8 版本: logstash-2.4

tomcat-account_01: 10.27.232.85

都要jdk1.8支持。

整体说明

数据流向=>日志/消息整体流向

logstash => kafka => logstash => elasticsearch => kibana

部署

1. 确认JDK版本及安装

es依赖java的版本最小为1.7

如果系统中未安装JDK
则命令返回bash: java: command not found, 需要安装JDK

如果系统中安装了JDK, 需确认版本是否大于java 1.7, 否则需要升级

1
2
3
4
5
6
7
8
9
10
11
12
java -version
java version "1.7.0_51" Java(TM) SE Runtime Environment (build 1.7.0_51-b13) Java HotSpot(TM) Server VM (build 24.51-b03, mixed mode)
安装及升级java(注意根据系统不同运行对应安装命令)
# Redhat/Centos/Fedora
sudo yum install java-1.7.0-openjdk
或者到官网, 下载最新的jdk的rpm包, 然后安装
wget http://download.oracle.com/otn-pub/java/jdk/8u91-b14/jdk-8u91-linux-x64.rpm
rpm -Uvh jdk-8u91-linux-x64.rpm

再次确认安装成功

1
java -version

基本配置设置FQDN:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#修改hostname
cat /etc/hostname
es_01
#修改hosts
cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
10.47.88.206 es.ihaozhuo.com es_01
#刷新环境
hostname -F /etc/hostname
#复查结果
hostname -f
es.ihaozhuo.com
hostname
es_01

防火墙配置

1
2
3
4
5
6
7
8
9
10
11
#service iptables stop
#setenforce 0
不过这里我防火墙是开启的,后期添加出去端口即可。
或者可以不关闭防火墙,但是要在iptables中打开相关的端口:
# vim /etc/sysconfig/iptables
-A INPUT -m state --state NEW -m tcp -p tcp --dport 80 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9200 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9292 -j ACCEPT
# service iptables restart

RPM快速安装

elk所有安装都可以使用rpm二进制包的方式,增加elastic官网的仓库repo就可以用yum安装了

elasticsearch看这里 —– elasticsearch-rpm官方文档

logstash看这里 —-logstash-rpm官网文档

kibana看这里 —kibana-rpm官网文档

es_01服务端源码安装

这里我是源码安装的
下载ElasticSearch ElasticSearch默认的对外服务的HTTP端口是9200,节点间交互的TCP端口是9300。

下载地址:Elasticsearch

2.4版本:
Elasticsearch2.4.3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
解压源码包:
[root@es_01 ~]# tar -zxvf elasticsearch-2.4.3.tar.gz -C /usr/local/
然后给目录做个软链接:
[root@es_01 local]# ln -s /usr/local/elasticsearch-2.4.3/ /usr/local/elasticsearch
这里需要修改配置文件:
配置前先创建几个目录文件
新建目录, 假设/data/目录挂载的硬盘最大(500G以上)
[root@es_01 srv]]# mkdir /srv/data/es-data -p
[root@es_01 srv]# mkdir /srv/data/es-work
[root@es_01 local]# mkdir /usr/local/elasticsearch/logs
[root@es_01 local]# mkdir /usr/local/elasticsearch/config/plugins
新建用户
修改源码目录属性属组:
[root@es_01 elasticsearch]# useradd -s /sbin/nologin elasticsearch
[root@es_01 elasticsearch]# chown -R elasticsearch:elasticsearch /usr/local/elasticsearch
[root@es_01 elasticsearch]# chown -R elasticsearch:elasticsearch /srv/data/
切换用户
切换到elasticsearch用户, 并进入elasticsearch目录
su elasticsearch
cd /usr/local/elasticsearch/

配置Elasticsearch:

以用户es的身份进行操作

文件路径: config/elasticsearch.yml
修改该文件中配置项: (注意, 原始文件中都是被#号注释掉了, 需要去掉对应注释并修改配置值)

  • 集群名: cluster.name, 注意: 两台机器配置一致
1
cluster.name: elk_cluster
  • 节点名: node.name, 注意: 两台机器配置不同, 一台为01, 另一台为02
1
2
3
4
5
6
# 第一台机器
node.name: inner_es_node_01
# 第二台机器
node.name: inner_es_node_02
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
[root@es_01 config]# vim elasticsearch.yml
# Use a descriptive name for your cluster:
#
#
cluster.name: elk_cluster
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
node.name: es_01
#
# Add custom attributes to the node:
#
# node.rack: r1
#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
path.data: /srv/data/es-data
#
# Path to log files:
#
path.logs: /usr/local/elasticsearch/logs
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
# bootstrap.memory_lock: true
#
# Make sure that the `ES_HEAP_SIZE` environment variable is set to about half the memory
# available on the system and that the owner of the process is allowed to use this limit.
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# Set the bind address to a specific IP (IPv4 or IPv6):
#
network.host: 10.47.88.206

切换到elasticsearch用户启动服务。

源码安装启动需要执行 :/usr/local/elasticsearch/bin/elasticsearch &
才能启动;

测试访问服务正常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[elasticsearch@es_01 elasticsearch]$ curl http://10.47.88.206:9200
{
"name" : "es_01",
"cluster_name" : "elk_cluster",
"cluster_uuid" : "mspLZT5nTL-d124suNbBBQ",
"version" : {
"number" : "2.4.3",
"build_hash" : "d38a34e7b75af4e17ead16f156feffa432b22be3",
"build_timestamp" : "2016-12-07T16:28:56Z",
"build_snapshot" : false,
"lucene_version" : "5.5.2"
},
"tagline" : "You Know, for Search"
}

下面是写开机启动脚本,不写的直接切换es用户到目录启动 -d后台启动。
这里需要/etc/init.d/创建启动脚本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
[root@ELK ~]# git clone https://github.com/elastic/elasticsearch-servicewrapper.git
Initialized empty Git repository in /root/elasticsearch-servicewrapper/.git/
remote: Counting objects: 184, done.
remote: Total 184 (delta 0), reused 0 (delta 0), pack-reused 184
Receiving objects: 100% (184/184), 4.55 MiB | 511 KiB/s, done.
Resolving deltas: 100% (53/53), done.
[root@ELK elasticsearch-servicewrapper]# mv service/ /usr/local/elasticsearch/bin/
[root@ELK elasticsearch-servicewrapper]# cd /usr/local/elasticsearch
[root@ELK elasticsearch]# /usr/local/elasticsearch/bin/service/elasticsearch install 这里是安装es
Detected RHEL or Fedora:
Installing the Elasticsearch daemon..
[root@ELK elasticsearch]# vim /etc/init.d/elasticsearch 查看安装es启动配置文件
[root@ELK elasticsearch]# service elastic search start 启动es
Starting Elasticsearch...
Waiting for Elasticsearch......
running: PID:31360 服务已启动了。
启动相关服务
service elasticsearch start
service elasticsearch status
配置 elasticsearch 服务随系统自动启动
# chkconfig --add elasticsearch
测试ElasticSearch服务是否正常,预期返回200的状态码
# curl -X GET http://localhost:9200

es_02服务端节点:

第一步基础配置都是一样的,跟es_01节点一样。 其他只需要到es_01拷贝过来,然后创建下es用户,修改下配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/usr/local/elasticsearch 目录拷贝到es_02机器。
这里需要修改配置文件:
配置前先创建几个目录文件
[root@es_01 srv]]# mkdir /srv/data/es-data -p
[root@es_01 srv]# mkdir /srv/data/es-work
修改源码目录属性属组:
[root@es_01 elasticsearch]# useradd -s /sbin/nologin elasticsearch
[root@es_01 elasticsearch]# chown -R elasticsearch:elasticsearch /usr/local/elasticsearch/*
[root@es_01 elasticsearch]# chown -R elasticsearch:elasticsearch /srv/data/
修改配置文件
vim elasticsearch.yml
node.name: es_02
network.host: 10.47.88.188
其他不需要修改

集群节点es_02测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[root@es_02 home]# curl http://10.47.88.188:9200
{
"name" : "es_02",
"cluster_name" : "elk_cluster",
"cluster_uuid" : "-4Rqn4IzS1GfnsodqZD8Tg",
"version" : {
"number" : "2.4.3",
"build_hash" : "d38a34e7b75af4e17ead16f156feffa432b22be3",
"build_timestamp" : "2016-12-07T16:28:56Z",
"build_snapshot" : false,
"lucene_version" : "5.5.2"
},
"tagline" : "You Know, for Search"
}

elk集群已安装配置完毕,我这里配置了nginx做下反向代理,走80端口出去。然后在nginx设置下内部公司访问不对外开放。

安装 head、marvel、bigdesk插件:

es1.5插件安装是./plugin -install xxx,而es2.4插件安装没有减号./plugin install xxx

1.5版本方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
* head插件
插件安装方法1:
/usr/local/elasticsearch/bin/plugin -install mobz/elasticsearch-head
重启es 即可。
打开http://localhost:9200/_plugin/head/
插件安装方法2:
1.https://github.com/mobz/elasticsearch-head下载zip 解压
2.建立/usr/local/elasticsearch/plugins/head/文件
3.将解压后的elasticsearch-head-master文件夹下的文件copy到/usr/local/elasticsearch/plugins/head/
重启es 即可。
打开http://localhost:9200/_plugin/head/

2.4版本以上安装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
* head插件
插件安装方法1:
/usr/local/elasticsearch/bin/plugin install mobz/elasticsearch-head
重启es 即可。
打开http://localhost:9200/_plugin/head/
插件安装方法2:
1.https://github.com/mobz/elasticsearch-head下载zip 解压
2.建立elasticsearch-1.0.0\plugins\head\_site文件
3.将解压后的elasticsearch-head-master文件夹下的文件copy到_site
重启es 即可。
打开http://localhost:9200/_plugin/head/

为了保障搜索服务的稳定性,增加了一台机器,将Elasticsearch部署成了集群模式, 部署到生产环境时发现,新的节点并不能被发现,后台发现阿里云并不支持多播,最后只能改为单播的方式配置了,好在之后一切顺利。

下面附上测试环境配置示例:添加下下面监听集群IP和端口。

  • es_01
1
2
3
4
[root@es_01 config]# vim elasticsearch.yml
discovery.zen.ping.multicast.enabled: false
discovery.zen.ping.unicast.hosts: ["10.47.88.206:9300","10.47.88.188:9300"]
  • es_02
1
2
3
4
[root@es_02 config]# vim elasticsearch.yml
discovery.zen.ping.multicast.enabled: false
discovery.zen.ping.unicast.hosts: ["10.47.88.206:9300","10.47.88.188:9300"]

然后重启服务,查看集群节点。

es_02安装Kibana:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
到https://www.elastic.co/downloads/kibana 找合适的版本。
wget https://download.elastic.co/kibana/kibana/kibana-4.5.1-linux-x64.tar.gz
#解压
#tar zxvf kibana-4.1.2-linux-x64.tar.gz -C /usr/local
cd /usr/local/ && mv kibana-4.1.2-linux-x64 kibana
#创建kibana启动脚本服务
vi /etc/rc.d/init.d/kibana
#!/bin/bash
### BEGIN INIT INFO
# Provides: kibana
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Short-Description: Runs kibana daemon
# Description: Runs the kibana daemon as a non-root user
### END INIT INFO
# Process name
NAME=kibana
DESC="Kibana4"
PROG="/etc/init.d/kibana"
# Configure location of Kibana bin
KIBANA_BIN=/usr/local/kibana/bin
# PID Info
PID_FOLDER=/var/run/kibana/
PID_FILE=/var/run/kibana/$NAME.pid
LOCK_FILE=/var/lock/subsys/$NAME
PATH=/bin:/usr/bin:/sbin:/usr/sbin:$KIBANA_BIN
DAEMON=$KIBANA_BIN/$NAME
# Configure User to run daemon process
DAEMON_USER=root
# Configure logging location
KIBANA_LOG=/var/log/kibana.log
# Begin Script
RETVAL=0
if [ `id -u` -ne 0 ]; then
echo "You need root privileges to run this script"
exit 1
fi
# Function library
. /etc/init.d/functions
start() {
echo -n "Starting $DESC : "
pid=`pidofproc -p $PID_FILE kibana`
if [ -n "$pid" ] ; then
echo "Already running."
exit 0
else
# Start Daemon
if [ ! -d "$PID_FOLDER" ] ; then
mkdir $PID_FOLDER
fi
daemon --user=$DAEMON_USER --pidfile=$PID_FILE $DAEMON 1>"$KIBANA_LOG" 2>&1 &
sleep 2
pidofproc node > $PID_FILE
RETVAL=$?
[[ $? -eq 0 ]] && success || failure
echo
[ $RETVAL = 0 ] && touch $LOCK_FILE
return $RETVAL
fi
}
reload()
{
echo "Reload command is not implemented for this service."
return $RETVAL
}
stop() {
echo -n "Stopping $DESC : "
killproc -p $PID_FILE $DAEMON
RETVAL=$?
echo
[ $RETVAL = 0 ] && rm -f $PID_FILE $LOCK_FILE
}
case "$1" in
start)
start
;;
stop)
stop
;;
status)
status -p $PID_FILE $DAEMON
RETVAL=$?
;;
restart)
stop
start
;;
reload)
reload
;;
*)
# Invalid Arguments, print the following message.
echo "Usage: $0 {start|stop|status|restart}" >&2
exit 2
;;
esac
修改启动权限
chmod +x /etc/rc.d/init.d/kibana

配置Kibana:

编辑kibana.yaml 修改端口,设置host 可以设置本地服务器IP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
vim /usr/local/kibana/config/kibana.yml
# Kibana is served by a back end server. This controls which port to use.
server.port: 5601
# The host to bind the server to.
server.host: "10.47.88.188"
# If you are running kibana behind a proxy, and want to mount it at a path,
# specify that path here. The basePath can't end in a slash.
# server.basePath: ""
# The maximum payload size in bytes on incoming server requests.
# server.maxPayloadBytes: 1048576
# The Elasticsearch instance to use for all your queries.
elasticsearch.url: "http://10.47.88.188:9200"
# preserve_elasticsearch_host true will send the hostname specified in `elasticsearch`. If you set it to false,
# then the host you use to connect to *this* Kibana instance will be sent.
elasticsearch.preserveHost: true
# Kibana uses an index in Elasticsearch to store saved searches, visualizations
# and dashboards. It will create a new index if it doesn't already exist.
# kibana.index: ".kibana"
# The default application to load.
kibana.defaultAppId: "discover"
# If your Elasticsearch is protected with basic auth, these are the user credentials
# used by the Kibana server to perform maintenance on the kibana_index at startup. Your Kibana
# users will still need to authenticate with Elasticsearch (which is proxied through
# the Kibana server)
# elasticsearch.ssl.key: /path/to/your/client.key
# If you need to provide a CA certificate for your Elasticsearch instance, put
# the path of the pem file here.
# elasticsearch.ssl.ca: /path/to/your/CA.pem
# Set to false to have a complete disregard for the validity of the SSL
# certificate.
# elasticsearch.ssl.verify: true
# Time in milliseconds to wait for elasticsearch to respond to pings, defaults to
# request_timeout setting
# elasticsearch.pingTimeout: 1500
# Time in milliseconds to wait for responses from the back end or elasticsearch.
# This must be > 0
elasticsearch.requestTimeout: 30000
# Time in milliseconds for Elasticsearch to wait for responses from shards.
# Set to 0 to disable.
# elasticsearch.shardTimeout: 0

启动kibana服务

1
2
service kibana start
service kibana status

查看端口

1
2
3
4
5
6
7
8
9
10
netstat -nltp
[root@es_02 config]# netstat -nltp
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 127.0.0.1:32000 0.0.0.0:* LISTEN 2517/java
tcp 0 0 10.47.88.188:5601 0.0.0.0:* LISTEN 6474/node
tcp 0 0 10.47.88.188:10050 0.0.0.0:* LISTEN 305/zabbix_agentd
tcp 0 0 10.47.88.188:9200 0.0.0.0:* LISTEN 5198/java
tcp 0 0 10.47.88.188:9300 0.0.0.0:* LISTEN 5198/java
tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 25265/sshd

到我Github上面下载kabana启动脚本。

es_01机器从es_02机器拷贝过去修改下配置就可以。

kibana安装插件参考:

Installing Marvel

这里kibana我做了nginx反向代理,集群代理。

nginx配置kibana反向代理:

这里我只允许我公司IP访问:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
upstream kibana.ihaozhuo.com {
server 10.47.88.206:5601 weight=1;
server 10.47.88.188:5601 weight=1;
}
server {
listen 80;
server_name kibana.ihaozhuo.com;
location / {
index index.html index.php index.jsp index.htm;
allow 202.107.202.82/32;
deny all;
proxy_pass http://kibana.ihaozhuo.com;
proxy_ignore_client_abort on;
proxy_redirect off;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}
}

KafKa集群搭建

1
2
3
下载地址:http://mirrors.hust.edu.cn/apache/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz
[root@kafka_01 srv]# tar -xvf kafka_2.10-0.9.0.0.tgz
[root@kafka_01 srv]# mv kafka_2.10-0.9.0.0 kafka

修改kafka配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[root@kafka_01 config]# vim /srv/kafka/config/server.properties
#设置brokerid(从0开始,3个节点分别设为0,1,2,不能重复)在这里id=0跟zookeeper id设置一样就行。 集群机器:按顺序写1
broker.id=0
#设置data目录,最好不要用默认的/tmp/kafka-logs
mkdir -p /srv/kafka/data/kafka-logs
#修改本地IP地址:
listeners=PLAINTEXT://10.46.72.172:9092
log.dirs=/srv/kafka/data/kafka-logs
#设置注册地址(重要,默认会把本机的hostanme注册到zk中,客户端连接时需要解析该hostanme,所以这里直接注册本机的IP地址,避免hostname解析失败,报错java.nio.channels.UnresolvedAddressException或java.io.IOException: Can not resolve address)
#设置zookeeper地址
zookeeper.connect=10.46.72.172:2181,10.47.88.103:2181,10.47.102.137:2181

配置zookeeper地址

1
2
3
4
5
6
7
vim zookeeper.properties
dataDir=/home/jollybi/tools/zookeeper-3.4.5/tmp
# the port at which the clients will connect
clientPort=2281
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
~

配置kafka访问地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
vim producer.properties
metadata.broker.list=169.44.62.139:9292,169.44.59.138:9292,169.44.62.137:9292
# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=
# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync
# specify the compression codec for all data generated: none, gzip, snappy, lz4.
# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively
compression.codec=none
# message encoder
serializer.class=kafka.serializer.DefaultEncoder

Kafka常用命令(普及)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Kafka常用命令
以下是kafka常用命令行总结:
1.查看topic的详细信息
./kafka-topics.sh -zookeeper 127.0.0.1:2181 -describe -topic testKJ1
2、为topic增加副本
./kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 -reassignment-json-file json/partitions-to-move.json -execute
3、创建topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testKJ1
4、为topic增加partition
./bin/kafka-topics.sh –zookeeper 127.0.0.1:2181 –alter –partitions 20 –topic testKJ1
5、kafka生产者客户端命令
./kafka-console-producer.sh --broker-list localhost:9092 --topic testKJ1
6、kafka消费者客户端命令
./kafka-console-consumer.sh -zookeeper localhost:2181 --from-beginning --topic testKJ1
7、kafka服务启动
./kafka-server-start.sh -daemon ../config/server.properties
8、下线broker
./kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper 127.0.0.1:2181 --broker #brokerId# --num.retries 3 --retry.interval.ms 60
shutdown broker
9、删除topic
./kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic testKJ1 --zookeeper 127.0.0.1:2181
./kafka-topics.sh --zookeeper localhost:2181 --delete --topic testKJ1
10、查看consumer组内消费的offset
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test --topic testKJ1

Kafka群集新建一个Topic

叫做logstash Topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#查看tocpic列表(--zookeeper指定任意一个zk节点即可,用于获取集群信息)
/usr/local/kafka/bin/kafka-topics.sh --zookeeper zk1.yazuoyw.com:2181 --describe
#创建topic(--replication-factor表示复制到多少个节点,--partitions表示分区数,一般都设置为2或与节点数相等,不能大于总节点数)
/usr/local/kafka/bin/kafka-topics.sh --zookeeper zk1.yazuoyw.com:2181 --create --topic topic1 --replication-factor 2 --partitions 2
#发送消息(--topic 指定topic)
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list kafka1.yazuoyw.com:9092,kafka2.yazuoyw.com:9092,kafka3.yazuoyw.com:9092 --topic topic1
message1
message2
#消费消息
/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper zk1.yazuoyw.com:2181 --topic topic1
#replica检查
/usr/local/kafka/bin/kafka-replica-verification.sh --broker-list kafka1.yazuoyw.com:9092,kafka2.yazuoyw.com:9092,kafka3.yazuoyw.com:9092

每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)

ElasticSearch机器logstash把数据从kafka存到elasticsearch的配置

其中选取kafka群集任意一个有zk的ip做连接使用

topic_id就是kafka中设置的topic logstash

在es上面安装logstash配置
/usr/local/logstash/config/kafka_to_es.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
input {
kafka {
zk_connect => "10.46.72.172:2181,10.47.88.103:2181,10.47.102.137:2181/kafka"
group_id => "logstash"
topic_id => "logstash"
reset_beginning => false # boolean (optional), default: false
consumer_threads => 2 # number (optional), default: 1
decorate_events => false # boolean (optional), default: false
}
}
output {
elasticsearch {
hosts => ["10.47.88.206:9200","10.47.88.188:9200"]
index => "%{host}-%{+YYYY.MM.dd}"
}
# stdout { codec => rubydebug }
}

新建了个测试的,测试下发送是否成功:/usr/local/logstash/config/stdin_to_es.conf

1
2
3
4
5
6
7
8
9
input {
stdin {}
}
output {
elasticsearch {
hosts => "10.47.88.206"}
stdout {
codec => rubydebug }
}

Step 2: 启动服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Kafka用到了Zookeeper,所有首先启动Zookper,下面简单的启用一个单实例的Zookkeeper服务。可以在命令的结尾加个&符号,这样就可以启动后离开控制台。
#现在启动Kafka:
/srv/kafka/bin/kafka-server-start.sh -daemon config/server.properties
#添加开机启动
echo ‘
# start kafka
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
‘ >> /etc/rc.local
#关闭
/usr/local/kafka/bin/kafka-server-stop.sh

kafka配置防火墙:

1
-A INPUT -i eth0 -p tcp -m state --state NEW -m tcp --dport 4888 -j ACCEPT

zookeeper集群

查看我之前写的这篇文档 ZooKeeper的集群快速搭建与优化

走kafka查看是否所有节点都启动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[root@kafka_03 bin]# sh zkCli.sh
Connecting to localhost:2181
2017-01-04 19:20:24,849 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
2017-01-04 19:20:24,853 [myid:] - INFO [main:Environment@100] - Client environment:host.name=kafka_03
2017-01-04 19:20:24,853 [myid:] - INFO [main:Environment@100] - Client environment:java.version=1.8.0_66
2017-01-04 19:20:24,856 [myid:] - INFO [main:Environment@100] - Client environment:java.vendor=Oracle Corporation
2017-01-04 19:20:24,856 [myid:] - INFO [main:Environment@100] - Client environment:java.home=/srv/jdk1.8.0_66/jre
2017-01-04 19:20:24,856 [myid:] - INFO [main:Environment@100] - Client environment:java.class.path=/srv/zookeeper-3.4.6/bin/../build/classes:/srv/zookeeper-3.4.6/bin/../build/lib/*.jar:/srv/zookeeper-3.4.6/bin/../lib/slf4j-log4j12-1.6.1.jar:/srv/zookeeper-3.4.6/bin/../lib/slf4j-api-1.6.1.jar:/srv/zookeeper-3.4.6/bin/../lib/netty-3.7.0.Final.jar:/srv/zookeeper-3.4.6/bin/../lib/log4j-1.2.16.jar:/srv/zookeeper-3.4.6/bin/../lib/jline-0.9.94.jar:/srv/zookeeper-3.4.6/bin/../zookeeper-3.4.6.jar:/srv/zookeeper-3.4.6/bin/../src/java/lib/*.jar:/srv/zookeeper-3.4.6/bin/../conf:
2017-01-04 19:20:24,856 [myid:] - INFO [main:Environment@100] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
[zk: localhost:2181(CONNECTED) 0] ls /
[controller_epoch, brokers, zookeeper, kafka, dubbo, admin, isr_change_notification, consumers, config, sthp]
[zk: localhost:2181(CONNECTED) 5] ls /kafka/brokers/ids
[0, 1, 2]

kafka 三台集群这里可以看到获取到ids。

安全问题

特别要注意elk所有软件的端口监听,切勿暴露监听到公网上去,另外即便是内网你也得注意配置内网的访问限制。

logstash 客户端安装:

源码安装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
我这里源码包安装
# wget https://download.elasticsearch.org/logstash/logstash/logstash-2.4.0.tar.gz
#curl -O https://download.elastic.co/logstash/logstash/logstash-2.4.0.tar.gz
#tar -zxvf logstash-2.4.0.tar.gz
#mv logstash-2.4.0 /usr/local/
#ln -s /usr/local/logstash-2.4.0/ /usr/local/logstash
下载启动脚本
生产都是运行在后台的,我这里源码安装没有init脚本启动。 去Github下载 https://github.com/benet1006/ELK_config.git
#cp logstash.init /etc/init.d/logstash
#chmod +x /etc/init.d/logstash
这个脚本我做过修改。
#启动logstash服务
service logstash start
service logstash status
#查看5000端口
netstat -nltp
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:9200 0.0.0.0:* LISTEN 1765/java
tcp 0 0 0.0.0.0:9300 0.0.0.0:* LISTEN 1765/java
tcp 0 0 0.0.0.0:9301 0.0.0.0:* LISTEN 2309/java
tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 1509/sshd
tcp 0 0 0.0.0.0:5601 0.0.0.0:* LISTEN 1876/node
tcp 0 0 0.0.0.0:5000 0.0.0.0:* LISTEN 2309/java
tcp 0 0 :::22 :::* LISTEN 1509/sshd
修改启动脚本
vim /etc/init.d/logstash
指定的目录自己源码安装的路径。
name=logstash
pidfile="/var/run/$name.pid"
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin
LS_USER=logstash
LS_GROUP=logstash
LS_HOME=/usr/local/logstash 安装路径
LS_HEAP_SIZE="1000m"
LS_JAVA_OPTS="-Djava.io.tmpdir=${LS_HOME}"
LS_LOG_DIR=/usr/local/logstash
LS_LOG_FILE="${LS_LOG_DIR}/$name.log"
LS_CONF_FILE=/etc/logstash.conf 收集日志的规则conf
LS_OPEN_FILES=16384
LS_NICE=19
LS_OPTS=""
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html
这个是log stash的官方文档的配置说明。
这个配置说明上面我先修改下我之前的配置文件。

logstash agent配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
配置log stash-实现系统日志收集input
file_to_kafka.conf 日志文件读出写入到kafka
input {
file {
path => "/srv/tomcat/logs/account/logFile.*.log"
type => "tomcat"
discover_interval => 15 #logstash
}
}
output {
#stdout { codec => rubydebug }
kafka{
bootstrap_servers => "10.46.72.172:9092,10.47.88.103:9092,10.47.102.137:9092"
#group_id => "logstash"
topic_id => "logstash"
}
}
2.2 logstash indexer 配置
kafka_to_es.conf
input {
kafka {
zk_connect => "10.46.72.172:2181,10.47.88.103:2181,10.47.102.137:2181kafka"
group_id => "logstash"
topic_id => "logstash"
reset_beginning => false # boolean (optional), default: false
consumer_threads => 2 # number (optional), default: 1
decorate_events => false # boolean (optional), default: false
}
}
output {
elasticsearch {
hosts => ["10.47.88.206:9200","10.47.88.188:9200"]
index => "%{host}-%{+YYYY.MM.dd}"
}
# stdout { codec => rubydebug }
}

es安装插件head查看下效果:

然后打开网站:http://elk.ihaozhuo.com/_plugin/head/

####kibana网站效果:

Communicative learning:

🐧 Linux shell_ senior operation and maintenance faction: QQ group 459096184 circle (system operation and maintenance - application operation and maintenance - automation operation and maintenance - virtualization technology research, welcome to join)
🐧 BigData-Exchange School:QQ group 521621407 circles (big data Yun Wei) (Hadoop developer) (big data research enthusiasts) welcome to join

Bidata have internal WeChat exchange group, learn from each other, join QQ group has links.