我只是想知道在Apache Spark中RDD和DataFrame (Spark 2.0.0 DataFrame只是数据集[行]的类型别名)之间的区别是什么?
你能把一个转换成另一个吗?
我只是想知道在Apache Spark中RDD和DataFrame (Spark 2.0.0 DataFrame只是数据集[行]的类型别名)之间的区别是什么?
你能把一个转换成另一个吗?
通过谷歌搜索“DataFrame definition”可以很好地定义一个DataFrame:
数据帧是一种表格,或者是一种二维的类似数组的结构 每一列包含对一个变量的测量,以及每一行 包含一个大小写。
因此,由于其表格格式,DataFrame具有额外的元数据,这允许Spark在最终查询上运行某些优化。
另一方面,RDD只是一个弹性分布式数据集(Resilient Distributed Dataset),它更像是一个数据黑箱,不能对其进行优化,因为可以对其执行的操作不受约束。
然而,你可以通过RDD方法从一个DataFrame到一个RDD,你也可以通过toDF方法从一个RDD到一个DataFrame(如果RDD是一个表格格式)
一般来说,由于内置的查询优化,建议尽可能使用DataFrame。
DataFrame相当于RDBMS中的表,也可以以类似于rdd中的“原生”分布式集合的方式进行操作。与rdd不同,dataframe跟踪模式并支持各种关系操作,从而实现更优化的执行。 每个DataFrame对象表示一个逻辑计划,但由于它们的“惰性”性质,直到用户调用特定的“输出操作”才会执行。
简单地说,RDD是核心组件,而DataFrame是spark 1.30引入的API。
RDD
数据分区的集合,称为RDD。这些RDD必须遵循以下几个属性:
不可变的, 容错, 分布式的, 更多。
这里RDD是结构化的或非结构化的。
DataFrame
DataFrame是Scala、Java、Python和r中可用的API,它允许处理任何类型的结构化和半结构化数据。要定义DataFrame,一个被组织成命名列的分布式数据集合,称为DataFrame。您可以很容易地优化DataFrame中的rdd。 您可以使用DataFrame一次处理JSON数据,parquet数据,HiveQL数据。
val sampleRDD = sqlContext.jsonFile("hdfs://localhost:9000/jsondata.json")
val sample_DF = sampleRDD.toDF()
这里Sample_DF被认为是DataFrame。sampleRDD(原始数据)称为RDD。
因为DataFrame是弱类型的,开发人员没有得到类型系统的好处。例如,假设你想从SQL中读取一些东西,并对其运行一些聚合:
val people = sqlContext.read.parquet("...")
val department = sqlContext.read.parquet("...")
people.filter("age > 30")
.join(department, people("deptId") === department("id"))
.groupBy(department("name"), "gender")
.agg(avg(people("salary")), max(people("age")))
当你说people("deptId")时,你得到的不是Int或Long对象,你得到的是你需要操作的Column对象。在具有丰富类型系统的语言(如Scala)中,您最终失去了所有类型安全,这增加了在编译时可以发现的运行时错误的数量。
相反,输入数据集[T]。当你这样做时:
val people: People = val people = sqlContext.read.parquet("...").as[People]
您实际上得到了一个People对象,其中deptId是一个实际的整型而不是列型,从而利用了类型系统。
从Spark 2.0开始,DataFrame和DataSet api将是统一的,其中DataFrame将是DataSet[Row]的类型别名。
首先,DataFrame是从SchemaRDD演变而来的。
是的. .Dataframe和RDD之间的转换是绝对可能的。
下面是一些示例代码片段。
df。rdd就是rdd [Row]
下面是一些创建数据框架的选项。
1) yourrddOffrow。toDF转换为DataFrame。 2)使用sql context的createDataFrame Val df = spark。createDataFrame (rddOfRow模式)
where schema can be from some of below options as described by nice SO post.. From scala case class and scala reflection api import org.apache.spark.sql.catalyst.ScalaReflection val schema = ScalaReflection.schemaFor[YourScalacaseClass].dataType.asInstanceOf[StructType] OR using Encoders import org.apache.spark.sql.Encoders val mySchema = Encoders.product[MyCaseClass].schema as described by Schema can also be created using StructType and StructField val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("col1", DoubleType, true)) .add(StructField("col2", DoubleType, true)) etc...
事实上,现在有3个Apache Spark api ..
火灾等级:
The RDD (Resilient Distributed Dataset) API has been in Spark since the 1.0 release. The RDD API provides many transformation methods, such as map(), filter(), and reduce() for performing computations on the data. Each of these methods results in a new RDD representing the transformed data. However, these methods are just defining the operations to be performed and the transformations are not performed until an action method is called. Examples of action methods are collect() and saveAsObjectFile().
抽样的例子:
rdd.filter(_.age > 21) // transformation
.map(_.last)// transformation
.saveAsObjectFile("under21.bin") // action
示例:RDD按属性过滤
rdd.filter(_.age > 21)
DataFrame火
Spark 1.3 introduced a new DataFrame API as part of the Project Tungsten initiative which seeks to improve the performance and scalability of Spark. The DataFrame API introduces the concept of a schema to describe the data, allowing Spark to manage the schema and only pass data between nodes, in a much more efficient way than using Java serialization. The DataFrame API is radically different from the RDD API because it is an API for building a relational query plan that Spark’s Catalyst optimizer can then execute. The API is natural for developers who are familiar with building query plans
示例SQL样式:
df。Filter ("age > 21");
限制: 因为代码是按名称引用数据属性的,所以编译器不可能捕捉到任何错误。如果属性名不正确,则只有在运行时创建查询计划时才会检测到错误。
DataFrame API的另一个缺点是它非常以scala为中心,虽然它确实支持Java,但支持是有限的。
例如,当从现有的Java对象RDD创建DataFrame时,Spark的Catalyst优化器无法推断模式,并假设DataFrame中的任何对象都实现了scala。产品界面。Scala case类解决了这个问题,因为它们实现了这个接口。
数据集火
The Dataset API, released as an API preview in Spark 1.6, aims to provide the best of both worlds; the familiar object-oriented programming style and compile-time type-safety of the RDD API but with the performance benefits of the Catalyst query optimizer. Datasets also use the same efficient off-heap storage mechanism as the DataFrame API. When it comes to serializing data, the Dataset API has the concept of encoders which translate between JVM representations (objects) and Spark’s internal binary format. Spark has built-in encoders which are very advanced in that they generate byte code to interact with off-heap data and provide on-demand access to individual attributes without having to de-serialize an entire object. Spark does not yet provide an API for implementing custom encoders, but that is planned for a future release. Additionally, the Dataset API is designed to work equally well with both Java and Scala. When working with Java objects, it is important that they are fully bean-compliant.
示例数据集API SQL样式:
dataset.filter(_.age < 21);
DataFrame和DataSet之间的评估不同:
阴极级流..(解密spark峰会上的数据框架和数据集演示)
进一步阅读…databricks文章-三个Apache Spark api的故事:rdd vs dataframe和数据集
Apache Spark提供了三种类型的api
抽样 DataFrame 数据集
这里是RDD, Dataframe和Dataset之间的api比较。
RDD
Spark提供的主要抽象是一个弹性分布式数据集(RDD),它是跨集群节点划分的元素集合,可以并行操作。
抽样特性:
Distributed collection: RDD uses MapReduce operations which is widely adopted for processing and generating large datasets with a parallel, distributed algorithm on a cluster. It allows users to write parallel computations, using a set of high-level operators, without having to worry about work distribution and fault tolerance. Immutable: RDDs composed of a collection of records which are partitioned. A partition is a basic unit of parallelism in an RDD, and each partition is one logical division of data which is immutable and created through some transformations on existing partitions.Immutability helps to achieve consistency in computations. Fault tolerant: In a case of we lose some partition of RDD , we can replay the transformation on that partition in lineage to achieve the same computation, rather than doing data replication across multiple nodes.This characteristic is the biggest benefit of RDD because it saves a lot of efforts in data management and replication and thus achieves faster computations. Lazy evaluations: All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset . The transformations are only computed when an action requires a result to be returned to the driver program. Functional transformations: RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. Data processing formats: It can easily and efficiently process data which is structured as well as unstructured data. Programming Languages supported: RDD API is available in Java, Scala, Python and R.
抽样的局限性:
没有内置优化引擎: 在处理结构化数据时,rdd无法利用Spark的高级优化器,包括catalyst优化器和Tungsten执行引擎。开发人员需要根据每个RDD的属性来优化它。 处理结构化数据: 与Dataframe和数据集不同,rdd不推断所摄取数据的模式,并要求用户指定它。
Dataframes
Spark在Spark 1.3版本中引入了Dataframes。Dataframe克服了rdd所面临的主要挑战。
DataFrame是一个分布式的数据集合,它被组织成命名的列。它在概念上等同于关系数据库或R/Python Dataframe中的表。除了Dataframe, Spark还引入了catalyst优化器,它利用高级编程特性来构建可扩展的查询优化器。
Dataframe特点:-
Distributed collection of Row Object: A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database, but with richer optimizations under the hood. Data Processing: Processing structured and unstructured data formats (Avro, CSV, elastic search, and Cassandra) and storage systems (HDFS, HIVE tables, MySQL, etc). It can read and write from all these various datasources. Optimization using catalyst optimizer: It powers both SQL queries and the DataFrame API. Dataframe use catalyst tree transformation framework in four phases, 1.Analyzing a logical plan to resolve references 2.Logical plan optimization 3.Physical planning 4.Code generation to compile parts of the query to Java bytecode. Hive Compatibility: Using Spark SQL, you can run unmodified Hive queries on your existing Hive warehouses. It reuses Hive frontend and MetaStore and gives you full compatibility with existing Hive data, queries, and UDFs. Tungsten: Tungsten provides a physical execution backend whichexplicitly manages memory and dynamically generates bytecode for expression evaluation. Programming Languages supported: Dataframe API is available in Java, Scala, Python, and R.
Dataframe限制:
编译时类型安全: 如前所述,Dataframe API不支持编译时安全,这限制了你在不知道结构时操作数据。下面的示例在编译时工作。但是,在执行这段代码时,您将得到一个运行时异常。
例子:
case class Person(name : String , age : Int)
val dataframe = sqlContext.read.json("people.json")
dataframe.filter("salary > 10000").show
=> throws Exception : cannot resolve 'salary' given input age , name
这很有挑战性,特别是当您正在处理多个转换和聚合步骤时。
无法操作域对象(丢失域对象): 一旦将域对象转换为数据框架,就不能从中重新生成数据框架。在下面的例子中,一旦我们从personRDD创建了personDF,我们将不会恢复Person类的原始RDD (RDD[Person])。
例子:
case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
personDF.rdd // returns RDD[Row] , does not returns RDD[Person]
Datasets火
Dataset API is an extension to DataFrames that provides a type-safe, object-oriented programming interface. It is a strongly-typed, immutable collection of objects that are mapped to a relational schema. At the core of the Dataset, API is a new concept called an encoder, which is responsible for converting between JVM objects and tabular representation. The tabular representation is stored using Spark internal Tungsten binary format, allowing for operations on serialized data and improved memory utilization. Spark 1.6 comes with support for automatically generating encoders for a wide variety of types, including primitive types (e.g. String, Integer, Long), Scala case classes, and Java Beans.
数据集的特性:
Provides best of both RDD and Dataframe: RDD(functional programming, type safe), DataFrame (relational model, Query optimazation , Tungsten execution, sorting and shuffling) Encoders: With the use of Encoders, it is easy to convert any JVM object into a Dataset, allowing users to work with both structured and unstructured data unlike Dataframe. Programming Languages supported: Datasets API is currently only available in Scala and Java. Python and R are currently not supported in version 1.6. Python support is slated for version 2.0. Type Safety: Datasets API provides compile time safety which was not available in Dataframes. In the example below, we can see how Dataset can operate on domain objects with compile lambda functions.
例子:
case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
val ds:Dataset[Person] = personDF.as[Person]
ds.filter(p => p.age > 25)
ds.filter(p => p.salary > 25)
// error : value salary is not a member of person
ds.rdd // returns RDD[Person]
互操作:数据集允许您轻松地将现有的rdd和dataframe转换为数据集,而无需样板代码。
数据集API限制:-
需要类型转换为字符串: 目前从数据集中查询数据需要我们将类中的字段指定为字符串。查询完数据后,必须将列强制转换为所需的数据类型。另一方面,如果我们在数据集上使用map操作,它将不会使用Catalyst优化器。
例子:
ds.select(col("name").as[String], $"age".as[Int]).collect()
不支持Python和R:从1.6版开始,数据集只支持Scala和Java。Python支持将在Spark 2.0中引入。
Datasets API与现有的RDD和Dataframe API相比,具有更好的类型安全性和函数式编程优势。面对API中类型强制转换需求的挑战,您仍然无法获得所需的类型安全性,并将使您的代码变得脆弱。
Dataframe是Row对象的RDD,每个对象代表一条记录。一个 Dataframe还知道它的行的模式(即数据字段)。虽然Dataframes 看起来像常规的rdd,它们内部以更有效的方式存储数据,利用它们的模式。此外,它们还提供了rdd上不可用的新操作,例如运行SQL查询的能力。数据帧可以从外部数据源、查询结果或常规rdd中创建。
参考文献:Zaharia M., et al。学习火花(O'Reilly, 2015)
大部分答案都是正确的,我只想补充一点
在Spark 2.0中,这两个API (DataFrame +DataSet)将统一为一个API。
统一DataFrame和Dataset:在Scala和Java中,DataFrame和Dataset是统一的,即DataFrame只是Dataset of Row的类型别名。在Python和R中,由于缺乏类型安全,DataFrame是主要的编程接口。”
数据集类似于rdd,但是,它们不使用Java序列化或Kryo,而是使用专门的Encoder来序列化对象,以便在网络上进行处理或传输。
Spark SQL支持两种将现有rdd转换为数据集的方法。第一种方法使用反射来推断包含特定类型对象的RDD的模式。这种基于反射的方法可以生成更简洁的代码,如果在编写Spark应用程序时已经知道模式,这种方法也能很好地工作。
创建数据集的第二种方法是通过编程接口,该接口允许您构造一个模式,然后将其应用于现有的RDD。虽然此方法更详细,但它允许您在运行时之前不知道列及其类型时构造数据集。
在这里你可以找到RDD tof数据帧对话的答案
如何将rdd对象转换为数据帧在火花
所有(RDD、DataFrame和DataSet)在一张图片中。
图片致谢
RDD
RDD是可以并行操作的元素的容错集合。
DataFrame
DataFrame是一个被组织成命名列的数据集。它是 概念上等价于关系数据库中的表或数据 框架,但是在底层有更丰富的优化。
数据集
数据集是数据的分布式集合。Dataset是Spark 1.6中新增的接口,提供rdd的优点 (强类型,能够使用强大的lambda函数) Spark SQL优化执行引擎的好处。 注意: 在Scala/Java中,Dataset of Rows (Dataset[Row])通常被称为DataFrames。
用一个代码片段对它们进行了很好的比较。
源
问:你能把一个转换成另一个,像RDD到DataFrame,反之亦然?
是的,两者都有可能
1. 使用.toDF() RDD到DataFrame
val rowsRdd: RDD[Row] = sc.parallelize(
Seq(
Row("first", 2.0, 7.0),
Row("second", 3.5, 2.5),
Row("third", 7.0, 5.9)
)
)
val df = spark.createDataFrame(rowsRdd).toDF("id", "val1", "val2")
df.show()
+------+----+----+
| id|val1|val2|
+------+----+----+
| first| 2.0| 7.0|
|second| 3.5| 2.5|
| third| 7.0| 5.9|
+------+----+----+
在Spark中将RDD对象转换为Dataframe
2. 使用.rdd()方法将DataFrame/DataSet转换为RDD
val rowsRdd: RDD[Row] = df.rdd() // DataFrame to RDD
从使用的角度来看,RDD vs DataFrame:
RDDs are amazing! as they give us all the flexibility to deal with almost any kind of data; unstructured, semi structured and structured data. As, lot of times data is not ready to be fit into a DataFrame, (even JSON), RDDs can be used to do preprocessing on the data so that it can fit in a dataframe. RDDs are core data abstraction in Spark. Not all transformations that are possible on RDD are possible on DataFrames, example subtract() is for RDD vs except() is for DataFrame. Since DataFrames are like a relational table, they follow strict rules when using set/relational theory transformations, for example if you wanted to union two dataframes the requirement is that both dfs have same number of columns and associated column datatypes. Column names can be different. These rules don't apply to RDDs. Here is a good tutorial explaining these facts. There are performance gains when using DataFrames as others have already explained in depth. Using DataFrames you don't need to pass the arbitrary function as you do when programming with RDDs. You need the SQLContext/HiveContext to program dataframes as they lie in SparkSQL area of spark eco-system, but for RDD you only need SparkContext/JavaSparkContext which lie in Spark Core libraries. You can create a df from a RDD if you can define a schema for it. You can also convert a df to rdd and rdd to df.
我希望这能有所帮助!
A DataFrame is an RDD that has a schema. You can think of it as a relational database table, in that each column has a name and a known type. The power of DataFrames comes from the fact that, when you create a DataFrame from a structured dataset (Json, Parquet..), Spark is able to infer a schema by making a pass over the entire (Json, Parquet..) dataset that's being loaded. Then, when calculating the execution plan, Spark, can use the schema and do substantially better computation optimizations. Note that DataFrame was called SchemaRDD before Spark v1.3.0
Spark RDD(弹性分布式数据集):
RDD is the core data abstraction API and is available since very first release of Spark (Spark 1.0). It is a lower-level API for manipulating distributed collection of data. The RDD APIs exposes some extremely useful methods which can be used to get very tight control over underlying physical data structure. It is an immutable (read only) collection of partitioned data distributed on different machines. RDD enables in-memory computation on large clusters to speed up big data processing in a fault tolerant manner. To enable fault tolerance, RDD uses DAG (Directed Acyclic Graph) which consists of a set of vertices and edges. The vertices and edges in DAG represent the RDD and the operation to be applied on that RDD respectively. The transformations defined on RDD are lazy and executes only when an action is called
Spark DataFrame
Spark 1.3 introduced two new data abstraction APIs – DataFrame and DataSet. The DataFrame APIs organizes the data into named columns like a table in relational database. It enables programmers to define schema on a distributed collection of data. Each row in a DataFrame is of object type row. Like an SQL table, each column must have same number of rows in a DataFrame. In short, DataFrame is lazily evaluated plan which specifies the operations needs to be performed on the distributed collection of the data. DataFrame is also an immutable collection.
Spark数据集:
作为DataFrame api的扩展,Spark 1.3还引入了DataSet api,在Spark中提供严格类型和面向对象的编程接口。它是不可变的、类型安全的分布式数据集合。像DataFrame一样,DataSet APIs也使用Catalyst引擎来实现执行优化。DataSet是DataFrame api的扩展。
〇其他差异
Apache Spark - RDD, DataFrame和DataSet
Spark RDD –
RDD代表弹性分布式数据集。只读 记录的分区集合。RDD是最基本的数据结构 的火花。它允许程序员在内存中执行计算 采用容错方式的大型集群。因此,加快任务的速度。
星火数据帧 –
与RDD不同,数据被组织成命名列。比如一张表 在关系数据库中。的不可变分布式集合 数据。Spark中的DataFrame允许开发人员在上面强加一个结构 数据的分布式集合,允许更高层次的抽象。
Spark数据集-
Apache Spark中的数据集是DataFrame API的扩展 提供类型安全的面向对象编程接口。数据集 通过暴露表达式来利用Spark的Catalyst优化器 和数据字段到查询计划器。
一个。 RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)
b. RDD让我们决定如何做,这限制了Spark在底层处理上的优化。dataframe/dataset让我们决定我们想做什么,并把一切都留给Spark来决定如何进行计算。
作为内存中的jvm对象,RDD涉及到垃圾收集和Java(或稍微好一点的Kryo)序列化的开销,当数据增长时,这些开销是昂贵的。这会降低性能。
数据帧比rdd提供了巨大的性能提升,因为它有2个强大的特性:
自定义内存管理(又名Project Tungsten) 优化的执行计划(又名Catalyst Optimizer) RDD ->数据帧->数据集
d.数据集(Project Tungsten和Catalyst Optimizer)如何在数据帧上得分是它拥有的另一个功能:编码器