PyODPS DataFrame 的代码在哪里跑
使用 PyODPS DataFrame 写数据应用时,同一个脚本文件里的代码,可能跑到不同的地方去执行——本地、MaxCompute Executor,甚至是 DataWorks 这样的开发环境。这种“分身术”有时会带来一些意想不到的问题。这篇文章就来聊聊,怎么搞清楚代码到底在哪儿跑,以及遇到典型问题该怎么解决。
概述
先看一段示例代码:
from odps import ODPS, options
import numpy as np
o = ODPS(access_id, access_key, project, endpoint)
df = o.get_table('pyodps_iris').to_df()
coeffs = [0.1, 0.2, 0.4]
def handle(v):
import numpy as np
return float(np.cosh(v)) * sum(coeffs)
options.df.supersede_libraries = True
val = df.sepal_length.map(handle).sum().execute(libraries=['numpy.zip', 'other.zip'])
print(np.sinh(val))
先泼盆冷水:PyODPS 本质上只是一个 Python 包,不是什么魔改的 Python 解释器。它运行在标准的 Python 环境中,所以不会自动把单机代码变成分布式执行——你写的每一条语句,行为都和普通 Python 完全一致,别指望它自己“变魔法”。
下面解释这段代码的执行过程。

上图是执行时可能涉及的系统。代码本身执行的位置用紫色标出——这些系统都在 MaxCompute 外部,为了方便,下文统称“本地”。本地执行的代码包括 handle 函数之外的部分(注意 handle 传进 map 时只传了函数本身,还没执行)。所以这些代码的行为和普通 Python 一致,import 第三方包时引用的是本地的包。这就引出问题了:上面代码里 libraries=['numpy.zip', 'other.zip'] 引用的 other.zip 如果没在本地安装,那本地代码里一旦有 import other,立刻报错——即便 other.zip 已经上传到 MaxCompute 资源,本地根本找不到它。理论上,本地代码只要不涉及 PyODPS 包,就和 PyODPS 无关,需要用户自己排查。
那 handle 函数呢?情况完全不同。当你把 handle 传进 map 方法时(假设后端是 MaxCompute),它先被 cloudpickle 模块打包——闭包、字节码一并带走。然后 PyODPS DataFrame 用这些生成一个 Python UDF,提交给 MaxCompute。作业以 SQL 形式执行时,会调用这个 UDF,在 MaxCompute Executor 里 unpickle 并运行。所以:
handle函数体里的代码全在 MaxCompute Executor 里跑,本地不执行。handle里用不了本地安装的包,只有 Executor 里存在的包才管用。- 上传的第三方包必须兼容 Executor 的 Python 版本(目前是 Python 2.7,UCS2)。
handle里修改外部变量(比如coeffs)不会影响本地的值。- 如果在
handle外 import 了包,然后在handle里调用,可能报错——不同环境的包结构不一样,cloudpickle 会把本地引用带过去,导致报错。所以强烈建议 import 写在handle内部。 - 因为用了 cloudpickle,如果
handle调用了其他文件里的代码,那些文件所在的包必须存在于 Executor 里。不想搞第三方包的话,就把所有个人代码塞进同一个文件。
上述对 handle 的解释同样适用于自定义聚合、apply 和 map_reduce 中调用的自定义方法 / Agg 类。如果用的后端是 Pandas,那所有代码都会在本地跑,本地也需要装好相关包。不过 Pandas 后端调试完通常会转到 MaxCompute 运行,所以建议在本地装包的同时,按 MaxCompute 后端的惯例来开发。
使用第三方包
个人电脑 / 自有服务器在本地使用第三方包 / 其他文件中的代码
在相应的 Python 版本上安装即可。
DataWorks 中本地使用其他文件中的代码
该部分功能由 DataWorks 提供,参考 DataWorks 文档。
map / apply / map_reduce / 自定义聚合中使用第三方包 / 其他文件中的代码
参考阿里云官方文档的相关文章(https://yq.aliyun.com/articles/591508)。需要补充的是:在 DataWorks 上上传资源后,记得点击“提交”确保资源被正确上传到 MaxCompute。如果需要用自己的 Numpy 版本,上传正确版本的 wheel 包的同时,要配置 odps.df.supersede_libraries = True,并确保上传的 numpy 包名位于 libraries 最前面;如果指定了 options.df.libraries,则 numpy 包名需要位于 options.df.libraries 最前面。
引用其他 MaxCompute 表中的数据
个人电脑 / 自有服务器在本地访问 MaxCompute 表
如果 Endpoint 可以连接,使用 PyODPS / DataFrame 访问。
map / apply / map_reduce / 自定义聚合中访问其他 MaxCompute 表
MaxCompute Executor 里通常不支持访问 Endpoint / Tunnel Endpoint,上面也没有 PyODPS 包可用——所以不能直接使用 ODPS 入口对象或者 PyODPS DataFrame,也不能从自定义函数外部传入这些对象。如果表的数据量不大,建议将 DataFrame 作为资源传入(参考 PyODPS 官方文档:https://pyodps.readthedocs.io/zh_CN/latest/df-element.html#function-resource)。数据量大的话,改写成 join 更合适。
访问其他服务
个人电脑 / 自有服务器在本地访问其他服务
保证自己的环境能正常访问相关服务,生产服务器可以找 PE 沟通。
DataWorks 上的本地代码中访问其他服务
请咨询 DataWorks 技术支持。
map / apply / map_reduce / 自定义聚合中访问其他服务
参考上文提到的文档启用 Isolation,如果仍然遇到网络报错,请联系 MaxCompute 用户群,找售后帮忙解决。