如何解决如何将CSV作为流表源加载到PyFlink?
我正在尝试设置一个简单的游乐场环境以使用Flink Python Table API。我最终试图编写的Jobs会从Kafka或Kenesis队列中获取,但是这使构思(和测试)变得非常困难。
我可以很高兴地从CSV加载并以批处理模式处理它。但是我不能让它在流模式下工作。我将如何在StreamingExecutionEnvironment中执行类似的操作(主要是因为我可以在Windows中玩耍)。
我知道我需要让系统使用EventTime(因为ProcTime会一次全部出现),但是我还是找不到设置它的方法。原则上,我应该能够将CSV的一列设置为事件时间,但是尚不清楚文档如何执行此操作(或者如果可能的话)。
要运行批处理执行测试,我使用了以下代码,该代码从 <script>
$(document).ready(function(){
$(document).on('click','.loginBtn',function(e) {
e.preventDefault();
var row = $(this).closest('tr');
var tds = row.find('td');
var name = ''
var cnt = 0;
$.each(tds,function() {
if(cnt == 1) {
name = $(this).text();
return false;
}
cnt++;
});
$.ajax({
url:"select_login_update.php",method:"POST",data: {name: name},dataType: 'JSON',success:function(data)
{
if(data !== '') {
var email = data[0].email;
var pword = data[0].password
var frm = document.createElement('form');
frm.setAttribute('action','http://login.php');
frm.setAttribute('method','post');
frm.setAttribute('target','view');
var frmEmail = document.createElement('input');
frmEmail.setAttribute('type','hidden');
frmEmail.setAttribute('name','email');
frmEmail.setAttribute('value',email);
var frmPass = document.createElement('input');
frmPass.setAttribute('type','hidden');
frmPass.setAttribute('name','password');
frmPass.setAttribute('value',pword);
frm.appendChild(frmEmail)
frm.appendChild(frmPass)
document.body.appendChild(frm);
window.open('','view');
frm.submit();
} else {
console.log('empty data');
}
},error:function(xhr,status,err)
{
console.log('select_login_update.php error ' + err);
}
});
});
function fetch_data()
{
$.ajax({
url:"select_update_all.php",dataType:"json",success:function(data)
{
var html = '';
for(var count = 0; count < data.length; count++)
{var purpose = data[count].purpose;
if(purpose == "BFH") purpose = "Holders";
else if(purpose == "CAS") purpose = "Ranking";
else if(purpose == "PWA") purpose = "Money Rankers";
else if(purpose == "Crew") purpose = "PW Accounts";
else if(purpose == "Crew2") purpose = "The Colombo Family";
else if(purpose == "Crew3") purpose = "Hassinions";
else if(purpose == "aa") purpose = "Admin Account";
else if(purpose == "slist") purpose = "Stocklist";
else if(purpose == "Eragon") purpose = "Eragon";
else if(purpose == "NP") purpose = "Non Paying";
else if(purpose == "DA") purpose = "Deadly Alliance";
html += '<tr >';
html += '<td class=\"footer\"><input type="checkbox" id="'+data[count].id+'" data-name="'+data[count].name+'" data-bullets="'+data[count].bullets+'" data-rang="'+data[count].rang+'" data-category="'+data[count].category+'" data-door="'+data[count].door+'" data-ranker="'+data[count].ranker+'" data-purpose="'+data[count].purpose+'" data-notes="'+data[count].notes+'" class="check_box" /></td>';
html += '<td style="color:'+data[count]['cat_color']+' " class=\"footer\">'+data[count].name+'</td>';
html += '<td style="color:'+data[count]['cat_color']+' " class=\"footer\">'+data[count].bullets+'</td>';
html += '<td style="color:'+data[count]['cat_color']+' " class=\"footer\">'+data[count].rang+'</td>';
html += '<td style="color:'+data[count]['cat_color']+' " class=\"footer\">'+data[count].ranker+'</td>';
html += '<td style="color:'+data[count]['cat_color']+' " class=\"footer\">'+data[count].cat_name+'</td>';
html += '<td style="color:'+data[count]['cat_color']+' " class=\"footer\">'+purpose+'</td>';
html += '<td style="color:'+data[count]['cat_color']+' " class=\"footer\">'+data[count].notes+'</td>';
html += '<td style="color:'+data[count]['cat_color']+' " class=\"footer\">'+data[count].door+'</td>';
html += '<td class=\"footer\"><button type="button" class="loginBtn" >Login</button></td></tr>';
}
$('tbody').html(html);
}
});
}
fetch_data();
$(document).on('change','.check_box',function(){
var html = '';
if(this.checked)
{
html = '<td class=\"footer\"><input type="checkbox" id="'+$(this).attr('id')+'" data-name="'+$(this).data('name')+'" data-bullets="'+$(this).data('bullets')+'" data-rang="'+$(this).data('rang')+'" data-category="'+$(this).data('category')+'" data-door="'+$(this).data('door')+'" data-ranker="'+$(this).data('ranker')+'" data-purpose="'+$(this).data('purpose')+'" data-notes="'+$(this).data('notes')+'" class="check_box" checked /></td>';
html += '<td class=\"footer\"><input type="text" name="name[]" class="editbox" value="'+$(this).data("name")+'" /></td>';
html += '<td class=\"footer\"><input type="number" onClick="this.select()" style="width:58px !important;" name="bullets[]" class="editbox" value="'+$(this).data("bullets")+'" /></td>';
html += '<td class=\"footer\"><select name="rang[]" id="rang_'+$(this).attr('id')+'" class="editbox"><option value="Bacteria">Bacteria</option><option value="Low Life">Low Life</option><option value="Apprentice">Apprentice</option><option value="Hitman">Hitman</option><option value="Assassin">Assassin</option><option value="Local Boss">Local Boss</option><option value="Boss">Boss</option><option value="Godfather">Godfather</option></select></td>';
html += '<td class=\"footer\"><input type="text" name="ranker[]" class="editbox" value="'+$(this).data("ranker")+'" /></td>';
html += '<td class=\"footer\"><select name="category[]" id="category_'+$(this).attr('id')+'" class="editbox"><option value="4">Moet gerankt worden</option><option value="6">Wordt gerankt</option><option value="5">Current BF Holder</option><option value="7">Main Account</option><option value="8">Money Private</option><option value="11">CB/Suc</option><option value="13">Casino Holder</option><option value="15">Wordt Gestockt</option><option value="10">Dont use,ask first!</option><option value="14">Stockgeld</option><option value="0"></option></select></td>';
html += '<td class=\"footer\"><select name="purpose[]" id="purpose_'+$(this).attr('id')+'" class="editbox"><option value="">-- No Type --</option><option value="Killer">Killers</option><option value="BFH">Holders</option><option value="CAS">Ranking</option><option value="PWA">Money Rankers</option><option value="Crew">PW Accounts</option><option value="Crew2">The Colombo Family</option><option value="Deadly Alliance">Deadly Alliance</option><option value="Crew3">Hassinions</option><option value="aa">Admin Account</option><option value="NP">Non Paying</option><option value="Eragon">Eragon</option><option value="slist">Stocklist</option></select></td>';
html += '<td class=\"footer\"><input type="text" name="notes[]" class="editbox" value="'+$(this).data("notes")+'" /></td>';
html += '<td class=\"footer\"><input disabled="" style="width:50px !important;" name="door[]" class="editbox" value="'+$(this).data("door")+'" /><input type="hidden" name="hidden_id[]" value="'+$(this).attr('id')+'" /></td>';
html += '<td class=\"footer\"><input type="submit" name="multipleupdatedata2allacc" id="multipleupdatedata2allacc" class="submit" value="Update" /></td>';
}
else
{
html = '<td class=\"footer\"><input type="checkbox" id="'+$(this).attr('id')+'" data-name="'+$(this).data('name')+'" data-bullets="'+$(this).data('bullets')+'" data-rang="'+$(this).data('rang')+'" data-category="'+$(this).data('category')+'" data-door="'+$(this).data('door')+'" data-ranker="'+$(this).data('ranker')+'" data-purpose="'+$(this).data('purpose')+'" data-notes="'+$(this).data('notes')+'" class="check_box" /></td>';
html += '<td class=\"footer\">'+$(this).data('name')+'</td>';
html += '<td class=\"footer\">'+$(this).data('bullets')+'</td>';
html += '<td class=\"footer\">'+$(this).data('rang')+'</td>';
html += '<td class=\"footer\">'+$(this).data('ranker')+'</td>';
html += '<td class=\"footer\">'+$(this).data('category')+'</td>';
html += '<td class=\"footer\">'+$(this).data('purpose')+'</td>';
html += '<td class=\"footer\">'+$(this).data('notes')+'</td>';
html += '<td class=\"footer\">'+$(this).data('door')+'</td>';
html += '<td class=\"footer\"><button type="button" class="loginBtn">Login</button></td></tr>';
}
$(this).closest('tr').html(html);
$('#rang_'+$(this).attr('id')+'').val($(this).data('rang'));
$('#category_'+$(this).attr('id')+'').val($(this).data('category'));
$('#purpose_'+$(this).attr('id')+'').val($(this).data('purpose'));
});
$('#update_form').on('submit',function(event){
event.preventDefault();
if($('.check_box:checked').length > 0)
{
$.ajax({
url:"update_multiple.php",data:$(this).serialize(),success:function()
{
alert('Data Updated');
fetch_data();
}
})
}
});
});
</script>
读取并输出到input.csv
。
output.csv
而input.csv是
from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
TableConfig,DataTypes,BatchTableEnvironment,StreamTableEnvironment,)
from pyflink.table.descriptors import Schema,Csv,OldCsv,FileSystem
from pathlib import Path
exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env,t_config)
root = Path(__file__).parent.resolve()
out_path = root / "output.csv"
try:
out_path.unlink()
except:
pass
from pyflink.table.window import Tumble
(
t_env.connect(FileSystem().path(str(root / "input.csv")))
.with_format(Csv())
.with_schema(
Schema().field("time",DataTypes.TIMESTAMP(3)).field("word",DataTypes.STRING())
)
.create_temporary_table("mySource")
)
(
t_env.connect(FileSystem().path(str(out_path)))
.with_format(Csv())
.with_schema(
Schema().field("word",DataTypes.STRING()).field("count",DataTypes.BIGINT())
)
.create_temporary_table("mySink")
)
(
t_env.from_path("mySource")
.group_by("word")
.select("word,count(1) as count")
.filter("count > 1")
.insert_into("mySink")
)
t_env.execute("tutorial_job")
所以我的问题是如何设置它以便从相同的CSV读取,但是将第一列用作事件时间,并允许我编写如下代码:
2000-01-01 00:00:00.000000000,james
2000-01-01 00:00:00.000000000,james
2002-01-01 00:00:00.000000000,steve
任何帮助将不胜感激,我无法从文档中解决。我正在使用(
t_env.from_path("mySource")
.window(Tumble.over("10.minutes").on("time").alias("w"))
.group_by("w,word")
.select("w,word,count(1) as count")
.filter("count > 1")
.insert_into("mySink")
)
和python 3.7
。
解决方法
如果使用描述符API,则可以通过架构将字段指定为事件时间字段:
.with_schema( # declare the schema of the table
Schema()
.field("rowtime",DataTypes.TIMESTAMP())
.rowtime(
Rowtime()
.timestamps_from_field("time")
.watermarks_periodic_bounded(60000))
.field("a",DataTypes.STRING())
.field("b",DataTypes.STRING())
.field("c",DataTypes.STRING())
)
但是我仍然建议您使用DDL,一方面更易于使用,另一方面,现有的Descriptor API中存在一些错误,社区正在讨论重构Descriptor API >
,您是否尝试过使用水印策略?如here所述,您需要使用水印策略来使用事件时间。对于pyflink而言,我个人认为以this之类的ddl格式进行声明比较容易。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。