在numpy数组上映射函数的最有效方法是什么?我目前正在做:

import numpy as np 

x = np.array([1, 2, 3, 4, 5])

# Obtain array of square of each element in x
squarer = lambda t: t ** 2
squares = np.array([squarer(xi) for xi in x])

然而,这可能非常低效,因为我在将新数组转换回numpy数组之前,使用列表推导式将其构造为Python列表。我们能做得更好吗?


当前回答

博士TL;

正如@user2357112所指出的,应用函数的“直接”方法总是在Numpy数组上映射函数的最快和最简单的方法:

import numpy as np
x = np.array([1, 2, 3, 4, 5])
f = lambda x: x ** 2
squares = f(x)

一般避免np。向量化,因为它性能不佳,并且已经(或曾经)有许多问题。如果您正在处理其他数据类型,您可能需要研究下面所示的其他方法。

方法比较

下面是一些简单的测试,比较三种映射函数的方法,本例使用Python 3.6和NumPy 1.15.4。首先,测试的设置函数:

import timeit
import numpy as np

f = lambda x: x ** 2
vf = np.vectorize(f)

def test_array(x, n):
    t = timeit.timeit(
        'np.array([f(xi) for xi in x])',
        'from __main__ import np, x, f', number=n)
    print('array: {0:.3f}'.format(t))

def test_fromiter(x, n):
    t = timeit.timeit(
        'np.fromiter((f(xi) for xi in x), x.dtype, count=len(x))',
        'from __main__ import np, x, f', number=n)
    print('fromiter: {0:.3f}'.format(t))

def test_direct(x, n):
    t = timeit.timeit(
        'f(x)',
        'from __main__ import x, f', number=n)
    print('direct: {0:.3f}'.format(t))

def test_vectorized(x, n):
    t = timeit.timeit(
        'vf(x)',
        'from __main__ import x, vf', number=n)
    print('vectorized: {0:.3f}'.format(t))

测试五个元素(从最快到最慢排序):

x = np.array([1, 2, 3, 4, 5])
n = 100000
test_direct(x, n)      # 0.265
test_fromiter(x, n)    # 0.479
test_array(x, n)       # 0.865
test_vectorized(x, n)  # 2.906

包含100个元素:

x = np.arange(100)
n = 10000
test_direct(x, n)      # 0.030
test_array(x, n)       # 0.501
test_vectorized(x, n)  # 0.670
test_fromiter(x, n)    # 0.883

并且使用1000个或更多的数组元素:

x = np.arange(1000)
n = 1000
test_direct(x, n)      # 0.007
test_fromiter(x, n)    # 0.479
test_array(x, n)       # 0.516
test_vectorized(x, n)  # 0.945

不同版本的Python/NumPy和编译器优化会有不同的结果,所以对您的环境进行类似的测试。

其他回答

squares = squarer(x)

数组上的算术运算以元素方式自动应用,高效的c级循环避免了适用于python级循环或理解的所有解释器开销。

您希望应用到NumPy数组elementwise的大多数函数都可以正常工作,尽管有些函数可能需要更改。例如,if不能在元素方面工作。你需要将它们转换为使用numpy.where这样的结构:

def using_if(x):
    if x < 5:
        return x
    else:
        return x**2

就变成了

def using_where(x):
    return numpy.where(x < 5, x, x**2)

似乎没有人提到在numpy包中生成ufunc的内置工厂方法:np.frompyfunc,我已经对np进行了测试。矢量化,并且比它的表现好大约20~30%。当然,它不能像规定的C代码或numba(我没有测试过)那样执行,但它是比np.vectorize更好的选择

f = lambda x, y: x * y
f_arr = np.frompyfunc(f, 2, 1)
vf = np.vectorize(f)
arr = np.linspace(0, 1, 10000)

%timeit f_arr(arr, arr) # 307ms
%timeit vf(arr, arr) # 450ms

我也测试了更大的样本,改进是成比例的。请在这里查看文档

以上所有答案都比较好,但如果您需要使用自定义函数进行映射,并且您有numpy。Ndarray,你需要保留数组的形状。

我只比较了两个,但它将保留ndarray的形状。我使用了包含100万个条目的数组进行比较。这里我使用square函数,它也内置在numpy中,具有很大的性能提升,因为需要一些东西,您可以使用您选择的函数。

import numpy, time
def timeit():
    y = numpy.arange(1000000)
    now = time.time()
    numpy.array([x * x for x in y.reshape(-1)]).reshape(y.shape)        
    print(time.time() - now)
    now = time.time()
    numpy.fromiter((x * x for x in y.reshape(-1)), y.dtype).reshape(y.shape)
    print(time.time() - now)
    now = time.time()
    numpy.square(y)  
    print(time.time() - now)

输出

>>> timeit()
1.162431240081787    # list comprehension and then building numpy array
1.0775556564331055   # from numpy.fromiter
0.002948284149169922 # using inbuilt function

在这里,你可以清楚地看到numpy.fromiter工作得很好,考虑到简单的方法,如果内置函数可用,请使用它。

博士TL;

正如@user2357112所指出的,应用函数的“直接”方法总是在Numpy数组上映射函数的最快和最简单的方法:

import numpy as np
x = np.array([1, 2, 3, 4, 5])
f = lambda x: x ** 2
squares = f(x)

一般避免np。向量化,因为它性能不佳,并且已经(或曾经)有许多问题。如果您正在处理其他数据类型,您可能需要研究下面所示的其他方法。

方法比较

下面是一些简单的测试,比较三种映射函数的方法,本例使用Python 3.6和NumPy 1.15.4。首先,测试的设置函数:

import timeit
import numpy as np

f = lambda x: x ** 2
vf = np.vectorize(f)

def test_array(x, n):
    t = timeit.timeit(
        'np.array([f(xi) for xi in x])',
        'from __main__ import np, x, f', number=n)
    print('array: {0:.3f}'.format(t))

def test_fromiter(x, n):
    t = timeit.timeit(
        'np.fromiter((f(xi) for xi in x), x.dtype, count=len(x))',
        'from __main__ import np, x, f', number=n)
    print('fromiter: {0:.3f}'.format(t))

def test_direct(x, n):
    t = timeit.timeit(
        'f(x)',
        'from __main__ import x, f', number=n)
    print('direct: {0:.3f}'.format(t))

def test_vectorized(x, n):
    t = timeit.timeit(
        'vf(x)',
        'from __main__ import x, vf', number=n)
    print('vectorized: {0:.3f}'.format(t))

测试五个元素(从最快到最慢排序):

x = np.array([1, 2, 3, 4, 5])
n = 100000
test_direct(x, n)      # 0.265
test_fromiter(x, n)    # 0.479
test_array(x, n)       # 0.865
test_vectorized(x, n)  # 2.906

包含100个元素:

x = np.arange(100)
n = 10000
test_direct(x, n)      # 0.030
test_array(x, n)       # 0.501
test_vectorized(x, n)  # 0.670
test_fromiter(x, n)    # 0.883

并且使用1000个或更多的数组元素:

x = np.arange(1000)
n = 1000
test_direct(x, n)      # 0.007
test_fromiter(x, n)    # 0.479
test_array(x, n)       # 0.516
test_vectorized(x, n)  # 0.945

不同版本的Python/NumPy和编译器优化会有不同的结果,所以对您的环境进行类似的测试。

我相信在numpy的新版本(我使用1.13)中,您可以简单地通过将numpy数组传递给您为标量类型编写的函数来调用该函数,它将自动应用函数调用到numpy数组上的每个元素,并返回另一个numpy数组

>>> import numpy as np
>>> squarer = lambda t: t ** 2
>>> x = np.array([1, 2, 3, 4, 5])
>>> squarer(x)
array([ 1,  4,  9, 16, 25])