如何将任意长度的列表拆分为大小相等的块?
请参阅如果数据结果将直接用于循环,并且不需要存储,则如何以块形式遍历列表。
对于字符串输入的同一问题,请参见每n个字符拆分字符串?。相同的技术通常适用,但也有一些变化。
如何将任意长度的列表拆分为大小相等的块?
请参阅如果数据结果将直接用于循环,并且不需要存储,则如何以块形式遍历列表。
对于字符串输入的同一问题,请参见每n个字符拆分字符串?。相同的技术通常适用,但也有一些变化。
当前回答
任何可迭代的通用分块器,使用户可以选择如何在结尾处处理部分分块。
在Python 3上测试。
分块.py
from enum import Enum
class PartialChunkOptions(Enum):
INCLUDE = 0
EXCLUDE = 1
PAD = 2
ERROR = 3
class PartialChunkException(Exception):
pass
def chunker(iterable, n, on_partial=PartialChunkOptions.INCLUDE, pad=None):
"""
A chunker yielding n-element lists from an iterable, with various options
about what to do about a partial chunk at the end.
on_partial=PartialChunkOptions.INCLUDE (the default):
include the partial chunk as a short (<n) element list
on_partial=PartialChunkOptions.EXCLUDE
do not include the partial chunk
on_partial=PartialChunkOptions.PAD
pad to an n-element list
(also pass pad=<pad_value>, default None)
on_partial=PartialChunkOptions.ERROR
raise a RuntimeError if a partial chunk is encountered
"""
on_partial = PartialChunkOptions(on_partial)
iterator = iter(iterable)
while True:
vals = []
for i in range(n):
try:
vals.append(next(iterator))
except StopIteration:
if vals:
if on_partial == PartialChunkOptions.INCLUDE:
yield vals
elif on_partial == PartialChunkOptions.EXCLUDE:
pass
elif on_partial == PartialChunkOptions.PAD:
yield vals + [pad] * (n - len(vals))
elif on_partial == PartialChunkOptions.ERROR:
raise PartialChunkException
return
return
yield vals
测试.py
import chunker
chunk_size = 3
for it in (range(100, 107),
range(100, 109)):
print("\nITERABLE TO CHUNK: {}".format(it))
print("CHUNK SIZE: {}".format(chunk_size))
for option in chunker.PartialChunkOptions.__members__.values():
print("\noption {} used".format(option))
try:
for chunk in chunker.chunker(it, chunk_size, on_partial=option):
print(chunk)
except chunker.PartialChunkException:
print("PartialChunkException was raised")
print("")
test.py的输出
ITERABLE TO CHUNK: range(100, 107)
CHUNK SIZE: 3
option PartialChunkOptions.INCLUDE used
[100, 101, 102]
[103, 104, 105]
[106]
option PartialChunkOptions.EXCLUDE used
[100, 101, 102]
[103, 104, 105]
option PartialChunkOptions.PAD used
[100, 101, 102]
[103, 104, 105]
[106, None, None]
option PartialChunkOptions.ERROR used
[100, 101, 102]
[103, 104, 105]
PartialChunkException was raised
ITERABLE TO CHUNK: range(100, 109)
CHUNK SIZE: 3
option PartialChunkOptions.INCLUDE used
[100, 101, 102]
[103, 104, 105]
[106, 107, 108]
option PartialChunkOptions.EXCLUDE used
[100, 101, 102]
[103, 104, 105]
[106, 107, 108]
option PartialChunkOptions.PAD used
[100, 101, 102]
[103, 104, 105]
[106, 107, 108]
option PartialChunkOptions.ERROR used
[100, 101, 102]
[103, 104, 105]
[106, 107, 108]
其他回答
用户@tzot的解决方案zip_langest(*[iter(lst)]*n,fillvalue=padvalue)非常优雅,但如果lst的长度不能被n整除,它会填充最后一个子列表,以保持其长度与其他子列表的长度匹配。然而,如果这不可取,那么只需使用zip()生成类似的循环zip,并将lst的剩余元素(不能生成“完整”子列表)附加到输出即可。
输出示例为ABCDEFG,3->ABC DEF G。
单线版本(Python>=3.8):
list(map(list, zip(*[iter(lst)]*n))) + ([rest] if (rest:=lst[len(lst)//n*n : ]) else [])
A函数:
def chunkify(lst, chunk_size):
nested = list(map(list, zip(*[iter(lst)]*chunk_size)))
rest = lst[len(lst)//chunk_size*chunk_size: ]
if rest:
nested.append(rest)
return nested
生成器(尽管每个批次都是一个元组):
def chunkify(lst, chunk_size):
for tup in zip(*[iter(lst)]*chunk_size):
yield tup
rest = tuple(lst[len(lst)//chunk_size*chunk_size: ])
if rest:
yield rest
它比这里的一些最流行的答案产生相同的输出更快。
my_list, n = list(range(1_000_000)), 12
%timeit list(chunks(my_list, n)) # @Ned_Batchelder
# 36.4 ms ± 1.6 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit [my_list[i:i+n] for i in range(0, len(my_list), n)] # @Ned_Batchelder
# 34.6 ms ± 1.12 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit it = iter(my_list); list(iter(lambda: list(islice(it, n)), [])) # @senderle
# 60.6 ms ± 5.36 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit list(mit.chunked(my_list, n)) # @pylang
# 59.4 ms ± 4.92 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit chunkify(my_list, n)
# 25.8 ms ± 1.84 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
同样,从Python 3.12开始,这个功能将作为itertools模块中的批处理方法来实现(目前是一个配方),因此这个答案很可能会被Python 3.12淘汰。
由于我必须这样做,下面是我的解决方案,给出了一个生成器和一个批量大小:
def pop_n_elems_from_generator(g, n):
elems = []
try:
for idx in xrange(0, n):
elems.append(g.next())
return elems
except StopIteration:
return elems
非常简单的事情:
def chunks(xs, n):
n = max(1, n)
return (xs[i:i+n] for i in range(0, len(xs), n))
对于Python 2,使用xrange()代替range()。
延迟加载版本
导入pprintpprint.pprint(列表(块(范围(10,75),10))[范围(10、20),范围(20、30),范围(30、40),范围(40、50),范围(50、60),范围(60、70),范围(70,75)]将此实现的结果与接受答案的示例使用结果进行比较。
上面的许多函数都假定整个可迭代函数的长度是预先知道的,或者至少计算起来很便宜。
对于一些流式对象,这意味着首先将完整数据加载到内存中(例如下载整个文件)以获取长度信息。
但是,如果您还不知道完整大小,可以使用以下代码:
def chunks(iterable, size):
"""
Yield successive chunks from iterable, being `size` long.
https://stackoverflow.com/a/55776536/3423324
:param iterable: The object you want to split into pieces.
:param size: The size each of the resulting pieces should have.
"""
i = 0
while True:
sliced = iterable[i:i + size]
if len(sliced) == 0:
# to suppress stuff like `range(max, max)`.
break
# end if
yield sliced
if len(sliced) < size:
# our slice is not the full length, so we must have passed the end of the iterator
break
# end if
i += size # so we start the next chunk at the right place.
# end while
# end def
这之所以有效,是因为如果您传递了一个iterable的结尾,slice命令将返回less/no元素:
"abc"[0:2] == 'ab'
"abc"[2:4] == 'c'
"abc"[4:6] == ''
我们现在使用切片的结果,并计算生成的块的长度。如果它低于我们的预期,我们知道我们可以结束迭代。
这样,除非访问,否则不会执行迭代器。
下面我有一个解决方案确实有效,但比这个解决方案更重要的是对其他方法的一些评论。首先,一个好的解决方案不应该要求一个循环按顺序遍历子迭代器。如果我跑
g = paged_iter(list(range(50)), 11))
i0 = next(g)
i1 = next(g)
list(i1)
list(i0)
最后一个命令的适当输出是
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
not
[]
正如这里大多数基于itertools的解决方案所返回的那样。这不仅仅是关于按顺序访问迭代器的常见无聊限制。想象一个消费者试图清理输入不良的数据,该数据颠倒了5的块的适当顺序,即数据看起来像[B5,A5,D5,C5],应该像[A5,B5,C5,D5](其中A5只是五个元素,而不是子列表)。该使用者将查看分组函数的声明行为,并毫不犹豫地编写一个类似
i = 0
out = []
for it in paged_iter(data,5)
if (i % 2 == 0):
swapped = it
else:
out += list(it)
out += list(swapped)
i = i + 1
如果您偷偷摸摸地假设子迭代器总是按顺序完全使用,那么这将产生神秘的错误结果。如果你想交错块中的元素,情况就更糟了。
其次,大量建议的解决方案隐含地依赖于迭代器具有确定性顺序的事实(例如,迭代器没有设置),尽管使用islice的一些解决方案可能还可以,但我对此感到担忧。
第三,itertools-grouper方法有效,但该方法依赖于zip_langest(或zip)函数的内部行为,而这些行为不是其发布行为的一部分。特别是,grouper函数只起作用,因为在zip_langest(i0…In)中,下一个函数总是按next(i0)、next(i 1)、……的顺序调用。。。在重新开始之前。当grouper传递同一迭代器对象的n个副本时,它依赖于此行为。
最后,虽然下面的解决方案可以得到改进,但如果您对上面的假设进行了批评,即子迭代器是按顺序访问的,并且在没有这个假设的情况下被完全阅读,则必须隐式(通过调用链)或显式(通过deques或其他数据结构)为每个子迭代程序存储元素。所以,不要浪费时间(就像我所做的那样),假设人们可以用一些巧妙的技巧来解决这个问题。
def paged_iter(iterat, n):
itr = iter(iterat)
deq = None
try:
while(True):
deq = collections.deque(maxlen=n)
for q in range(n):
deq.append(next(itr))
yield (i for i in deq)
except StopIteration:
yield (i for i in deq)