nginx节点日志通过syslog发送至syslog server,syslog server进行格式处理后作为生产者,把日志流send至kafka 对应的topic上。
基于faust框架编写数据流消费程序,从kafka指定的topic上消费数据流,通过stream.filter+lambda表达式,指定错误界别的数据流进行分析,使用域名和ip为key进行计数,当错误超过阈值时发送告警通知相关人员。
import faust import redis from feishuRobot import feishuRobot from datetime import timedelta from log import logger from config.config import ReadConfig try: obj = ReadConfig() conf = obj.get_config() logger.info("load config file successful") except Exception as r: logger.error('Fail to load config file: {reason}', reason=r) try: pool = redis.ConnectionPool(host=conf['redis']['address'], port=conf['redis']['port'], decode_responses=True, password=conf['redis']['password'] ) obj_r = redis.Redis(connection_pool=pool) except Exception as r: logger.error("Fail to connection redis poll: {reason}", reason=r) app = faust.App( 'error_log_alarm', store='rocksdb://', broker= conf['kafka']['access_broker'], stream_wait_empty=False, broker_max_poll_records=conf['kafka']['max_poll'], topic_partitions=1, #vaule_type=json, #value_serializer='raw', ) class Transfer(faust.Record): from_host_ip: str level: str #message: str reason: str logtime: str #def master_processor(key, events): #timestamp = key[1][0] #for event in events: record_error = app.Table( 'record_error', default=int, #on_window_close=master_processor, #).tumbling(timedelta(minutes=1), expires=timedelta(minutes=1)).relative_to_stream() ).tumbling(conf['faust']['window_size'], expires=conf['faust']['expires'], key_index=True).relative_to_stream() error_topic = app.topic('sec-waf-error-log', value_type=Transfer) @app.agent(error_topic) async def greet(stream): ''' #async for value in stream.filter(lambda x: x.status == '200' ).group_by(Transfer.from_host_ip): async for value in stream: master_to_total[value.from_host_ip] += 1 ''' #async for value in stream.group_by(Transfer.from_host_ip): #upstream timed out (110: Connection timed out) while reading response header from upstream #httpApi_action(): httpApi_action[push_count_dict] error: failed to commit the pipelined (push_count_dict) requests: timeout, context: ngx.timer #upstream prematurely closed connection while reading response header from upstream # connect() failed (111: Connection refused) while connecting to upstream #access forbidden by rule, #recv() failed (104: Connection reset by peer) while proxying upgraded connection try: feishu = feishuRobot() except Exception as r: logger.error("Fail to init feishuRobot object: {reason}", reason=r) async for value in stream.filter(lambda x: x.level == "error"): #print("attack: ", value) record_error['{value.from_host_ip}'] += 1 #域名_ip统计计数 v = record_error['{value.from_host_ip}'] if v.now() >= 10: msg = "" err_key = "" if "recv() failed (110: Connection timed out) while reading response header from upstream" in value.reason: err_key = "error_" + value.from_host_ip + "_Connection_timed_out" msg += "级别: 中\r\n" elif "recv() failed (104: Connection reset by peer) while reading response header from upstream" in value.reason: err_key = "error_" + value.from_host_ip + "_Connection_reset_peer" msg += "级别: 中\r\n" elif "upstream prematurely closed connection while reading response header from upstream" in value.reason: err_key = "error_" + value.from_host_ip + "_prematurely_closed_connection" msg += "级别: 中\r\n" elif "access forbidden by rule" in value.reason: err_key = "error_" + value.from_host_ip + "_access_forbidden_rule" msg += "级别: 低\r\n" elif "connect() failed (111: Connection refused) while connecting to upstream" in value.reason: errr_key = "error_" + value.from_host_ip + "_Connection_refused" msg += "级别: 中\r\n" elif "client intended to send too large body" in value.reason: err_key = "error_" + value.from_host_ip + "_send_too_large_body" msg += "级别: 低\r\n" elif "failed to commit the pipelined (push_count_dict)" in value.reason: err_key = "error_" + value.from_host_ip + "_commit_the_pipelined" msg += "级别: 低\r\n" elif "could not build optimal server_names_hash" in value.reason: #warning err_key = "warn_" + value.from_host_ip + "_optimal_server_names_hash" msg += "级别: 低\r\n" elif "no live upstreams while connecting to upstream" in value.reason: err_key = "error_" + value.from_host_ip + "_no_live_upstreams" msg += "级别: 高\r\n" elif "SSL_do_handshake() failed" in value.reason: err_key = "error_" + value.from_host_ip + "_SSL_do_handshake" msg += "级别: 高\r\n" if obj_r.get(err_key) is not None: #如果存在更新计数统计 obj_r.set(err_key, str(v.now())) else: obj_r.set(err_key, str(v.now())) obj_r.expire(err_key, conf['redis']['record_expire']) msg += "waf节点: " + value.from_host_ip + "\r\n" msg += "错误信息" + value.reason + "\r\n" msg += "时间: " + value.logtime + "\r\n" msg += "错误日志频率: 2分钟" + str(v.now()) + "次\r\n" feishu.send_card_text(msg) if __name__ == '__main__': app.main()