我经常在超过1500万行左右的数据帧上执行pandas操作,我希望能够访问特定操作的进度指示器。
是否存在基于文本的熊猫分裂-应用-组合操作进度指示器?
例如:
df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)
其中feature_rollup是一个有点复杂的函数,它采用许多DF列,并通过各种方法创建新的用户列。对于大数据帧,这些操作可能需要一段时间,所以我想知道是否有可能在iPython笔记本中有基于文本的输出,以更新我的进度。
到目前为止,我已经尝试了Python的规范循环进度指示器,但它们没有以任何有意义的方式与pandas交互。
我希望在pandas库/文档中有一些我忽略了的东西,可以让人们了解分裂-应用-组合的进展。一个简单的实现可能会查看apply函数正在处理的数据帧子集的总数,并将进度报告为这些子集的完成部分。
这可能是需要添加到库中的东西吗?
对于合并、concat、join等操作,可以使用Dask显示进度条。
您可以将Pandas数据框架转换为Dask数据框架。然后你可以显示Dask进度条。
下面的代码显示了一个简单的例子:
创建和转换Pandas数据框架
import pandas as pd
import numpy as np
from tqdm import tqdm
import dask.dataframe as dd
n = 450000
maxa = 700
df1 = pd.DataFrame({'lkey': np.random.randint(0, maxa, n),'lvalue': np.random.randint(0,int(1e8),n)})
df2 = pd.DataFrame({'rkey': np.random.randint(0, maxa, n),'rvalue': np.random.randint(0, int(1e8),n)})
sd1 = dd.from_pandas(df1, npartitions=3)
sd2 = dd.from_pandas(df2, npartitions=3)
使用进度条合并
from tqdm.dask import TqdmCallback
from dask.diagnostics import ProgressBar
ProgressBar().register()
with TqdmCallback(desc="compute"):
sd1.merge(sd2, left_on='lkey', right_on='rkey').compute()
同样的操作,Dask比Pandas更快,需要的资源更少:
熊猫74.7毫秒
Dask 20.2 ms
详情如下:
进度条合并或连接操作与tqdm在熊猫
测试笔记本
注1:我已经测试了这个解决方案:https://stackoverflow.com/a/56257514/3921758,但它不适合我。不度量合并操作。
注2:我已经为熊猫的tqdm检查了“开放请求”,如下:
https://github.com/tqdm/tqdm/issues/1144
https://github.com/noamraph/tqdm/issues/28
调整Jeff的答案(并将其作为一个可重用的函数)。
def logged_apply(g, func, *args, **kwargs):
step_percentage = 100. / len(g)
import sys
sys.stdout.write('apply progress: 0%')
sys.stdout.flush()
def logging_decorator(func):
def wrapper(*args, **kwargs):
progress = wrapper.count * step_percentage
sys.stdout.write('\033[D \033[D' * 4 + format(progress, '3.0f') + '%')
sys.stdout.flush()
wrapper.count += 1
return func(*args, **kwargs)
wrapper.count = 0
return wrapper
logged_func = logging_decorator(func)
res = g.apply(logged_func, *args, **kwargs)
sys.stdout.write('\033[D \033[D' * 4 + format(100., '3.0f') + '%' + '\n')
sys.stdout.flush()
return res
注意:应用进度百分比内联更新。如果你的函数stout,那么这将不起作用。
In [11]: g = df_users.groupby(['userID', 'requestDate'])
In [12]: f = feature_rollup
In [13]: logged_apply(g, f)
apply progress: 100%
Out[13]:
...
像往常一样,你可以把它添加到你的groupby对象作为一个方法:
from pandas.core.groupby import DataFrameGroupBy
DataFrameGroupBy.logged_apply = logged_apply
In [21]: g.logged_apply(f)
apply progress: 100%
Out[21]:
...
正如评论中提到的,这不是core pandas感兴趣的功能。但是python允许你为许多pandas对象/方法创建这些(这样做会有相当多的工作…尽管你应该能够概括这种方法)。
由于流行的需求,我在tqdm中添加了熊猫支持(pip install "tqdm>=4.9.0")。与其他答案不同,这不会明显地减慢熊猫的速度——下面是DataFrameGroupBy.progress_apply的一个例子:
import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm # for notebooks
# Create new `pandas` methods which use `tqdm` progress
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)
如果你对它的工作原理(以及如何为自己的回调修改它)感兴趣,请参阅GitHub上的示例,PyPI的完整文档,或导入模块并运行帮助(tqdm)。其他支持的函数包括map、applymap、聚合和转换。
EDIT
要直接回答原来的问题,请替换为:
df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)
:
from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)
注:tqdm <= v4.8:
对于4.8以下的tqdm版本,而不是tqdm.pandas(),你必须做:
from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())
我已经改变了Jeff的答案,以包括一个总数,这样您就可以跟踪进度和一个变量,只打印每个X次迭代(这实际上提高了很多性能,如果“print_at”相当高)
def count_wrapper(func,total, print_at):
def wrapper(*args):
wrapper.count += 1
if wrapper.count % wrapper.print_at == 0:
clear_output()
sys.stdout.write( "%d / %d"%(calc_time.count,calc_time.total) )
sys.stdout.flush()
return func(*args)
wrapper.count = 0
wrapper.total = total
wrapper.print_at = print_at
return wrapper
clear_output()函数来自
from IPython.core.display import clear_output
如果不是在IPython,安迪·海登的答案是没有它的