我有一个日志文件正在写的另一个进程,我想观察变化。每次发生更改时,我都希望将新数据读入并对其进行一些处理。

最好的方法是什么?我希望在PyWin32库中有某种钩子。我找到了win32文件。函数FindNextChangeNotification,但不知道如何要求它监视特定的文件。

如果有人做过类似的事情,我真的很感激能听到…

[编辑]我应该提到我追求的是一种不需要轮询的解决方案。

[编辑]诅咒!这似乎不能在映射的网络驱动器上工作。我猜windows不会像在本地磁盘上那样“听到”任何对文件的更新。


当前回答

这是对Tim Goldan脚本的另一种修改,它运行在unix类型上,并通过使用dict (file=>time)为文件修改添加了一个简单的监控器。

用法:whatvername .py path_to_dir_to_watch

#!/usr/bin/env python

import os, sys, time

def files_to_timestamp(path):
    files = [os.path.join(path, f) for f in os.listdir(path)]
    return dict ([(f, os.path.getmtime(f)) for f in files])

if __name__ == "__main__":

    path_to_watch = sys.argv[1]
    print('Watching {}..'.format(path_to_watch))

    before = files_to_timestamp(path_to_watch)

    while 1:
        time.sleep (2)
        after = files_to_timestamp(path_to_watch)

        added = [f for f in after.keys() if not f in before.keys()]
        removed = [f for f in before.keys() if not f in after.keys()]
        modified = []

        for f in before.keys():
            if not f in removed:
                if os.path.getmtime(f) != before.get(f):
                    modified.append(f)

        if added: print('Added: {}'.format(', '.join(added)))
        if removed: print('Removed: {}'.format(', '.join(removed)))
        if modified: print('Modified: {}'.format(', '.join(modified)))

        before = after

其他回答

因为我已经全局安装了它,所以我最喜欢的方法是使用nodemon。如果你的源代码在src中,你的入口点是src/app.py,那么就像这样简单:

nodemon -w 'src/**' -e py,html --exec python src/app.py

... 其中-e py,html允许您控制要监视更改的文件类型。

相关@4Oh4解决方案一个流畅的更改文件列表观看;

import os
import sys
import time

class Watcher(object):
    running = True
    refresh_delay_secs = 1

    # Constructor
    def __init__(self, watch_files, call_func_on_change=None, *args, **kwargs):
        self._cached_stamp = 0
        self._cached_stamp_files = {}
        self.filenames = watch_files
        self.call_func_on_change = call_func_on_change
        self.args = args
        self.kwargs = kwargs

    # Look for changes
    def look(self):
        for file in self.filenames:
            stamp = os.stat(file).st_mtime
            if not file in self._cached_stamp_files:
                self._cached_stamp_files[file] = 0
            if stamp != self._cached_stamp_files[file]:
                self._cached_stamp_files[file] = stamp
                # File has changed, so do something...
                file_to_read = open(file, 'r')
                value = file_to_read.read()
                print("value from file", value)
                file_to_read.seek(0)
                if self.call_func_on_change is not None:
                    self.call_func_on_change(*self.args, **self.kwargs)

    # Keep watching in a loop
    def watch(self):
        while self.running:
            try:
                # Look for changes
                time.sleep(self.refresh_delay_secs)
                self.look()
            except KeyboardInterrupt:
                print('\nDone')
                break
            except FileNotFoundError:
                # Action on file not found
                pass
            except Exception as e:
                print(e)
                print('Unhandled error: %s' % sys.exc_info()[0])

# Call this function each time a change happens
def custom_action(text):
    print(text)
    # pass

watch_files = ['/Users/mexekanez/my_file.txt', '/Users/mexekanez/my_file1.txt']

# watcher = Watcher(watch_file)  # simple



if __name__ == "__main__":
    watcher = Watcher(watch_files, custom_action, text='yes, changed')  # also call custom action function
    watcher.watch()  # start the watch going

在对蒂姆·戈尔登的剧本进行了一些修改后,我有了以下似乎工作得相当不错的内容:

import os

import win32file
import win32con

path_to_watch = "." # look at the current directory
file_to_watch = "test.txt" # look for changes to a file called test.txt

def ProcessNewData( newData ):
    print "Text added: %s"%newData

# Set up the bits we'll need for output
ACTIONS = {
  1 : "Created",
  2 : "Deleted",
  3 : "Updated",
  4 : "Renamed from something",
  5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001
hDir = win32file.CreateFile (
  path_to_watch,
  FILE_LIST_DIRECTORY,
  win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
  None,
  win32con.OPEN_EXISTING,
  win32con.FILE_FLAG_BACKUP_SEMANTICS,
  None
)

# Open the file we're interested in
a = open(file_to_watch, "r")

# Throw away any exising log data
a.read()

# Wait for new data and call ProcessNewData for each new chunk that's written
while 1:
  # Wait for a change to occur
  results = win32file.ReadDirectoryChangesW (
    hDir,
    1024,
    False,
    win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
    None,
    None
  )

  # For each change, check to see if it's updating the file we're interested in
  for action, file in results:
    full_filename = os.path.join (path_to_watch, file)
    #print file, ACTIONS.get (action, "Unknown")
    if file == file_to_watch:
        newText = a.read()
        if newText != "":
            ProcessNewData( newText )

它可能需要进行更多的错误检查,但对于简单地观察日志文件并在将其输出到屏幕之前对其进行一些处理,这样做效果很好。

谢谢大家的投入-很棒的东西!

对我来说,最简单的解决方法就是使用看门狗的工具watchmedo

从https://pypi.python.org/pypi/watchdog我现在有一个进程,在一个目录中查找sql文件,并在必要时执行它们。

watchmedo shell-command \
--patterns="*.sql" \
--recursive \
--command='~/Desktop/load_files_into_mysql_database.sh' \
.

为了用轮询和最小依赖来观察单个文件,下面是一个完整的例子,基于Deestan的回答(上面):

import os
import sys 
import time

class Watcher(object):
    running = True
    refresh_delay_secs = 1

    # Constructor
    def __init__(self, watch_file, call_func_on_change=None, *args, **kwargs):
        self._cached_stamp = 0
        self.filename = watch_file
        self.call_func_on_change = call_func_on_change
        self.args = args
        self.kwargs = kwargs

    # Look for changes
    def look(self):
        stamp = os.stat(self.filename).st_mtime
        if stamp != self._cached_stamp:
            self._cached_stamp = stamp
            # File has changed, so do something...
            print('File changed')
            if self.call_func_on_change is not None:
                self.call_func_on_change(*self.args, **self.kwargs)

    # Keep watching in a loop        
    def watch(self):
        while self.running: 
            try: 
                # Look for changes
                time.sleep(self.refresh_delay_secs) 
                self.look() 
            except KeyboardInterrupt: 
                print('\nDone') 
                break 
            except FileNotFoundError:
                # Action on file not found
                pass
            except: 
                print('Unhandled error: %s' % sys.exc_info()[0])

# Call this function each time a change happens
def custom_action(text):
    print(text)

watch_file = 'my_file.txt'

# watcher = Watcher(watch_file)  # simple
watcher = Watcher(watch_file, custom_action, text='yes, changed')  # also call custom action function
watcher.watch()  # start the watch going