diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampFunction.java new file mode 100644 index 0000000000..698942badf --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampFunction.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; +import org.apache.inlong.sdk.transform.process.utils.DateUtil; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.List; + +/** + * TimestampFunction -> timestamp(expr1[, expr2]) + * description: + * - return NULL if expr1 or expr2 is NULL. + * - return the date or datetime expression expr as a datetime value if there is only one parameter + * - return the result of the date or date time expression expr1 plus the time expression expr2 if there are two parameters + */ +@TransformFunction(names = {"timestamp"}) +public class TimestampFunction implements ValueParser { + + private ValueParser dateTimeExprParser; + private ValueParser timeExprParser; + + public TimestampFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + dateTimeExprParser = OperatorTools.buildParser(expressions.get(0)); + if (expressions.size() == 2) { + timeExprParser = OperatorTools.buildParser(expressions.get(1)); + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object dateTimeExprObj = dateTimeExprParser.parse(sourceData, rowIndex, context); + if (dateTimeExprObj == null) { + return null; + } + String dateTimeStr = dateTimeExprObj.toString(); + LocalDateTime localDateTime = DateUtil.parseLocalDateTime(dateTimeExprObj.toString()); + if (localDateTime == null) { + // Not meeting the format requirements + return null; + } + boolean hasMicroSecond = dateTimeStr.indexOf('.') != -1; + String formatStr = DateUtil.YEAR_TO_SECOND; + // Support the second parameter + if (timeExprParser != null) { + Object timeExprObj = timeExprParser.parse(sourceData, rowIndex, context); + if (timeExprObj != null) { + String timeStr = timeExprObj.toString(); + LocalTime localTime = DateUtil.parseLocalTime(timeStr); + if (localTime == null) { + // Not meeting the format requirements + return null; + } + hasMicroSecond |= timeStr.indexOf('.') != -1; + localDateTime = DateUtil.dateAdd(localDateTime, localTime); + } else { + return null; + } + } + if (hasMicroSecond) { + formatStr = DateUtil.YEAR_TO_MICRO; + } + return localDateTime.format(DateUtil.getDateTimeFormatter(formatStr)); + } +} \ No newline at end of file diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java index 998f838cf2..e703bb543f 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java @@ -24,19 +24,28 @@ import java.time.LocalTime; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoField; -import java.util.Arrays; -import java.util.List; +import java.util.LinkedHashMap; import java.util.Map; public class DateUtil { // Need to follow this order - private static final List DATE_TIME_FORMATTER_LIST = Arrays.asList( - DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"), - DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"), - DateTimeFormatter.ofPattern("yyyy-MM-dd")); - private static final List TIME_FORMATTER_LIST = Arrays.asList( - DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS"), DateTimeFormatter.ofPattern("HH:mm:ss")); + private static final Map DATE_TIME_FORMATTER_MAP = new LinkedHashMap<>(); + private static final Map TIME_FORMATTER_MAP = new LinkedHashMap<>(); + public static String YEAR_TO_MICRO = "yyyy-MM-dd HH:mm:ss.SSSSSS"; + public static String YEAR_TO_SECOND = "yyyy-MM-dd HH:mm:ss"; + public static String YEAR_TO_MONTH = "yyyy-MM-dd"; + public static String HOUR_TO_MICRO = "HH:mm:ss.SSSSSS"; + public static String HOUR_TO_SECOND = "HH:mm:ss"; + + static { + DATE_TIME_FORMATTER_MAP.put(YEAR_TO_MICRO, DateTimeFormatter.ofPattern(YEAR_TO_MICRO)); + DATE_TIME_FORMATTER_MAP.put(YEAR_TO_SECOND, DateTimeFormatter.ofPattern(YEAR_TO_SECOND)); + DATE_TIME_FORMATTER_MAP.put(YEAR_TO_MONTH, DateTimeFormatter.ofPattern(YEAR_TO_MONTH)); + + TIME_FORMATTER_MAP.put(HOUR_TO_MICRO, DateTimeFormatter.ofPattern(HOUR_TO_MICRO)); + TIME_FORMATTER_MAP.put(HOUR_TO_SECOND, DateTimeFormatter.ofPattern(HOUR_TO_SECOND)); + } /** * Time calculation @@ -56,8 +65,27 @@ public static String dateAdd(String dateStr, Pair> intervalPair, int sign, LocalDateTime dateTime, String dataStr) { int factor = intervalPair.getKey(); @@ -127,16 +166,14 @@ private static String addDateTime(Pair> interval return null; } } - - String result = dateTime.toLocalDate().toString(); if (hasTime) { if (hasMicroSecond) { - result += " " + dateTime.toLocalTime().format(TIME_FORMATTER_LIST.get(0)); + return dateTime.format(DATE_TIME_FORMATTER_MAP.get(YEAR_TO_MICRO)); } else { - result += " " + dateTime.toLocalTime().format(TIME_FORMATTER_LIST.get(1)); + return dateTime.format(DATE_TIME_FORMATTER_MAP.get(YEAR_TO_SECOND)); } } - return result; + return dateTime.toLocalDate().toString(); } private static String addTime(Pair> intervalPair, int sign, LocalTime time, @@ -168,9 +205,9 @@ private static String addTime(Pair> intervalPair } if (hasMicroSecond) { - return time.format(TIME_FORMATTER_LIST.get(0)); + return time.format(TIME_FORMATTER_MAP.get(HOUR_TO_MICRO)); } else { - return time.format(TIME_FORMATTER_LIST.get(1)); + return time.format(TIME_FORMATTER_MAP.get(HOUR_TO_SECOND)); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestTimestampFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestTimestampFunction.java new file mode 100644 index 0000000000..1efb66cd6b --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestTimestampFunction.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.temporal; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestTimestampFunction extends AbstractFunctionTemporalTestBase { + + @Test + public void testTimestamp() throws Exception { + String transformSql = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select timestamp(string1,string2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: timestamp('2003-12-31 12:00:00.600000','12:00:00') + output = processor.transform("2003-12-31 12:00:00.600000|12:00:00", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=2004-01-01 00:00:00.600000", output.get(0)); + + // case2: timestamp('2003-12-31 12:00:00','12:00:00.600000') + output = processor.transform("2003-12-31 12:00:00|12:00:00.600000", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=2004-01-01 00:00:00.600000", output.get(0)); + + // case3: timestamp('2003-12-31','12:00:00.600000') + output = processor.transform("2003-12-31|12:00:00.600000", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=2003-12-31 12:00:00.600000", output.get(0)); + + transformSql = "select timestamp(string1,stringx) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case4: timestamp('2003-12-31 12:00:00.600000',null) + output = processor.transform("2003-12-31 12:00:00.600000|12:00:00", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + transformSql = "select timestamp(string1) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case5: timestamp('2003-12-31 12:00:00') + output = processor.transform("2003-12-31 12:00:00", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=2003-12-31 12:00:00", output.get(0)); + + // case6: timestamp('2003-12-31') + output = processor.transform("2003-12-31", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=2003-12-31 00:00:00", output.get(0)); + + } +}