鸿 网 互 联 www.68idc.cn

当前位置 : 服务器租用 > 数据库 > 其他数据库 > >

spark快速上手

来源:互联网 作者:佚名 时间:2018-01-26 18:35
spark快速上手 前言 基于Spark 2.1版本 仅仅是快速上手,没有深究细节 主要参考是官方文档 代码均为官方文档中代码,语言为Scala 进入spark-shell 终端输入 spark-shell ,进入的是Scala环境的终端,也可以输入 pyspark 进入Python环境的终端 创建一个SparkS

spark快速上手

前言
  • 基于Spark 2.1版本
  • 仅仅是快速上手,没有深究细节
  • 主要参考是官方文档
  • 代码均为官方文档中代码,语言为Scala
进入spark-shell

终端输入spark-shell,进入的是Scala环境的终端,也可以输入pyspark进入Python环境的终端

创建一个SparkSession
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
创建DataFrame
val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

此处可以读其他格式文件,如text,parquet,对应函数为spark.read.textspark.read.parquet

执行一些操作
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+
执行一些SQL操作

可以看出上一节中利用自带函数可以实现查询的效果,假如对函数不太熟悉,也可以执行SQL语句,如下:

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
  • 此处创建的为TempView,还有GlobalTemporaryView,前者当创建它的session关闭时,view消失;后者则不会。
  • View的作用相当于数据库中的table
DataSet
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface
case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

第一次接触Spark,可能会搞不清楚DataSet和DataFrame的区别,可以参考下面链接:

  • http://www.jianshu.com/p/c0181667daa0
  • https://www.zhihu.com/question/48684460

还有一个RDD的概念,可以参考:

  • http://www.sohu.com/a/140074843_494939

  • http://blog.csdn.net/wo334499/article/details/51689549

?

网友评论
<