我需要锁定一个文件写在Python。它将被多个Python进程同时访问。我在网上找到了一些解决方案,但大多数都无法达到我的目的,因为它们通常只是基于Unix或Windows的。


锁定文件通常是特定于平台的操作,因此您可能需要考虑在不同的操作系统上运行的可能性。例如:

import os

def my_lock(f):
    if os.name == "posix":
        # Unix or OS X specific locking here
    elif os.name == "nt":
        # Windows specific locking here
    else:
        print "Unknown operating system, lock unavailable"

在操作系统级别协调对单个文件的访问充满了您可能不想解决的各种问题。

最好的办法是有一个单独的进程来协调对该文件的读写访问。


这里有一个跨平台的文件锁定模块:Portalocker

尽管正如Kevin所说,从多个进程同时写入文件是您希望尽可能避免的事情。

如果可以将问题硬塞到数据库中,则可以使用SQLite。它支持并发访问并处理自己的锁定。


锁定是平台和设备特定的,但一般来说,你有几个选择:

Use flock(), or equivalent (if your os supports it). This is advisory locking, unless you check for the lock, it's ignored. Use a lock-copy-move-unlock methodology, where you copy the file, write the new data, then move it (move, not copy - move is an atomic operation in Linux -- check your OS), and you check for the existence of the lock file. Use a directory as a "lock". This is necessary if you're writing to NFS, since NFS doesn't support flock(). There's also the possibility of using shared memory between the processes, but I've never tried that; it's very OS-specific.

对于所有这些方法,您必须使用自旋锁(失败后重试)技术来获取和测试锁。这确实为错误同步留下了一个小窗口,但它通常小到不会成为一个大问题。

如果您正在寻找跨平台的解决方案,那么您最好通过其他机制将日志记录到另一个系统(其次是上面的NFS技术)。

请注意,sqlite在NFS上受到与普通文件相同的约束,因此您不能在网络共享上写入sqlite数据库并免费获得同步。


好吧,所以我最终用我在这里写的代码,在我的网站上链接是死的,在archive.org上查看(也可在GitHub上)。我可以以以下方式使用它:

from filelock import FileLock

with FileLock("myfile.txt"):
    # work with the file as it is now locked
    print("Lock acquired.")

我更喜欢lockfile——平台独立的文件锁定


我从grizzed -python中找到了一个简单而有效的(!)实现。

简单使用os.open(…, O_EXCL) + os.close()在windows上不起作用。


我曾经处理过这样的情况,我在同一个目录/文件夹中运行同一个程序的多个副本,并记录了错误。我的方法是在打开日志文件之前向磁盘写入一个“锁定文件”。程序在继续之前检查是否存在“锁文件”,如果“锁文件”存在,则等待轮到它。

代码如下:

def errlogger(error):

    while True:
        if not exists('errloglock'):
            lock = open('errloglock', 'w')
            if exists('errorlog'): log = open('errorlog', 'a')
            else: log = open('errorlog', 'w')
            log.write(str(datetime.utcnow())[0:-7] + ' ' + error + '\n')
            log.close()
            remove('errloglock')
            return
        else:
            check = stat('errloglock')
            if time() - check.st_ctime > 0.01: remove('errloglock')
            print('waiting my turn')

编辑—— 在考虑了上面关于过期锁的一些评论之后,我编辑了代码,添加了一个检查“锁文件”是否过期的检查。在我的系统上计算这个函数的几千次迭代,得到的平均值为0.002066…几秒钟前:

lock = open('errloglock', 'w')

到之后:

remove('errloglock')

所以我想我将从5倍的量开始,以表示过时并监控问题的情况。

此外,当我在处理计时时,我意识到我有一些并不真正必要的代码:

lock.close()

这是我在公开声明之后立即得到的,所以我在这次编辑中删除了它。


我一直在寻找几种解决方案,我的选择是 oslo.concurrency

它功能强大,文档也相对完善。它是基于紧固件的。

其他的解决方案:

Portalocker:需要pywin32,这是一个exe安装,所以不可能通过pip 紧固件:缺乏记录 lockfile:弃用 flufl。lock:用于POSIX系统的nfs安全文件锁定。 simpleflock:上次更新2013-07 佐。lockfile:上一次更新2016-06(截至2017-03) lock_file: 2007-10年最后一次更新


你会发现pylocker很有用。它可以用于锁定文件或一般的锁定机制,并且可以从多个Python进程一次访问。

如果你只是想锁定一个文件,下面是它的工作原理:

import uuid
from pylocker import Locker

#  create a unique lock pass. This can be any string.
lpass = str(uuid.uuid1())

# create locker instance.
FL = Locker(filePath='myfile.txt', lockPass=lpass, mode='w')

# aquire the lock
with FL as r:
    # get the result
    acquired, code, fd  = r

    # check if aquired.
    if fd is not None:
        print fd
        fd.write("I have succesfuly aquired the lock !")

# no need to release anything or to close the file descriptor, 
# with statement takes care of that. let's print fd and verify that.
print fd

场景是这样的: 用户请求一个文件执行某些操作。然后,如果用户再次发送相同的请求,它会通知用户在第一个请求完成之前不会完成第二个请求。这就是为什么我使用锁定机制来处理这个问题。

下面是我的工作代码:

from lockfile import LockFile
lock = LockFile(lock_file_path)
status = ""
if not lock.is_locked():
    lock.acquire()
    status = lock.path + ' is locked.'
    print status
else:
    status = lock.path + " is already locked."
    print status

return status

其他解决方案引用了大量的外部代码库。如果您更愿意自己动手,这里有一些跨平台解决方案的代码,在Linux / DOS系统上使用各自的文件锁定工具。

try:
    # Posix based file locking (Linux, Ubuntu, MacOS, etc.)
    #   Only allows locking on writable files, might cause
    #   strange results for reading.
    import fcntl, os
    def lock_file(f):
        if f.writable(): fcntl.lockf(f, fcntl.LOCK_EX)
    def unlock_file(f):
        if f.writable(): fcntl.lockf(f, fcntl.LOCK_UN)
except ModuleNotFoundError:
    # Windows file locking
    import msvcrt, os
    def file_size(f):
        return os.path.getsize( os.path.realpath(f.name) )
    def lock_file(f):
        msvcrt.locking(f.fileno(), msvcrt.LK_RLCK, file_size(f))
    def unlock_file(f):
        msvcrt.locking(f.fileno(), msvcrt.LK_UNLCK, file_size(f))


# Class for ensuring that all file operations are atomic, treat
# initialization like a standard call to 'open' that happens to be atomic.
# This file opener *must* be used in a "with" block.
class AtomicOpen:
    # Open the file with arguments provided by user. Then acquire
    # a lock on that file object (WARNING: Advisory locking).
    def __init__(self, path, *args, **kwargs):
        # Open the file and acquire a lock on the file before operating
        self.file = open(path,*args, **kwargs)
        # Lock the opened file
        lock_file(self.file)

    # Return the opened file object (knowing a lock has been obtained).
    def __enter__(self, *args, **kwargs): return self.file

    # Unlock the file and close the file object.
    def __exit__(self, exc_type=None, exc_value=None, traceback=None):        
        # Flush to make sure all buffered contents are written to file.
        self.file.flush()
        os.fsync(self.file.fileno())
        # Release the lock on the file.
        unlock_file(self.file)
        self.file.close()
        # Handle exceptions that may have come up during execution, by
        # default any exceptions are raised to the user.
        if (exc_type != None): return False
        else:                  return True        

现在,AtomicOpen可以在with块中使用,而在这里通常使用open语句。

警告:

如果在Windows上运行并且Python在调用exit之前崩溃,我不确定锁的行为是什么。 这里提供的锁定是建议的,而不是绝对的。所有潜在的竞争进程必须使用“AtomicOpen”类。 截至2020年11月9日,此代码仅锁定Posix系统上的可写文件。在发布之后的某一时刻,在此日期之前,使用fcntl是非法的。锁定只读文件。


下面是一个如何使用filelock库的例子,它类似于Evan Fossmark的实现:

from filelock import FileLock

lockfile = r"c:\scr.txt"
lock = FileLock(lockfile + ".lock")
with lock:
    file = open(path, "w")
    file.write("123")
    file.close()

with lock:块中的任何代码都是线程安全的,这意味着它将在另一个进程访问该文件之前完成。


这招对我很管用: 不占用大文件,分布在几个小文件中 创建文件Temp,删除文件A,然后将文件Temp重命名为A。

import os
import json

def Server():
    i = 0
    while i == 0:
        try:        
                with open(File_Temp, "w") as file:
                    json.dump(DATA, file, indent=2)
                if os.path.exists(File_A):
                    os.remove(File_A)
                os.rename(File_Temp, File_A)
                i = 1
        except OSError as e:
                print ("file locked: " ,str(e))
                time.sleep(1)
            
            
def Clients():
    i = 0
    while i == 0:
        try:
            if os.path.exists(File_A):
                with open(File_A,"r") as file:
                    DATA_Temp = file.read()
            DATA = json.loads(DATA_Temp)
            i = 1
        except OSError as e:
            print (str(e))
            time.sleep(1)

如果你只需要Mac/POSIX,这应该在没有外部包的情况下工作。

import sys
import stat
import os


filePath = "<PATH TO FILE>"
if sys.platform == 'darwin':
  flags = os.stat(filePath).st_flags
  if flags & ~stat.UF_IMMUTABLE:
    os.chflags(filePath, flags & stat.UF_IMMUTABLE)

如果你想解锁一个文件,只要修改一下,

  if flags & stat.UF_IMMUTABLE:
    os.chflags(filePath, flags & ~stat.UF_IMMUTABLE)