本文介绍了 Py4j的使用以及 Flink官方如何使用 Py4j进行混合语言编程,最后会介绍下我们会应用这种技术在我们的 Flink Notebook 服务,来创建一个混合语言编程环境。
Flink Notebook 服务是我司自研的基于Notebook方式的Flink 开发平台,他支持用户通过SQL方式和JAR包方式进行混合编程,并通过一些配置,既可完全的在页面上完成FLINK任务的开发工作,如图:
通过不同的Notebook Type我们可以加载不同类型的组件,通过table结果集流转方式,承接上下游,以完成相应的功能。目前的插件类型主要是主要分:1.SQL组建:可以自由撰写SQL,2.格式化组建:sink或者source,有具体的格式,标准的前端组建对应,3.JAR包自定义组件,通过用户上传自己开发的jar包完成对应的逻辑。
对于Jar包自定义组件来说,他是为了解决1%的特异性需求的,但问题是其代码不可见,逻辑也相对自由,有违Notebook的初衷,因此,我们想设计一种Notebook的Type,支持可视化的Python编写,可以直接将代码在页面上进行开发。
Flink 本身来说,就有PyFlink 和 Python UDF support,因此python和 flink的耦合度应该很高,所以我们要了解Flink是怎么做的,从而研究我们应该如何去做,所以本文会分成以下3个部分来介绍整个混编逻辑:
1. Java与Python 通信:Py4J 2. Py4j in Flink 3. Notebook with Python
Py4j可以使运行于python解释器的python程序动态的访问java虚拟机中的java对象。Java方法可以像java对象就在python解释器里一样被调用, java collection也可以通过标准python collection方法调用。Py4j也可以使java程序回调python对象。
详细说明可以参考官网 https://www.py4j.org/
安装以及基本使用也可以参考官网
Py4j可以在系统中创建一个 java和python 之间通信的socket管道。
我们可以通过一个例子来看整个Py4j是如何工作的。
我们先创建一个想让python负责具体实现的Java 接口:
public interface TestEnterPoint { String gift(HashMap<String,String> a, String b); }
在java 服务端,我们通过以下代码可以启动一个简单的Py4j监听:
public static void main(String[] args) { ListenerApplication application = new ListenerApplication(); GatewayServer server = new GatewayServer(application); server.start(true); }
ListenerApplication 表示一个允许共享给python的类,她可以是任意java类,包括Map,List等复杂结构化数据:
public class ListenerApplication { TestEnterPoint enterPoint = new TestEnterPoint(); public void setListener(TestEnterPoint enterPoint) { this.enterPoint = enterPoint; } public void notifyAllListeners() { HashMap<String,String> map = new HashMap<>(); map.put("a","aaaa"); Object returnValue = listener.gift(map,"a"); System.out.println(returnValue); } }
而在Python端,我们可以通过以下代码运行一个python程序:
from py4j.java_gateway import JavaGateway, CallbackServerParameters class TestEnterPoint(object): def gift(self, map, key): return map.get(key) class Java: implements = ["com.xxxx.xxx.test.py.TestEnterPoint"] if __name__ == "__main__": gateway = JavaGateway( callback_server_parameters=CallbackServerParameters()) listener = TestEnterPoint() gateway.entry_point.setListener(listener) gateway.entry_point.notifyAllListeners() gateway.shutdown()
这样我们就通过Python来实现了一个 map.get(key) 的方法
整个过程中,我们看出几点对于python来说比较基本的使用方式,那就是,第一,通过Python 中implements = ["com.xxxx.xxx.test.py.TestEnterPoint"]
的使用方式,我们可以实现一个Java的Interface,第二,通过gateway.entry_point的方式,我们可以拿到java中设置的可共享变量,第三个我们在例子中并没有呈现,但也是非常基础的使用,就是通过在python中使用 gateway.jvm.com.xxxx.xxx.test.py.TestServer
的方式,允许python使用任何java的class,允许初始化,允许调用方法,但是他们如果想和java端进行数据通信,则必须通过entry_point来实现。
讲完Py4j并且如果把上面的代码自己拿来试下,应该已经对整个python和java互通有一定理解了,那么我们Flink中如何使用Py4J来进行混编,也就顺理成章的很好理解了,在Flink中,有很多地方使用到了这种技术,包括PyFlink,以及Python UDF support,PyFlink 属于Pyton为主,也比较复杂,这边就先就以简单的Python UDF为例,梳理下Flink的执行逻辑。
在Flink Java中如何使用Python UDF
在Flink 中使用Python的UDF相对来说非常简单,创建一个Python代码,比如:
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) def func1(s: str): return s.replace('hello', 'ni hao')
在Flink Java中,需要配置Python环境变量,首先将Python文件加到环境中去,如果是集群提交,需要加到依赖中去(使用-pyfs 提交Python文件),或者远程的Hdfs文件。其次需要配置Python的程序依赖环境路径:
configuration.setString("python.files", "/Users/yourName/test.py"); configuration.setString("python.client.executable", "python3"); configuration.setString("python.executable", "/usr/bin/python3");
最后,我们在使用过程中,比如通过SQL使用时候,只需要如下SQL语句即可:
create temporary system function func1 as 'test1.func1' language python
其中test1是python的文件名"test1.py"而 func1就是上文中的那个python 的function name,如此既可以在java中使用python实现的UDF
Flink是如何实现这些的
在追踪Flink Sql是如何执行create function过程中,我们发现整个Flink的执行流程大致如图:
Flink会通过语法解析后的通过create function的后缀“ language python”判断是否是Python fuction,如果是,会调用PythonFunctionUtils来获取function,而PythonFunctionUtil最终通过动态加载的PythonFunctionFactory来最终调用Py4j。这里可以看见他的逻辑其实也比较简单,首先就是启动Py4j的Java端server,然后主要就是通过环境变量,以及configture 里的各种参数,最终拼接出python的cmd 执行命令,运行命令并通过entryPoint获取其中的贡献类。最终生成我们在java端可以用的function。
这块如果有兴趣,在Flink源码中搜索 PythonFunctionFactory 可以直接看见相关代码。
我们平台是类似Zeeplin的可视化Notebook编程页面,对于我们来说,要在页面上支持Python编程,有几种方案:
方案一对于我们来说并不难,可以看到Flink官方既是支持Python UDF的,我们只需要将这个Notebook Part里的内容,生成Python文件并添加到环境中一起提交即可,但这种方案没法解决我们上面提出的一大痛点,用户的1%需要Jar包开发的非标任务,不是单单可以通过UDF来实现的。
方案二对于我们来说,最大的问题是所有的优化,整个程序体系都是建立在Java 基础上的,改动会非常巨大。
如此,只能采取方案三,而方案三的问题是,Flink的原版PyFlink只创建了 PythonFunctionFactory 和一个 心跳2个 entryPoint,这对我们来说比较局限。所以我们会采取模仿 PythonFunctionFactory 的方式,自己创建Py4j进程,来完成Notebook的混编实现
这里的详细设计以及Demo 我们会在下篇文章(二)中放出。谢谢各位。