Spark

Spark

Spark组件

## Spark 用Scala语言开发的、基于内存的、分布式计算框架

Spark SQL模块 进行结构化数据处理的组件,类似Hive
Spark Streaming模块 针对实时数据进行流处理的组件 (准实时[秒、分钟],微批次的数据处理框架)
    执行器 Executer
        接受器 Receiver
            DStream 随时间推移而接收到的数据的序列
    驱动器 Spark Streaming Driver
        Receiver Tracker
        Job Generator 定时运行
        Job Schedulder 生成 SparkContext
Spark MLlib模块 提供机器学习算法库,包括模型评估、数据导入、更底层的机器学习原语
Spark GraphX模块 面向图计算的框架和算法库
Spark R模块 数据分析

## Spark 核心

Spark Core 提供Spark最基础、最核心的功能,其他功能都是在其基础上进行扩展的

Master & Worker 是Standalone部署环境中的资源调度框架
Master 类似于Yarn中的ResourceManager模块
    Driver 用于执行Spark任务的main方法,负责实用户程序的执行工作。只要负责:
        将用户程序转化为作业(job)
        在Executer之间调度任务(task)
        跟踪Executer的执行情况
        通过UI展示查询运行情况
Worker 类似于Yarn中的NodeManager模块
    Executor 运行在Worker中的JVM进程,用于计算的节点,可以指定Executor个数、资源(内存、CPU)
        负责运行Spark作业(job)中的具体任务(task),并将结果返回给Driver进程
        通过块管理器(Block Manager)为用户程序要求缓存的RDD提供内存式存储
ApplicationMaster 包含在Spark向YARN提交的程序中
    向yarn申请执行任务的资源容器Container,运行用户程序的作业(job),控制任务的执行,跟着任务状态,处理任务失败等异常情况

Spark部署

安装java8
下载

Local 模式

运行 spark shell

bin/spark-shell
bin/spark-shell --master local
bin/spark-shell --master local[2]
# --master选项指定分布式集群的主 URL,或者用 local 使用单线程在本地运行,或者用 local[N] 使用 N 个线程在本地运行

# 在spark shell中提交任务
sc.textFile("RELEASE").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

# WEB,查看任务运行情况
http://localhost:4040

# 使用 spark-submit 启动应用程序
bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

一些常用的选项是:
--class:您的应用程序的入口点(例如org.apache.spark.examples.SparkPi)
--master:集群的主 URL(例如spark://23.195.26.187:7077)
--deploy-mode: 是否将驱动程序部署在工作节点 ( cluster) 或本地作为外部客户端 ( client) (默认值: client) †
--conf: key=value 格式的任意 Spark 配置属性。对于包含空格的值,将“key=value”用引号括起来(如图所示)。多个配置应作为单独的参数传递。(例如--conf <key>=<value> --conf <key2>=<value2>)
application-jar: 包含您的应用程序和所有依赖项的捆绑 jar 的路径。URL 必须在集群内全局可见,例如,hdfs://路径或file://所有节点上都存在的路径。
application-arguments: 传递给你的主类的主方法的参数,如果有的话

# 使用 spark-submit 启动应用程序,本地模式
bin/spark-submit --class org.apache.spark.examples.SparkPi --master local[2] ./examples/jars/spark-examples_2.12-3.2.2.jar 10

Standalone 模式

需要启动 Master 和 Worker

sbin/start-all.sh
sbin/stop-all.sh

# 使用 spark-submit 启动应用程序,集群模式
bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://localhost:7077 ./examples/jars/spark-examples_2.12-3.2.2.jar 10

# WEB,Spark Master 查看任务运行情况
http://localhost:8080

配置 Spark History 服务

修改配置文件

vi conf/spark-defaults.conf

# 下面是 spark-submit 的配置,参考: conf/spark-defaults.conf.template
# spark.master                     spark://localhost:7077       # 内部服务通信端口
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://localhost:9000/spark/log

# 下面是给 history-server 的配置: sbin/start-history-server.sh --help
# 等价于在 spark-env.sh 中配置 SPARK_HISTORY_OPTS='-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://localhost:9000/spark/log'
# History Server options:
spark.history.ui.port            18080
spark.history.retainedApplications  3   # 在内存中保持加载的历史记录的最大数量

# FsHistoryProvider options:
spark.history.fs.logDirectory    hdfs://localhost:9000/spark/log

# 在 hdfs 上创建日志目录
cd /data/hadoop-*
bin/hdfs dfs -mkdir -p /spark/log

## 启动日志服务
sbin/start-history-server.sh
sbin/stop-history-server.sh

# WEB,查看 Spark 历史服务
http://localhost:18080

YARN 模式

依赖 YARN 和 HDFS。在 YARN 模式下,ResourceManager 的地址是从 Hadoop 配置中获取的。因此,--master参数为yarn。

修改配置文件

vi conf/spark-env.sh

# 指向包含 Hadoop 集群的(客户端)配置文件的目录
YARN_CONF_DIR=/data/hadoop-3.3.4/etc/hadoop/

vi conf/spark-defaults.conf

# 下面是用yarn做资源调度的配置
# Spark 历史服务器的地址,这样在yarn中查看日志可以正确跳转到spark的历史服务
spark.yarn.historyServer.address    vm-0-7-centos:18080

# cluster 模式,Spark 驱动程序运行在集群上由 YARN 管理的应用程序主进程中,客户端可以在启动应用程序后离开。
bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster ./examples/jars/spark-examples_2.12-3.2.2.jar 10

# client 模式,驱动程序运行在客户端进程中,应用程序主控仅用于向 YARN 请求资源。
bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client ./examples/jars/spark-examples_2.12-3.2.2.jar 10

Spark Shell

bin/spark-shell
spark.read.json("user.json")
var df = spark.read.json("user.json")
df.show

df.createTempView("user")
spark.sql("select username from user").show

Spark SQL

[appadmin@iZuf6grebmbepcfxp8aezwZ cqy]$ spark-sql --help
Usage: ./bin/spark-sql [options] [cli option]

Options:
  --master MASTER_URL         spark://host:port, mesos://host:port, yarn,
                              k8s://https://host:port, or local (Default: local[*]).
  --deploy-mode DEPLOY_MODE   Whether to launch the driver program locally ("client") or
                              on one of the worker machines inside the cluster ("cluster") (Default: client).
  --class CLASS_NAME          Your application's main class (for Java / Scala apps).
  --name NAME                 A name of your application.
CLI options:
 -d,--define <key=value>          Variable substitution to apply to Hive commands. e.g. -d A=B or --define A=B
    --database <databasename>     Specify the database to use
 -e <quoted-query-string>         SQL from command line 带引号的查询字符串
 -f <filename>                    SQL from files        sql查询脚本
 -H,--help                        Print help information
    --hiveconf <property=value>   Use value for given property
    --hivevar <key=value>         Variable substitution to apply to Hive commands. e.g. --hivevar A=B
 -i <filename>                    Initialization SQL file
 -S,--silent                      Silent mode in interactive shell 静默模式
 -v,--verbose                     Verbose mode (echo executed SQL to the console) 打印Spark每步的执行信息

示例

spark-sql --master local[*]

# 导出
spark-sql --master local[*] -e "
          show databases;
          use cdp_prod_static_label;
          show tables;
          " > results.txt 

spark-sql --master local[*] --database cdp_prod_static_label -e "
          select * from vehicle_profilebase_18b500cc202 limit 5;
          " > results.txt 

spark-sql --master local[*] -f 1.sql > results.txt 


为了复用SQL语句,需要在SQL语句中添加参数,如下:
select * from table where year=${year} and month='09' and day='03' limit 10;

如果直接在命令行执行,这时会直接读取Shell变量,所以我们需要定义Shell变量“year”,如下:
export year=2018
spark-sql -e "select * from mydb.table where year=${year} and month='09' and day='03' limit 10;" > results.txt

如果需要进入Spark Sql Cli中执行,这时就可以使用“-d”定义参数,如下:
spark-sql --num-executors 100 -d year=2018
#   进入Spark Sql Cli,这时可以进行变量替换
select * from sec_ods.sdk where year=${year} and month='09' and day='03' limit 10;"

Spark Streaming