如何解决Flink docker compose-自定义库
我正在尝试使用docker-compose设置Flink会话集群。我想在Flink中加载一个自定义库,因为该库包含我所有工作使用的代码。通过创建如下所示的自定义docker映像来实现此目的:
FROM flink:1.10.0
WORKDIR /opt/flink/lib
RUN mkdir /opt/flink/usrlib
RUN chown flink:flink /opt/flink/usrlib
ADD --chown=flink:flink ./myLibrary.jar /opt/flink/lib/myLibary.jar
作业/任务管理器成功启动。当我使用Web UI提交作业时,我的作业可以正常运行,但有一个例外:
在我的库中,我有一个flink映射运算符(称为DeserialisationMapper),该运算符使用来自Kafka的JSON消息,并根据消息中的标记创建自定义Java对象。例如,如果消息是
{"objectType": "Address","street": "Street 1"}
我的DeserialisationMapper生成Java POJO,即Address类的实例,其字段“ street”的字段设置为“ Street 1”。我使用Java反射来做到这一点。 POJO 的自定义Java类仅在作业本身中可用(不是库)。当我在Eclipse中执行程序时(我的自定义库作为Maven依赖项提供),一切正常。 DeserialisationMapper能够找到作业项目中的自定义Java类。当我为该作业导出一个“胖” jar时,该jar包含了该作业的所有依赖项(例如myLibrary.jar)并将其部署到flink集群中,它也可以正常工作。但是,当我尝试将我的库放在flink集群中(使用上面显示的自定义图像)并将其从作业jar中排除时,我收到ClassNotFoundException,提示无法找到特定的类(例如Address),尽管该类看起来正确(例如,org.eclipse.myJob.datatypes.Address)-我确认该类位于作业jar中的正确位置。注意:我的工作实际上可以访问myLibrary.jar中的方法,例如Kafka使用者是在myLibrary.jar中的方法中创建的,而该方法由我的工作调用。)
为什么会这样? myLibrary.jar不能找到我的工作罐中包含的类吗?我应该进行任何特定的配置还是完全不可能?
解决方法
自己找到解决方案。根据{{3}},来自会话群集中用户jar的类是动态加载的,因此无法被Flink的classpath中加载的lib访问。一种解决方案是将用户jar放在lib文件夹中,这对我不起作用,因为我希望我的用户能够通过UI提交其作业。 this中介绍了另一个对我有用的解决方案。基本上,当需要来自用户jar的类时,您的Flink运算符应使用getRuntimeContext().getUserCodeClassLoader()
实例化类加载器。为了做到这一点,它们应该是Rich函数(例如RichFlatMapFunction)。然后,使用该类加载器,可以调用loadClass(className)
方法,该方法指向用户类所在的路径。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。