谁能给我解释一下map和flatMap之间的区别,以及它们各自的良好用例是什么?
“flatten the results”是什么意思? 它有什么好处?
谁能给我解释一下map和flatMap之间的区别,以及它们各自的良好用例是什么?
“flatten the results”是什么意思? 它有什么好处?
当前回答
所有的例子都很好....这是一个很好的视觉插图…资料来源:spark的DataFlair培训
Map: Map是Apache Spark中的转换操作。它应用于RDD的每个元素,并将结果作为新的RDD返回。在Map中,操作开发人员可以定义自己的自定义业务逻辑。同样的逻辑将应用于RDD的所有元素。
Spark RDD map函数以一个元素作为输入,根据自定义代码(由开发人员指定)处理它,每次返回一个元素。Map将一个长度为N的RDD转换为另一个长度为N的RDD。输入和输出RDD通常具有相同数量的记录。
使用scala的map示例:
val x = spark.sparkContext.parallelize(List("spark", "map", "example", "sample", "example"), 3)
val y = x.map(x => (x, 1))
y.collect
// res0: Array[(String, Int)] =
// Array((spark,1), (map,1), (example,1), (sample,1), (example,1))
// rdd y can be re writen with shorter syntax in scala as
val y = x.map((_, 1))
y.collect
// res1: Array[(String, Int)] =
// Array((spark,1), (map,1), (example,1), (sample,1), (example,1))
// Another example of making tuple with string and it's length
val y = x.map(x => (x, x.length))
y.collect
// res3: Array[(String, Int)] =
// Array((spark,5), (map,3), (example,7), (sample,6), (example,7))
FlatMap:
flatMap是一个转换操作。它应用于RDD的每个元素,并将结果作为新的RDD返回。它类似于Map,但是FlatMap允许从Map函数返回0,1或更多元素。在FlatMap操作中,开发人员可以定义自己的自定义业务逻辑。同样的逻辑将应用于RDD的所有元素。
“flatten the results”是什么意思?
FlatMap函数接受一个元素作为输入,根据自定义代码(由开发人员指定)处理它,并一次返回0个或多个元素。flatMap()将一个长度为N的RDD转换为另一个长度为M的RDD。
使用scala的flatMap示例:
val x = spark.sparkContext.parallelize(List("spark flatmap example", "sample example"), 2)
// map operation will return Array of Arrays in following case : check type of res0
val y = x.map(x => x.split(" ")) // split(" ") returns an array of words
y.collect
// res0: Array[Array[String]] =
// Array(Array(spark, flatmap, example), Array(sample, example))
// flatMap operation will return Array of words in following case : Check type of res1
val y = x.flatMap(x => x.split(" "))
y.collect
//res1: Array[String] =
// Array(spark, flatmap, example, sample, example)
// RDD y can be re written with shorter syntax in scala as
val y = x.flatMap(_.split(" "))
y.collect
//res2: Array[String] =
// Array(spark, flatmap, example, sample, example)
其他回答
使用测试。以Md为例:
➜ spark-1.6.1 cat test.md
This is the first line;
This is the second line;
This is the last line.
scala> val textFile = sc.textFile("test.md")
scala> textFile.map(line => line.split(" ")).count()
res2: Long = 3
scala> textFile.flatMap(line => line.split(" ")).count()
res3: Long = 15
scala> textFile.map(line => line.split(" ")).collect()
res0: Array[Array[String]] = Array(Array(This, is, the, first, line;), Array(This, is, the, second, line;), Array(This, is, the, last, line.))
scala> textFile.flatMap(line => line.split(" ")).collect()
res1: Array[String] = Array(This, is, the, first, line;, This, is, the, second, line;, This, is, the, last, line.)
如果您使用映射方法,您将得到测试线。md,对于flatMap方法,您将得到字数。
map方法类似于flatMap,它们都返回一个新的RDD。map方法经常使用返回一个新的RDD, flatMap方法经常使用分割词。
map和flatMap输出的差异:
1. flatmap
val a = sc.parallelize(1 to 10, 5)
a.flatMap(1 to _).collect()
输出:
1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
2.地图:
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.map(_.length).collect()
输出:
3 6 6 3 8
Flatmap和Map都转换集合。
的区别:
地图(函数) 返回一个新的分布式数据集,该数据集通过函数func传递源的每个元素。
flatMap(函数) 类似于map,但是每个输入项可以映射到0个或多个输出项(因此func应该返回Seq而不是单个项)。
变换函数为: map:输入一个元素->输出一个元素。 flatMap:输入一个元素->输出0个或更多元素(一个集合)。
map
通过将函数应用到该RDD的每个元素,返回一个新的RDD。
>>> rdd = sc.parallelize([2, 3, 4])
>>> sorted(rdd.map(lambda x: [(x, x), (x, x)]).collect())
[[(2, 2), (2, 2)], [(3, 3), (3, 3)], [(4, 4), (4, 4)]]
flatMap
返回一个新的RDD,首先对该RDD的所有元素应用一个函数,然后将结果平摊。 在这里,一个元素转化为多个元素是可能的
>>> rdd = sc.parallelize([2, 3, 4])
>>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
map返回相同数量元素的RDD,而flatMap可能不会。
flatMap过滤丢失或不正确数据的示例用例。
map在各种各样的情况下使用,其中输入和输出的元素数量是相同的。
number.csv
1
2
3
-
4
-
5
Map.py添加add.csv中的所有数字。
from operator import *
def f(row):
try:
return float(row)
except Exception:
return 0
rdd = sc.textFile('a.csv').map(f)
print(rdd.count()) # 7
print(rdd.reduce(add)) # 15.0
py使用flatMap在添加之前过滤掉缺失的数据。与以前的版本相比,增加的数字更少。
from operator import *
def f(row):
try:
return [float(row)]
except Exception:
return []
rdd = sc.textFile('a.csv').flatMap(f)
print(rdd.count()) # 5
print(rdd.reduce(add)) # 15.0