谁能给我解释一下map和flatMap之间的区别,以及它们各自的良好用例是什么?

“flatten the results”是什么意思? 它有什么好处?


当前回答

map

通过将函数应用到该RDD的每个元素,返回一个新的RDD。

>>> rdd = sc.parallelize([2, 3, 4])
>>> sorted(rdd.map(lambda x: [(x, x), (x, x)]).collect())
[[(2, 2), (2, 2)], [(3, 3), (3, 3)], [(4, 4), (4, 4)]]

flatMap

返回一个新的RDD,首先对该RDD的所有元素应用一个函数,然后将结果平摊。 在这里,一个元素转化为多个元素是可能的

>>> rdd = sc.parallelize([2, 3, 4])
>>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]

其他回答

使用测试。以Md为例:

➜  spark-1.6.1 cat test.md
This is the first line;
This is the second line;
This is the last line.

scala> val textFile = sc.textFile("test.md")
scala> textFile.map(line => line.split(" ")).count()
res2: Long = 3

scala> textFile.flatMap(line => line.split(" ")).count()
res3: Long = 15

scala> textFile.map(line => line.split(" ")).collect()
res0: Array[Array[String]] = Array(Array(This, is, the, first, line;), Array(This, is, the, second, line;), Array(This, is, the, last, line.))

scala> textFile.flatMap(line => line.split(" ")).collect()
res1: Array[String] = Array(This, is, the, first, line;, This, is, the, second, line;, This, is, the, last, line.)

如果您使用映射方法,您将得到测试线。md,对于flatMap方法,您将得到字数。

map方法类似于flatMap,它们都返回一个新的RDD。map方法经常使用返回一个新的RDD, flatMap方法经常使用分割词。

下面是一个不同的例子,作为一个spark-shell会话:

首先是一些数据——两行文本:

val rdd = sc.parallelize(Seq("Roses are red", "Violets are blue"))  // lines

rdd.collect

    res0: Array[String] = Array("Roses are red", "Violets are blue")

现在,map将一个长度为N的RDD转换为另一个长度为N的RDD。

例如,它将两行映射为两行长度:

rdd.map(_.length).collect

    res1: Array[Int] = Array(13, 16)

但是flatMap(松散地说)将长度为N的RDD转换为N个集合的集合,然后将这些集合平展为单个结果RDD。

rdd.flatMap(_.split(" ")).collect

    res2: Array[String] = Array("Roses", "are", "red", "Violets", "are", "blue")

我们每行有多个单词,而且每行有多行,但我们最终得到一个单词输出数组

为了说明这一点,从一个行集合到一个单词集合的flatMapping如下:

["aa bb cc", "", "dd"] => [["aa","bb","cc"],[],["dd"]] => ["aa","bb","cc","dd"]

因此,对于flatMap,输入和输出rdd通常具有不同的大小。

如果我们试图使用map与我们的split函数,我们将以嵌套结构结束(RDD的单词数组,类型为RDD[Array[String]]),因为我们必须对每个输入只有一个结果:

rdd.map(_.split(" ")).collect

    res3: Array[Array[String]] = Array(
                                     Array(Roses, are, red), 
                                     Array(Violets, are, blue)
                                 )

最后,一个有用的特殊情况是映射到一个可能不返回答案的函数,因此返回一个Option。我们可以使用flatMap过滤出返回None的元素,并从返回Some的元素中提取值:

val rdd = sc.parallelize(Seq(1,2,3,4))

def myfn(x: Int): Option[Int] = if (x <= 2) Some(x * 10) else None

rdd.flatMap(myfn).collect

    res3: Array[Int] = Array(10,20)

(注意这里Option的行为很像一个只有一个元素或者没有元素的列表)

对于所有想要PySpark相关的人:

示例转换:flatMap

>>> a="hello what are you doing"
>>> a.split()

['hello', 'what', 'are', 'you', 'doing']

>>> b=["hello what are you doing","this is rak"]
>>> b.split()

回溯(最近一次调用): 文件“”,第1行,在 AttributeError: 'list'对象没有属性'split'

>>> rline=sc.parallelize(b)
>>> type(rline)

>>> def fwords(x):
...     return x.split()


>>> rword=rline.map(fwords)
>>> rword.collect()

[[‘你好’,‘什么’,‘是’,‘你’,‘做’],[‘这个’,‘是’,'爱你']]

>>> rwordflat=rline.flatMap(fwords)
>>> rwordflat.collect()

[‘你好’,‘什么’,‘是’,‘你’,‘做’,‘这’,‘是’,‘爱’)

希望能有所帮助。

区别可以从下面的pyspark代码示例中看到:

rdd = sc.parallelize([2, 3, 4])
rdd.flatMap(lambda x: range(1, x)).collect()
Output:
[1, 1, 2, 1, 2, 3]


rdd.map(lambda x: range(1, x)).collect()
Output:
[[1], [1, 2], [1, 2, 3]]

map返回相同数量元素的RDD,而flatMap可能不会。

flatMap过滤丢失或不正确数据的示例用例。

map在各种各样的情况下使用,其中输入和输出的元素数量是相同的。

number.csv

1
2
3
-
4
-
5

Map.py添加add.csv中的所有数字。

from operator import *

def f(row):
  try:
    return float(row)
  except Exception:
    return 0

rdd = sc.textFile('a.csv').map(f)

print(rdd.count())      # 7
print(rdd.reduce(add))  # 15.0

py使用flatMap在添加之前过滤掉缺失的数据。与以前的版本相比,增加的数字更少。

from operator import *

def f(row):
  try:
    return [float(row)]
  except Exception:
    return []

rdd = sc.textFile('a.csv').flatMap(f)

print(rdd.count())      # 5
print(rdd.reduce(add))  # 15.0