Skip to content

Commit

Permalink
[INLONG-11154][SDK] Transform SQL supports TIMESTAMP function
Browse files Browse the repository at this point in the history
  • Loading branch information
ZKpLo committed Sep 21, 2024
1 parent 260a12d commit d77dc08
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
import org.apache.inlong.sdk.transform.process.utils.DateUtil;

import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;

import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.List;

/**
* TimestampFunction -> timestamp(expr1[, expr2])
* description:
* - return NULL if expr1 or expr2 is NULL.
* - return the date or datetime expression expr as a datetime value if there is only one parameter
* - return the result of the date or date time expression expr1 plus the time expression expr2 if there are two parameters
*/
@TransformFunction(names = {"timestamp"})
public class TimestampFunction implements ValueParser {

private ValueParser dateTimeExprParser;
private ValueParser timeExprParser;

public TimestampFunction(Function expr) {
List<Expression> expressions = expr.getParameters().getExpressions();
dateTimeExprParser = OperatorTools.buildParser(expressions.get(0));
if (expressions.size() == 2) {
timeExprParser = OperatorTools.buildParser(expressions.get(1));
}
}

@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
Object dateTimeExprObj = dateTimeExprParser.parse(sourceData, rowIndex, context);
if (dateTimeExprObj == null) {
return null;
}
String dateTimeStr = dateTimeExprObj.toString();
LocalDateTime localDateTime = DateUtil.parseLocalDateTime(dateTimeExprObj.toString());
if (localDateTime == null) {
// Not meeting the format requirements
return null;
}
boolean hasMicroSecond = dateTimeStr.indexOf('.') != -1;
String formatStr = DateUtil.YEAR_TO_SECOND;
// Support the second parameter
if (timeExprParser != null) {
Object timeExprObj = timeExprParser.parse(sourceData, rowIndex, context);
if (timeExprObj != null) {
String timeStr = timeExprObj.toString();
LocalTime localTime = DateUtil.parseLocalTime(timeStr);
if (localTime == null) {
// Not meeting the format requirements
return null;
}
hasMicroSecond |= timeStr.indexOf('.') != -1;
localDateTime = DateUtil.dateAdd(localDateTime, localTime);
} else {
return null;
}
}
if (hasMicroSecond) {
formatStr = DateUtil.YEAR_TO_MICRO;
}
return localDateTime.format(DateUtil.getDateTimeFormatter(formatStr));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,28 @@
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoField;
import java.util.Arrays;
import java.util.List;
import java.util.LinkedHashMap;
import java.util.Map;

public class DateUtil {

// Need to follow this order
private static final List<DateTimeFormatter> DATE_TIME_FORMATTER_LIST = Arrays.asList(
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"),
DateTimeFormatter.ofPattern("yyyy-MM-dd"));
private static final List<DateTimeFormatter> TIME_FORMATTER_LIST = Arrays.asList(
DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS"), DateTimeFormatter.ofPattern("HH:mm:ss"));
private static final Map<String, DateTimeFormatter> DATE_TIME_FORMATTER_MAP = new LinkedHashMap<>();
private static final Map<String, DateTimeFormatter> TIME_FORMATTER_MAP = new LinkedHashMap<>();
public static String YEAR_TO_MICRO = "yyyy-MM-dd HH:mm:ss.SSSSSS";
public static String YEAR_TO_SECOND = "yyyy-MM-dd HH:mm:ss";
public static String YEAR_TO_MONTH = "yyyy-MM-dd";
public static String HOUR_TO_MICRO = "HH:mm:ss.SSSSSS";
public static String HOUR_TO_SECOND = "HH:mm:ss";

static {
DATE_TIME_FORMATTER_MAP.put(YEAR_TO_MICRO, DateTimeFormatter.ofPattern(YEAR_TO_MICRO));
DATE_TIME_FORMATTER_MAP.put(YEAR_TO_SECOND, DateTimeFormatter.ofPattern(YEAR_TO_SECOND));
DATE_TIME_FORMATTER_MAP.put(YEAR_TO_MONTH, DateTimeFormatter.ofPattern(YEAR_TO_MONTH));

TIME_FORMATTER_MAP.put(HOUR_TO_MICRO, DateTimeFormatter.ofPattern(HOUR_TO_MICRO));
TIME_FORMATTER_MAP.put(HOUR_TO_SECOND, DateTimeFormatter.ofPattern(HOUR_TO_SECOND));
}

/**
* Time calculation
Expand All @@ -56,8 +65,27 @@ public static String dateAdd(String dateStr, Pair<Integer, Map<ChronoField, Long
return null;
}

Object dateParserObj = null;
for (DateTimeFormatter dateTimeFormatter : DATE_TIME_FORMATTER_LIST) {
Object dateParserObj = parseLocalDateTime(dateStr);
if (dateParserObj != null) {
return addDateTime(intervalPair, sign, (LocalDateTime) dateParserObj, dateStr);
}
dateParserObj = parseLocalTime(dateStr);
if (dateParserObj != null) {
return addTime(intervalPair, sign, (LocalTime) dateParserObj, dateStr);
}
return null;
}

public static LocalDateTime dateAdd(LocalDateTime localDateTime, LocalTime localTime) {
return localDateTime.plusHours(localTime.getHour())
.plusMinutes(localTime.getMinute())
.plusSeconds(localTime.getSecond())
.plusNanos(localTime.getNano());
}

public static LocalDateTime parseLocalDateTime(String dateStr) {
LocalDateTime dateParserObj = null;
for (DateTimeFormatter dateTimeFormatter : DATE_TIME_FORMATTER_MAP.values()) {
try {
dateParserObj = LocalDateTime.parse(dateStr, dateTimeFormatter);
} catch (Exception e) {
Expand All @@ -68,24 +96,35 @@ public static String dateAdd(String dateStr, Pair<Integer, Map<ChronoField, Long
}
}
if (dateParserObj != null) {
return addDateTime(intervalPair, sign, (LocalDateTime) dateParserObj, dateStr);
return dateParserObj;
}
}
return null;
}

for (DateTimeFormatter dateTimeFormatter : TIME_FORMATTER_LIST) {
public static LocalTime parseLocalTime(String dateStr) {
LocalTime dateParserObj = null;
for (DateTimeFormatter dateTimeFormatter : TIME_FORMATTER_MAP.values()) {
try {
dateParserObj = LocalTime.parse(dateStr, dateTimeFormatter);
} catch (Exception ignored) {

}
if (dateParserObj != null) {
return addTime(intervalPair, sign, (LocalTime) dateParserObj, dateStr);
return dateParserObj;
}
}

return null;
}

public static DateTimeFormatter getDateTimeFormatter(String formatStr) {
DateTimeFormatter formatter = DATE_TIME_FORMATTER_MAP.get(formatStr);
if (formatter != null) {
return formatter;
}
return TIME_FORMATTER_MAP.get(formatStr);
}

private static String addDateTime(Pair<Integer, Map<ChronoField, Long>> intervalPair, int sign,
LocalDateTime dateTime, String dataStr) {
int factor = intervalPair.getKey();
Expand Down Expand Up @@ -127,16 +166,14 @@ private static String addDateTime(Pair<Integer, Map<ChronoField, Long>> interval
return null;
}
}

String result = dateTime.toLocalDate().toString();
if (hasTime) {
if (hasMicroSecond) {
result += " " + dateTime.toLocalTime().format(TIME_FORMATTER_LIST.get(0));
return dateTime.format(DATE_TIME_FORMATTER_MAP.get(YEAR_TO_MICRO));
} else {
result += " " + dateTime.toLocalTime().format(TIME_FORMATTER_LIST.get(1));
return dateTime.format(DATE_TIME_FORMATTER_MAP.get(YEAR_TO_SECOND));
}
}
return result;
return dateTime.toLocalDate().toString();
}

private static String addTime(Pair<Integer, Map<ChronoField, Long>> intervalPair, int sign, LocalTime time,
Expand Down Expand Up @@ -168,9 +205,9 @@ private static String addTime(Pair<Integer, Map<ChronoField, Long>> intervalPair
}

if (hasMicroSecond) {
return time.format(TIME_FORMATTER_LIST.get(0));
return time.format(TIME_FORMATTER_MAP.get(HOUR_TO_MICRO));
} else {
return time.format(TIME_FORMATTER_LIST.get(1));
return time.format(TIME_FORMATTER_MAP.get(HOUR_TO_SECOND));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function.temporal;

import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
import org.apache.inlong.sdk.transform.pojo.TransformConfig;
import org.apache.inlong.sdk.transform.process.TransformProcessor;

import org.junit.Assert;
import org.junit.Test;

import java.util.HashMap;
import java.util.List;

public class TestTimestampFunction extends AbstractFunctionTemporalTestBase {

@Test
public void testTimestamp() throws Exception {
String transformSql = null;
TransformConfig config = null;
TransformProcessor<String, String> processor = null;
List<String> output = null;

transformSql = "select timestamp(string1,string2) from source";
config = new TransformConfig(transformSql);
processor = TransformProcessor
.create(config, SourceDecoderFactory.createCsvDecoder(csvSource),
SinkEncoderFactory.createKvEncoder(kvSink));
// case1: timestamp('2003-12-31 12:00:00.600000','12:00:00')
output = processor.transform("2003-12-31 12:00:00.600000|12:00:00", new HashMap<>());
Assert.assertEquals(1, output.size());
Assert.assertEquals("result=2004-01-01 00:00:00.600000", output.get(0));

// case2: timestamp('2003-12-31 12:00:00','12:00:00.600000')
output = processor.transform("2003-12-31 12:00:00|12:00:00.600000", new HashMap<>());
Assert.assertEquals(1, output.size());
Assert.assertEquals("result=2004-01-01 00:00:00.600000", output.get(0));

// case3: timestamp('2003-12-31','12:00:00.600000')
output = processor.transform("2003-12-31|12:00:00.600000", new HashMap<>());
Assert.assertEquals(1, output.size());
Assert.assertEquals("result=2003-12-31 12:00:00.600000", output.get(0));

transformSql = "select timestamp(string1,stringx) from source";
config = new TransformConfig(transformSql);
processor = TransformProcessor
.create(config, SourceDecoderFactory.createCsvDecoder(csvSource),
SinkEncoderFactory.createKvEncoder(kvSink));
// case4: timestamp('2003-12-31 12:00:00.600000',null)
output = processor.transform("2003-12-31 12:00:00.600000|12:00:00", new HashMap<>());
Assert.assertEquals(1, output.size());
Assert.assertEquals("result=", output.get(0));

transformSql = "select timestamp(string1) from source";
config = new TransformConfig(transformSql);
processor = TransformProcessor
.create(config, SourceDecoderFactory.createCsvDecoder(csvSource),
SinkEncoderFactory.createKvEncoder(kvSink));
// case5: timestamp('2003-12-31 12:00:00')
output = processor.transform("2003-12-31 12:00:00", new HashMap<>());
Assert.assertEquals(1, output.size());
Assert.assertEquals("result=2003-12-31 12:00:00", output.get(0));

// case6: timestamp('2003-12-31')
output = processor.transform("2003-12-31", new HashMap<>());
Assert.assertEquals(1, output.size());
Assert.assertEquals("result=2003-12-31 00:00:00", output.get(0));

}
}

0 comments on commit d77dc08

Please sign in to comment.