Spark 核心的概念是 Resilient Distributed Dataset (RDD) :一个可并行操作的有容错机制的数据集合。有 2 种方式创建 RDDs:第一种是在你的驱动程序中并行化一个已经存在的集合;另外一种是引用一个外部存储系统的数据集,例如共享的文件系统,HDFS,HBase或其他 Hadoop 数据格式的数据源。
并行集合
并行集合 (Parallelized collections ) 的创建是通过在一个已有的集合(Scala Seq
)上调用 SparkContext 的 parallelize
方法实现的。集合中的元素被复制到一个可并行操作的分布式数据集中。例如,这里演示了如何在一个包含 1 到 5 的数组中创建并行集合:
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
一旦创建完成,这个分布式数据集(distData
)就可以被并行操作。例如,我们可以调用 distData.reduce((a, b) => a + b)
将这个数组中的元素相加。我们以后再描述在分布式上的一些操作。
并行集合一个很重要的参数是切片数(slices ),表示一个数据集切分的份数。Spark 会在集群上为每一个切片运行一个任务。你可以在集群上为每个 CPU 设置 2-4 个切片(slices)。正常情况下,Spark 会试着基于你的集群状况自动地设置切片的数目。然而,你也可以通过 parallelize
的第二个参数手动地设置(例如:sc.parallelize(data, 10)
)。
外部数据集
Spark 可以从任何一个 Hadoop 支持的存储源创建分布式数据集,包括你的本地文件系统,HDFS,Cassandra,HBase,Amazon S3等。 Spark 支持文本文件(text files),SequenceFiles 和其他 Hadoop InputFormat。
文本文件 RDDs 可以使用 SparkContext 的 textFile
方法创建。 在这个方法里传入文件的 URI (机器上的本地路径或 hdfs://
,s3n://
等),然后它会将文件读取成一个行集合。这里是一个调用例子:
scala> val distFile = sc.textFile("data.txt")
distFile: RDD[String] = MappedRDD@1d4cee08
一旦创建完成,distFiile
就能做数据集操作。例如,我们可以用下面的方式使用 map
和 reduce
操作将所有行的长度相加:distFile.map(s => s.length).reduce((a, b) => a + b)
。
注意,Spark 读文件时:
- 如果使用本地文件系统路径,文件必须能在 work 节点上用相同的路径访问到。要么复制文件到所有的 workers,要么使用网络的方式共享文件系统。
- 所有 Spark 的基于文件的方法,包括
textFile
,能很好地支持文件目录,压缩过的文件和通配符。例如,你可以使用textFile("/my/文件目录")
,textFile("/my/文件目录/*.txt")
和textFile("/my/文件目录/*.gz")
。 textFile
方法也可以选择第二个可选参数来控制切片(slices )的数目。默认情况下,Spark 为每一个文件块(HDFS 默认文件块大小是 64M)创建一个切片(slice )。但是你也可以通过一个更大的值来设置一个更高的切片数目。注意,你不能设置一个小于文件块数目的切片值。
除了文本文件,Spark 的 Scala API 支持其他几种数据格式:
SparkContext.wholeTextFiles
让你读取一个包含多个小文本文件的文件目录并且返回每一个(filename, content)对。与textFile
的差异是:它记录的是每个文件中的每一行。- 对于 SequenceFiles,可以使用 SparkContext 的
sequenceFile[K, V]
方法创建,K 和 V 分别对应的是 key 和 values 的类型。像 IntWritable 与 Text 一样,它们必须是 Hadoop 的 Writable 接口的子类。另外,对于几种通用的 Writables,Spark 允许你指定原生类型来替代。例如:sequenceFile[Int, String]
将会自动读取 IntWritables 和 Text。 - 对于其他的 Hadoop InputFormats,你可以使用
SparkContext.hadoopRDD
方法,它可以指定任意的JobConf
,输入格式(InputFormat),key 类型,values 类型。你可以跟设置 Hadoop job 一样的方法设置输入源。你还可以在新的 MapReduce 接口(org.apache.hadoop.mapreduce)基础上使用SparkContext.newAPIHadoopRDD
(译者注:老的接口是SparkContext.newHadoopRDD
)。 RDD.saveAsObjectFile
和SparkContext.objectFile
支持保存一个RDD,保存格式是一个简单的 Java 对象序列化格式。这是一种效率不高的专有格式,如 Avro,它提供了简单的方法来保存任何一个 RDD。
RDD 操作
RDDs 支持 2 种类型的操作:转换(transformations) 从已经存在的数据集中创建一个新的数据集;动作(actions) 在数据集上进行计算之后返回一个值到驱动程序。例如,map
是一个转换操作,它将每一个数据集元素传递给一个函数并且返回一个新的 RDD。另一方面,reduce
是一个动作,它使用相同的函数来聚合 RDD 的所有元素,并且将最终的结果返回到驱动程序(不过也有一个并行 reduceByKey
能返回一个分布式数据集)。
在 Spark 中,所有的转换(transformations)都是惰性(lazy)的,它们不会马上计算它们的结果。相反的,它们仅仅记录转换操作是应用到哪些基础数据集(例如一个文件)上的。转换仅仅在这个时候计算:当动作(action) 需要一个结果返回给驱动程序的时候。这个设计能够让 Spark 运行得更加高效。例如,我们可以实现:通过 map
创建一个新数据集在 reduce
中使用,并且仅仅返回 reduce
的结果给 driver,而不是整个大的映射过的数据集。
默认情况下,每一个转换过的 RDD 会在每次执行动作(action)的时候重新计算一次。然而,你也可以使用 persist
(或 cache
)方法持久化(persist
)一个 RDD 到内存中。在这个情况下,Spark 会在集群上保存相关的元素,在你下次查询的时候会变得更快。在这里也同样支持持久化 RDD 到磁盘,或在多个节点间复制。
基础
为了说明 RDD 基本知识,考虑下面的简单程序:
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
第一行是定义来自于外部文件的 RDD。这个数据集并没有加载到内存或做其他的操作:lines
仅仅是一个指向文件的指针。第二行是定义 lineLengths
,它是 map
转换(transformation)的结果。同样,lineLengths
由于懒惰模式也没有 立即计算。最后,我们执行 reduce
,它是一个动作(action)。在这个地方,Spark 把计算分成多个任务(task),并且让它们运行在多个机器上。每台机器都运行自己的 map 部分和本地 reduce 部分。然后仅仅将结果返回给驱动程序。
如果我们想要再次使用 lineLengths
,我们可以添加:
lineLengths.persist()
在 reduce
之前,它会导致 lineLengths
在第一次计算完成之后保存到内存中。
传递函数到 Spark
Spark 的 API 很大程度上依靠在驱动程序里传递函数到集群上运行。这里有两种推荐的方式:
- 匿名函数 (Anonymous function syntax),可以在比较短的代码中使用。
- 全局单例对象里的静态方法。例如,你可以定义
object MyFunctions
然后传递MyFounctions.func1
,像下面这样:
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
注意,它可能传递的是一个类实例里的一个方法引用(而不是一个单例对象),这里必须传送包含方法的整个对象。例如:
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
这里,如果我们创建了一个 new MyClass
对象,并且调用它的 doStuff
,map
里面引用了这个 MyClass
实例中的 func1
方法,所以这个对象必须传送到集群上。类似写成 rdd.map(x => this.func1(x))
。
以类似的方式,访问外部对象的字段将会引用整个对象:
class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
相当于写成 rdd.map(x => this.field + x)
,引用了整个 this
对象。为了避免这个问题,最简单的方式是复制 field
到一个本地变量而不是从外部访问它:
def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
使用键值对
虽然很多 Spark 操作工作在包含任意类型对象的 RDDs 上的,但是少数几个特殊操作仅仅在键值(key-value)对 RDDs 上可用。最常见的是分布式 "shuffle" 操作,例如根据一个 key 对一组数据进行分组和聚合。
在 Scala 中,这些操作在包含二元组(Tuple2)(在语言的内建元组中,通过简单的写 (a, b) 创建) 的 RDD 上自动地变成可用的,只要在你的程序中导入 org.apache.spark.SparkContext._
来启用 Spark 的隐式转换。在 PairRDDFunctions 的类里键值对操作是可以使用的,如果你导入隐式转换它会自动地包装成元组 RDD。
例如,下面的代码在键值对上使用 reduceByKey
操作来统计在一个文件里每一行文本内容出现的次数:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
我们也可以使用 counts.sortByKey()
,例如,将键值对按照字母进行排序,最后 counts.collect()
把它们作为一个对象数组带回到驱动程序。
注意:当使用一个自定义对象作为 key 在使用键值对操作的时候,你需要确保自定义 equals()
方法和 hashCode()
方法是匹配的。更加详细的内容,查看 Object.hashCode() 文档中的契约概述。