PySpark SparkContext
SparkContext 是任何 Spark 功能的入口点。当我们运行任何 Spark 应用程序时,启动一个驱动程序,它具有 main 函数,并在此处启动 SparkContext。然后,驱动程序在工作节点上的执行程序内运行操作。
SparkContext 使用 Py4J 启动 JVM 并创建 JavaSparkContext。默认情况下,PySpark 将 SparkContext 作为 'sc' 提供 ,因此创建新的 SparkContext 将不起作用。
以下代码块包含PySpark类的详细信息以及SparkContext可以采用的参数。
class pyspark.SparkContext ( master = None, appName = None, sparkHome = None, pyFiles = None, environment = None, batchSize = 0, serializer = PickleSerializer(), conf = None, gateway = None, jsc = None, profiler_cls = <class 'pyspark.profiler.BasicProfiler'> )
参数
以下是SparkContext的参数。
- Master - 它是连接到的集群的URL。
- appName - 您的工作名称。
- sparkHome - Spark安装目录。
- pyFiles - 要发送到集群并添加到PYTHONPATH的.zip或.py文件。
- environment - 工作节点环境变量。
- batchSize - 表示为单个Java对象的Python对象的数量。 设置1以禁用批处理,设置0以根据对象大小自动选择批处理大小,或设置为-1以使用无限批处理大小。
- serializer - RDD序列化器。
- Conf - L {SparkConf}的一个对象,用于设置所有Spark属性。
- gateway - 使用现有网关和JVM,否则初始化新JVM。
- JSC - JavaSparkContext实例。
- profiler_cls - 用于进行性能分析的一类自定义Profiler(默认为pyspark.profiler.BasicProfiler)。
在上述参数中,主要使用 master 和 appname 。任何PySpark程序的前两行如下所示:
from pyspark import SparkContext sc = SparkContext("local", "First App")
SparkContext示例 - PySpark Shell
现在你已经对SparkContext有了足够的了解,让我们在PySpark shell上运行一个简单的例子。在此示例中,我们将计算 README.md 文件中带有字符“a”或“b”的行 数 。那么,让我们说一个文件中有5行,3行有'a'字符,那么输出将是→ Line with a:3 。字符'b'也是如此。
注 - 我们不会在以下示例中创建任何SparkContext对象,因为默认情况下,当PySpark shell启动时,Spark会自动创建名为sc的SparkContext对象。 如果您尝试创建另一个SparkContext对象,您将收到以下错误 “ValueError:无法一次运行多个SparkContexts”。
<<< logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md" <<< logData = sc.textFile(logFile).cache() <<< numAs = logData.filter(lambda s: 'a' in s).count() <<< numBs = logData.filter(lambda s: 'b' in s).count() <<< print "Lines with a: %i, lines with b: %i" % (numAs, numBs) Lines with a: 62, lines with b: 30
SparkContext示例 - Python程序
让我们使用Python程序运行相同的示例。创建一个名为 firstapp.py 的Python文件,并在该文件中输入以下代码。
----------------------------------------firstapp.py--------------------------------------- from pyspark import SparkContext logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md" sc = SparkContext("local", "first app") logData = sc.textFile(logFile).cache() numAs = logData.filter(lambda s: 'a' in s).count() numBs = logData.filter(lambda s: 'b' in s).count() print "Lines with a: %i, lines with b: %i" % (numAs, numBs) ----------------------------------------firstapp.py---------------------------------------
然后我们将在终端中执行以下命令来运行此Python文件。我们将得到与上面相同的输出。
$SPARK_HOME/bin/spark-submit firstapp.py Output: Lines with a: 62, lines with b: 30
下一章:PySpark RDD
现在我们已经在我们的系统上安装并配置了PySpark,我们可以在Apache Spark上用Python编程。但在此之前,让我们了解Spark - RDD中的一个基本概念。RDD代表 Resilient Distri ...