如何解决Apache Nifi Flowfile 到 Elasticsearch 批量 api
我正在尝试准备 flowfile 以使用标准 InvokeHttp 处理器通过版本控制对 elasticsearch 进行批量调用,因为我未能使其与 PutElasticsearch* 处理器一起使用
Elasticsearch 批量端点接受 ndjson 数据格式,如下所示:
{"index": {"_index":"foo","id":"bar","version":1,"version_type":"external}}\n
{"id":"bar","field1":"field1_value","field2":"field2_value"}\n
{"index": {"_index":"foo","id":"bizz","version":2,"version_type":"external}}\n
{"id":"bizz","field1":"field3_value","field2":"field4_value"}\n
我当前的处理器设置如下所示: Change to elasticsearch bulk format flow
我正在使用带有简单正则表达式 message.body
的 ExtractText 处理器将 id 提取到属性,将版本提取到属性并将流文件内容提取到属性 .*
,
通过属性中的内容,我可以将 ConvertRecord 处理器与 RecordReader 一起使用:JsonTreeReader
和 RecordWriter:FreeFormTextRecordSetWriter
,Text 属性设置为:
{"index": {"_index":"foo","_id":"${id}","version":"${version","version_type":"external"}}}
${message.body}
现在转换流文件后,我使用 MergeContent 处理器合并它们,并通过 InvokeHttp 处理器将它们发送到 elasticsearch,并将 url 设置为 ${elastic.url}/foo/_bulk 并且 Content-Type 设置为 x-ndjson 类型。
有没有可能做得更好?我有一种感觉,我的解决方案不是最好的,但我是 Nifi 的新手,我无法以其他方式解决它。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。