Spark RDD 的创建

Posted by Jackson on 2017-10-13

Rdd 的创建

官方文档:http://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#resilient-distributed-datasets-rdds

There are two ways to create RDDs: parallelizing an existing collection in your driver program,
or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase,
or any data source offering a Hadoop InputFormat.

两种创建RDD的方式:第一种是基于已经存在的集合进行创建,第二种是基于外部已经存在的存储系统来创建比如HDFS、HBASE。。

  1. Parallelized Collections

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
distData.reduce((a, b) => a + b)

One important parameter for parallel collections is the number of partitions to cut the dataset into.
将数据集拆成多个Partition,Spark中一个Task对应于一个Partition

you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)).
可以手动控制Partition的个数

  1. External Datasets
    val distFile = sc.textFile(“data.txt”)
    distFile.map(s => s.length).reduce((a, b) => a + b).

注意:
1)如果使用本地文件系统作为路径,那么这个文件必须在所有的Worker Node节点的相同位置上面有一份
2)Spark的输入文件的方法(包括textFile) 都支持在目录、压缩、正则匹配路径、
3)默认情况下,Spark针对每一个Block创建一个Partition,但是也可以通过参数来将此Partition的个数调整到更大,但是不能调到更小

Spark的Scala API也支持一些其他的数据格式:
1)SparkContext.wholeTextFile 方法可以允许你读取一个包含很多小文件的目录,并且以键值对的方式返回(fileName,Content)
这个与textFile刚好相反,textFile是读取一个文件将文件中的每一行作为一条记录