
## Spark 用Scala语言开发的、基于内存的、分布式计算框架
Spark SQL模块 进行结构化数据处理的组件,类似Hive
Spark Streaming模块 针对实时数据进行流处理的组件 (准实时[秒、分钟],微批次的数据处理框架)
执行器 Executer
接受器 Receiver
DStream 随时间推移而接收到的数据的序列
驱动器 Spark Streaming Driver
Receiver Tracker
Job Generator 定时运行
Job Schedulder 生成 SparkContext
Spark MLlib模块 提供机器学习算法库,包括模型评估、数据导入、更底层的机器学习原语
Spark GraphX模块 面向图计算的框架和算法库
Spark R模块 数据分析
## Spark 核心
Spark Core 提供Spark最基础、最核心的功能,其他功能都是在其基础上进行扩展的
Master & Worker 是Standalone部署环境中的资源调度框架
Master 类似于Yarn中的ResourceManager模块
Driver 用于执行Spark任务的main方法,负责实用户程序的执行工作。只要负责:
将用户程序转化为作业(job)
在Executer之间调度任务(task)
跟踪Executer的执行情况
通过UI展示查询运行情况
Worker 类似于Yarn中的NodeManager模块
Executor 运行在Worker中的JVM进程,用于计算的节点,可以指定Executor个数、资源(内存、CPU)
负责运行Spark作业(job)中的具体任务(task),并将结果返回给Driver进程
通过块管理器(Block Manager)为用户程序要求缓存的RDD提供内存式存储
ApplicationMaster 包含在Spark向YARN提交的程序中
向yarn申请执行任务的资源容器Container,运行用户程序的作业(job),控制任务的执行,跟着任务状态,处理任务失败等异常情况
安装java8
下载
运行 spark shell
bin/spark-shell
bin/spark-shell --master local
bin/spark-shell --master local[2]
# --master选项指定分布式集群的主 URL,或者用 local 使用单线程在本地运行,或者用 local[N] 使用 N 个线程在本地运行
# 在spark shell中提交任务
sc.textFile("RELEASE").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
# WEB,查看任务运行情况
http://localhost:4040
# 使用 spark-submit 启动应用程序
bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
一些常用的选项是:
--class:您的应用程序的入口点(例如org.apache.spark.examples.SparkPi)
--master:集群的主 URL(例如spark://23.195.26.187:7077)
--deploy-mode: 是否将驱动程序部署在工作节点 ( cluster) 或本地作为外部客户端 ( client) (默认值: client) †
--conf: key=value 格式的任意 Spark 配置属性。对于包含空格的值,将“key=value”用引号括起来(如图所示)。多个配置应作为单独的参数传递。(例如--conf <key>=<value> --conf <key2>=<value2>)
application-jar: 包含您的应用程序和所有依赖项的捆绑 jar 的路径。URL 必须在集群内全局可见,例如,hdfs://路径或file://所有节点上都存在的路径。
application-arguments: 传递给你的主类的主方法的参数,如果有的话
# 使用 spark-submit 启动应用程序,本地模式
bin/spark-submit --class org.apache.spark.examples.SparkPi --master local[2] ./examples/jars/spark-examples_2.12-3.2.2.jar 10
需要启动 Master 和 Worker
sbin/start-all.sh
sbin/stop-all.sh
# 使用 spark-submit 启动应用程序,集群模式
bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://localhost:7077 ./examples/jars/spark-examples_2.12-3.2.2.jar 10
# WEB,Spark Master 查看任务运行情况
http://localhost:8080
vi conf/spark-defaults.conf
# 下面是 spark-submit 的配置,参考: conf/spark-defaults.conf.template
# spark.master spark://localhost:7077 # 内部服务通信端口
spark.eventLog.enabled true
spark.eventLog.dir hdfs://localhost:9000/spark/log
# 下面是给 history-server 的配置: sbin/start-history-server.sh --help
# 等价于在 spark-env.sh 中配置 SPARK_HISTORY_OPTS='-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://localhost:9000/spark/log'
# History Server options:
spark.history.ui.port 18080
spark.history.retainedApplications 3 # 在内存中保持加载的历史记录的最大数量
# FsHistoryProvider options:
spark.history.fs.logDirectory hdfs://localhost:9000/spark/log
# 在 hdfs 上创建日志目录
cd /data/hadoop-*
bin/hdfs dfs -mkdir -p /spark/log
## 启动日志服务
sbin/start-history-server.sh
sbin/stop-history-server.sh
# WEB,查看 Spark 历史服务
http://localhost:18080
依赖 YARN 和 HDFS。在 YARN 模式下,ResourceManager 的地址是从 Hadoop 配置中获取的。因此,--master参数为yarn。
vi conf/spark-env.sh
# 指向包含 Hadoop 集群的(客户端)配置文件的目录
YARN_CONF_DIR=/data/hadoop-3.3.4/etc/hadoop/
vi conf/spark-defaults.conf
# 下面是用yarn做资源调度的配置
# Spark 历史服务器的地址,这样在yarn中查看日志可以正确跳转到spark的历史服务
spark.yarn.historyServer.address vm-0-7-centos:18080
# cluster 模式,Spark 驱动程序运行在集群上由 YARN 管理的应用程序主进程中,客户端可以在启动应用程序后离开。
bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster ./examples/jars/spark-examples_2.12-3.2.2.jar 10
# client 模式,驱动程序运行在客户端进程中,应用程序主控仅用于向 YARN 请求资源。
bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client ./examples/jars/spark-examples_2.12-3.2.2.jar 10
bin/spark-shell
spark.read.json("user.json")
var df = spark.read.json("user.json")
df.show
df.createTempView("user")
spark.sql("select username from user").show
[appadmin@iZuf6grebmbepcfxp8aezwZ cqy]$ spark-sql --help
Usage: ./bin/spark-sql [options] [cli option]
Options:
--master MASTER_URL spark://host:port, mesos://host:port, yarn,
k8s://https://host:port, or local (Default: local[*]).
--deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
on one of the worker machines inside the cluster ("cluster") (Default: client).
--class CLASS_NAME Your application's main class (for Java / Scala apps).
--name NAME A name of your application.
CLI options:
-d,--define <key=value> Variable substitution to apply to Hive commands. e.g. -d A=B or --define A=B
--database <databasename> Specify the database to use
-e <quoted-query-string> SQL from command line 带引号的查询字符串
-f <filename> SQL from files sql查询脚本
-H,--help Print help information
--hiveconf <property=value> Use value for given property
--hivevar <key=value> Variable substitution to apply to Hive commands. e.g. --hivevar A=B
-i <filename> Initialization SQL file
-S,--silent Silent mode in interactive shell 静默模式
-v,--verbose Verbose mode (echo executed SQL to the console) 打印Spark每步的执行信息
spark-sql --master local[*]
# 导出
spark-sql --master local[*] -e "
show databases;
use cdp_prod_static_label;
show tables;
" > results.txt
spark-sql --master local[*] --database cdp_prod_static_label -e "
select * from vehicle_profilebase_18b500cc202 limit 5;
" > results.txt
spark-sql --master local[*] -f 1.sql > results.txt
为了复用SQL语句,需要在SQL语句中添加参数,如下:
select * from table where year=${year} and month='09' and day='03' limit 10;
如果直接在命令行执行,这时会直接读取Shell变量,所以我们需要定义Shell变量“year”,如下:
export year=2018
spark-sql -e "select * from mydb.table where year=${year} and month='09' and day='03' limit 10;" > results.txt
如果需要进入Spark Sql Cli中执行,这时就可以使用“-d”定义参数,如下:
spark-sql --num-executors 100 -d year=2018
# 进入Spark Sql Cli,这时可以进行变量替换
select * from sec_ods.sdk where year=${year} and month='09' and day='03' limit 10;"