现在我在框架中有一个中心模块,它使用Python 2.6 multiprocessing模块生成多个进程。因为它使用多处理,所以有一个模块级的多处理感知日志,log = multiprocessing.get_logger()。根据文档,这个日志记录器(EDIT)没有进程共享锁,所以你不会在sys. exe中弄乱东西。Stderr(或任何文件句柄),让多个进程同时写入它。

我现在遇到的问题是框架中的其他模块不支持多处理。在我看来,我需要让这个中心模块上的所有依赖都使用多处理感知日志。这在框架内很烦人,更不用说对框架的所有客户端了。还有我想不到的选择吗?


当前回答

concurrent-log-handler似乎完美地完成了这项工作。在Windows上测试。还支持POSIX系统。

主要思想

使用返回记录器的函数创建一个单独的文件。记录器必须为每个进程拥有ConcurrentRotatingFileHandler的新实例。示例函数get_logger()如下所示。 创建记录器是在流程初始化时完成的。对于多处理。进程的子类,它将意味着run()方法的开始。

详细说明

在这个例子中,我将使用下面的文件结构

.
│-- child.py        <-- For a child process
│-- logs.py         <-- For setting up the logs for the app
│-- main.py         <-- For a main process
│-- myapp.py        <-- For starting the app
│-- somemodule.py   <-- For an example, a "3rd party module using standard logging"

Code

子进程

# child.py 

import multiprocessing as mp
import time
from somemodule import do_something


class ChildProcess(mp.Process):
    def __init__(self):
        self.logger = None
        super().__init__()

    def run(self):
        from logs import get_logger
        self.logger = get_logger()


        while True:
            time.sleep(1)
            self.logger.info("Child process")
            do_something()

Simple child process that inherits multiprocessing.Process and simply logs to file text "Child process" Important: The get_logger() is called inside the run(), or elsewhere inside the child process (not module level or in __init__().) This is required as get_logger() creates ConcurrentRotatingFileHandler instance, and new instance is needed for each process. The do_something is used just to demonstrate that this works with 3rd party library code which does not have any clue that you are using concurrent-log-handler.

主要过程

# main.py

import logging
import multiprocessing as mp
import time

from child import ChildProcess
from somemodule import do_something


class MainProcess(mp.Process):
    def __init__(self):
        self.logger = logging.getLogger()
        super().__init__()

    def run(self):
        from logs import get_logger

        self.logger = get_logger()
        self.child = ChildProcess()
        self.child.daemon = True
        self.child.start()

        while True:
            time.sleep(0.5)
            self.logger.critical("Main process")
            do_something()


主进程,在第二个“主进程”中两次登录到文件。同样继承自multiprocessing.Process。 get_logger()和do_something()的注释与子进程相同。

日志设置

# logs.py

import logging
import os

from concurrent_log_handler import ConcurrentRotatingFileHandler

LOGLEVEL = logging.DEBUG


def get_logger():
    logger = logging.getLogger()

    if logger.handlers:
        return logger

    # Use an absolute path to prevent file rotation trouble.
    logfile = os.path.abspath("mylog.log")

    logger.setLevel(LOGLEVEL)

    # Rotate log after reaching 512K, keep 5 old copies.
    filehandler = ConcurrentRotatingFileHandler(
        logfile, mode="a", maxBytes=512 * 1024, backupCount=5, encoding="utf-8"
    )
    filehandler.setLevel(LOGLEVEL)

    # create also handler for displaying output in the stdout
    ch = logging.StreamHandler()
    ch.setLevel(LOGLEVEL)

    formatter = logging.Formatter(
        "%(asctime)s - %(module)s - %(levelname)s - %(message)s [Process: %(process)d, %(filename)s:%(funcName)s(%(lineno)d)]"
    )

    # add formatter to ch
    ch.setFormatter(formatter)
    filehandler.setFormatter(formatter)

    logger.addHandler(ch)
    logger.addHandler(filehandler)

    return logger

这使用了concurrent-log-handler包中的ConcurrentRotatingFileHandler。每个进程都需要一个新的ConcurrentRotatingFileHandler实例。 注意,ConcurrentRotatingFileHandler的所有参数在每个进程中都应该是相同的。

示例应用程序

# myapp.py 

if __name__ == "__main__":
    from main import MainProcess

    p = MainProcess()
    p.start()

这只是一个关于如何启动多进程应用程序的简单示例

第三方模块使用标准日志记录的例子

# somemodule.py 

import logging

logger = logging.getLogger("somemodule")

def do_something():
    logging.info("doing something")

只是一个简单的例子来测试来自第三方代码的记录器是否正常工作。

示例输出

2021-04-19 19:02:29,425 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:29,427 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:29,929 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:29,931 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:30,133 - child - INFO - Child process [Process: 76700, child.py:run(18)]
2021-04-19 19:02:30,137 - somemodule - INFO - doing something [Process: 76700, somemodule.py:do_something(7)]
2021-04-19 19:02:30,436 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:30,439 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:30,944 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:30,946 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:31,142 - child - INFO - Child process [Process: 76700, child.py:run(18)]
2021-04-19 19:02:31,145 - somemodule - INFO - doing something [Process: 76700, somemodule.py:do_something(7)]
2021-04-19 19:02:31,449 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:31,451 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]

其他回答

然而,另一种选择可能是日志包中各种非基于文件的日志处理程序:

套接字处理程序 数据报处理程序 系统日志处理程序

(和其他人)

通过这种方式,您可以轻松地在某个地方创建一个日志守护进程,以便安全地对其进行写入并正确地处理结果。(例如,一个简单的套接字服务器,它只是解pickle消息并将其发送到自己的旋转文件处理程序。)

SyslogHandler也会为您处理这个问题。当然,您可以使用自己的syslog实例,而不是系统实例。

我有一个解决方案,类似于ironhacker的,除了我使用日志。在我的一些代码中,我发现我需要在将异常传递回队列之前格式化它,因为回溯是不能pickle的:

class QueueHandler(logging.Handler):
    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue
    def emit(self, record):
        if record.exc_info:
            # can't pass exc_info across processes so just format now
            record.exc_text = self.formatException(record.exc_info)
            record.exc_info = None
        self.queue.put(record)
    def formatException(self, ei):
        sio = cStringIO.StringIO()
        traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
        s = sio.getvalue()
        sio.close()
        if s[-1] == "\n":
            s = s[:-1]
        return s

如何将所有日志记录委托给另一个进程,从队列中读取所有日志条目?

LOG_QUEUE = multiprocessing.JoinableQueue()

class CentralLogger(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        self.log = logger.getLogger('some_config')
        self.log.info("Started Central Logging process")

    def run(self):
        while True:
            log_level, message = self.queue.get()
            if log_level is None:
                self.log.info("Shutting down Central Logging process")
                break
            else:
                self.log.log(log_level, message)

central_logger_process = CentralLogger(LOG_QUEUE)
central_logger_process.start()

只需通过任何多进程机制甚至继承共享LOG_QUEUE,就可以很好地工作!

解决这个问题的唯一方法是非侵入性的:

Spawn each worker process such that its log goes to a different file descriptor (to disk or to pipe.) Ideally, all log entries should be timestamped. Your controller process can then do one of the following: If using disk files: Coalesce the log files at the end of the run, sorted by timestamp If using pipes (recommended): Coalesce log entries on-the-fly from all pipes, into a central log file. (E.g., Periodically select from the pipes' file descriptors, perform merge-sort on the available log entries, and flush to centralized log. Repeat.)

其他线程的变体,它将日志记录和队列线程分开。

"""sample code for logging in subprocesses using multiprocessing

* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
  process.
* As in the other implementations, a thread reads the queue and calls the
  handlers. Except in this implementation, the thread is defined outside of a
  handler, which makes the logger definitions simpler.
* Works with multiple handlers.  If the logger in the main process defines
  multiple handlers, they will all be fed records generated by the
  subprocesses loggers.

tested with Python 2.5 and 2.6 on Linux and Windows

"""

import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys

DEFAULT_LEVEL = logging.DEBUG

formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")

class SubProcessLogHandler(logging.Handler):
    """handler used by subprocesses

    It simply puts items on a Queue for the main process to log.

    """

    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue

    def emit(self, record):
        self.queue.put(record)

class LogQueueReader(threading.Thread):
    """thread to write subprocesses log records to main process log

    This thread reads the records written by subprocesses and writes them to
    the handlers defined in the main process's handlers.

    """

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.daemon = True

    def run(self):
        """read from the queue and write to the log handlers

        The logging documentation says logging is thread safe, so there
        shouldn't be contention between normal logging (from the main
        process) and this thread.

        Note that we're using the name of the original logger.

        """
        # Thanks Mike for the error checking code.
        while True:
            try:
                record = self.queue.get()
                # get the logger for this record
                logger = logging.getLogger(record.name)
                logger.callHandlers(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

class LoggingProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def _setupLogger(self):
        # create the logger to use.
        logger = logging.getLogger('test.subprocess')
        # The only handler desired is the SubProcessLogHandler.  If any others
        # exist, remove them. In this case, on Unix and Linux the StreamHandler
        # will be inherited.

        for handler in logger.handlers:
            # just a check for my sanity
            assert not isinstance(handler, SubProcessLogHandler)
            logger.removeHandler(handler)
        # add the handler
        handler = SubProcessLogHandler(self.queue)
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # On Windows, the level will not be inherited.  Also, we could just
        # set the level to log everything here and filter it in the main
        # process handlers.  For now, just set it from the global default.
        logger.setLevel(DEFAULT_LEVEL)
        self.logger = logger

    def run(self):
        self._setupLogger()
        logger = self.logger
        # and here goes the logging
        p = multiprocessing.current_process()
        logger.info('hello from process %s with pid %s' % (p.name, p.pid))


if __name__ == '__main__':
    # queue used by the subprocess loggers
    queue = multiprocessing.Queue()
    # Just a normal logger
    logger = logging.getLogger('test')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(DEFAULT_LEVEL)
    logger.info('hello from the main process')
    # This thread will read from the subprocesses and write to the main log's
    # handlers.
    log_queue_reader = LogQueueReader(queue)
    log_queue_reader.start()
    # create the processes.
    for i in range(10):
        p = LoggingProcess(queue)
        p.start()
    # The way I read the multiprocessing warning about Queue, joining a
    # process before it has finished feeding the Queue can cause a deadlock.
    # Also, Queue.empty() is not realiable, so just make sure all processes
    # are finished.
    # active_children joins subprocesses when they're finished.
    while multiprocessing.active_children():
        time.sleep(.1)