我有一个由多重yield返回的生成器对象。准备调用这个生成器是相当耗时的操作。这就是为什么我想重复使用发电机几次。

y = FunctionWithYield()
for x in y: print(x)
#here must be something to reset 'y'
for x in y: print(x)

当然,我正在考虑将内容复制到简单的列表中。有办法重置我的发电机吗?


请参见:如何提前查看Python生成器中的一个元素?


当前回答

我的答案解决了稍微不同的问题:如果初始化生成器的开销很大,生成每个生成的对象的开销也很大。但是我们需要在多个函数中多次使用生成器。为了只调用一次生成器和每个生成的对象,我们可以使用线程并在不同的线程中运行每个消费方法。由于GIL,我们可能无法实现真正的并行,但我们将实现我们的目标。

这种方法在以下情况下做得很好:深度学习模型处理了大量图像。结果是图像上的很多物体都有很多遮罩。每个掩码都会消耗内存。我们有大约10种方法来进行不同的统计和度量,但它们都是一次性拍摄所有图像。所有的图像都装不下内存。方法可以很容易地重写为接受迭代器。

class GeneratorSplitter:
'''
Split a generator object into multiple generators which will be sincronised. Each call to each of the sub generators will cause only one call in the input generator. This way multiple methods on threads can iterate the input generator , and the generator will cycled only once.
'''

def __init__(self, gen):
    self.gen = gen
    self.consumers: List[GeneratorSplitter.InnerGen] = []
    self.thread: threading.Thread = None
    self.value = None
    self.finished = False
    self.exception = None

def GetConsumer(self):
    # Returns a generator object. 
    cons = self.InnerGen(self)
    self.consumers.append(cons)
    return cons

def _Work(self):
    try:
        for d in self.gen:
            for cons in self.consumers:
                cons.consumed.wait()
                cons.consumed.clear()

            self.value = d

            for cons in self.consumers:
                cons.readyToRead.set()

        for cons in self.consumers:
            cons.consumed.wait()

        self.finished = True

        for cons in self.consumers:
            cons.readyToRead.set()
    except Exception as ex:
        self.exception = ex
        for cons in self.consumers:
            cons.readyToRead.set()

def Start(self):
    self.thread = threading.Thread(target=self._Work)
    self.thread.start()

class InnerGen:
    def __init__(self, parent: "GeneratorSplitter"):
        self.parent: "GeneratorSplitter" = parent
        self.readyToRead: threading.Event = threading.Event()
        self.consumed: threading.Event = threading.Event()
        self.consumed.set()

    def __iter__(self):
        return self

    def __next__(self):
        self.readyToRead.wait()
        self.readyToRead.clear()
        if self.parent.finished:
            raise StopIteration()
        if self.parent.exception:
            raise self.parent.exception
        val = self.parent.value
        self.consumed.set()
        return val

Ussage:

genSplitter = GeneratorSplitter(expensiveGenerator)

metrics={}
executor = ThreadPoolExecutor(max_workers=3)
f1 = executor.submit(mean,genSplitter.GetConsumer())
f2 = executor.submit(max,genSplitter.GetConsumer())
f3 = executor.submit(someFancyMetric,genSplitter.GetConsumer())
genSplitter.Start()

metrics.update(f1.result())
metrics.update(f2.result())
metrics.update(f3.result())

其他回答

我的答案解决了稍微不同的问题:如果初始化生成器的开销很大,生成每个生成的对象的开销也很大。但是我们需要在多个函数中多次使用生成器。为了只调用一次生成器和每个生成的对象,我们可以使用线程并在不同的线程中运行每个消费方法。由于GIL,我们可能无法实现真正的并行,但我们将实现我们的目标。

这种方法在以下情况下做得很好:深度学习模型处理了大量图像。结果是图像上的很多物体都有很多遮罩。每个掩码都会消耗内存。我们有大约10种方法来进行不同的统计和度量,但它们都是一次性拍摄所有图像。所有的图像都装不下内存。方法可以很容易地重写为接受迭代器。

class GeneratorSplitter:
'''
Split a generator object into multiple generators which will be sincronised. Each call to each of the sub generators will cause only one call in the input generator. This way multiple methods on threads can iterate the input generator , and the generator will cycled only once.
'''

def __init__(self, gen):
    self.gen = gen
    self.consumers: List[GeneratorSplitter.InnerGen] = []
    self.thread: threading.Thread = None
    self.value = None
    self.finished = False
    self.exception = None

def GetConsumer(self):
    # Returns a generator object. 
    cons = self.InnerGen(self)
    self.consumers.append(cons)
    return cons

def _Work(self):
    try:
        for d in self.gen:
            for cons in self.consumers:
                cons.consumed.wait()
                cons.consumed.clear()

            self.value = d

            for cons in self.consumers:
                cons.readyToRead.set()

        for cons in self.consumers:
            cons.consumed.wait()

        self.finished = True

        for cons in self.consumers:
            cons.readyToRead.set()
    except Exception as ex:
        self.exception = ex
        for cons in self.consumers:
            cons.readyToRead.set()

def Start(self):
    self.thread = threading.Thread(target=self._Work)
    self.thread.start()

class InnerGen:
    def __init__(self, parent: "GeneratorSplitter"):
        self.parent: "GeneratorSplitter" = parent
        self.readyToRead: threading.Event = threading.Event()
        self.consumed: threading.Event = threading.Event()
        self.consumed.set()

    def __iter__(self):
        return self

    def __next__(self):
        self.readyToRead.wait()
        self.readyToRead.clear()
        if self.parent.finished:
            raise StopIteration()
        if self.parent.exception:
            raise self.parent.exception
        val = self.parent.value
        self.consumed.set()
        return val

Ussage:

genSplitter = GeneratorSplitter(expensiveGenerator)

metrics={}
executor = ThreadPoolExecutor(max_workers=3)
f1 = executor.submit(mean,genSplitter.GetConsumer())
f2 = executor.submit(max,genSplitter.GetConsumer())
f3 = executor.submit(someFancyMetric,genSplitter.GetConsumer())
genSplitter.Start()

metrics.update(f1.result())
metrics.update(f2.result())
metrics.update(f3.result())

你可以使用itertools.cycle()来实现这一点。 您可以使用此方法创建一个迭代器,然后在迭代器上执行for循环,迭代器将对其值进行循环。

例如:

def generator():
for j in cycle([i for i in range(5)]):
    yield j

gen = generator()
for i in range(20):
    print(next(gen))

将生成20个数字,0到4重复。

医生说:

Note, this member of the toolkit may require significant auxiliary storage (depending on the length of the iterable).

另一种选择是使用itertools.tee()函数创建生成器的第二个版本:

import itertools
y = FunctionWithYield()
y, y_backup = itertools.tee(y)
for x in y:
    print(x)
for x in y_backup:
    print(x)

从内存使用的角度来看,如果原始迭代可能不处理所有的项,这可能是有益的。

如果希望使用预定义的参数集多次重用此生成器,可以使用functools.partial。

from functools import partial
func_with_yield = partial(FunctionWithYield, arg0, arg1)

for i in range(100):
    for x in func_with_yield():
        print(x)

这将把生成器函数包装到另一个函数中,因此每次调用func_with_yield()时,它都会创建相同的生成器函数。

现在可以使用more_itertools。Seekable(第三方工具),允许重置迭代器。

通过> pip Install more_itertools安装

import more_itertools as mit


y = mit.seekable(FunctionWithYield())
for x in y:
    print(x)

y.seek(0)                                              # reset iterator
for x in y:
    print(x)

注意:内存消耗会随着迭代器的增加而增加,所以要警惕大型迭代对象。