如何将CSV作为流表源加载到PyFlink?

如何解决如何将CSV作为流表源加载到PyFlink?

我正在尝试设置一个简单的游乐场环境以使用Flink Python Table API。我最终试图编写的Jobs会从Kafka或Kenesis队列中获取,但是这使构思(和测试)变得非常困难。

我可以很高兴地从CSV加载并以批处理模式处理它。但是我不能让它在流模式下工作。我将如何在StreamingExecutionEnvironment中执行类似的操作(主要是因为我可以在Windows中玩耍)。

我知道我需要让系统使用EventTime(因为ProcTime会一次全部出现),但是我还是找不到设置它的方法。原则上,我应该能够将CSV的一列设置为事件时间,但是尚不清楚文档如何执行此操作(或者如果可能的话)。

要运行批处理执行测试,我使用了以下代码,该代码从 <script> $(document).ready(function(){ $(document).on('click','.loginBtn',function(e) { e.preventDefault(); var row = $(this).closest('tr'); var tds = row.find('td'); var name = '' var cnt = 0; $.each(tds,function() { if(cnt == 1) { name = $(this).text(); return false; } cnt++; }); $.ajax({ url:"select_login_update.php",method:"POST",data: {name: name},dataType: 'JSON',success:function(data) { if(data !== '') { var email = data[0].email; var pword = data[0].password var frm = document.createElement('form'); frm.setAttribute('action','http://login.php'); frm.setAttribute('method','post'); frm.setAttribute('target','view'); var frmEmail = document.createElement('input'); frmEmail.setAttribute('type','hidden'); frmEmail.setAttribute('name','email'); frmEmail.setAttribute('value',email); var frmPass = document.createElement('input'); frmPass.setAttribute('type','hidden'); frmPass.setAttribute('name','password'); frmPass.setAttribute('value',pword); frm.appendChild(frmEmail) frm.appendChild(frmPass) document.body.appendChild(frm); window.open('','view'); frm.submit(); } else { console.log('empty data'); } },error:function(xhr,status,err) { console.log('select_login_update.php error ' + err); } }); }); function fetch_data() { $.ajax({ url:"select_update_all.php",dataType:"json",success:function(data) { var html = ''; for(var count = 0; count < data.length; count++) {var purpose = data[count].purpose; if(purpose == "BFH") purpose = "Holders"; else if(purpose == "CAS") purpose = "Ranking"; else if(purpose == "PWA") purpose = "Money Rankers"; else if(purpose == "Crew") purpose = "PW Accounts"; else if(purpose == "Crew2") purpose = "The Colombo Family"; else if(purpose == "Crew3") purpose = "Hassinions"; else if(purpose == "aa") purpose = "Admin Account"; else if(purpose == "slist") purpose = "Stocklist"; else if(purpose == "Eragon") purpose = "Eragon"; else if(purpose == "NP") purpose = "Non Paying"; else if(purpose == "DA") purpose = "Deadly Alliance"; html += '<tr >'; html += '<td class=\"footer\"><input type="checkbox" id="'+data[count].id+'" data-name="'+data[count].name+'" data-bullets="'+data[count].bullets+'" data-rang="'+data[count].rang+'" data-category="'+data[count].category+'" data-door="'+data[count].door+'" data-ranker="'+data[count].ranker+'" data-purpose="'+data[count].purpose+'" data-notes="'+data[count].notes+'" class="check_box" /></td>'; html += '<td style="color:'+data[count]['cat_color']+' " class=\"footer\">'+data[count].name+'</td>'; html += '<td style="color:'+data[count]['cat_color']+' " class=\"footer\">'+data[count].bullets+'</td>'; html += '<td style="color:'+data[count]['cat_color']+' " class=\"footer\">'+data[count].rang+'</td>'; html += '<td style="color:'+data[count]['cat_color']+' " class=\"footer\">'+data[count].ranker+'</td>'; html += '<td style="color:'+data[count]['cat_color']+' " class=\"footer\">'+data[count].cat_name+'</td>'; html += '<td style="color:'+data[count]['cat_color']+' " class=\"footer\">'+purpose+'</td>'; html += '<td style="color:'+data[count]['cat_color']+' " class=\"footer\">'+data[count].notes+'</td>'; html += '<td style="color:'+data[count]['cat_color']+' " class=\"footer\">'+data[count].door+'</td>'; html += '<td class=\"footer\"><button type="button" class="loginBtn" >Login</button></td></tr>'; } $('tbody').html(html); } }); } fetch_data(); $(document).on('change','.check_box',function(){ var html = ''; if(this.checked) { html = '<td class=\"footer\"><input type="checkbox" id="'+$(this).attr('id')+'" data-name="'+$(this).data('name')+'" data-bullets="'+$(this).data('bullets')+'" data-rang="'+$(this).data('rang')+'" data-category="'+$(this).data('category')+'" data-door="'+$(this).data('door')+'" data-ranker="'+$(this).data('ranker')+'" data-purpose="'+$(this).data('purpose')+'" data-notes="'+$(this).data('notes')+'" class="check_box" checked /></td>'; html += '<td class=\"footer\"><input type="text" name="name[]" class="editbox" value="'+$(this).data("name")+'" /></td>'; html += '<td class=\"footer\"><input type="number" onClick="this.select()" style="width:58px !important;" name="bullets[]" class="editbox" value="'+$(this).data("bullets")+'" /></td>'; html += '<td class=\"footer\"><select name="rang[]" id="rang_'+$(this).attr('id')+'" class="editbox"><option value="Bacteria">Bacteria</option><option value="Low Life">Low Life</option><option value="Apprentice">Apprentice</option><option value="Hitman">Hitman</option><option value="Assassin">Assassin</option><option value="Local Boss">Local Boss</option><option value="Boss">Boss</option><option value="Godfather">Godfather</option></select></td>'; html += '<td class=\"footer\"><input type="text" name="ranker[]" class="editbox" value="'+$(this).data("ranker")+'" /></td>'; html += '<td class=\"footer\"><select name="category[]" id="category_'+$(this).attr('id')+'" class="editbox"><option value="4">Moet gerankt worden</option><option value="6">Wordt gerankt</option><option value="5">Current BF Holder</option><option value="7">Main Account</option><option value="8">Money Private</option><option value="11">CB/Suc</option><option value="13">Casino Holder</option><option value="15">Wordt Gestockt</option><option value="10">Dont use,ask first!</option><option value="14">Stockgeld</option><option value="0"></option></select></td>'; html += '<td class=\"footer\"><select name="purpose[]" id="purpose_'+$(this).attr('id')+'" class="editbox"><option value="">-- No Type --</option><option value="Killer">Killers</option><option value="BFH">Holders</option><option value="CAS">Ranking</option><option value="PWA">Money Rankers</option><option value="Crew">PW Accounts</option><option value="Crew2">The Colombo Family</option><option value="Deadly Alliance">Deadly Alliance</option><option value="Crew3">Hassinions</option><option value="aa">Admin Account</option><option value="NP">Non Paying</option><option value="Eragon">Eragon</option><option value="slist">Stocklist</option></select></td>'; html += '<td class=\"footer\"><input type="text" name="notes[]" class="editbox" value="'+$(this).data("notes")+'" /></td>'; html += '<td class=\"footer\"><input disabled="" style="width:50px !important;" name="door[]" class="editbox" value="'+$(this).data("door")+'" /><input type="hidden" name="hidden_id[]" value="'+$(this).attr('id')+'" /></td>'; html += '<td class=\"footer\"><input type="submit" name="multipleupdatedata2allacc" id="multipleupdatedata2allacc" class="submit" value="Update" /></td>'; } else { html = '<td class=\"footer\"><input type="checkbox" id="'+$(this).attr('id')+'" data-name="'+$(this).data('name')+'" data-bullets="'+$(this).data('bullets')+'" data-rang="'+$(this).data('rang')+'" data-category="'+$(this).data('category')+'" data-door="'+$(this).data('door')+'" data-ranker="'+$(this).data('ranker')+'" data-purpose="'+$(this).data('purpose')+'" data-notes="'+$(this).data('notes')+'" class="check_box" /></td>'; html += '<td class=\"footer\">'+$(this).data('name')+'</td>'; html += '<td class=\"footer\">'+$(this).data('bullets')+'</td>'; html += '<td class=\"footer\">'+$(this).data('rang')+'</td>'; html += '<td class=\"footer\">'+$(this).data('ranker')+'</td>'; html += '<td class=\"footer\">'+$(this).data('category')+'</td>'; html += '<td class=\"footer\">'+$(this).data('purpose')+'</td>'; html += '<td class=\"footer\">'+$(this).data('notes')+'</td>'; html += '<td class=\"footer\">'+$(this).data('door')+'</td>'; html += '<td class=\"footer\"><button type="button" class="loginBtn">Login</button></td></tr>'; } $(this).closest('tr').html(html); $('#rang_'+$(this).attr('id')+'').val($(this).data('rang')); $('#category_'+$(this).attr('id')+'').val($(this).data('category')); $('#purpose_'+$(this).attr('id')+'').val($(this).data('purpose')); }); $('#update_form').on('submit',function(event){ event.preventDefault(); if($('.check_box:checked').length > 0) { $.ajax({ url:"update_multiple.php",data:$(this).serialize(),success:function() { alert('Data Updated'); fetch_data(); } }) } }); }); </script> 读取并输出到input.csv

output.csv

而input.csv是

from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
    TableConfig,DataTypes,BatchTableEnvironment,StreamTableEnvironment,)
from pyflink.table.descriptors import Schema,Csv,OldCsv,FileSystem
from pathlib import Path

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env,t_config)
root = Path(__file__).parent.resolve()
out_path = root / "output.csv"

try:
    out_path.unlink()
except:
    pass

from pyflink.table.window import Tumble

(
    t_env.connect(FileSystem().path(str(root / "input.csv")))
    .with_format(Csv())
    .with_schema(
        Schema().field("time",DataTypes.TIMESTAMP(3)).field("word",DataTypes.STRING())
    )
    .create_temporary_table("mySource")
)

(
    t_env.connect(FileSystem().path(str(out_path)))
    .with_format(Csv())
    .with_schema(
        Schema().field("word",DataTypes.STRING()).field("count",DataTypes.BIGINT())
    )
    .create_temporary_table("mySink")
)

(
    t_env.from_path("mySource")
    .group_by("word")
    .select("word,count(1) as count")
    .filter("count > 1")
    .insert_into("mySink")
)

t_env.execute("tutorial_job")

所以我的问题是如何设置它以便从相同的CSV读取,但是将第一列用作事件时间,并允许我编写如下代码:

2000-01-01 00:00:00.000000000,james
2000-01-01 00:00:00.000000000,james
2002-01-01 00:00:00.000000000,steve

任何帮助将不胜感激,我无法从文档中解决。我正在使用( t_env.from_path("mySource") .window(Tumble.over("10.minutes").on("time").alias("w")) .group_by("w,word") .select("w,word,count(1) as count") .filter("count > 1") .insert_into("mySink") ) python 3.7

解决方法

如果使用描述符API,则可以通过架构将字段指定为事件时间字段:

.with_schema(  # declare the schema of the table
             Schema()
             .field("rowtime",DataTypes.TIMESTAMP())
             .rowtime(
                Rowtime()
                .timestamps_from_field("time")
                .watermarks_periodic_bounded(60000))
             .field("a",DataTypes.STRING())
             .field("b",DataTypes.STRING())
             .field("c",DataTypes.STRING())
         )

但是我仍然建议您使用DDL,一方面更易于使用,另一方面,现有的Descriptor API中存在一些错误,社区正在讨论重构Descriptor API >

,

您是否尝试过使用水印策略?如here所述,您需要使用水印策略来使用事件时间。对于pyflink而言,我个人认为以this之类的ddl格式进行声明比较容易。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-