我有一个Python脚本,它把一个整数列表作为输入,我需要一次处理四个整数。不幸的是,我无法控制输入,否则我将它作为一个四元素元组列表传入。目前,我以这种方式迭代它:

for i in range(0, len(ints), 4):
    # dummy op for example code
    foo += ints[i] * ints[i + 1] + ints[i + 2] * ints[i + 3]

不过,它看起来很像“C-think”,这让我怀疑有一种更python的方式来处理这种情况。该列表在迭代后被丢弃,因此不需要保留。也许这样会更好?

while ints:
    foo += ints[0] * ints[1] + ints[2] * ints[3]
    ints[0:4] = []

不过,感觉还是不太对。: - /

相关问题:在Python中如何将列表分割成大小均匀的块?


当前回答

你可以使用函数库中的partition或chunks函数:

from funcy import partition

for a, b, c, d in partition(4, ints):
    foo += a * b * c * d

这些函数还有迭代器版本ipartition和ichunks,在这种情况下更有效。

您还可以查看它们的实现。

其他回答

关于J.F. Sebastian给出的解决方案:

def chunker(iterable, chunksize):
    return zip(*[iter(iterable)]*chunksize)

它很聪明,但有一个缺点——总是返回元组。如何获得字符串代替? 当然,你可以写“.join(chunker(…))”,但无论如何都要构造临时元组。

你可以通过编写自己的zip来摆脱临时元组,就像这样:

class IteratorExhausted(Exception):
    pass

def translate_StopIteration(iterable, to=IteratorExhausted):
    for i in iterable:
        yield i
    raise to # StopIteration would get ignored because this is generator,
             # but custom exception can leave the generator.

def custom_zip(*iterables, reductor=tuple):
    iterators = tuple(map(translate_StopIteration, iterables))
    while True:
        try:
            yield reductor(next(i) for i in iterators)
        except IteratorExhausted: # when any of iterators get exhausted.
            break

Then

def chunker(data, size, reductor=tuple):
    return custom_zip(*[iter(data)]*size, reductor=reductor)

使用示例:

>>> for i in chunker('12345', 2):
...     print(repr(i))
...
('1', '2')
('3', '4')
>>> for i in chunker('12345', 2, ''.join):
...     print(repr(i))
...
'12'
'34'

下面是一个支持生成器的无导入chunker:

def chunks(seq, size):
    it = iter(seq)
    while True:
        ret = tuple(next(it) for _ in range(size))
        if len(ret) == size:
            yield ret
        else:
            raise StopIteration()

使用示例:

>>> def foo():
...     i = 0
...     while True:
...         i += 1
...         yield i
...
>>> c = chunks(foo(), 3)
>>> c.next()
(1, 2, 3)
>>> c.next()
(4, 5, 6)
>>> list(chunks('abcdefg', 2))
[('a', 'b'), ('c', 'd'), ('e', 'f')]

我希望通过将迭代器从列表中删除,我不是简单地复制列表的一部分。生成器可以被切片,它们将自动仍然是一个生成器,而列表将被切片成1000个条目的大块,这是较低的效率。

def iter_group(iterable, batch_size:int):
    length = len(iterable)
    start = batch_size*-1
    end = 0
    while(end < length):
        start += batch_size
        end += batch_size
        if type(iterable) == list:
            yield (iterable[i] for i in range(start,min(length-1,end)))
        else:
            yield iterable[start:end]

用法:

items = list(range(1,1251))

for item_group in iter_group(items, 1000):
    for item in item_group:
        print(item)

如果你不介意使用外部包,你可以使用iteration_utilities。Grouper from iteration_utilities它支持所有可迭代对象(不仅仅是序列):

from iteration_utilities import grouper
seq = list(range(20))
for group in grouper(seq, 4):
    print(group)

打印:

(0, 1, 2, 3)
(4, 5, 6, 7)
(8, 9, 10, 11)
(12, 13, 14, 15)
(16, 17, 18, 19)

如果长度不是组大小的倍数,它还支持填充(不完整的最后一组)或截断(丢弃不完整的最后一组)最后一个:

from iteration_utilities import grouper
seq = list(range(17))
for group in grouper(seq, 4):
    print(group)
# (0, 1, 2, 3)
# (4, 5, 6, 7)
# (8, 9, 10, 11)
# (12, 13, 14, 15)
# (16,)

for group in grouper(seq, 4, fillvalue=None):
    print(group)
# (0, 1, 2, 3)
# (4, 5, 6, 7)
# (8, 9, 10, 11)
# (12, 13, 14, 15)
# (16, None, None, None)

for group in grouper(seq, 4, truncate=True):
    print(group)
# (0, 1, 2, 3)
# (4, 5, 6, 7)
# (8, 9, 10, 11)
# (12, 13, 14, 15)

基准

我还决定比较上面提到的几种方法的运行时间。这是一个对数-对数图,根据不同大小的列表将“10”个元素分组。对于定性结果:较低意味着更快:

至少在这个基准测试中iteration_utilities。石斑鱼表现最好。接着是Craz。

基准是用simple_benchmark1创建的。运行这个基准测试的代码是:

import iteration_utilities
import itertools
from itertools import zip_longest

def consume_all(it):
    return iteration_utilities.consume(it, None)

import simple_benchmark
b = simple_benchmark.BenchmarkBuilder()

@b.add_function()
def grouper(l, n):
    return consume_all(iteration_utilities.grouper(l, n))

def Craz_inner(iterable, n, fillvalue=None):
    args = [iter(iterable)] * n
    return zip_longest(*args, fillvalue=fillvalue)

@b.add_function()
def Craz(iterable, n, fillvalue=None):
    return consume_all(Craz_inner(iterable, n, fillvalue))

def nosklo_inner(seq, size):
    return (seq[pos:pos + size] for pos in range(0, len(seq), size))

@b.add_function()
def nosklo(seq, size):
    return consume_all(nosklo_inner(seq, size))

def SLott_inner(ints, chunk_size):
    for i in range(0, len(ints), chunk_size):
        yield ints[i:i+chunk_size]

@b.add_function()
def SLott(ints, chunk_size):
    return consume_all(SLott_inner(ints, chunk_size))

def MarkusJarderot1_inner(iterable,size):
    it = iter(iterable)
    chunk = tuple(itertools.islice(it,size))
    while chunk:
        yield chunk
        chunk = tuple(itertools.islice(it,size))

@b.add_function()
def MarkusJarderot1(iterable,size):
    return consume_all(MarkusJarderot1_inner(iterable,size))

def MarkusJarderot2_inner(iterable,size,filler=None):
    it = itertools.chain(iterable,itertools.repeat(filler,size-1))
    chunk = tuple(itertools.islice(it,size))
    while len(chunk) == size:
        yield chunk
        chunk = tuple(itertools.islice(it,size))

@b.add_function()
def MarkusJarderot2(iterable,size):
    return consume_all(MarkusJarderot2_inner(iterable,size))

@b.add_arguments()
def argument_provider():
    for exp in range(2, 20):
        size = 2**exp
        yield size, simple_benchmark.MultiArgument([[0] * size, 10])

r = b.run()

1免责声明:我是iteration_utilities和simple_benchmark库的作者。

使用小的函数和东西真的不吸引我;我更喜欢使用切片:

data = [...]
chunk_size = 10000 # or whatever
chunks = [data[i:i+chunk_size] for i in xrange(0,len(data),chunk_size)]
for chunk in chunks:
    ...