现在我在框架中有一个中心模块,它使用Python 2.6 multiprocessing模块生成多个进程。因为它使用多处理,所以有一个模块级的多处理感知日志,log = multiprocessing.get_logger()。根据文档,这个日志记录器(EDIT)没有进程共享锁,所以你不会在sys. exe中弄乱东西。Stderr(或任何文件句柄),让多个进程同时写入它。
我现在遇到的问题是框架中的其他模块不支持多处理。在我看来,我需要让这个中心模块上的所有依赖都使用多处理感知日志。这在框架内很烦人,更不用说对框架的所有客户端了。还有我想不到的选择吗?
解决这个问题的唯一方法是非侵入性的:
Spawn each worker process such that its log goes to a different file descriptor (to disk or to pipe.) Ideally, all log entries should be timestamped.
Your controller process can then do one of the following:
If using disk files: Coalesce the log files at the end of the run, sorted by timestamp
If using pipes (recommended): Coalesce log entries on-the-fly from all pipes, into a central log file. (E.g., Periodically select from the pipes' file descriptors, perform merge-sort on the available log entries, and flush to centralized log. Repeat.)
如何将所有日志记录委托给另一个进程,从队列中读取所有日志条目?
LOG_QUEUE = multiprocessing.JoinableQueue()
class CentralLogger(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
self.log = logger.getLogger('some_config')
self.log.info("Started Central Logging process")
def run(self):
while True:
log_level, message = self.queue.get()
if log_level is None:
self.log.info("Shutting down Central Logging process")
break
else:
self.log.log(log_level, message)
central_logger_process = CentralLogger(LOG_QUEUE)
central_logger_process.start()
只需通过任何多进程机制甚至继承共享LOG_QUEUE,就可以很好地工作!
对于可能需要这个的人,我为multiprocessing_logging包写了一个装饰器,它将当前进程名添加到日志中,这样就可以清楚地看到谁记录了什么。
它还运行install_mp_handler(),因此在创建池之前运行它是没有用的。
这让我可以看到哪个工作人员创建了哪些日志消息。
下面是蓝图和示例:
import sys
import logging
from functools import wraps
import multiprocessing
import multiprocessing_logging
# Setup basic console logger as 'logger'
logger = logging.getLogger()
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter(u'%(asctime)s :: %(levelname)s :: %(message)s'))
logger.setLevel(logging.DEBUG)
logger.addHandler(console_handler)
# Create a decorator for functions that are called via multiprocessing pools
def logs_mp_process_names(fn):
class MultiProcessLogFilter(logging.Filter):
def filter(self, record):
try:
process_name = multiprocessing.current_process().name
except BaseException:
process_name = __name__
record.msg = f'{process_name} :: {record.msg}'
return True
multiprocessing_logging.install_mp_handler()
f = MultiProcessLogFilter()
# Wraps is needed here so apply / apply_async know the function name
@wraps(fn)
def wrapper(*args, **kwargs):
logger.removeFilter(f)
logger.addFilter(f)
return fn(*args, **kwargs)
return wrapper
# Create a test function and decorate it
@logs_mp_process_names
def test(argument):
logger.info(f'test function called via: {argument}')
# You can also redefine undecored functions
def undecorated_function():
logger.info('I am not decorated')
@logs_mp_process_names
def redecorated(*args, **kwargs):
return undecorated_function(*args, **kwargs)
# Enjoy
if __name__ == '__main__':
with multiprocessing.Pool() as mp_pool:
# Also works with apply_async
mp_pool.apply(test, ('mp pool',))
mp_pool.apply(redecorated)
logger.info('some main logs')
test('main program')