博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Apache Beam和BigQuery的错误处理(Java SDK)
阅读量:5879 次
发布时间:2019-06-19

本文共 5775 字,大约阅读时间需要 19 分钟。

设计管道

假设我们有一个简单的场景:事件正在流向Kafka,我们希望使用管道中的事件,进行一些转换并将结果写入BigQuery表,以使数据可用于分析。

可以在作业开始之前创建BigQuery表,或者Beam本身可以创建它。

代码看起来很简单:

EventsProcessingOptions options = PipelineOptionsFactory。fromArgs(args)。withValidation()

。作为(EventsProcessingOptions。类);

管道 p = 管道。创造(选项);

PCollection tableRows =

//阅读kafka主题               p。apply(“kafka-topic-read”,kafkaReader)                。申请(“海边的卡夫卡值”,MapElements。到(TypeDescriptors。字符串())                        。通过(记录 - > 记录。getKV。()的getValue()))                //将值转换为JsonNode                。申请(“字符串到JSON” ,ParseJsons。的(JsonNode。类))                //创建TableRow                。申请(“建设-表行”,帕尔多。的(新 EventsRowFn()))                //将表格行保存到BigQuery                。apply(“BQ-write”,BigQueryIO。< TableRowWithEvent > write()                        。到(tableSpec)                        。withCreateDisposition(BigQueryIO。写。createDisposition会。CREATE_NEVER)                        。withWriteDisposition(BigQueryIO。收件。writeDisposition会。WRITE_APPEND);

少了什么东西?

在现实世界中,可能会发生错误,在大多数情况下,我们将需要处理它们。

在上面的管道中,当我们尝试将事件从Kafka解析为JsonNode,转换期间以及BigQuery插入阶段时,可能会发生错误。

错误处理计划

对于每个错误,我们将在不同的BigQuery表中创建一行,其中包含更多信息,例如来自Kafka的origin事件。

一旦发生错误,我们就可以分析错误记录并全面了解它。

然后,我们可以修复管道代码,重置/更改Kafka使用者组偏移,并再次使用固定代码重播事件。

我们还可以修复事件本身(例如,在JSON解析错误中)并将其重新发送到Kafka。

处理转换错误

让我们快速浏览一下我们的转换函数:

@ProcessElement

public void processElement(@Element JsonNode > element,OutputReceiver < TableRowWithEvent > out){
TableRow convertedRow = new TableRow();
insertLong(元件。得到(“server_time” ),“server_time” ,convertedRow);
insertFloat(元件。得到(“screen_dpi” ),“screen_dpi” ,convertedRow);
//更多转变来
背景。输出(输出);
}
private void insertLong(JsonNode value,String key,TableRow convertedRow){

String  valueToInsert  =  value。asText();    如果(valueToInsert  !=  空 &&  !valueToInsert。的isEmpty()){        long  longValue  =  Long。parseLong(valueToInsert);        convertedRow。set(key,longValue);    }

}

private void insertFloat(JsonNode value,String key,TableRow convertedRow){

String  valueToInsert  =  getStringValue(value);    if(valueToInsert  !=  null){        float  floatValue  =  Float。parseFloat(valueToInsert);        convertedRow。set(key,floatValue);    }

}

是的,我们可能在解析过程中失败,因为我们将字符串解析为Float / Long,并且这对无法转换的数据失败。

我们需要从主函数输出中排除失败的数据并将这些数据发送到管道中的不同路径,然后我们将它们保存到BigQuery中的错误表中。

怎么样?让我们使用标签

当我们在ParDo 函数末尾输出一个元素时 ,我们可以在一个标签内输出它。然后我们可以获取所有标记为特定名称的元素,并对它们执行一些处理。

这里我们将使用两个标签,一个是MAIN标签,它包含所有成功的记录,另一个包含所有错误和一些上下文,例如 DEADLETTER_OUT。

该主标记必须与ParDo 函数本身的OUTPUT类型相同,并且 所有其他标记可以是不同类型。

现在,我们的 ParDo 函数将如下所示(注意标记添加):

@ProcessElement

public void processElement(@Element JsonNode > element,OutputReceiver < TableRowWithEvent > out){
public static final TupleTag < JsonNode > MAIN_OUT = new TupleTag < JsonNode >(){};
public static final TupleTag < BigQueryProcessError > DEADLETTER_OUT = new TupleTag < BigQueryProcessError >(){};
TableRow convertedRow = new TableRow();
尝试 {

insertLong(元件。得到(“server_time” ),“server_time” ,convertedRow);  insertFloat(元件。得到(“screen_dpi” ),“screen_dpi” ,convertedRow);  //更多转变来  背景。输出(输出);

} catch(例外 e){

记录器。误差(“失败变换” + ë。的getMessage(),ê);  背景。输出(DEADLETTER_OUT,新 BigQueryProcessError(convertedRow。的toString(),ê。的getMessage(),ERROR_TYPE。BQ_PROCESS,originEvent));

}

}
我们如何通过标签处理元素?让我们改变管道,并进行拆分。该 MAIN 元素将大量查询表和 DEADLETTER_OUT 内容将被发送到错误表。

EventsProcessingOptions options = PipelineOptionsFactory。fromArgs(args)。withValidation()

。作为(EventsProcessingOptions。类);

管道 p = 管道。创造(选项);

PCollectionTuple tableRows =

//阅读kafka主题               p。apply(“kafka-topic-read”,kafkaReader)                。申请(“海边的卡夫卡值”,MapElements。到(TypeDescriptors。字符串())                        。通过(记录 - > 记录。getKV。()的getValue()))                //将值转换为JsonNode                。申请(“字符串到JSON” ,ParseJsons。的(JsonNode。类))                //创建TableRow                。申请(“建设-表行”,帕尔多。的(新 EventsRowFn())。withOutputTags(MAIN_OUT,TupleTagList。的(DEADLETTER_OUT)));              //将MAIN标签保存到BQ              tableRows                  。得到(MAIN_OUT)                  。apply(“BQ-write”,BigQueryIO。< TableRowWithEvent > write()                  。到(tableSpec)                  。withCreateDisposition(BigQueryIO。写。createDisposition会。CREATE_NEVER)                  。withWriteDisposition(BigQueryIO。收件。writeDisposition会。WRITE_APPEND);              //将DEADLETTER_OUT保存到BQ错误表              tableRows                。得到(DEADLETTER_OUT)                。申请(“BQ-进程的错误提取物”,帕尔多。的(新 BigQueryProcessErrorExtracFn()))                。申请(“BQ-进程的错误写”,BigQueryIO。writeTableRows()                        。to(errTableSpec)                        。withJsonSchema(errSchema)                        。withCreateDisposition(BigQueryIO。写。createDisposition会。CREATE_IF_NEEDED)                        。withWriteDisposition(BigQueryIO。收件。writeDisposition会。WRITE_APPEND));

p。run();

处理BigQuery插入错误
为了在BigQuery插入期间处理错误,我们必须使用BiqQueryIO API。

让我们放大写入阶段。并稍微改变一下:

WriteResult writeResult = tableRowToInsertCollection

。申请(“BQ-写”,BigQueryIO。写()    //指定将返回失败的行及其错误            。withExtendedErrorInfo()            。到(tableSpec)            。withCreateDisposition(BigQueryIO。写。createDisposition会。CREATE_NEVER)            。withWriteDisposition(BigQueryIO。写。writeDisposition会。WRITE_APPEND)    //指定处理失败插入的策略。            。withFailedInsertRetryPolicy(InsertRetryPolicy。retryTransientErrors()));

//将失败的行及其错误写入错误表

写结果

。getFailedInsertsWithErr()    。申请(窗口。到(FixedWindows。的(持续时间。standardMinutes(5))))    。申请(“BQ-插入错误提取物”,帕尔多。的(新 BigQueryInsertErrorExtractFn(tableRowToInsertView))。withSideInputs(tableRowToInsertView))    。申请(“BQ-插入错误写”,BigQueryIO。writeTableRows()            。to(errTableSpec)            。withJsonSchema(errSchema)            。withCreateDisposition(BigQueryIO。写。createDisposition会。CREATE_IF_NEEDED)            。withWriteDisposition(BigQueryIO。收件。writeDisposition会。WRITE_APPEND));

在上面的代码片段中,我们从BigQueryIO获取失败的TableRows及其错误。现在我们可以将它们转换为另一个 TableRow 并将它们写入错误表。在这种情况下,我们让作业在需要时创建表。

转载地址:http://ydcix.baihongyu.com/

你可能感兴趣的文章
POJ 2002
查看>>
MVC和MTV结构分析
查看>>
(转)微信网页扫码登录的实现
查看>>
mariadb启动报错:[ERROR] Can't start server : Bind on unix socket: Permission denied
查看>>
nginx的信号量
查看>>
《携程的技术演进之路》读后感
查看>>
股票新闻速递 隐私声明
查看>>
LeetCode--206--反转链表
查看>>
matlab list函数参数,Matlab 函数参数汇总
查看>>
云im php,网易云IM
查看>>
测试linux vsftpd,vsftpd配置、测试
查看>>
河南农业大学c语言平时作业答案,河南农业大学2004-2005学年第二学期《C语言程序设计》期末考试试卷(2份,有答案)...
查看>>
c语言打开alist文件,C语言 文件的打开与关闭详解及示例代码
查看>>
c语言 中的共用体和结构体如何联合定义,结构体(Struct)、联合体(Union)和位域
查看>>
iPad pro能运行c语言吗,关于iPad Pro的五大槽点 你必须知道
查看>>
wordcount源代码c语言,Word Count程序(C语言实现)
查看>>
( 译、持续更新 ) JavaScript 上分小技巧(一)
查看>>
CI框架 -- URI 路由
查看>>
RPi 2B Raspbian system install
查看>>
HTTP 响应
查看>>