Skip to content

Commit

Permalink
Flink's metrics for exactly once and at least once.
Browse files Browse the repository at this point in the history
  • Loading branch information
prashastia committed Sep 21, 2024
1 parent 71bca0b commit 5141c88
Showing 1 changed file with 5 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,16 +232,8 @@ private static void runBoundedTableAPIFlinkJob(
Table sourceTable =
tEnv.from("bigQuerySourceTable")
.select($("*"))
.flatMap(
call(
"func",
Row.of(
$("unique_key"),
$("name"),
$("number"),
$("ts"),
$("description"))))
.as("unique_key", "name", "number", "ts", "description");
.flatMap(call("func", Row.of($("name"), $("number"), $("ts"))))
.as("name", "number", "ts");

BigQueryTableConfig sinkTableConfig =
BigQuerySinkTableConfig.newBuilder()
Expand Down Expand Up @@ -490,23 +482,13 @@ private static void runBoundedJoinFlinkJob(

/** Function to flatmap the Table API source Catalog Table. */
@FunctionHint(
input =
@DataTypeHint(
"ROW<`unique_key` STRING, `name` STRING, `number` BIGINT, `ts` TIMESTAMP(6), `description` STRING>"),
output =
@DataTypeHint(
"ROW<`unique_key` STRING, `name` STRING, `number` BIGINT, `ts` TIMESTAMP(6), `description` STRING>"))
input = @DataTypeHint("ROW<`name` STRING, `number` BIGINT, `ts` TIMESTAMP(6)>"),
output = @DataTypeHint("ROW<`name` STRING, `number` BIGINT, `ts` TIMESTAMP(6)>"))
public static class MyFlatMapFunction extends TableFunction<Row> {

public void eval(Row row) {
String str = (String) row.getField("name");
collect(
Row.of(
row.getField("unique_key"),
str + "_write_test",
row.getField("number"),
row.getField("ts"),
row.getField("description")));
collect(Row.of(str + "_write_test", row.getField("number"), row.getField("ts")));
}
}
}

0 comments on commit 5141c88

Please sign in to comment.