使用scala进行开发

Step1: 安装sbt


$ curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo
$ sudo yum install sbt

配置源:


# cat ~/.sbt/repositories
[repositories]
  # 本地源
  local
  # 阿里源
  aliyun: http://maven.aliyun.com/nexus/content/groups/public/
  typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
  sonatype-oss-releases
  maven-central
  sonatype-oss-snapshots

Step2: 创建项目

可以使用Giter8模版创建项目。


$ sbt new imarios/frameless.g8

spark相关的几个模版:

  • holdenk/sparkProjectTemplate.g8 (Template for Scala Apache Spark project).
  • imarios/frameless.g8 (A simple frameless template to start with more expressive types for Spark)
  • nttdata-oss/basic-spark-project.g8 (Spark basic project.)
  • spark-jobserver/spark-jobserver.g8 (Template for Spark Jobserver)

Step3: 编写代码

创建出的项目目录中包含一下主要条目:


$ ls
build.sbt  project  src  target

编写的代码放在 ‘src/main/scala/’ 目录中:


$ ls src/main/scala/
SimpleApp.scala

$ cat src/main/scala/SimpleApp.scala
import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "people.txt" // Should be some file on your system
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
  }
}

Step4: 编译打包

程序开发好之后首先需要编译打包:


$ sbt package
[info] Loading project definition from /home/spark/scala/frameless/project
[info] Updating {file:/home/spark/scala/frameless/project/}frameless-build...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Set current project to frameless (in build file:/home/spark/scala/frameless/)
[info] Updating {file:/home/spark/scala/frameless/}root...
[info] Resolving org.sonatype.oss#oss-parent;9 ...
[info] downloading https://repo1.maven.org/maven2/org/scala-lang/scala-library/2.11.11/scala-library-2.11.11.jar ...
[info]  [SUCCESSFUL ] org.scala-lang#scala-library;2.11.11!scala-library.jar (94788ms)

...

[info] downloading https://repo1.maven.org/maven2/jline/jline/2.14.3/jline-2.14.3.jar ...
[info]  [SUCCESSFUL ] jline#jline;2.14.3!jline.jar (4836ms)
[info] Done updating.
[info] Compiling 1 Scala source to /home/spark/scala/frameless/target/scala-2.11/classes...
[info] 'compiler-interface' not yet compiled for Scala 2.11.11. Compiling...
[info]   Compilation completed in 24.981 s
[info] Packaging /home/spark/scala/frameless/target/scala-2.11/frameless_2.11-0.1.jar ...
[info] Done packaging.
[success] Total time: 2795 s, completed Jan 16, 2018 3:03:12 PM

编译打包成功后的文件输出在 ‘target/scala-2.11/’ 目录中:

/home/spark/scala/frameless/target/scala-2.11/frameless_2.11-0.1.jar

Step5: 部署运行


# su - spark
$ export SPARK_MAJOR_VERSION=2

$ spark-submit --class SimpleApp --master yarn-client --num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1 target/scala-2.11/frameless_2.11-0.1.jar
SPARK_MAJOR_VERSION is set to 2, using Spark2
Warning: Master yarn-client is deprecated since 2.0. Please use master "yarn" with specified deploy mode instead.
18/01/17 15:20:18 INFO SparkContext: Running Spark version 2.2.0.2.6.3.0-235
18/01/17 15:20:19 INFO SparkContext: Submitted application: Simple Application
18/01/17 15:20:19 INFO SecurityManager: Changing view acls to: spark
18/01/17 15:20:19 INFO SecurityManager: Changing modify acls to: spark
18/01/17 15:20:19 INFO SecurityManager: Changing view acls groups to:
18/01/17 15:20:19 INFO SecurityManager: Changing modify acls groups to:
18/01/17 15:20:19 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(spark); groups with view
 permissions: Set(); users  with modify permissions: Set(spark); groups with modify permissions: Set()
18/01/17 15:20:20 INFO Utils: Successfully started service 'sparkDriver' on port 43573.

...

18/01/17 15:21:30 INFO YarnScheduler: Adding task set 3.0 with 1 tasks
18/01/17 15:21:30 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, bdtest-002, executor 1, partition 0, NODE_LOCAL, 4737 bytes)
18/01/17 15:21:30 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on bdtest-002:44065 (size: 3.7 KB, free: 93.2 MB)
18/01/17 15:21:30 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 1 to 192.168.1.4:44312
18/01/17 15:21:30 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 141 bytes
18/01/17 15:21:30 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 96 ms on bdtest-002 (executor 1) (1/1)
18/01/17 15:21:30 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have all completed, from pool
18/01/17 15:21:30 INFO DAGScheduler: ResultStage 3 (count at SimpleApp.scala:10) finished in 0.096 s
18/01/17 15:21:30 INFO DAGScheduler: Job 1 finished: count at SimpleApp.scala:10, took 0.289165 s
Lines with a: 1, Lines with b: 0
18/01/17 15:21:30 INFO AbstractConnector: Stopped Spark@17b03218{HTTP/1.1,[http/1.1]}{0.0.0.0:4042}
18/01/17 15:21:30 INFO SparkUI: Stopped Spark web UI at http://192.168.1.5:4042
18/01/17 15:21:30 INFO YarnClientSchedulerBackend: Interrupting monitor thread
18/01/17 15:21:30 INFO YarnClientSchedulerBackend: Shutting down all executors
18/01/17 15:21:30 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
18/01/17 15:21:30 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
18/01/17 15:21:30 INFO YarnClientSchedulerBackend: Stopped
18/01/17 15:21:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/01/17 15:21:31 INFO MemoryStore: MemoryStore cleared
18/01/17 15:21:31 INFO BlockManager: BlockManager stopped
18/01/17 15:21:31 INFO BlockManagerMaster: BlockManagerMaster stopped
18/01/17 15:21:31 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/01/17 15:21:31 INFO SparkContext: Successfully stopped SparkContext
18/01/17 15:21:31 INFO ShutdownHookManager: Shutdown hook called
18/01/17 15:21:31 INFO ShutdownHookManager: Deleting directory /tmp/spark-730219ef-2995-4225-8743-5769fa6269db
  • –master: 指定spark driver的运行模式。
    • yarn-cluster: spark driver运行在yarn的application master进程中。如果应用只是进行计算可以使用这种方式运行,如果需要跟前台进行交互则可以考虑yarn-client模式。
    • yarn-client: spark driver运行在client进程中,此时application master只负责向yarn申请资源。

使用python进行开发

使用HDP环境提交任务

创建python文件pi.py

import sys
from random import random
from operator import add

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PythonPi")\.getOrCreate()

partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions

def f(any):
    x = random() * 2 - 1
    y = random() * 2 - 1
    return 1 if x ** 2 + y ** 2 <= 1 else 0

count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))

spark.stop()

运行pi.py,这里指定使用yarn的cluster模式

SPARK_MAJOR_VERSION=2 spark-submit --master yarn --deploy-mode cluster --driver-memory 512m --executor-memory 512m --deploy pi.py 10

在HDP环境运行jupyter server

使用yarn的cluster模式运行,在运行期间会始终在yarn中占用资源,如果需要长期运行,使用--master local比较好

XDG_RUNTIME_DIR="" SPARK_MAJOR_VERSION=2 PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 121.43.171.231' pyspark --master yarn --deploy-mode cluster --driver-memory 512m --executor-memory 512m

为jupyter server设置密码等操作参考jupyter文档

运行后访问服务器8888端口即可访问jupyter服务

非HDP环境开发

确保本机有一份spark二进制文件,比如spark-2.2.1-bin-hadoop2.7

安装pyspark

pip install pyspark

设置SPARK_HOME(可选)

export SPARK_HOME=/home/xxx/spark-2.2.1-bin-hadoop2.7

通过代码提交任务

import os
from pyspark import SparkConf, SparkContext

# if `SPARK_HOME` is undefined yet
if 'SPARK_HOME' not in os.environ:
    os.environ['SPARK_HOME'] = '/home/xxx/spark-2.2.1-bin-hadoop2.7'

conf = SparkConf().setAppName('Demo').setMaster('yarn').set('spark.yarn.deploy.mode', 'cluster')
sc = SparkContext(conf=conf)

# Do something with sc...

或者使用SparkSession API

from spark.sql import SparkSession
spark = (SparkSession.builder
          .master("yarn")
          .appName("Demo")
          .config("spark.yarn.deploy.mode", "cluster")
          .getOrCreate())

# Do something with spark...

oozie自动化

TODO

Development

Spark 2.0之前的开发接口主要是RDD(Resilient Distributed Dataset),从2.0之后原有RDD接口仍然支持,但RDD被Dataset取代,如果使用Dataset编程的话需要使用Spark SQL

API

http://spark.apache.org/docs/1.3.1/api/scala/index.html

SparkSession vs SparkContext

Spark 2.0之前有3个主要的连接对象:

  • SparkContext: 建立与Spark执行环境相关的连接,用于创建RDD
  • SqlContext:利用SparkContext背后的SparkSQL建立连接
  • HiveContext:创建访问hive的接口

从Spark 2.0开始,Datasets/Dataframes成为主要的数据访问接口,SparkSession(org.apache.spark.sql.SparkSession)成为主要的访问Spark执行环境的接口。

SparkConf

Spark 2.0之前需要先创建SparkConf,在创建SparkContext。


//set up the spark configuration and create contexts
val sparkConf = new SparkConf().setAppName("SparkSessionZipsExample").setMaster("local")
// your handle to SparkContext to access other context like SQLContext
val sc = new SparkContext(sparkConf).set("spark.some.config.option", "some-value")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Spark 2.0之后可以不用显示的创建SparkConf对象就可以创建SparkSession对象。


// Create a SparkSession. No need to create SparkContext
// You automatically get it as part of the SparkSession
val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
val spark = SparkSession
   .builder()
   .appName("SparkSessionZipsExample")
   .config("spark.sql.warehouse.dir", warehouseLocation)
   .enableHiveSupport()
   .getOrCreate()

在SparkSession创建完后,Spark的运行时配置属性还可以通过SparkSession.conf()被修改:


//set new runtime options
spark.conf.set("spark.sql.shuffle.partitions", 6)
spark.conf.set("spark.executor.memory", "2g")
//get all settings
val configMap:Map[String, String] = spark.conf.getAll()

SparkSQL

2.0之前需要通过创建SqlContext来使用SparkSQL。


val sc = new SparkContext(sparkConf).set("spark.some.config.option", "some-value")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

2.0后通过SparkSession可以很方便的访问SparkSession.sql()来使用SparkSQL。


// Now create an SQL table and issue SQL queries against it without
// using the sqlContext but through the SparkSession object.
// Creates a temporary view of the DataFrame
zipsDF.createOrReplaceTempView("zips_table")
zipsDF.cache()
val resultsDF = spark.sql("SELECT city, pop, state, zip FROM zips_table")
resultsDF.show(10)

访问Catalog Metadata

SparkSession暴露了访问metastore的接口SparkSession.catalog,这个接口可以用来访问catalog metadata信息,比如(Hcatalog)


//fetch metadata data from the catalog
spark.catalog.listDatabases.show(false)
spark.catalog.listTables.show(false)

scala> spark.catalog.listDatabases.show(false)
+-------+---------------------+------------------------------------------+
|name   |description          |locationUri                               |
+-------+---------------------+------------------------------------------+
|default|Default Hive database|hdfs://bdtest-001:8020/apps/hive/warehouse|
+-------+---------------------+------------------------------------------+

scala> spark.catalog.listTables.show(false)
+-------------------------+--------+----------------------------------------+---------+-----------+
|name                     |database|description                             |tableType|isTemporary|
+-------------------------+--------+----------------------------------------+---------+-----------+
|activity_donate_record   |default |Imported by sqoop on 2018/01/08 19:03:33|MANAGED  |false      |
|activity_periods_config  |default |Imported by sqoop on 2018/01/08 19:04:20|MANAGED  |false      |
|activity_result          |default |Imported by sqoop on 2018/01/08 19:05:00|MANAGED  |false      |
|activity_school_info     |default |Imported by sqoop on 2018/01/08 19:05:46|MANAGED  |false      |
|activity_season_game_data|default |Imported by sqoop on 2018/01/08 19:06:23|MANAGED  |false      |
|activity_season_game_log |default |Imported by sqoop on 2018/01/08 19:07:00|MANAGED  |false      |
|ana_tag                  |default |Imported by sqoop on 2018/01/08 19:07:35|MANAGED  |false      |
|ana_user_tag             |default |Imported by sqoop on 2018/01/08 19:08:18|MANAGED  |false      |
|api_invoking_log         |default |Imported by sqoop on 2018/01/08 19:08:58|MANAGED  |false      |
|app_function             |default |Imported by sqoop on 2018/01/08 19:09:34|MANAGED  |false      |
|bank_card                |default |Imported by sqoop on 2018/01/08 19:10:09|MANAGED  |false      |
|bank_card_bin            |default |Imported by sqoop on 2018/01/08 19:10:46|MANAGED  |false      |
|command_invocation       |default |Imported by sqoop on 2018/01/08 19:11:26|MANAGED  |false      |
|coupon_grant_log         |default |Imported by sqoop on 2018/01/08 19:12:05|MANAGED  |false      |
|coupon_user              |default |Imported by sqoop on 2018/01/08 19:12:43|MANAGED  |false      |
|device_command           |default |Imported by sqoop on 2018/01/08 19:13:18|MANAGED  |false      |
|device_preorder          |default |Imported by sqoop on 2018/01/08 19:13:54|MANAGED  |false      |
|dro_dropdetails          |default |null                                    |MANAGED  |false      |
|dro_dropinfo             |default |Imported by sqoop on 2018/01/08 19:15:09|MANAGED  |false      |
|dro_exchange_record      |default |Imported by sqoop on 2018/01/08 19:15:43|MANAGED  |false      |
+-------------------------+--------+----------------------------------------+---------+-----------+
only showing top 20 rows

DataFrame 和 Datasets

DataFrame & Dataset

Dataset:

  • 强类型
  • 支持functional和relational操作
  • 操作分类
    • 转换(transformation):用于产生新的Datasets
      • 例如:map,filter,select,aggregate(groupBy)
    • 操作(actions):触发计算并返回结果
      • 例如:count,show,把数据写回文件系统
  • lazy:计算只有到action触发的时候才进行。Dataset内部可以理解为存储了如何进行计算的计划。当action执行的时候Spark query optimizer生成并优化计算方案后将其执行。
  • Encoder:把JVM中的类型T跟Spark SQL的数据表现形式进行转换的机制。(marshal/unmarshal?)

  • 两种创建方法:

    • 使用SparkSession的read()方法

      
      val people = spark.read.parquet("...").as[Person]  // Scala
      Dataset people = spark.read().parquet("...").as(Encoders.bean(Person.class)); // Java
      
    • 对现有Dataset做转换(transformation)

      
      val names = people.map(_.name)  // in Scala; names is a Dataset[String]
      Dataset names = people.map((Person p) -> p.name, Encoders.STRING));
      
  • Dataset操作也可以是untyped,通过:Dataset, Column, DataFrame的function等。

  • 列操作

    • 选择一列(抛弃其他列):select(“column”)
    • 增加一列:withColumn(“newColumn”, Column)
    • 修改一列: withColumn(“column”, Column)
    • 删除一列: drop(“column”)
    • 类型转换:
  • 列高级操作:

    • UDF:

DataFrame:

DataFrame是有名列(named columns)组成的数据集合,类似关系数据库中的表或者python/R中的pandas。

可以由多种数据源来构造DataFrame,比如Hive中的table,Spark的RDD(Resilient Distributed Datasets)。

TODO: Datasets

SparkSession有很多种方法创建DataFrame和Datasets。

DataFrame 描述
创建API SparkSession.createDataFrame
访问
其他
Datasets 描述
创建API
访问
其他

Row

  • 创建Row
  • 访问

    • generic访问,使用ordinal序列访问

      row(0):访问列第一个成员

    • 原始类型访问

Column

  • DataSet API:

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset

处理时间

* ver <= 1.5

使用hiveContext来查询

* 1.5 <= ver < 2.2

Spark 1.5引入了unix_timestamp(),所以在1.5之后到2.2的spark版本中可以这样操作时间信息


import org.apache.spark.sql.functions.{unix_timestamp, to_date}

val df = Seq((1L, "01-APR-2015")).toDF("id", "ts")

df.select(to_date(unix_timestamp(
  $"ts", "dd-MMM-yyyy"
).cast("timestamp")).alias("timestamp"))

* ver >= 2.2


import org.apache.spark.sql.functions.{to_date, to_timestamp}

df.select(to_date($"ts", "dd-MMM-yyyy").alias("date"))

df.select(to_timestamp($"ts", "dd-MM-yyyy HH:mm:ss").alias("timestamp"))

import spark.implicits._

引入toDF()方法和“$”操作符

编程陷阱

陷阱1:dataframe有元素,但通过循环把值放入ListBuffer()后ListBuffer()却为空列表


   val valueDF = allData.select("value").distinct()
    valueDF.show()
    //valueDF.foreach(r => valueList.append(r.getAs[Int]("value")))

    for (row <- valueDF) {
      println("value:" + row.getAs[Int]("value"))
      valueList.append(row.getAs[Int]("value"))
    }
    println("Values:" + valueList)
    valueList.foreach(println))

// 程序运行输出

+-------+
|value  |
+-------+
|      1|
|      3|
|      5|
|      4|
|     10|
|      2|
|      0|
+-------+

Values:ListBuffer()

可以看出两个问题: * 问题1: 虽然可以知道valueDF不为空,可是”println(…)“却没有任何输出内容 * 问题2: 结果valueList为空

原因:

Spark任务执行是通过Driver/Executor来完成的,Driver控制任务执行,Executor负责任务执行,foreach/for会被Driver分配给Executor来执行(分布式执行),每个Executor创建一个自己的ListBuffer拷贝,因此当任务结束后信息就丢失了。

参考Stackoverflow的讨论:https://stackoverflow.com/questions/36203299/scala-listbuffer-emptying-itself-after-every-add-in-loop

修复方法:使用collect()

collect()定义:


def collect(): Array[T]
Returns an array that contains all rows in this Dataset.
Running collect requires moving all the data into the application’s driver
process, and doing so on a very large dataset can crash the driver process with
OutOfMemoryError.
For Java API, use collectAsList.

修复代码如下:


val valueDF = allData.select(“value”).distinct()
valueDF.show()
//valueDF.foreach(r => valueList.append(r.getAs[Int](“value”)))

for (row <- valueDF.collect()) { println(“value:” + row.getAs[Int](“value”)) valueList.append(row.getAs[Int](“value”)) } println(“getAllChannels:” + valueList)

// 程序运行输出如下

+——-+ |value | +——-+ | 1| | 3| | 5| | 4| | 10| | 2| | 0| +——-+

value:1 value:3 value:5 value:4 value:10 value:2 value:0 getAllChannels:ListBuffer(1, 3, 5, 4, 10, 2, 0)

Reference