如何解决将字符串转换为DynamicRecords并追加到DynamicFrame
在下面的代码中,我想创建一个新的AWS-glue / spark DynamicRecord。
有损坏的数据传入,我已使用正则表达式将其提取。
在下面的for循环中,每个提取的记录看起来像这样的{"pcode":"999.99","prodno":"123456"}
。
我需要将其转换为DynamicRecord
并将其附加到主动态框架dynamic_frame
。
然后删除错误的数据记录。
`
glueContext.forEachBatch(data_frame_datasource0,(dataFrame: Dataset[Row],batchId: Long) =>
{
if (dataFrame.count() > 0)
{
val dynamic_frame = DynamicFrame(dataFrame,glueContext)
def getGoodRecFromBadData(rec: DynamicRecord): DynamicRecord =
{
val crrptVal: String = getFieldValue("_corrupt_record",rec)
val crptVal = s"""${crrptVal}"""
if(crrptVal != "")
{
var recs_str_list = rec_patt.findAllIn(crptVal).toList
//now I have a list
//{"pcode":"999.98","prodno":"123456"}
//{"pcode":"999.99","prodno":"654321"}
//I'd like to either take that list and append directly as one item per row on the DF
//OR iterate over the list and append each item to the main DF
for(r <- recs_str_list)
{
//here is where I want to make a DynamicRecord from r and append it to
//dynamic_frame
new_rec = ??
//then delete the _corrupt_record,which I think can be done with 'dropField'
}
}
}
}
}
`
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。