我在日常工作中使用SAS,它的核心支持非常棒。然而,SAS作为一款软件,由于许多其他原因而很糟糕。

有一天,我希望用python和pandas来取代我对SAS的使用,但我目前缺乏用于大型数据集的核心工作流。我不是在谈论需要分布式网络的“大数据”,而是那些大到无法装入内存,但又小到可以装入硬盘的文件。

我的第一个想法是使用HDFStore将大型数据集保存在磁盘上,只将我需要的数据块放入数据框架中进行分析。其他人提到MongoDB是一种更容易使用的替代方案。我的问题是:

完成以下任务的最佳实践工作流程是什么:

将平面文件加载到永久的磁盘数据库结构中 查询该数据库以检索数据以输入pandas数据结构 在操作熊猫的碎片后更新数据库

现实世界的例子将非常受欢迎,尤其是那些在“大数据”上使用熊猫的人。

编辑—我希望这样工作的一个例子:

迭代地导入一个大型平面文件,并将其存储在一个永久的磁盘数据库结构中。这些文件通常太大,无法装入内存。 为了使用Pandas,我希望读取这些数据的子集(通常一次只有几列),这些子集可以放入内存中。 我将通过对所选列执行各种操作来创建新列。 然后,我必须将这些新列追加到数据库结构中。

我正在努力寻找执行这些步骤的最佳实践方法。阅读关于熊猫和pytables的链接,似乎添加一个新列可能是一个问题。

编辑——具体回答杰夫的问题:

I am building consumer credit risk models. The kinds of data include phone, SSN and address characteristics; property values; derogatory information like criminal records, bankruptcies, etc... The datasets I use every day have nearly 1,000 to 2,000 fields on average of mixed data types: continuous, nominal and ordinal variables of both numeric and character data. I rarely append rows, but I do perform many operations that create new columns. Typical operations involve combining several columns using conditional logic into a new, compound column. For example, if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B'. The result of these operations is a new column for every record in my dataset. Finally, I would like to append these new columns into the on-disk data structure. I would repeat step 2, exploring the data with crosstabs and descriptive statistics trying to find interesting, intuitive relationships to model. A typical project file is usually about 1GB. Files are organized into such a manner where a row consists of a record of consumer data. Each row has the same number of columns for every record. This will always be the case. It's pretty rare that I would subset by rows when creating a new column. However, it's pretty common for me to subset on rows when creating reports or generating descriptive statistics. For example, I might want to create a simple frequency for a specific line of business, say Retail credit cards. To do this, I would select only those records where the line of business = retail in addition to whichever columns I want to report on. When creating new columns, however, I would pull all rows of data and only the columns I need for the operations. The modeling process requires that I analyze every column, look for interesting relationships with some outcome variable, and create new compound columns that describe those relationships. The columns that I explore are usually done in small sets. For example, I will focus on a set of say 20 columns just dealing with property values and observe how they relate to defaulting on a loan. Once those are explored and new columns are created, I then move on to another group of columns, say college education, and repeat the process. What I'm doing is creating candidate variables that explain the relationship between my data and some outcome. At the very end of this process, I apply some learning techniques that create an equation out of those compound columns.

我很少向数据集中添加行。我几乎总是会创建新的列(统计/机器学习术语中的变量或特征)。


当前回答

正如其他人所指出的那样,几年之后,一个“脱离核心”的熊猫替代物出现了:dask。虽然dask并不是熊猫的替代品,但它的所有功能都有几个突出的原因:

Dask是一个用于分析计算的灵活并行计算库,它针对交互式计算工作负载的动态任务调度进行了优化 “大数据”集合,如并行数组、数据框架和列表,将NumPy、Pandas或Python迭代器等常用接口扩展到大于内存或分布式环境,并从笔记本电脑扩展到集群。

Dask emphasizes the following virtues: Familiar: Provides parallelized NumPy array and Pandas DataFrame objects Flexible: Provides a task scheduling interface for more custom workloads and integration with other projects. Native: Enables distributed computing in Pure Python with access to the PyData stack. Fast: Operates with low overhead, low latency, and minimal serialization necessary for fast numerical algorithms Scales up: Runs resiliently on clusters with 1000s of cores Scales down: Trivial to set up and run on a laptop in a single process Responsive: Designed with interactive computing in mind it provides rapid feedback and diagnostics to aid humans

并添加一个简单的代码示例:

import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()

替换一些pandas代码,像这样:

import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()

并且,特别值得注意的是,通过并发提供。期货接口是提交自定义任务的通用基础设施:

from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()

其他回答

还有一种变化

在pandas中完成的许多操作也可以作为数据库查询(sql, mongo)来完成。

使用RDBMS或mongodb允许您在DB查询中执行一些聚合(针对大数据进行了优化,并有效地使用缓存和索引)。

稍后,您可以使用pandas执行后期处理。

这种方法的优点是,您获得了处理大数据的DB优化,同时仍然用高级声明性语法定义逻辑——而不必处理决定在内存中做什么和在内核外做什么的细节。

尽管查询语言和pandas是不同的,但是将部分逻辑从一种语言转换到另一种逻辑通常并不复杂。

我想指出维克斯包裹。

Vaex is a python library for lazy Out-of-Core DataFrames (similar to Pandas), to visualize and explore big tabular datasets. It can calculate statistics such as mean, sum, count, standard deviation etc, on an N-dimensional grid up to a billion (109) objects/rows per second. Visualization is done using histograms, density plots and 3d volume rendering, allowing interactive exploration of big data. Vaex uses memory mapping, zero memory copy policy and lazy computations for best performance (no memory wasted).

看一下文档:https://vaex.readthedocs.io/en/latest/ 该API与熊猫的API非常接近。

我经常以这种方式使用数十gb的数据 例:我在磁盘上有一些表,我通过查询读取它们,创建数据,然后再追加回去。

值得一读文档,在这篇文章的后面,有一些关于如何存储数据的建议。

这些细节会影响你如何存储你的数据,比如: 尽可能多地描述细节;我可以帮你建立一个结构

Size of data, # of rows, columns, types of columns; are you appending rows, or just columns? What will typical operations look like. E.g. do a query on columns to select a bunch of rows and specific columns, then do an operation (in-memory), create new columns, save these. (Giving a toy example could enable us to offer more specific recommendations.) After that processing, then what do you do? Is step 2 ad hoc, or repeatable? Input flat files: how many, rough total size in Gb. How are these organized e.g. by records? Does each one contains different fields, or do they have some records per file with all of the fields in each file? Do you ever select subsets of rows (records) based on criteria (e.g. select the rows with field A > 5)? and then do something, or do you just select fields A, B, C with all of the records (and then do something)? Do you 'work on' all of your columns (in groups), or are there a good proportion that you may only use for reports (e.g. you want to keep the data around, but don't need to pull in that column explicity until final results time)?

解决方案

确保你至少安装了熊猫0.10.1。

逐块读取迭代文件和多个表查询。

由于pytables被优化为按行操作(这是您查询的对象),因此我们将为每组字段创建一个表。通过这种方式,可以很容易地选择一小组字段(这将与一个大表一起工作,但这样做更有效……我想我可以在将来修复这个限制…这是更直观的): (以下是伪代码。)

import numpy as np
import pandas as pd

# create a store
store = pd.HDFStore('mystore.h5')

# this is the key to your storage:
#    this maps your fields to a specific group, and defines 
#    what you want to have as data_columns.
#    you might want to create a nice class wrapping this
#    (as you will want to have this map and its inversion)  
group_map = dict(
    A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
    B = dict(fields = ['field_10',......        ], dc = ['field_10']),
    .....
    REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),

)

group_map_inverted = dict()
for g, v in group_map.items():
    group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))

读入文件并创建存储(本质上是做append_to_multiple所做的事情):

for f in files:
   # read in the file, additional options may be necessary here
   # the chunksize is not strictly necessary, you may be able to slurp each 
   # file into memory in which case just eliminate this part of the loop 
   # (you can also change chunksize if necessary)
   for chunk in pd.read_table(f, chunksize=50000):
       # we are going to append to each table by group
       # we are not going to create indexes at this time
       # but we *ARE* going to create (some) data_columns

       # figure out the field groupings
       for g, v in group_map.items():
             # create the frame for this group
             frame = chunk.reindex(columns = v['fields'], copy = False)    

             # append it
             store.append(g, frame, index=False, data_columns = v['dc'])

现在,文件中已经有了所有的表(实际上,如果您愿意,可以将它们存储在单独的文件中,您可能必须将文件名添加到group_map中,但这可能是不必要的)。

下面是获取列并创建新列的方法:

frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
#     select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows

# do calculations on this frame
new_frame = cool_function_on_frame(frame)

# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)

当你准备好post_processing时:

# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)

关于data_columns,实际上不需要定义ANY data_columns;它们允许您基于列再选择行。例如:

store.select(group, where = ['field_1000=foo', 'field_1001>0'])

在最终报告生成阶段,它们可能是您最感兴趣的(本质上,数据列与其他列隔离,如果您定义很多,这可能会在一定程度上影响效率)。

你可能还想:

创建一个函数,该函数接受字段列表,在groups_map中查找组,然后选择这些组并连接结果,从而获得结果帧(这实际上是select_as_multiple所做的工作)。这种结构对你来说是相当透明的。 在某些数据列上建立索引(使行子集设置更快)。 启用压缩。

如果你有问题,请告诉我!

现在,在这个问题过去两年之后,又出现了一个“脱离核心”的熊猫版:dask。太棒了!虽然它不支持所有的熊猫功能,但你可以用它走得很远。更新:在过去的两年里,它一直在维护,有大量的用户社区使用Dask。

现在,这个问题已经过去四年了,在韦克斯又出现了另一只高性能的“超核心”熊猫。它“使用内存映射、零内存复制策略和惰性计算来获得最佳性能(没有内存浪费)。”它可以处理数十亿行的数据集,并且不将它们存储到内存中(甚至可以在次优硬件上进行分析)。

我最近遇到了一个类似的问题。我发现,简单地读取数据块,并在写入数据块时将其追加到相同的csv中就可以了。我的问题是根据另一个表中的信息添加一个日期列,使用某些列的值如下所示。这可能会帮助那些对dask和hdf5感到困惑但更熟悉熊猫的人,比如我自己。

def addDateColumn():
"""Adds time to the daily rainfall data. Reads the csv as chunks of 100k 
   rows at a time and outputs them, appending as needed, to a single csv. 
   Uses the column of the raster names to get the date.
"""
    df = pd.read_csv(pathlist[1]+"CHIRPS_tanz.csv", iterator=True, 
                     chunksize=100000) #read csv file as 100k chunks

    '''Do some stuff'''

    count = 1 #for indexing item in time list 
    for chunk in df: #for each 100k rows
        newtime = [] #empty list to append repeating times for different rows
        toiterate = chunk[chunk.columns[2]] #ID of raster nums to base time
        while count <= toiterate.max():
            for i in toiterate: 
                if i ==count:
                    newtime.append(newyears[count])
            count+=1
        print "Finished", str(chunknum), "chunks"
        chunk["time"] = newtime #create new column in dataframe based on time
        outname = "CHIRPS_tanz_time2.csv"
        #append each output to same csv, using no header
        chunk.to_csv(pathlist[2]+outname, mode='a', header=None, index=None)