现在我在框架中有一个中心模块,它使用Python 2.6 multiprocessing模块生成多个进程。因为它使用多处理,所以有一个模块级的多处理感知日志,log = multiprocessing.get_logger()。根据文档,这个日志记录器(EDIT)没有进程共享锁,所以你不会在sys. exe中弄乱东西。Stderr(或任何文件句柄),让多个进程同时写入它。

我现在遇到的问题是框架中的其他模块不支持多处理。在我看来,我需要让这个中心模块上的所有依赖都使用多处理感知日志。这在框架内很烦人,更不用说对框架的所有客户端了。还有我想不到的选择吗?


当前回答

concurrent-log-handler似乎完美地完成了这项工作。在Windows上测试。还支持POSIX系统。

主要思想

使用返回记录器的函数创建一个单独的文件。记录器必须为每个进程拥有ConcurrentRotatingFileHandler的新实例。示例函数get_logger()如下所示。 创建记录器是在流程初始化时完成的。对于多处理。进程的子类,它将意味着run()方法的开始。

详细说明

在这个例子中,我将使用下面的文件结构

.
│-- child.py        <-- For a child process
│-- logs.py         <-- For setting up the logs for the app
│-- main.py         <-- For a main process
│-- myapp.py        <-- For starting the app
│-- somemodule.py   <-- For an example, a "3rd party module using standard logging"

Code

子进程

# child.py 

import multiprocessing as mp
import time
from somemodule import do_something


class ChildProcess(mp.Process):
    def __init__(self):
        self.logger = None
        super().__init__()

    def run(self):
        from logs import get_logger
        self.logger = get_logger()


        while True:
            time.sleep(1)
            self.logger.info("Child process")
            do_something()

Simple child process that inherits multiprocessing.Process and simply logs to file text "Child process" Important: The get_logger() is called inside the run(), or elsewhere inside the child process (not module level or in __init__().) This is required as get_logger() creates ConcurrentRotatingFileHandler instance, and new instance is needed for each process. The do_something is used just to demonstrate that this works with 3rd party library code which does not have any clue that you are using concurrent-log-handler.

主要过程

# main.py

import logging
import multiprocessing as mp
import time

from child import ChildProcess
from somemodule import do_something


class MainProcess(mp.Process):
    def __init__(self):
        self.logger = logging.getLogger()
        super().__init__()

    def run(self):
        from logs import get_logger

        self.logger = get_logger()
        self.child = ChildProcess()
        self.child.daemon = True
        self.child.start()

        while True:
            time.sleep(0.5)
            self.logger.critical("Main process")
            do_something()


主进程,在第二个“主进程”中两次登录到文件。同样继承自multiprocessing.Process。 get_logger()和do_something()的注释与子进程相同。

日志设置

# logs.py

import logging
import os

from concurrent_log_handler import ConcurrentRotatingFileHandler

LOGLEVEL = logging.DEBUG


def get_logger():
    logger = logging.getLogger()

    if logger.handlers:
        return logger

    # Use an absolute path to prevent file rotation trouble.
    logfile = os.path.abspath("mylog.log")

    logger.setLevel(LOGLEVEL)

    # Rotate log after reaching 512K, keep 5 old copies.
    filehandler = ConcurrentRotatingFileHandler(
        logfile, mode="a", maxBytes=512 * 1024, backupCount=5, encoding="utf-8"
    )
    filehandler.setLevel(LOGLEVEL)

    # create also handler for displaying output in the stdout
    ch = logging.StreamHandler()
    ch.setLevel(LOGLEVEL)

    formatter = logging.Formatter(
        "%(asctime)s - %(module)s - %(levelname)s - %(message)s [Process: %(process)d, %(filename)s:%(funcName)s(%(lineno)d)]"
    )

    # add formatter to ch
    ch.setFormatter(formatter)
    filehandler.setFormatter(formatter)

    logger.addHandler(ch)
    logger.addHandler(filehandler)

    return logger

这使用了concurrent-log-handler包中的ConcurrentRotatingFileHandler。每个进程都需要一个新的ConcurrentRotatingFileHandler实例。 注意,ConcurrentRotatingFileHandler的所有参数在每个进程中都应该是相同的。

示例应用程序

# myapp.py 

if __name__ == "__main__":
    from main import MainProcess

    p = MainProcess()
    p.start()

这只是一个关于如何启动多进程应用程序的简单示例

第三方模块使用标准日志记录的例子

# somemodule.py 

import logging

logger = logging.getLogger("somemodule")

def do_something():
    logging.info("doing something")

只是一个简单的例子来测试来自第三方代码的记录器是否正常工作。

示例输出

2021-04-19 19:02:29,425 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:29,427 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:29,929 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:29,931 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:30,133 - child - INFO - Child process [Process: 76700, child.py:run(18)]
2021-04-19 19:02:30,137 - somemodule - INFO - doing something [Process: 76700, somemodule.py:do_something(7)]
2021-04-19 19:02:30,436 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:30,439 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:30,944 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:30,946 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:31,142 - child - INFO - Child process [Process: 76700, child.py:run(18)]
2021-04-19 19:02:31,145 - somemodule - INFO - doing something [Process: 76700, somemodule.py:do_something(7)]
2021-04-19 19:02:31,449 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:31,451 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]

其他回答

对于可能需要这个的人,我为multiprocessing_logging包写了一个装饰器,它将当前进程名添加到日志中,这样就可以清楚地看到谁记录了什么。

它还运行install_mp_handler(),因此在创建池之前运行它是没有用的。

这让我可以看到哪个工作人员创建了哪些日志消息。

下面是蓝图和示例:

import sys
import logging
from functools import wraps
import multiprocessing
import multiprocessing_logging

# Setup basic console logger as 'logger'
logger = logging.getLogger()
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter(u'%(asctime)s :: %(levelname)s :: %(message)s'))
logger.setLevel(logging.DEBUG)
logger.addHandler(console_handler)


# Create a decorator for functions that are called via multiprocessing pools
def logs_mp_process_names(fn):
    class MultiProcessLogFilter(logging.Filter):
        def filter(self, record):
            try:
                process_name = multiprocessing.current_process().name
            except BaseException:
                process_name = __name__
            record.msg = f'{process_name} :: {record.msg}'
            return True

    multiprocessing_logging.install_mp_handler()
    f = MultiProcessLogFilter()

    # Wraps is needed here so apply / apply_async know the function name
    @wraps(fn)
    def wrapper(*args, **kwargs):
        logger.removeFilter(f)
        logger.addFilter(f)
        return fn(*args, **kwargs)

    return wrapper


# Create a test function and decorate it
@logs_mp_process_names
def test(argument):
    logger.info(f'test function called via: {argument}')


# You can also redefine undecored functions
def undecorated_function():
    logger.info('I am not decorated')


@logs_mp_process_names
def redecorated(*args, **kwargs):
    return undecorated_function(*args, **kwargs)


# Enjoy
if __name__ == '__main__':
    with multiprocessing.Pool() as mp_pool:
        # Also works with apply_async
        mp_pool.apply(test, ('mp pool',))
        mp_pool.apply(redecorated)
        logger.info('some main logs')
        test('main program')

下面是我简单的破解/变通方法…不是最全面的,但很容易修改,比我在写这篇文章之前找到的任何其他答案都更容易阅读和理解:

import logging
import multiprocessing

class FakeLogger(object):
    def __init__(self, q):
        self.q = q
    def info(self, item):
        self.q.put('INFO - {}'.format(item))
    def debug(self, item):
        self.q.put('DEBUG - {}'.format(item))
    def critical(self, item):
        self.q.put('CRITICAL - {}'.format(item))
    def warning(self, item):
        self.q.put('WARNING - {}'.format(item))

def some_other_func_that_gets_logger_and_logs(num):
    # notice the name get's discarded
    # of course you can easily add this to your FakeLogger class
    local_logger = logging.getLogger('local')
    local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
    local_logger.debug('hmm, something may need debugging here')
    return num*2

def func_to_parallelize(data_chunk):
    # unpack our args
    the_num, logger_q = data_chunk
    # since we're now in a new process, let's monkeypatch the logging module
    logging.getLogger = lambda name=None: FakeLogger(logger_q)
    # now do the actual work that happens to log stuff too
    new_num = some_other_func_that_gets_logger_and_logs(the_num)
    return (the_num, new_num)

if __name__ == '__main__':
    multiprocessing.freeze_support()
    m = multiprocessing.Manager()
    logger_q = m.Queue()
    # we have to pass our data to be parallel-processed
    # we also need to pass the Queue object so we can retrieve the logs
    parallelable_data = [(1, logger_q), (2, logger_q)]
    # set up a pool of processes so we can take advantage of multiple CPU cores
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
    worker_output = pool.map(func_to_parallelize, parallelable_data)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks
    # get the contents of our FakeLogger object
    while not logger_q.empty():
        print logger_q.get()
    print 'worker output contained: {}'.format(worker_output)

到2020年,似乎有一种更简单的多处理日志记录方式。

这个函数将创建记录器。你可以在这里设置格式和你想要输出的位置(文件,stdout):

def create_logger():
    import multiprocessing, logging
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    formatter = logging.Formatter(\
        '[%(asctime)s| %(levelname)s| %(processName)s] %(message)s')
    handler = logging.FileHandler('logs/your_file_name.log')
    handler.setFormatter(formatter)

    # this bit will make sure you won't have 
    # duplicated messages in the output
    if not len(logger.handlers): 
        logger.addHandler(handler)
    return logger

在init中实例化记录器:

if __name__ == '__main__': 
    from multiprocessing import Pool
    logger = create_logger()
    logger.info('Starting pooling')
    p = Pool()
    # rest of the code

现在,你只需要在每个需要记录日志的函数中添加这个引用:

logger = create_logger()

并输出消息:

logger.info(f'My message from {something}')

希望这能有所帮助。

我也喜欢zzzeek的回答,但Andre是正确的,需要一个队列来防止乱码。我的运气还不错,但确实看到了乱码,这是意料之中的。实现它比我想象的要难,特别是在Windows上运行,在Windows上有一些关于全局变量和其他东西的额外限制(参见:如何在Windows上实现Python Multiprocessing ?)

但是,我终于让它工作了。这个例子可能并不完美,所以欢迎评论和建议。它也不支持设置格式化程序或根日志记录器以外的任何内容。基本上,您必须在每个池进程中用队列重新配置记录器,并在记录器上设置其他属性。

同样,欢迎提出任何关于如何使代码更好的建议。我当然还不知道所有的Python技巧:-)

import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue

class MultiProcessingLogHandler(logging.Handler):
    def __init__(self, handler, queue, child=False):
        logging.Handler.__init__(self)

        self._handler = handler
        self.queue = queue

        # we only want one of the loggers to be pulling from the queue.
        # If there is a way to do this without needing to be passed this
        # information, that would be great!
        if child == False:
            self.shutdown = False
            self.polltime = 1
            t = threading.Thread(target=self.receive)
            t.daemon = True
            t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        #print "receive on"
        while (self.shutdown == False) or (self.queue.empty() == False):
            # so we block for a short period of time so that we can
            # check for the shutdown cases.
            try:
                record = self.queue.get(True, self.polltime)
                self._handler.emit(record)
            except Queue.Empty, e:
                pass

    def send(self, s):
        # send just puts it in the queue for the server to retrieve
        self.queue.put(s)

    def _format_record(self, record):
        ei = record.exc_info
        if ei:
            dummy = self.format(record) # just to get traceback text into record.exc_text
            record.exc_info = None  # to avoid Unpickleable error

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        time.sleep(self.polltime+1) # give some time for messages to enter the queue.
        self.shutdown = True
        time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown

    def __del__(self):
        self.close() # hopefully this aids in orderly shutdown when things are going poorly.

def f(x):
    # just a logging command...
    logging.critical('function number: ' + str(x))
    # to make some calls take longer than others, so the output is "jumbled" as real MP programs are.
    time.sleep(x % 3)

def initPool(queue, level):
    """
    This causes the logging module to be initialized with the necessary info
    in pool threads to work correctly.
    """
    logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True))
    logging.getLogger('').setLevel(level)

if __name__ == '__main__':
    stream = StringIO.StringIO()
    logQueue = multiprocessing.Queue(100)
    handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue)
    logging.getLogger('').addHandler(handler)
    logging.getLogger('').setLevel(logging.DEBUG)

    logging.debug('starting main')

    # when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging.
    pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes
    pool.map(f, range(0,50))
    pool.close()

    logging.debug('done')
    logging.shutdown()
    print "stream output is:"
    print stream.getvalue()

然而,另一种选择可能是日志包中各种非基于文件的日志处理程序:

套接字处理程序 数据报处理程序 系统日志处理程序

(和其他人)

通过这种方式,您可以轻松地在某个地方创建一个日志守护进程,以便安全地对其进行写入并正确地处理结果。(例如,一个简单的套接字服务器,它只是解pickle消息并将其发送到自己的旋转文件处理程序。)

SyslogHandler也会为您处理这个问题。当然,您可以使用自己的syslog实例,而不是系统实例。