如何解决NiFi-ScanHBase处理器-使用原始流文件
我试图弄清楚如何使用NiFi解决以下问题。
场景:
我的NiFi流正在接收JSON消息,它们是输入流文件(来自Kafka主题),对于每个输入JSON,我将执行以下操作
- 我想基于输入json中的属性值扫描HBase表。这将是行前缀扫描,预计将返回 结果最多6条记录
- 对于结果中收到的每个行键,我想在同一张表中进行更新。
- 一旦我对收到的所有行键进行了更新,我想传递原始流程文件(从卡夫卡收到的) 进入后续流程组以继续我的处理。
我已经使用ScanHBase处理器从HBase过滤记录,但是我面临的问题是-ScanHBase处理器没有丰富选项 输入json,但相反,它将以不同的格式返回原始流文件或结果。 我知道可能是在HBaseScan之后充实JSON没有意义,因为预期Hbase扫描会返回记录列表,并且可能是 将输入的JSON和结果一起添加是没有意义的。
我可以使用的可能选项是-使用带有合并策略和相关属性的MergeContent处理器,但是此后MergeContent的结果有点难以使用 进行处理。
还有其他方法可以解决此问题吗?
我的流程如下所示
合并内容配置
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。