目前公司做等保,需要有日志审计,初步考虑使用rsyslog把所有服务器的日志收集起来。同时考虑到我们运维过程中也要查询日志,要把rsyslog收集的日志可以统一界面来查询使用
收集的日志类型:系统日志,mysql日志,防火墙,waf,F5日志,sftp,smtp日志等
开源产品:Rsyslog、Kafka、ELK
处理流程为:Vm Rsyslog--> Rsyslog Server --omkafka--> Kafka --> Logstash --> Elasticsearch --> Kibana
ps:omkafka模块在rsyslog v8.7.0之后的版本才支持
环境:
ELK SERVER | 10.10.27.123 |
---|---|
Rsyslog Server | 10.10.27.121 |
Rsyslog client | 10.10.27.122 |
Rsyslog是高速的日志收集处理服务,它具有高性能、安全可靠和模块化设计的特点,能够接收来自各种来源的日志输入(例如:file,tcp,udp,uxsock等),并通过处理后将结果输出的不同的目的地(例如:mysql,mongodb,elasticsearch,kafka等),每秒处理日志量能够超过百万条。
Rsyslog作为syslog的增强升级版本已经在各linux发行版默认安装了,无需额外安装
1、rsyslog服务端
~]# cat /etc/rsyslog.conf # Provides UDP syslog reception $ModLoad imudp $UDPServerRun 514 # Provides TCP syslog reception $ModLoad imtcp $InputTCPServerRun 514 ~]# cat /etc/rsyslog.d/default.conf #### GLOBAL DIRECTIVES #### # Use default timestamp format # 使用自定义的日志格式 $ActionFileDefaultTemplate RSYSLOG_TraditionalFileFormat $template myFormat,"%timestamp% %fromhost-ip% %syslogtag% %msg%\n" $ActionFileDefaultTemplate myFormat # 根据客户端的IP单独存放主机日志在不同目录,rsyslog需要手动创建 $template RemoteLogs,"/data/rsyslog/%fromhost-ip%/%fromhost-ip%_%$YEAR%-%$MONTH%-%$DAY%.log" # 排除本地主机IP日志记录,只记录远程主机日志 :fromhost-ip, !isequal, "127.0.0.1" ?RemoteLogs ~]# systemctl restart rsyslog
为了把rsyslog server收集的日志数据导入到ELK中,需要在rsyslog server使用到omkafka的模块
~]# yum -y install rsyslog-kafka ~]# cat /etc//rsyslog.d/kafka.conf # 加载omkafka和imfile模块 module(load="omkafka") module(load="imfile") # nginx template template(name="SystemlogTemplate" type="string" string="%hostname%<-+>%syslogtag%<-+>%msg%\n") # ruleset ruleset(name="systemlog-kafka") { #日志转发kafka action ( type="omkafka" template="SystemlogTemplate" topic="system-log" broker="10.10.27.123:9092" ) } input(type="imfile" Tag="Systemlog" File="/data/rsyslog/*/*.log" Ruleset="systemlog-kafka" ~]# systemctl restart rsyslog
2、 Rsyslog客户端
~]# cat /etc/rsyslog.conf #追加一行 *.* @10.10.27.121:514 #所有日志通过UDP传输给rsyslog server ~]# systemctl restart rsyslog
至此,rsyslog准备完毕,验证/data/rsyslog下是否产生日志文件
1、搭建kafka依赖的zookeeper
~]# docker login ~]# docker pull wurstmeister/zookeeper ~]# mkdir -p /data/zookeeper ~]# docker run -d \ --name zookeeper \ --net=host \ -p 2181:2181 \ --restart always \ -v /data/zookeeper:/data/zookeeper \ -e ZOO_PORT=2181 \ -e ZOO_DATA_DIR=/data/zookeeper/data \ -e ZOO_DATA_LOG_DIR=/data/zookeeper/logs \ -e ZOO_MY_ID=1 \ -e ZOO_SERVERS="server.1=10.10.27.123:2888:3888" \ wurstmeister/zookeeper:latest 参数说明: --net=host: 容器网络设置为 host, 能够和宿主机共享网络 -p 2181:2181: 容器的 2181 端口映射到宿主机的 2181 端口 -v /data/zookeeper:/data/zookeeper:容器的/data/zookeeper 目录挂载到宿主机的 /data/zookeeper 目录 ZOO_PORT:zookeeper 的运行端口 ZOO_DATA_DIR:数据存放目录 ZOO_DATA_LOG_DIR: 日志存放目录 ZOO_MY_ID:zk 的节点唯一标识 ZOO_SERVERS:zk 集群服务配置 如果需要部署zookeeper集群:其他 2 个节点同理部署, 只需要修改 ZOO_MY_ID; 节点 2:ZOO_MY_ID=2,节点 3:ZOO_MY_ID=3
验证
~]# docker exec -it zookeeper /bin/bash bash # cd /opt/zookeeper-3.4.13/bin bash # zkServer.sh status ZooKeeper JMX enabled by default Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg Mode: standalone
2、搭建kafka
~]# mkdir -p /data/kafka ~]# docker pull wurstmeister/kafka ~]# docker run -d \ --name kafka \ --net=host \ --restart always \ -v /data:/data \ -e KAFKA_BROKER_ID=1 \ -e KAFKA_PORT=9092 \ -e KAFKA_HEAP_OPTS="-Xms1g -Xmx1g" \ -e KAFKA_HOST_NAME=10.10.27.123 \ -e KAFKA_ADVERTISED_HOST_NAME=10.10.27.123 \ -e KAFKA_LOG_DIRS=/data/kafka \ -e KAFKA_ZOOKEEPER_CONNECT="10.10.27.123:2181" \ wurstmeister/kafka:latest 参数说明: --net=host: 容器网络设置为 host, 能够和宿主机共享网络 -v /data:/data:容器的/data 目录挂载到宿主机的 /data 目录 KAFKA_BROKER_ID:kafka 的 broker 集群标识, 每台节点 broker 不一样 KAFKA_PORT:kafka 运行端口 KAFKA_HEAP_OPTS:kafka 启动时的 jvm 大小 KAFKA_HOST_NAME:kafka 主机名称,这里随便写,但是要与主机 IP 做 dns 映射 KAFKA_LOG_DIRS:kafka 日志存储目录 KAFKA_ZOOKEEPER_CONNECT:kafka 运行在 zk 里面,zk 提供的连接地址,集群的话写多个地址,逗号隔开 如果需要部署zookeeper集群:其他 2 个节点同理部署, 只需要修改 KAFKA_BROKER_ID、KAFKA_HOST_NAME、KAFKA_ADVERTISED_HOST_NAME 对应的值即可
验证
进入 kafka 容器 ~]# docker exec -it kafka /bin/bash bash-4.4# bin/kafka-topics.sh --list --zookeeper 10.10.27.123:2181 __consumer_offsets log-api system-log #rsyslog自动创建的topic
1、搭建elasticsearch
~]# docker pull elasticsearch:7.7.0 ~]# mkdir /data/es/conf -p ~]# cd /data/es/conf ~]# vim elasticsearch.yml cluster.name: es-cluster network.host: 10.10.27.123 http.cors.enabled: true http.cors.allow-origin: "*" network.publish_host: 10.10.27.123 discovery.zen.minimum_master_nodes: 1 discovery.zen.ping.unicast.hosts: ["10.10.27.123"] discovery.type: single-node 多节点集群使用以下配置: ~]# vim elasticsearch.yml #集群名称,多个节点用一个名称 cluster.name: es-cluster ## es 1.0 版本的默认配置是 “0.0.0.0”,所以不绑定 ip 也可访问 network.host: 0.0.0.0 node.name: master #跨域设置 http.cors.enabled: true http.cors.allow-origin: "*" node.master: true node.data: true network.publish_host: 192.168.1.140 discovery.zen.ping.unicast.hosts: ["192.168.1.140,192.168.1.142,192.168.1.147"] discovery.zen.minimum_master_nodes: 1 ~]# docker run -d \ --name=elasticsearch -p 9200:9200 -p 9300:9300 -p 5601:5601 \ --net=host \ --restart always \ -e ES_JAVA_OPTS="-Xms1g -Xmx1g" \ #启动内存设置 -v /data/es/conf/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \ -v /data/es/data:/usr/share/elasticsearch/data \ elasticsearch:7.7.0 启动过程会有报错,提前做以下操作 ~]# mkdir /data/es/data ~]# chmod u+x /data/es/data ~]# vim /etc/sysctl.conf vm.max_map_count=655350 ~]# sysctl -p 如果需要部署zookeeper集群:其他 2 个节点同样的部署思路,只需要修改 elasticsearch 的配置文件中的 node.name 和 network.publish_host
2、部署kibana
~]# docker pull daocloud.io/library/kibana:7.7.0 ~]# mkdir -p /data/kibana/conf ~]# vi /data/kibana/conf/kibana.yml server.port: 5601 server.host: "0.0.0.0" # ES elasticsearch.hosts: ["http://10.10.27.123:9200"] i18n.locale: "zh-CN" xpack.security.enabled: false ps:多集群可忽略此配置文件,使用默认即可 ~]# docker run -d \ --restart always \ --name kibana \ --network=container:elasticsearch \ #容器网络要和连接的 es 容器共享网络 -v /data/kibana/config:/usr/share/kibana/config \ daocloud.io/library/kibana:7.7.0
3、部署logstash
~]# docker pull daocloud.io/library/logstash:7.7.0 ~]# mkdir /data/logstash/conf -p ~]# vim logstash.conf input{ kafka{ topics => ["system-log"] #必须可前文的topic统一 bootstrap_servers => ["10.10.27.123:9092"] } } output{ elasticsearch { hosts => ["10.10.27.123:9200"] index => "system-log-%{+YYYY.MM.dd}" } stdout { codec => rubydebug } } ~]# docker run -d \ --restart always \ --name logstash \ --net=host \ --link elasticsearch \ -v /data/logstash/conf/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \ daocloud.io/library/logstash:7.7.0
由于中国下载docker镜像很慢,配置镜像仓库
--registry-mirror=https://registry.docker-cn.com
建议在本地电脑,比如win10中安装windows版docker,配置好上面镜像仓库地址(setting-Docker Engine),打开powershell进行下载打包镜像:
PS C:\Users\suixin> docker pull kibana:7.7.0 PS C:\Users\suixin> docker save -o kibana.gz kibana:7.7.0
然后上传到服务器导入:
~]# docker load -i kibana.gz