0%

初探PySpark

初探PySpark

将介绍PySpark的单机与集群配置方法,以及基本使用示例。

Apache Spark是一个闪电般快速的实时处理框架。它进行内存计算以实时分析数据。由于 Apache Hadoop MapReduce 仅执行批处理并且缺乏实时处理功能,因此它开始出现。因此,引入了Apache Spark,因为它可以实时执行流处理,也可以处理批处理。

除了实时和批处理之外,Apache Spark还支持交互式查询和迭代算法。Apache Spark有自己的集群管理器,可以托管其应用程序。它利用Apache Hadoop进行存储和处理。它使用 HDFS (Hadoop分布式文件系统)进行存储,它也可以在 YARN 上运行Spark应用程序。

PySpark是通过python库使用Spark的工具。

前提条件

  1. anaconda
  2. 下载并安装好jdk 1.8 (PySpark与Java9及以上尚不兼容)

单机安装与配置

推荐使用anaconda虚拟python环境安装PySpark。以Mac用户为例。

1
2
3
cd anaconda3
touch pyspark.yml
vim pyspark.yml

在pyspark.yml中写入以下内容(注意:'-'符号前必须为两个空格):

1
2
3
4
5
6
7
8
9
10
11
name: pyspark
dependencies:
- python=3.6
- jupyter
- ipython
- numpy
- numpy-base
- pandas
- py4j
- pyspark
- pytz

新建PySpark虚拟环境:

1
conda env create -f pyspark.yml

确认安装:

1
2
3
4
5
6
conda env list
# 输出 pyspark
# 启用虚拟环境:
conda activate pyspark
# 进入PySpark shell
pyspark

单机应用示例

SparkContext是任何spark功能的入口点。当我们运行任何Spark应用程序时,启动一个驱动程序,它具有main函数,并在此处启动SparkContext。然后,驱动程序在工作节点上的执行程序内运行操作。

SparkContext使用Py4J启动 JVM 并创建 JavaSparkContext。默认情况下,PySpark将SparkContext作为 'sc'提供 ,因此创建新的SparkContext将不起作用。[2]

以下是单机示例代码,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from pyspark import SparkConf
from pyspark.sql import SparkSession
import traceback

appname = "test"#任务名称
master ="local"#单机模式设置
'''
local: 所有计算都运行在一个线程当中,没有任何并行计算,通常我们在本机执行一些测试代码,或者练手,就用这种模式。
local[K]: 指定使用几个线程来运行计算,比如local[4]就是运行4个worker线程。通常我们的cpu有几个core,就指定几个线程,最大化利用cpu的计算能力
local[*]: 这种模式直接帮你按照cpu最多cores来设置线程数了。
'''
try:
conf = SparkConf().setAppName(appname).setMaster(master)#spark资源配置
spark=SparkSession.builder.config(conf=conf).getOrCreate()
sc=spark.sparkContext
# 创建RDD单词,其中存储了一组提到的单词,在RDD中的计算会自动在集群中并行
words = sc.parallelize(
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"
])
counts = words.count() # 返回RDD中的元素数
print("Number of elements in RDD is %i" % counts)
sc.stop()
print('计算成功!')
except:
sc.stop()
traceback.print_exc()#返回出错信息
print('连接出错!')

集群安装与配置

需要在各个集群节点上安装Spark[4]。

安装Spark

  • 下载页面下载Spark

  • 下载到的文件以spark-2.4.5-bin-hadoop2.7.tgz为例(需要jdk1.8),解压

    1
    2
    tar xzvf spark-2.4.5-bin-hadoop2.7.tgz
    export SPARK_HOME=/xxx/xxx/spark-2.4.5 # 可以添加到bashrc中
  • 将SPARK_HOME/conf/spark-env.sh.template 复制一份重新命名为spark-env.sh

    1
    2
    3
    mkdir -p /tmp/spark_logs
    cd SPARK_HOME/conf/
    vim spark-env.sh

    master节点和slave节点的spark-env.sh均填入以下配置:

    1
    2
    3
    4
    SPARK_MASTER_HOST='192.168.0.102'
    export SPARK_LOG_DIR=/tmp/spark_logs
    export PYSPARK_PYTHON=/usr/bin/python3 # 指向你的pyspark环境的python
    export PYSPARK_DRIVER_PYTHON=/usr/bin/ipython # 指向你的pyspark环境的ipython
  • 启动master

    1
    2
    cd SPARK_HOME/sbin
    ./start-master.sh

    将会看到类似以下输出:

    1
    2
    ~$ ./start-master.sh
    starting org.apache.spark.deploy.master.Master, logging to /tmp/spark_logs/spark-arjun-org.apache.spark.deploy.master.Master-1-arjun-VPCEH26EN.out
  • 启动slave

    1
    ./start-slave.sh spark://<your.master.ip.address>:7077

    将会看到类似以下输出:

    1
    starting org.apache.spark.deploy.worker.Worker, logging to /tmp/spark_logs/spark-experiment-org.apache.spark.deploy.worker.Worker-1-slave026.out

集群应用实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from pyspark import SparkConf
from pyspark.sql import SparkSession
import traceback

appname = "test_cluster"#任务名称
master ="spark://192.168.0.102:7077"#单机模式设置
'''
local: 所有计算都运行在一个线程当中,没有任何并行计算,通常我们在本机执行一些测试代码,或者练手,就用这种模式。
local[K]: 指定使用几个线程来运行计算,比如local[4]就是运行4个worker线程。通常我们的cpu有几个core,就指定几个线程,最大化利用cpu的计算能力
local[*]: 这种模式直接帮你按照cpu最多cores来设置线程数了。
'''
try:
conf = SparkConf().setAppName(appname).setMaster(master)#spark资源配置
spark=SparkSession.builder.config(conf=conf).getOrCreate()
sc=spark.sparkContext
words = sc.parallelize(
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"
])
counts = words.count()
print("Number of elements in RDD is %i" % counts)
sc.stop()
print('计算成功!')
except:
sc.stop()
traceback.print_exc()#返回出错信息
print('连接出错!')

参考

[1] https://www.guru99.com/pyspark-tutorial.html

[2] http://codingdict.com/article/8880

[3] https://blog.csdn.net/qq_23860475/article/details/90476197

[4] https://www.tutorialkart.com/apache-spark/how-to-setup-an-apache-spark-cluster/