现在我在框架中有一个中心模块,它使用Python 2.6 multiprocessing模块生成多个进程。因为它使用多处理,所以有一个模块级的多处理感知日志,log = multiprocessing.get_logger()。根据文档,这个日志记录器(EDIT)没有进程共享锁,所以你不会在sys. exe中弄乱东西。Stderr(或任何文件句柄),让多个进程同时写入它。

我现在遇到的问题是框架中的其他模块不支持多处理。在我看来,我需要让这个中心模块上的所有依赖都使用多处理感知日志。这在框架内很烦人,更不用说对框架的所有客户端了。还有我想不到的选择吗?


当前回答

其中一个替代方案是将多处理日志写入一个已知文件,并注册一个atexit处理程序来加入这些进程,并在stderr上读取它;但是,您无法通过这种方式获得stderr上输出消息的实时流。

其他回答

下面是一个可以在Windows环境下使用的类,需要ActivePython。 您还可以继承其他日志处理程序(StreamHandler等)。

class SyncronizedFileHandler(logging.FileHandler):
    MUTEX_NAME = 'logging_mutex'

    def __init__(self , *args , **kwargs):

        self.mutex = win32event.CreateMutex(None , False , self.MUTEX_NAME)
        return super(SyncronizedFileHandler , self ).__init__(*args , **kwargs)

    def emit(self, *args , **kwargs):
        try:
            win32event.WaitForSingleObject(self.mutex , win32event.INFINITE)
            ret = super(SyncronizedFileHandler , self ).emit(*args , **kwargs)
        finally:
            win32event.ReleaseMutex(self.mutex)
        return ret

下面是一个演示用法的例子:

import logging
import random , time , os , sys , datetime
from string import letters
import win32api , win32event
from multiprocessing import Pool

def f(i):
    time.sleep(random.randint(0,10) * 0.1)
    ch = random.choice(letters)
    logging.info( ch * 30)


def init_logging():
    '''
    initilize the loggers
    '''
    formatter = logging.Formatter("%(levelname)s - %(process)d - %(asctime)s - %(filename)s - %(lineno)d - %(message)s")
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    file_handler = SyncronizedFileHandler(sys.argv[1])
    file_handler.setLevel(logging.INFO)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)

#must be called in the parent and in every worker process
init_logging() 

if __name__ == '__main__':
    #multiprocessing stuff
    pool = Pool(processes=10)
    imap_result = pool.imap(f , range(30))
    for i , _ in enumerate(imap_result):
        pass

下面是我简单的破解/变通方法…不是最全面的,但很容易修改,比我在写这篇文章之前找到的任何其他答案都更容易阅读和理解:

import logging
import multiprocessing

class FakeLogger(object):
    def __init__(self, q):
        self.q = q
    def info(self, item):
        self.q.put('INFO - {}'.format(item))
    def debug(self, item):
        self.q.put('DEBUG - {}'.format(item))
    def critical(self, item):
        self.q.put('CRITICAL - {}'.format(item))
    def warning(self, item):
        self.q.put('WARNING - {}'.format(item))

def some_other_func_that_gets_logger_and_logs(num):
    # notice the name get's discarded
    # of course you can easily add this to your FakeLogger class
    local_logger = logging.getLogger('local')
    local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
    local_logger.debug('hmm, something may need debugging here')
    return num*2

def func_to_parallelize(data_chunk):
    # unpack our args
    the_num, logger_q = data_chunk
    # since we're now in a new process, let's monkeypatch the logging module
    logging.getLogger = lambda name=None: FakeLogger(logger_q)
    # now do the actual work that happens to log stuff too
    new_num = some_other_func_that_gets_logger_and_logs(the_num)
    return (the_num, new_num)

if __name__ == '__main__':
    multiprocessing.freeze_support()
    m = multiprocessing.Manager()
    logger_q = m.Queue()
    # we have to pass our data to be parallel-processed
    # we also need to pass the Queue object so we can retrieve the logs
    parallelable_data = [(1, logger_q), (2, logger_q)]
    # set up a pool of processes so we can take advantage of multiple CPU cores
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
    worker_output = pool.map(func_to_parallelize, parallelable_data)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks
    # get the contents of our FakeLogger object
    while not logger_q.empty():
        print logger_q.get()
    print 'worker output contained: {}'.format(worker_output)

由于我们可以将多进程日志记录表示为多个发布者和一个订阅者(侦听器),因此使用ZeroMQ实现PUB-SUB消息传递确实是一种选择。

此外,PyZMQ模块(ZMQ的Python绑定)实现了PUBHandler,这是通过ZMQ发布日志消息的对象。酒吧的套接字。

在web上有一个解决方案,使用PyZMQ和PUBHandler从分布式应用程序集中记录日志,可以很容易地在本地使用多个发布进程。

formatters = {
    logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
    logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
    logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
    logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
    logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}

# This one will be used by publishing processes
class PUBLogger:
    def __init__(self, host, port=config.PUBSUB_LOGGER_PORT):
        self._logger = logging.getLogger(__name__)
        self._logger.setLevel(logging.DEBUG)
        self.ctx = zmq.Context()
        self.pub = self.ctx.socket(zmq.PUB)
        self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port))
        self._handler = PUBHandler(self.pub)
        self._handler.formatters = formatters
        self._logger.addHandler(self._handler)

    @property
    def logger(self):
        return self._logger

# This one will be used by listener process
class SUBLogger:
    def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT):
        self.output_dir = output_dir
        self._logger = logging.getLogger()
        self._logger.setLevel(logging.DEBUG)

        self.ctx = zmq.Context()
        self._sub = self.ctx.socket(zmq.SUB)
        self._sub.bind('tcp://*:{1}'.format(ip, port))
        self._sub.setsockopt(zmq.SUBSCRIBE, "")

        handler = handlers.RotatingFileHandler(os.path.join(output_dir, "client_debug.log"), "w", 100 * 1024 * 1024, 10)
        handler.setLevel(logging.DEBUG)
        formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s")
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)

  @property
  def sub(self):
      return self._sub

  @property
  def logger(self):
      return self._logger

#  And that's the way we actually run things:

# Listener process will forever listen on SUB socket for incoming messages
def run_sub_logger(ip, event):
    sub_logger = SUBLogger(ip)
    while not event.is_set():
        try:
            topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK)
            log_msg = getattr(logging, topic.lower())
            log_msg(message)
        except zmq.ZMQError as zmq_error:
            if zmq_error.errno == zmq.EAGAIN:
                pass


# Publisher processes loggers should be initialized as follows:

class Publisher:
    def __init__(self, stop_event, proc_id):
        self.stop_event = stop_event
        self.proc_id = proc_id
        self._logger = pub_logger.PUBLogger('127.0.0.1').logger

     def run(self):
         self._logger.info("{0} - Sending message".format(proc_id))

def run_worker(event, proc_id):
    worker = Publisher(event, proc_id)
    worker.run()

# Starting subscriber process so we won't loose publisher's messages
sub_logger_process = Process(target=run_sub_logger,
                                 args=('127.0.0.1'), stop_event,))
sub_logger_process.start()

#Starting publisher processes
for i in range(MAX_WORKERS_PER_CLIENT):
    processes.append(Process(target=run_worker,
                                 args=(stop_event, i,)))
for p in processes:
    p.start()

然而,另一种选择可能是日志包中各种非基于文件的日志处理程序:

套接字处理程序 数据报处理程序 系统日志处理程序

(和其他人)

通过这种方式,您可以轻松地在某个地方创建一个日志守护进程,以便安全地对其进行写入并正确地处理结果。(例如,一个简单的套接字服务器,它只是解pickle消息并将其发送到自己的旋转文件处理程序。)

SyslogHandler也会为您处理这个问题。当然,您可以使用自己的syslog实例,而不是系统实例。

有一个很棒的套餐

包: https://pypi.python.org/pypi/multiprocessing-logging/

代码: https://github.com/jruere/multiprocessing-logging

安装:

pip install multiprocessing-logging

然后添加:

import multiprocessing_logging

# This enables logs inside process
multiprocessing_logging.install_mp_handler()