Spark 部署
spark-submit 是用于在集群上部署 Spark 应用程序的 shell 命令。
它通过一个统一的界面使用所有相应的集群管理器。因此,您不必为每个应用程序配置您的应用程序。
范例
让我们使用之前使用的使用 shell 命令的单词计数的例子。在这里,我们考虑与Spark应用程序相同的例子。
示例输入
以下文本是输入数据,名为 in.txt 的文件。
people are not as beautiful as they look, as they walk or as they talk. they are only as beautiful as they love, as they care as they share.
看看下面的程序:
SparkWordCount.scala
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark._ object SparkWordCount { def main(args: Array[String]) { val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) /* local = master URL; Word Count = application name; */ /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ /* Map = variables to work nodes */ /*creating an inputRDD to read text file (in.txt) through Spark context*/ val input = sc.textFile("in.txt") /* Transform the inputRDD into countRDD */ valcount = input.flatMap(line ⇒ line.split(" ")) .map(word ⇒ (word, 1)) .reduceByKey(_ + _) /* saveAsTextFile method is an action that effects on the RDD */ count.saveAsTextFile("outfile") System.out.println("OK"); } }
将上述程序保存到一个名为 SparkWordCount.scala 的文件中,并将其放在名为 spark-application 的用户定义目录中。
注 - 将inputRDD转换为countRDD时,我们使用flatMap()将行(从文本文件)标记为单词,使用map()方法计算单词频率,并使用reduceByKey()方法计算每个单词的重复次数。
使用以下步骤提交此应用程序。通过终端执行 spark-application 目录中的所有步骤。
第1步:下载Spark Jar
Spark 核心 jar 包是编译所必需的,因此,从下面的链接: http://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10/1.3.0
下载spark-core_2.10-1.3.0.jar,并将jar文件从download目录下载到 spark-application。
第2步:编译程序
使用下面给出的命令编译上述程序。该命令应该从 spark-application 目录执行。这里, /usr/local/spark/lib/spark- assembly-1.4.0-hadoop2.6.0.jar 是一个从Spark 库取得的 Hadoop 支持 jar包。
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
第3步:创建一个JAR
使用以下命令创建 Spark 应用程序的 jar文件。这里, wordcount 是 jar文件的文件名。
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
第4步:提交Spark应用程序
使用以下命令提交 spark 应用程序:
spark-submit --class SparkWordCount --master local wordcount.jar
如果它成功执行,那么你会发现下面给出的输出。该 行 下面的输出出租为用户识别,这是程序的最后一行。如果你仔细阅读下面的输出,你会发现不同的东西,比如
- 在端口42954上成功启动服务'sparkDriver'
- MemoryStore的容量为267.3 MB
- 启动SparkUI
http://192.168.1.217:4040
- 添加了JAR文件:/home/hadoop/piapplication/count.jar
- ResultStage 1(SparkPi.scala:saveAsTextFile:11)在0.566秒内完成
- 在
http://192.168.1.217:4040
处停止Spark Web UI - MemoryStore清零
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954] 15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s OK 15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 15/07/08 13:56:14 INFO Utils: Shutdown hook called 15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
第5步:检查输出
在成功执行程序后,您会在spark-application目录中找到名为 outfile 的目录。
以下命令用于打开和检查outfile目录中的文件列表。
$ cd outfile $ ls Part-00000 part-00001 _SUCCESS
用于检查 零件00000 文件输出的命令是 -
$ cat part-00000 (people,1) (are,2) (not,1) (as,8) (beautiful,2) (they, 7) (look,1)
部分00001文件中用于检查输出的命令:
$ cat part-00001 (walk, 1) (or, 1) (talk, 1) (only, 1) (love, 1) (care, 1) (share, 1)
阅读以下部分以更多地了解 spark-submit 命令。
Spark-submit语法
spark-submit [options] <app jar | python file> [app arguments]
选项
____下面给出的表格描述了一系列 选项:
序号 | 选项 | 描述 |
---|---|---|
1 | --master | spark://主机:端口,mesos://主机:端口,纱线或本地。 |
2 | --deploy-mode | 是在本地(“客户端”)还是在集群内的其中一台工作站计算机(“集群”)(默认:客户端)上启动驱动程序。 |
3 | --class | 您的应用程序的主类(用于Java / Scala应用程序)。 |
4 | --name | 您的应用程序的名称。 |
5 | --jars | 以逗号分隔的本地jar列表,包含在驱动程序和执行程序类路径中。 |
6 | --packages | 以逗号分隔的jar的maven坐标列表,包含在驱动程序和执行程序类路径中。 |
7 | --repositories | 以逗号分隔的其他远程存储库列表,用于搜索使用--packages指定的maven坐标。 |
8 | --py-files | 逗号分隔的.zip,.egg或.py文件列表,放在Python应用程序的PYTHON PATH上。 |
9 | --files | 以逗号分隔的文件列表,放在每个执行程序的工作目录中。 |
10 | --conf (prop=val) | 任意Spark配置属性。 |
11 | --properties-file | 从中加载额外属性的文件的路径。如果未指定,则会查找conf / spark-defaults。 |
12 | --driver-memory | 驱动程序的内存(例如1000M,2G)(默认值:512M)。 |
13 | --driver-java-options | 要传递给驱动程序的额外Java选项。 |
14 | --driver-library-path | 要传递给驱动程序的额外库路径条目。 |
15 | --driver-class-path | 要传递给驱动程序的额外类路径条目。请注意,添加了--jars的jar自动包含在类路径中。 |
16 | --executor-memory | 每个执行器的内存(例如1000M,2G)(默认值:1G)。 |
17 | --proxy-user | 用户在提交申请时进行冒充。 |
18 | --help, -h | 显示此帮助消息并退出。 |
19 | --verbose, -v | 打印其他调试输出。 |
20 | --version | 打印当前Spark的版本。 |
21 | --driver-cores NUM | 驱动程序的核心(默认值:1)。 |
22 | --supervise | 如果给定,则在失败时重新启动驱动程序。 |
23 | --kill | 如果给定,则杀死指定的驱动程序。 |
24 | --status | 如果给定,驱动程序特定的请求状态 |
25 | --total-executor-cores | 处理器核数 |
26 | --executor-cores | 每个执行器核数. (默认 : 1 in YARN mode, or all available cores on the worker in standalone mode). |
下一章:Spark 高级编程
Spark包含两种不同类型的共享变量:一个是 广播变量 ,另一个是 累加器 。广播变量 - 用于高效分配大值。累加器 - 用于汇总特定集合的信息。 广播变量广播变量允许程序员在每台机器上保存一个只读变量,而不 ...