如何解决Elixir Flow在本地计算机上工作,但不处理数据并抛出:[警告] ** AWS Fargate的“ GenStage.Streamer”中未定义的handle_info
我一直在尝试使用Elixir Flow(由GenStage构建的Lib)处理/流来自AWS S3存储桶文件的数据,并写入AWS RDS DB。
我已经能够在本地计算机上成功完成此操作,但是当我将应用程序部署到AWS ECS / Fargate或EC2时,它无法按预期运行。 下面是我的实现:
def load_file(file_name) do
window = Flow.Window.count(100)
file_name
|> HTTPStream.get()
|> HTTPStreamUtil.lines()
|> Flow.from_enumerable()
|> Flow.filter(&(String.match?(&1,~r/^rec/)))
|> Flow.map(fn line ->
line
|> String.replace("\n","")
|> String.replace("\"","")
|> String.split(";")
|> transform
end)
|> Flow.partition(window: window,key: {:key,"day_type_no"})
|> Flow.reduce(fn -> [] end,fn item,batch -> [item | batch] end)
|> Flow.on_trigger(fn items ->
items
|> add_timestamps
|> Database.Routes.create_multiple_day_type
{[items],items}
end)
|> Flow.run()
end
当我在AWS Fargate或EC2中运行应用程序时,我看到下面的日志,并且未处理部分/全部数据。
[warn] ** Undefined handle_info in "GenStage.Streamer"
** Unhandled message: {:tcp,#Port<0.20>," \"SBST#c29 #01#06#2019#3\"\r\nrec; 20200512; 32510; \"SBST#37 #01#07#2019#4\"\r\nrec; 20200512; 32511; \"SBST#31 #03#00#2019#5\"\r\nrec; 20200512; 32512; \"SBST#298 #01#05#2019#6\"\r\nrec; 20200512; 32513; \"SBST#c40 #02#06#2019#7\"\r\nrec; 20200512; 32514; \"SBST#229 #02#06#2019#1\"\r\nrec; 20200512; 32515; \"SBST#298 #01#00#2019#2\"\r\nrec; 20200512; 32516; \"SBST#c291 #01#00#2019#3\"\r\nrec; 20200512; 32517; \"SBST#38 #03#00#2019#4\"\r\nrec; 20200512; 32518; \"SBST#33 #06#04#2019#5\"\r\nrec; 20200512; 32519; \"SBST#2N #01#05#2019#6\"\r\nrec; 20200512; 32520; \"SBST#401 #02#06#2019#7\"\r\nrec; 20200512; 32521; \"SBST#c23 #01#00#2019#1\"\r\nrec; 20200512; 32523; \"SBST#c291 #01#06#2019#3\"\r\nrec; 20200512; 32524; \"SBST#38 #01#07#2019#4\"\r\nrec; 20200512; 32525; \"SBST#33 #06#07#2019#5\"\r\nrec; 20200512; 32526; \"SBST#31 #01#05#2019#6\"\r\nrec; 20200512; 32527; \"SBST#42 #03#06#2019#7\"\r\nrec; 20200512; 32528; \"SBST#c23 #01#06#2019#1\"\r\nrec; 20200512; 32529; \"SBST#2N #01#65#2019#2\"\r\nrec; 20200512; 32530; \"SBST#292 #01#00#2019#3\"\r\nrec; 20200512; 32531; \"SBST#c39 #01#00#2019#4\"\r\nrec; 20200512; 32532; \"SBST#35 #02#00#2019#5\"\r\nrec; 20200512; 32533; \"SBST#33 #02#05#2019#6\"\r\nrec; 20200512; 32534; \"SBST#45 #01#06#2019#7\"\r\nrec; 20200512; 32535; \"SBST#c29 #01#00#2019#1\"\r\nrec; 20200512; 32536; \"SBST#31 #03#00#2019#2\"\r\nrec; 20200512; 32537; \"SBST#292 #01#06#2019#3\"\r\nrec; 20200512; 32538; \"SBST#c39 #01#07#2019#4\"\r\nrec; 20200512; 32539; \"SBST#37 #03#00#2019#5\"\r\nrec; 20200512; 32540; \"SBST#35 #02#05#2019#6\"\r\nrec; 20200512; 32541; \"SBST#c46 #03#06#2019#7\"\r\nrec; 20200512; 32542; \"SBST#c29 #01#06#2019#1\"\r\nrec; 20200512; 32543; \"SBST#33 #06#25#2019#2\"\r\nrec; 20200512; 32544; \"SBST#c293 #05#00#2019#3\"\r\nrec; 20200512; 32545; \"SBST#4 #02#00#2019#4\"\r\nrec; 20200512; 32546; \"SBST#37 #01#07#2019#5\"\r\nrec; 20200512; 32547; \"SBST#37 #01#05#2019#6\"\r\nrec; 20200512; 32548; \"SBST#47 #02#06#2019#7\"\r\nrec; 20200512; 32549; \"SBST#c291 #01#00#2019#1\"\r\nrec; 20200512; 32550; \"SBST#33 #06#07#2019#2\"\r\nrec; 20200512; 32551; \"SBST#c293 #01#06#2019#3\"\r\nrec; 20200512; 32552; \"SBST#c40 #02#00#2019#4\"\r\nrec; 20200512; 32553; \"SBST#38 #03#00#2019#5\"\r\nrec; 20200512; 32554; \"SBST#38 #03#05#2019#6\"\r\nrec; 20200512; 32555; \"SBST#4N #01#67#2019#7\"\r\nrec; 20200512; 32556; \"SBST#c291 #01#06#2019#1\"\r\nrec; 20200512; 32557; \"SBST#35 #02#00#2019#2\"\r\nrec; 20200512; 32558; \"SBST#298 #01#00#2019#3\"\r\nrec; 20200512; 32559; \"SBST#c40 #01#07#2019#4\"\r\nrec; 20200512; 32560; \"SBST#38 #01#07#2019#5\"\r\nrec; 20200512; 32561; \"SBST#c39 #01#05#2019#6\"\r\nrec; 20200512; 32562; \"SBST#506 #02#06#2019#7\"\r\nrec; 20200512; 32563; \"SBST#292 #01#00#2019#1\"\r\nrec; 20200512; 32564; \"SBST#37 #03#00#2019#2\"\r\nrec; 202" <> ...}
** Stream started at:
(gen_stage 1.0.0) lib/gen_stage.ex:1609: GenStage.from_enumerable/2
(stdlib 3.13) supervisor.erl:385: :supervisor.do_start_child_i/3
(stdlib 3.13) supervisor.erl:371: :supervisor.do_start_child/2
(stdlib 3.13) supervisor.erl:677: :supervisor.handle_start_child/2
(stdlib 3.13) supervisor.erl:426: :supervisor.handle_call/3
(stdlib 3.13) gen_server.erl:706: :gen_server.try_handle_call/4
(stdlib 3.13) gen_server.erl:735: :gen_server.handle_msg/6
(stdlib 3.13) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
我尝试使用不同的Flow.Window例如计数,全局等,它们都不如预期那样工作。另外我怀疑这可能是我的远程服务器/任务没有启用CPU /内存,我试图将任务CPU增加到2vCPU,它确实有助于处理更多数据,但不是所有数据。
有线问题是,此代码在我的本地计算机上可以完美工作(我的PC的强大cos功能强大?)。我想和大家一起检查,看看有人知道为什么会发生这种情况吗?
非常感谢您。
解决方法
我尝试了另一种方法,将文件从S3下载到已安装的docker卷,然后流传输数据,它起作用了。以下是更改。
def load_file(file_name) do
# download vdv452 file
file_path = Application.get_env(:ex_aws,:docker_volume) <> (String.split(file_name,"/") |> List.last)
ExAws.S3.download_file(Application.get_env(:ex_aws,:s3_bucket_name),file_name,file_path,timeout: 3600)
|> ExAws.request!
# stream file data
file_path
|> File.stream!
|> Flow.from_enumerable()
|> Flow.filter(&(String.match?(&1,~r/^rec/)))
|> Flow.map(fn line ->
line
|> String.replace("\n","")
|> String.replace("\"","")
|> String.split(";")
|> transform
end)
|> Flow.partition(window: Flow.Window.count(1_000))
|> Flow.reduce(fn -> [] end,fn item,batch -> [item | batch] end)
|> Flow.on_trigger(fn items ->
items
|> add_timestamps
|> Database.Routes.create_multiple_route_sequence
{[items],items}
end)
|> Flow.run()
end
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。