页面树结构

2017-11-09 ApacheCN 开源组织,第二期邀请成员活动,一起走的更远 : http://www.apachecn.org/member/209.html


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

运行 Spark Python 应用

用 Java 和 Scala 访问 Spark 提供了许多优点 : 因为 Spark 它自己运行在 JVM 中,运行在 JVM 内部是平台无关的,独立的代码包和它打入到 JAR 文件中的依赖,以及更高的性能。如果您使用 Spark Python API 将会失去这些优势。

管理依赖并让它们可用于群集上的 Python Job 是很难的。为了确定哪些依赖在群集上是需要的,您必须了解运行在分布式 群集 中的 Spark executor 进程中的 Spark 应用程序的代码。如果您定义的 Python 转换使用的任何的第三方库,比如 NumPy 或者 nltk,当它们运行在远程的 executor 上时 Spark executor 需要访问这些库。

独立的依赖关系

常见的情况中,一个自定义的 Python 包包含了你想要应用到每个 RDD 元素的 功能。下面展示了一个简单的例子 : 

def import_my_special_package(x):
  import my.special.package
  return x

int_rdd = sc.parallelize([1, 2, 3, 4])
int_rdd.map(lambda x: import_my_special_package(x))
int_rdd.collect()

您创建了一个有 4 个元素名为 int_rdd 的简单的 RDD。然后您应用函数 import_my_special_package 到每个 int_rdd 的元素。这个函数导入了 my.sepcial.package 并且返回了传入的原始参数。这样和使用类或者定义的函数有相同的作用,因为 Spark 需要每个 Spark executor 在需要时导入 my.special.package。

如果你只需要 my.special.package 内部一个简单的文件,您可以通过在您的 spark-submit 命令中使用 --py-files 选项并指定文件的路径来直接让 Spark 的所有 executor 都可以获取。您也可以以编程的方式通过使用 sc.addPyFiles() 函数来指定。如果您使用的功能来自跨越多个文件的 package,为 package 制作一个 egg,因为 --py-files 标记也接受一个 egg 文件的路径。

如果您有一个独立的依赖关系,您可以使用两种方式让需要的 Python 依赖是可用的。

  • 如果您只依赖一个单独的文件,您可以使用 --py-files 命令行选项,或者以编程的方式用 sc.addPyFiles(path) 兵指定本地 Python 文件的路径添加这些文件到 SparkContext。
  • 如果您有一个独立模块上的依赖(一个没有其它依赖的模块),您可以创建一个这些模块的 egg 或者 zip 文件,使用 --py-files 命令行选项或者以编程的方式用 sc.addPyFiles(path) 兵指定本地 Python 文件的路径添加这些文件到 SparkContext。

复杂的依赖关系

一些操作依赖有许多依赖关系的复杂的 package。例如,下列的代码片段导入了 Python pandas 数据分析库 : 

def import_pandas(x):
 import pandas
 return x

int_rdd = sc.parallelize([1, 2, 3, 4])
int_rdd.map(lambda x: import_pandas(x))
int_rdd.collect()

pandas 依赖 NumPy,SciPi,和许多其它的 package。尽管 pandas 作为一个 *.py 文件来分发是很复杂的,您可以为它和它的依赖建一个 egg 并发送它们到 executo 。

分发 Egg 文件的限制

在这两个独立的,复杂的依赖关系的情况中,发送 egg 文件是有问题的,因为带有原代码包必须在其上运行的特定主机进行编译。当行业业标准的硬件做分布式计算,你必须假设的是,硬件是多样化的。然而,由于所需的 C 编译,内置客户端主机上一个 Python egg 是特定的客户端的 CPU 架构。因此,分配复杂的 egg,像编译 NumPy,SciPy 的 package 和 pandas 经常出现故障。相反,分发 egg 文件,你应该在群集的每个主机上安装所需的 Python 包,并指定 Python 的二进制文件的路径为 worker 主机使用。

安装以及保持 Python 环境

安装和维护的Python环境可能比较复杂,但可以让你使用完整的Python包生态系统。系统管理员在用您需要的依赖在群集的每台主机上安装 Anaconda distribution 或者设置一个 虚拟环境

如果您使用 Cloudera Manager,您可以使用方法将 Anaconda distribution 作为一个 Parcel 来部署。

最低需要的角色 : 群集管理员(或者管理员)

  1. 添加下面的 URL https://repo.continuum.io/pkgs/misc/parcels/ 到远程的 Parcel 仓库 URL,像 Parcel 配置设置 中描述的一样。
  2. 管理 Parcels 中描述的一样下载,分发,并且激活 Parcel。

Anaconda 被安装在 parcel 目录/Anaconda 中,parcel 目录默认是 /opt/cloudera/parcels,但是可以在 parcel 配置设置中更改。Anaconda parcel 支持 Continuum Analytics

如果您没有使用 Cloudera Manager,您可以在您的群集中安装一个虚拟环境通过在每台主机上运行命令 Cluster SSHParallel SSH,或者 Fabric。假设每台主机已经安装了 Python 和 pip,在一个 RHEL 6 兼容的系统中上的虚拟环境中使用下面的命令来安装标准的数据栈(NumPy,SciPy,scikit-learn,和 pandas)。

# Install python-devel:
yum install python-devel

# Install non-Python dependencies required by SciPy that are not installed by default:
yum install atlas atlas-devel lapack-devel blas-devel

# install virtualenv:
pip install virtualenv

# create a new virtualenv:
virtualenv mynewenv

# activate the virtualenv:
source mynewenv/bin/activate

# install packages in mynewenv:
pip install numpy
pip install scipy
pip install scikit-learn
pip install pandas

设置 Python 路径

之后您想要使用的 Python 包在您群集中一致的位置,如下所示设置相应的环境变量路径到您的 Python 可执行的文件 : 

  • Client 模式 - 用 PYSPARK_PYTHON 设置 executor 路径,用 PYSPARK_DRIVER_PYTHON 设置 driver 路径。
  • Cluster 模式 - 用 spark.yarn.appMasterEnv.PYSPARK_PYTHON 设置 executor 路径,用 spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON 设置 driver 路径。

为了使这些变量一致,添加相应的 export 语句 : 

  • Anaconda parcel - export 变量 /opt/cloudera/parcels/Anaconda/bin/python
  • Virtual environment - export 变量 /path/to/mynewenv/bin/python

到 sparl-env.sh,检查其他用户没有用条件测试的变量,比如 : 

if [ -z "${PYSPARK_PYTHON}" ]; then
export PYSPARK_PYTHON=
fi

在 Cloudera Manager 中,如下所示在 spark-env.sh 中设置环境变量 : 

最低需求角色 : 配置员(也可以用群集管理员,管理员)

  1. 转到 Spark 服务。
  2. 点击 配置 标签。
  3. 搜索 spark-conf/spark-env.sh 的 Spark 服务高级配置代码段(安全阀)
  4. 添加变量到属性中。
  5. 点击 保存更改 以提交更改。
  6. 重启服务。
  7. 部署客户端配置。

在命令行中,在 /etc/spark/conf/spark-env.sh 中设置环境变量。