比起Scala,我更喜欢Python。但是,由于Spark本身是用Scala编写的,我希望我的代码在Scala中比Python版本运行得更快,原因很明显。

基于这个假设,我想学习并编写一些非常常见的1 GB数据预处理代码的Scala版本。数据是从Kaggle上的SpringLeaf比赛中挑选的。只是简单介绍一下数据(它包含1936个维度和145232行)。数据由各种类型组成,例如int型,浮点型,字符串,布尔型。我使用8个内核中的6个进行Spark处理;这就是为什么我使用minPartitions=6,这样每个核心都有一些东西要处理。

Scala代码

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => 
  if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = {
  val vals = line.split(delim1)
  (vals(0), vals(1))
}

val input4 = input3.map(toKeyVal)

def valsConcat(val1: String, val2: String): String = {
  val1 + "," + val2
}

val input5 = input4.reduceByKey(valsConcat)

input5.saveAsTextFile("output")

Python代码

input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'


def drop_first_line(index, itr):
  if index == 0:
    return iter(list(itr)[1:])
  else:
    return itr

input2 = input.mapPartitionsWithIndex(drop_first_line)

def separate_cols(line):
  line = line.replace('true', '1').replace('false', '0')
  vals = line.split(',')
  vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
           for e, val in enumerate(vals)]
  return vals2


input3 = input2.flatMap(separate_cols)

def to_key_val(kv):
  key, val = kv.split(DELIM_1)
  return (key, val)
input4 = input3.map(to_key_val)

def vals_concat(v1, v2):
  return v1 + ',' + v2

input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')

Scala的性能 第0阶段(38分钟),第1阶段(18秒)

Python的性能 第0阶段(11分钟),第1阶段(7秒)

两者都会生成不同的DAG可视化图(由于这两张图显示了Scala (map)和Python (reduceByKey)的不同阶段0函数)

但是,本质上,这两个代码都试图将数据转换为(维度_id,值列表字符串)RDD并保存到磁盘。输出将用于计算每个维度的各种统计信息。

在性能方面,对于这样的真实数据,Scala代码的运行速度似乎比Python版本慢4倍。 对我来说,好消息是它给了我继续使用Python的良好动力。坏消息是我不太明白为什么?



讨论代码的原始答案可以在下面找到。


首先,您必须区分不同类型的API,每种API都有自己的性能考虑因素。

抽样API

(纯Python结构与基于JVM的编制)

这是受Python代码性能和PySpark实现细节影响最大的组件。虽然Python的性能不太可能成为问题,但至少有几个因素是你必须考虑的:

Overhead of JVM communication. Practically all data that comes to and from Python executor has to be passed through a socket and a JVM worker. While this is a relatively efficient local communication it is still not free. Process-based executors (Python) versus thread based (single JVM multiple threads) executors (Scala). Each Python executor runs in its own process. As a side effect, it provides stronger isolation than its JVM counterpart and some control over executor lifecycle but potentially significantly higher memory usage: interpreter memory footprint footprint of the loaded libraries less efficient broadcasting (each process requires its own copy of a broadcast) Performance of Python code itself. Generally speaking Scala is faster than Python but it will vary on task to task. Moreover you have multiple options including JITs like Numba, C extensions (Cython) or specialized libraries like Theano. Finally, if you don't use ML / MLlib (or simply NumPy stack), consider using PyPy as an alternative interpreter. See SPARK-3094. PySpark configuration provides the spark.python.worker.reuse option which can be used to choose between forking Python process for each task and reusing existing process. The latter option seems to be useful to avoid expensive garbage collection (it is more an impression than a result of systematic tests), while the former one (default) is optimal for in case of expensive broadcasts and imports. Reference counting, used as the first line garbage collection method in CPython, works pretty well with typical Spark workloads (stream-like processing, no reference cycles) and reduces the risk of long GC pauses.

MLlib

(Python和JVM混合执行)

基本的考虑因素与以前几乎相同,但有一些额外的问题。虽然MLlib使用的基本结构是普通的Python RDD对象,但所有算法都直接使用Scala执行。

这意味着将Python对象转换为Scala对象或将Python对象转换为Scala对象会增加额外的成本,增加内存使用,以及稍后我们将介绍的一些额外限制。

目前(Spark 2.x),基于rdd的API处于维护模式,计划在Spark 3.0中被移除。

DataFrame API和Spark ML

(使用Python代码的JVM执行仅限于驱动程序)

这些可能是标准数据处理任务的最佳选择。由于Python代码主要局限于驱动程序上的高级逻辑操作,因此Python和Scala之间应该没有性能差异。

A single exception is usage of row-wise Python UDFs which are significantly less efficient than their Scala equivalents. While there is some chance for improvements (there has been substantial development in Spark 2.0.0), the biggest limitation is full roundtrip between internal representation (JVM) and Python interpreter. If possible, you should favor a composition of built-in expressions (example. Python UDF behavior has been improved in Spark 2.0.0, but it is still suboptimal compared to native execution.

随着向量化udf (SPARK-21190和进一步的扩展)的引入,这可能会在未来得到改善,它使用箭头流进行高效的零拷贝反序列化数据交换。对于大多数应用程序,它们的次要开销可以忽略不计。

还要确保避免在dataframe和rdd之间传递不必要的数据。这需要昂贵的序列化和反序列化,更不用说与Python解释器之间的数据传输了。

值得注意的是,Py4J调用具有相当高的延迟。这包括简单的调用,如:

from pyspark.sql.functions import col

col("foo")

通常,这并不重要(开销是恒定的,不依赖于数据量),但对于软实时应用程序,您可以考虑缓存/重用Java包装器。

GraphX和Spark数据集

至于现在(Spark 1.6 2.1),两者都没有提供PySpark API,所以你可以说PySpark比Scala差得多。

GraphX

实际上,GraphX的开发几乎完全停止了,项目目前处于维护模式,相关的JIRA票据已经关闭,因为无法修复。GraphFrames库通过Python绑定提供了另一种图形处理库。

Dataset

主观上讲,在Python中静态类型的数据集没有太多的空间,即使有,当前的Scala实现也太简单了,不能提供与DataFrame相同的性能优势。

流媒体

从我目前看到的情况来看,我强烈建议使用Scala而不是Python。如果PySpark支持结构化流,未来可能会有所改变,但现在Scala API似乎更健壮、更全面、更高效。我的经验很有限。

Spark 2中的结构化流。X似乎缩小了语言之间的差距,但目前它仍处于早期阶段。然而,基于RDD的API已经在Databricks文档中被引用为“遗留流”(访问日期为2017-03-03),因此有理由期待进一步的统一努力。

不履行注意事项

Feature parity

并非所有Spark特性都是通过PySpark API公开的。一定要检查您需要的部分是否已经实现,并尝试了解可能的限制。

当您使用MLlib和类似的混合上下文时,这一点尤其重要(参见从任务调用Java/Scala函数)。公平地说,PySpark API的某些部分,比如mllib。linalg提供了一组比Scala更全面的方法。

API design

PySpark API紧密地反映了它的Scala对等物,因此并不完全是python式的。这意味着在语言之间进行映射非常容易,但与此同时,Python代码可能非常难以理解。

Complex architecture

与纯JVM执行相比,PySpark数据流相对复杂。对PySpark程序或调试进行推理要困难得多。此外,至少要对Scala和JVM有基本的了解。

Spark 2.x and beyond

持续向数据集API的转变,以及冻结的RDD API,为Python用户带来了机遇和挑战。虽然API的高级部分更容易在Python中公开,但更高级的特性几乎不可能直接使用。

此外,本地Python函数在SQL世界中仍然是二等公民。希望将来Apache Arrow序列化能够改善这一点(目前的工作目标是数据收集,但UDF serde是一个长期目标)。

对于强烈依赖Python代码库的项目,纯Python替代品(如Dask或Ray)可能是一个有趣的替代品。

不一定非得是一个对另一个

Spark DataFrame (SQL, Dataset) API提供了一种优雅的方式来集成Scala/Java代码在PySpark应用程序。您可以使用dataframe将数据公开给本机JVM代码并读取结果。我已经在其他地方解释了一些选项,你可以在如何在Pyspark中使用Scala类中找到Python-Scala往返的工作示例。

它可以通过引入用户定义类型来进一步增强(参见如何在Spark SQL中为自定义类型定义模式?)


问题中提供的代码有什么问题

(免责声明:python的观点。很可能我错过了一些Scala技巧)

首先,你的代码中有一部分完全没有意义。如果你已经有(键,值)对创建使用zipWithIndex或枚举在创建字符串只是分割它之后的点是什么?flatMap不递归地工作,所以你可以简单地产生元组并跳过后面的map。

我发现问题的另一部分是reduceByKey。一般来说,如果应用聚合函数可以减少必须打乱的数据量,那么reduceByKey是有用的。因为您只是简单地连接字符串,所以这里没有什么可获得的。忽略低级别的东西,比如引用的数量,必须传输的数据量与groupByKey完全相同。

通常情况下,我不会详述这一点,但据我所知,这是Scala代码中的一个瓶颈。在JVM上连接字符串是一个相当昂贵的操作(例如:scala中的字符串连接和Java中的字符串连接一样昂贵吗?)这意味着像这样的_。reduceByKey((v1: String, v2: String) => v1 + ',' + v2)在你的代码中等价于input4.reduceByKey(valsConcat)不是一个好主意。

如果你想避免groupByKey,你可以尝试使用StringBuilder的aggregateByKey。类似的东西应该可以达到这个目的:

rdd.aggregateByKey(new StringBuilder)(
  (acc, e) => {
    if(!acc.isEmpty) acc.append(",").append(e)
    else acc.append(e)
  },
  (acc1, acc2) => {
    if(acc1.isEmpty | acc2.isEmpty)  acc1.addString(acc2)
    else acc1.append(",").addString(acc2)
  }
)

但我怀疑这值得大惊小怪。

记住上面的内容,我重写了你的代码,如下所示:

生存巨:

val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
  (idx, iter) => if (idx == 0) iter.drop(1) else iter
}

val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
  case ("true", i) => (i, "1")
  case ("false", i) => (i, "0")
  case p => p.swap
})

val result = pairs.groupByKey.map{
  case (k, vals) =>  {
    val valsString = vals.mkString(",")
    s"$k,$valsString"
  }
}

result.saveAsTextFile("scalaout")

Python:

def drop_first_line(index, itr):
    if index == 0:
        return iter(list(itr)[1:])
    else:
        return itr

def separate_cols(line):
    line = line.replace('true', '1').replace('false', '0')
    vals = line.split(',')
    for (i, x) in enumerate(vals):
        yield (i, x)

input = (sc
    .textFile('train.csv', minPartitions=6)
    .mapPartitionsWithIndex(drop_first_line))

pairs = input.flatMap(separate_cols)

result = (pairs
    .groupByKey()
    .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))

result.saveAsTextFile("pythonout")

结果

在本地[6]模式下(Intel(R) Xeon(R) CPU E3-1245 V2 @ 3.40GHz),每个执行器占用4GB内存(n = 3):

Scala - mean: 250.00s, stdev: 12.49 Python - mean: 246.66s, stdev: 1.15

我敢肯定,大部分时间都花在了洗牌、序列化、反序列化和其他次要任务上。只是为了好玩,这里是Python中简单的单线程代码,它在这台机器上执行相同的任务不到一分钟:

def go():
    with open("train.csv") as fr:
        lines = [
            line.replace('true', '1').replace('false', '0').split(",")
            for line in fr]
    return zip(*lines[1:])

延伸至以上答案-

Scala在许多方面都比python更快,但python比Scala更受欢迎是有一些合理的原因的,让我们看看其中的几个

Python for Apache Spark非常容易学习和使用。然而,这并不是Pyspark比Scala更好的唯一原因。有更多的。

用于Spark的Python API在集群上可能更慢,但最终,与Scala相比,数据科学家可以用它做更多的事情。Scala的复杂性是不存在的。界面简单全面。

谈到代码的可读性,维护和熟悉Apache的Python API, Spark远比Scala好。

Python附带了几个与机器学习和自然语言处理相关的库。这有助于数据分析,也有非常成熟和经过时间检验的统计数据。例如numpy, pandas, scikit-learn, seaborn和matplotlib。

注意:大多数数据科学家使用混合方法,即使用这两种api的优点。

最后,Scala社区对程序员的帮助很少。这使得Python成为一门非常有价值的学习。如果您对任何静态类型编程语言(如Java)都有足够的经验,那么您就不必担心完全不用Scala了。