在写我自己的练手项目的时候,需要写一系列Python脚本来帮助我进行运维/环境配置,我希望这些脚本能够有比较好的日志记录。
这一篇博客中,我实现了日志同时向控制台和日志中进行输出,并且二者的日志等级、日志格式不相同。
这一篇博客中,我通过自定义日志的格式描述符,实现了定长的日志等级。
解决完日志问题后,我需要解决这些脚本的另一个问题:执行命令。
我写这一系列Python脚本就是因为我不擅长写shell脚本,而且Python脚本在我的开发环境(Windows)和运行环境(Linux)都可以运行。其需要担当的很重要的一个功能就是执行一些命令,如kubectl apply
、mvn test
等。
我希望执行这些命令的时候,能够实时地将命令的输出,记录到日志中。
本博客,我实现了执行命令,并将命令执行的输出进行劫持,实时记录到日志中。
关于多进程协同,Python主要有三个手段:
其中multiprocessing
主要用于解决计算密集型计算中,Python由于GIL(全局解释器锁)的存在,多线程无法提升性能,而需要创建多个进程来加快计算的场景。
而os.system()
则是阻塞式的,而且似乎无法将其创建的子进程的输出进行劫持。
subprocess
模块几乎是唯一的选择。
subprocess
虽然提供了简易使用的subprocess.run()
方法,但是这个方法无法做到实时输出,也就是命令执行完成之后才能一次性获取命令的输出,即使传入了stdout=subprocess.PIPE
要求其创建一个管道,仍旧是阻塞式的读取。stdout
也可以指定为一个流,因此我们可以将其直接重定向到本程序的标准输出,但是logger并不是一个流。
所以我们只能使用更为底层的subprocess.Popen对象来进行操作。
process = subprocess.Popen( cmd, shell=True, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, )
通过这样的参数,创建子进程运行cmd
指定的命令,同时创建与子进程关联的process
对象,此时process.stdout
和process.stderr
都是可以调用readline
方法来一行行读取字符串的管道。
通过subprocess.Popen()
创建Popen
对象后,子进程开始运行,我们可以通过管道来读取子进程的输出。因为有两个管道都需要读取,并且它们没有提供检查管道是否读取的方法,只能阻塞地尝试进行读取,所以我们需要创建读取线程来分别读取这两个管道。
实现如下:
import logging import subprocess import shlex import threading class CommandExecutionException(Exception): def __init__(self, command: str, exit_code: int) -> None: super().__init__(f"command executed fail with exit-code={exit_code}: {command}") _logger = logging.getLogger(__name__) class TextReadLineThread(threading.Thread): def __init__(self, readline, callback, *args, **kargs) -> None: super().__init__(*args, **kargs) self.readline = readline self.callback = callback def run(self): for line in iter(self.readline, ""): if len(line) == 0: break self.callback(line) def cmd_exec(command: str, ensure_success: bool=True) -> int: _logger.info("executing command: {}".format(command)) cmd = shlex.split(command) process = subprocess.Popen( cmd, shell=True, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) _logger.debug("started command") def log_warp(func): def _wrapper(line: str): return func("\t" + line.strip()) return _wrapper read_stdout = TextReadLineThread(process.stdout.readline, log_warp(_logger.info)) read_stderr = TextReadLineThread(process.stderr.readline, log_warp(_logger.warning)) read_stdout.start() read_stderr.start() read_stdout.join() _logger.debug("stdout reading finish") read_stderr.join() _logger.debug("stderr reading finish") ret = process.wait() _logger.debug("process finish") _logger.info("executed command with exit-code={}".format(ret)) if ensure_success and ret != 0: raise CommandExecutionException(command=command, exit_code=ret) return ret if __name__ == '__main__': _logger_trans = { "DEBUG": "DBG", "INFO": "INF", "WARNING": "WAR", "CRITICAL": "ERR" } _old_factory = logging.getLogRecordFactory() def factory(name, level, fn, lno, msg, args, exc_info, func=None, sinfo=None, **kwargs)->logging.LogRecord: record = _old_factory(name, level, fn, lno, msg, args, exc_info, func, sinfo, **kwargs) record.shortlevelname = _logger_trans[record.levelname] return record logging.setLogRecordFactory(factory) logging.basicConfig( level=logging.DEBUG, format='[%(asctime)s %(shortlevelname)s] %(message)s', datefmt="%Y/%m/%d %H:%M:%S" ) cmd_exec("ping localhost", ensure_success=False)
在TextReadLineThread
的run方法中,我们使用了内置函数iter()
的两个参数的形式,其第一个参数为一个可调用对象,第二个参数为该可调用对象可能的输出,通过iter(func, sentinel)
函数产生的迭代器,每次迭代时都无参数地调用func
,直到func()
返回sentinel
时结束迭代。
关于main
中日志配置,参考这一篇博客和[这一篇博客],对日志格式进行增强,使得我们能够明确看到子进程的输出被劫持了。
这里我创建了两个线程,一个用于读取stdout
,一个用于读取stderr
,然后阻塞掉当前线程。其实可以只创建一个新的线程用于读取stderr
,用当前线程读取stdout
。
我使用了ping
命令来进行测试,因为ping
明显是隔一段时间输出一段,可以明显看出是不是实时进行日志记录。但是linux上ping
默认是不会停止的,需要注意一下。