讨论代码的原始答案可以在下面找到。
首先,您必须区分不同类型的API,每种API都有自己的性能考虑因素。
抽样API
(纯Python结构与基于JVM的编制)
这是受Python代码性能和PySpark实现细节影响最大的组件。虽然Python的性能不太可能成为问题,但至少有几个因素是你必须考虑的:
Overhead of JVM communication. Practically all data that comes to and from Python executor has to be passed through a socket and a JVM worker. While this is a relatively efficient local communication it is still not free.
Process-based executors (Python) versus thread based (single JVM multiple threads) executors (Scala). Each Python executor runs in its own process. As a side effect, it provides stronger isolation than its JVM counterpart and some control over executor lifecycle but potentially significantly higher memory usage:
interpreter memory footprint
footprint of the loaded libraries
less efficient broadcasting (each process requires its own copy of a broadcast)
Performance of Python code itself. Generally speaking Scala is faster than Python but it will vary on task to task. Moreover you have multiple options including JITs like Numba, C extensions (Cython) or specialized libraries like Theano. Finally, if you don't use ML / MLlib (or simply NumPy stack), consider using PyPy as an alternative interpreter. See SPARK-3094.
PySpark configuration provides the spark.python.worker.reuse option which can be used to choose between forking Python process for each task and reusing existing process. The latter option seems to be useful to avoid expensive garbage collection (it is more an impression than a result of systematic tests), while the former one (default) is optimal for in case of expensive broadcasts and imports.
Reference counting, used as the first line garbage collection method in CPython, works pretty well with typical Spark workloads (stream-like processing, no reference cycles) and reduces the risk of long GC pauses.
MLlib
(Python和JVM混合执行)
基本的考虑因素与以前几乎相同,但有一些额外的问题。虽然MLlib使用的基本结构是普通的Python RDD对象,但所有算法都直接使用Scala执行。
这意味着将Python对象转换为Scala对象或将Python对象转换为Scala对象会增加额外的成本,增加内存使用,以及稍后我们将介绍的一些额外限制。
目前(Spark 2.x),基于rdd的API处于维护模式,计划在Spark 3.0中被移除。
DataFrame API和Spark ML
(使用Python代码的JVM执行仅限于驱动程序)
这些可能是标准数据处理任务的最佳选择。由于Python代码主要局限于驱动程序上的高级逻辑操作,因此Python和Scala之间应该没有性能差异。
A single exception is usage of row-wise Python UDFs which are significantly less efficient than their Scala equivalents. While there is some chance for improvements (there has been substantial development in Spark 2.0.0), the biggest limitation is full roundtrip between internal representation (JVM) and Python interpreter. If possible, you should favor a composition of built-in expressions (example. Python UDF behavior has been improved in Spark 2.0.0, but it is still suboptimal compared to native execution.
随着向量化udf (SPARK-21190和进一步的扩展)的引入,这可能会在未来得到改善,它使用箭头流进行高效的零拷贝反序列化数据交换。对于大多数应用程序,它们的次要开销可以忽略不计。
还要确保避免在dataframe和rdd之间传递不必要的数据。这需要昂贵的序列化和反序列化,更不用说与Python解释器之间的数据传输了。
值得注意的是,Py4J调用具有相当高的延迟。这包括简单的调用,如:
from pyspark.sql.functions import col
col("foo")
通常,这并不重要(开销是恒定的,不依赖于数据量),但对于软实时应用程序,您可以考虑缓存/重用Java包装器。
GraphX和Spark数据集
至于现在(Spark 1.6 2.1),两者都没有提供PySpark API,所以你可以说PySpark比Scala差得多。
GraphX
实际上,GraphX的开发几乎完全停止了,项目目前处于维护模式,相关的JIRA票据已经关闭,因为无法修复。GraphFrames库通过Python绑定提供了另一种图形处理库。
Dataset
主观上讲,在Python中静态类型的数据集没有太多的空间,即使有,当前的Scala实现也太简单了,不能提供与DataFrame相同的性能优势。
流媒体
从我目前看到的情况来看,我强烈建议使用Scala而不是Python。如果PySpark支持结构化流,未来可能会有所改变,但现在Scala API似乎更健壮、更全面、更高效。我的经验很有限。
Spark 2中的结构化流。X似乎缩小了语言之间的差距,但目前它仍处于早期阶段。然而,基于RDD的API已经在Databricks文档中被引用为“遗留流”(访问日期为2017-03-03),因此有理由期待进一步的统一努力。
不履行注意事项
Feature parity
并非所有Spark特性都是通过PySpark API公开的。一定要检查您需要的部分是否已经实现,并尝试了解可能的限制。
当您使用MLlib和类似的混合上下文时,这一点尤其重要(参见从任务调用Java/Scala函数)。公平地说,PySpark API的某些部分,比如mllib。linalg提供了一组比Scala更全面的方法。
API design
PySpark API紧密地反映了它的Scala对等物,因此并不完全是python式的。这意味着在语言之间进行映射非常容易,但与此同时,Python代码可能非常难以理解。
Complex architecture
与纯JVM执行相比,PySpark数据流相对复杂。对PySpark程序或调试进行推理要困难得多。此外,至少要对Scala和JVM有基本的了解。
Spark 2.x and beyond
持续向数据集API的转变,以及冻结的RDD API,为Python用户带来了机遇和挑战。虽然API的高级部分更容易在Python中公开,但更高级的特性几乎不可能直接使用。
此外,本地Python函数在SQL世界中仍然是二等公民。希望将来Apache Arrow序列化能够改善这一点(目前的工作目标是数据收集,但UDF serde是一个长期目标)。
对于强烈依赖Python代码库的项目,纯Python替代品(如Dask或Ray)可能是一个有趣的替代品。
不一定非得是一个对另一个
Spark DataFrame (SQL, Dataset) API提供了一种优雅的方式来集成Scala/Java代码在PySpark应用程序。您可以使用dataframe将数据公开给本机JVM代码并读取结果。我已经在其他地方解释了一些选项,你可以在如何在Pyspark中使用Scala类中找到Python-Scala往返的工作示例。
它可以通过引入用户定义类型来进一步增强(参见如何在Spark SQL中为自定义类型定义模式?)
问题中提供的代码有什么问题
(免责声明:python的观点。很可能我错过了一些Scala技巧)
首先,你的代码中有一部分完全没有意义。如果你已经有(键,值)对创建使用zipWithIndex或枚举在创建字符串只是分割它之后的点是什么?flatMap不递归地工作,所以你可以简单地产生元组并跳过后面的map。
我发现问题的另一部分是reduceByKey。一般来说,如果应用聚合函数可以减少必须打乱的数据量,那么reduceByKey是有用的。因为您只是简单地连接字符串,所以这里没有什么可获得的。忽略低级别的东西,比如引用的数量,必须传输的数据量与groupByKey完全相同。
通常情况下,我不会详述这一点,但据我所知,这是Scala代码中的一个瓶颈。在JVM上连接字符串是一个相当昂贵的操作(例如:scala中的字符串连接和Java中的字符串连接一样昂贵吗?)这意味着像这样的_。reduceByKey((v1: String, v2: String) => v1 + ',' + v2)在你的代码中等价于input4.reduceByKey(valsConcat)不是一个好主意。
如果你想避免groupByKey,你可以尝试使用StringBuilder的aggregateByKey。类似的东西应该可以达到这个目的:
rdd.aggregateByKey(new StringBuilder)(
(acc, e) => {
if(!acc.isEmpty) acc.append(",").append(e)
else acc.append(e)
},
(acc1, acc2) => {
if(acc1.isEmpty | acc2.isEmpty) acc1.addString(acc2)
else acc1.append(",").addString(acc2)
}
)
但我怀疑这值得大惊小怪。
记住上面的内容,我重写了你的代码,如下所示:
生存巨:
val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
(idx, iter) => if (idx == 0) iter.drop(1) else iter
}
val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
case ("true", i) => (i, "1")
case ("false", i) => (i, "0")
case p => p.swap
})
val result = pairs.groupByKey.map{
case (k, vals) => {
val valsString = vals.mkString(",")
s"$k,$valsString"
}
}
result.saveAsTextFile("scalaout")
Python:
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
for (i, x) in enumerate(vals):
yield (i, x)
input = (sc
.textFile('train.csv', minPartitions=6)
.mapPartitionsWithIndex(drop_first_line))
pairs = input.flatMap(separate_cols)
result = (pairs
.groupByKey()
.map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))
result.saveAsTextFile("pythonout")
结果
在本地[6]模式下(Intel(R) Xeon(R) CPU E3-1245 V2 @ 3.40GHz),每个执行器占用4GB内存(n = 3):
Scala - mean: 250.00s, stdev: 12.49
Python - mean: 246.66s, stdev: 1.15
我敢肯定,大部分时间都花在了洗牌、序列化、反序列化和其他次要任务上。只是为了好玩,这里是Python中简单的单线程代码,它在这台机器上执行相同的任务不到一分钟:
def go():
with open("train.csv") as fr:
lines = [
line.replace('true', '1').replace('false', '0').split(",")
for line in fr]
return zip(*lines[1:])