Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SEDONA-303] Port all Sedona Spark functions to Sedona Flink -- Step 7 #896

Merged
merged 3 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/api/flink/Constructor.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,20 @@ SELECT ST_Point(x, y) AS pointshape
FROM pointtable
```

## ST_PointZ

Introduction: Construct a Point from X, Y and Z and an optional srid. If srid is not set, it defaults to 0 (unknown).

Format: `ST_PointZ (X:decimal, Y:decimal, Z:decimal)`
Format: `ST_PointZ (X:decimal, Y:decimal, Z:decimal, srid:integer)`

Since: `v1.5.0`

SQL example:
```sql
SELECT ST_PointZ(1.0, 2.0, 3.0) AS pointshape
```

## ST_PointFromText

Introduction: Construct a Point from Text, delimited by Delimiter
Expand Down
62 changes: 61 additions & 1 deletion docs/api/flink/Predicate.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,21 @@ FROM pointdf
WHERE ST_Contains(ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), pointdf.arealandmark)
```

## ST_Crosses

Introduction: Return true if A crosses B

Format: `ST_Crosses (A:geometry, B:geometry)`

Since: `v1.5.0`

SQL example:
```sql
SELECT *
FROM pointdf
WHERE ST_Crosses(pointdf.arealandmark, ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0))
```

## ST_Disjoint

Introduction: Return true if A and B are disjoint
Expand All @@ -21,13 +36,28 @@ Format: `ST_Disjoint (A:geometry, B:geometry)`

Since: `v1.2.1`

Spark SQL example:
SQL example:
```sql
SELECT *
FROM pointdf
WHERE ST_Disjoinnt(ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), pointdf.arealandmark)
```

## ST_Equals

Introduction: Return true if A equals to B

Format: `ST_Equals (A:geometry, B:geometry)`

Since: `v1.5.0`

SQL example:
```sql
SELECT *
FROM pointdf
WHERE ST_Equals(pointdf.arealandmark, ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0))
```

## ST_Intersects

Introduction: Return true if A intersects B
Expand All @@ -43,6 +73,36 @@ FROM pointdf
WHERE ST_Intersects(ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), pointdf.arealandmark)
```

## ST_Overlaps

Introduction: Return true if A overlaps B

Format: `ST_Overlaps (A:geometry, B:geometry)`

Since: `v1.5.0`

SQL example:
```sql
SELECT *
FROM geom
WHERE ST_Overlaps(geom.geom_a, geom.geom_b)
```

## ST_Touches

Introduction: Return true if A touches B

Format: `ST_Touches (A:geometry, B:geometry)`

Since: `v1.5.0`

SQL example:
```sql
SELECT *
FROM pointdf
WHERE ST_Touches(pointdf.arealandmark, ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0))
```

## ST_Within

Introduction: Return true if A is within B
Expand Down
5 changes: 5 additions & 0 deletions flink/src/main/java/org/apache/sedona/flink/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public static UserDefinedFunction[] getFuncs() {
new Aggregators.ST_Envelope_Aggr(),
new Aggregators.ST_Union_Aggr(),
new Constructors.ST_Point(),
new Constructors.ST_PointZ(),
new Constructors.ST_PointFromText(),
new Constructors.ST_LineStringFromText(),
new Constructors.ST_LineFromText(),
Expand Down Expand Up @@ -138,11 +139,15 @@ public static UserDefinedFunction[] getPredicates() {
return new UserDefinedFunction[]{
new Predicates.ST_Intersects(),
new Predicates.ST_Contains(),
new Predicates.ST_Crosses(),
new Predicates.ST_Within(),
new Predicates.ST_Covers(),
new Predicates.ST_CoveredBy(),
new Predicates.ST_Disjoint(),
new Predicates.ST_Equals(),
new Predicates.ST_OrderingEquals(),
new Predicates.ST_Overlaps(),
new Predicates.ST_Touches(),
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,19 @@ private static Geometry getGeometryByType(String geom, String inputDelimiter, Ge
public static class ST_Point extends ScalarFunction {
@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
public Geometry eval(@DataTypeHint("Double") Double x, @DataTypeHint("Double") Double y) throws ParseException {
Coordinate coordinates = new Coordinate(x, y);
GeometryFactory geometryFactory = new GeometryFactory();
return geometryFactory.createPoint(coordinates);
return org.apache.sedona.common.Constructors.point(x, y);
}
}

public static class ST_PointZ extends ScalarFunction {
@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
public Geometry eval(@DataTypeHint("Double") Double x, @DataTypeHint("Double") Double y, @DataTypeHint("Double") Double z) throws ParseException {
return eval(x, y, z, 0);
}

@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
public Geometry eval(@DataTypeHint("Double") Double x, @DataTypeHint("Double") Double y, @DataTypeHint("Double") Double z, @DataTypeHint("Integer") Integer srid) throws ParseException {
return org.apache.sedona.common.Constructors.pointZ(x, y, z, srid);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jt
{
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
return geom1.intersects(geom2);
return org.apache.sedona.common.Predicates.intersects(geom1, geom2);
}
}

Expand All @@ -53,7 +53,7 @@ public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jt
{
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
return geom1.contains(geom2);
return org.apache.sedona.common.Predicates.contains(geom1, geom2);
}
}

Expand All @@ -72,7 +72,7 @@ public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jt
{
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
return geom1.within(geom2);
return org.apache.sedona.common.Predicates.within(geom1, geom2);
}
}

Expand All @@ -92,7 +92,7 @@ public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jt
{
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
return geom1.covers(geom2);
return org.apache.sedona.common.Predicates.covers(geom1, geom2);
}
}

Expand All @@ -112,7 +112,25 @@ public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jt
{
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
return geom1.coveredBy(geom2);
return org.apache.sedona.common.Predicates.coveredBy(geom1, geom2);
}
}

public static class ST_Crosses extends ScalarFunction
{
/**
* Constructor for relation checking without duplicate removal
*/
public ST_Crosses()
{
}

@DataTypeHint("Boolean")
public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1, @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2)
{
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
return org.apache.sedona.common.Predicates.crosses(geom1, geom2);
}
}

Expand All @@ -132,7 +150,27 @@ public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jt
{
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
return geom1.disjoint(geom2);
return org.apache.sedona.common.Predicates.disjoint(geom1, geom2);
}
}

public static class ST_Equals
extends ScalarFunction
{

/**
* Constructor for relation checking without duplicate removal
*/
public ST_Equals()
{
}

@DataTypeHint("Boolean")
public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1, @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2)
{
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
return org.apache.sedona.common.Predicates.equals(geom1, geom2);
}
}

Expand All @@ -152,7 +190,47 @@ public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jt
{
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
return geom1.equalsExact(geom2);
return org.apache.sedona.common.Predicates.orderingEquals(geom1, geom2);
}
}

public static class ST_Overlaps
extends ScalarFunction
{

/**
* Constructor for relation checking without duplicate removal
*/
public ST_Overlaps()
{
}

@DataTypeHint("Boolean")
public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1, @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2)
{
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
return org.apache.sedona.common.Predicates.overlaps(geom1, geom2);
}
}

public static class ST_Touches
extends ScalarFunction
{

/**
* Constructor for relation checking without duplicate removal
*/
public ST_Touches()
{
}

@DataTypeHint("Boolean")
public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1, @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2)
{
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
return org.apache.sedona.common.Predicates.touches(geom1, geom2);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.GeometryFactory;
import org.locationtech.jts.geom.Point;
import org.locationtech.jts.geom.Polygon;
import org.wololo.jts2geojson.GeoJSONReader;

Expand Down Expand Up @@ -73,6 +74,31 @@ public void test2DPoint() {
assertEquals(expected, result);
}

@Test
public void testPointZ() {
List<Row> data = new ArrayList<>();
data.add(Row.of(2.0, 2.0, 5.0, "point"));
String[] colNames = new String[]{"x", "y", "z", "name_point"};

TypeInformation<?>[] colTypes = {
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO};
RowTypeInfo typeInfo = new RowTypeInfo(colTypes, colNames);
DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);
Table pointTable = tableEnv.fromDataStream(ds);

Table geomTable = pointTable
.select(call(Constructors.ST_PointZ.class.getSimpleName(), $(colNames[0]), $(colNames[1]), $(colNames[2]))
.as(colNames[3]));

Point result = first(geomTable)
.getFieldAs(colNames[3]);

assertEquals(5.0, result.getCoordinate().getZ(), 1e-6);
}

@Test
public void testPointFromText() {
List<Row> data = createPointWKT(testDataSize);
Expand Down
35 changes: 35 additions & 0 deletions flink/src/test/java/org/apache/sedona/flink/PredicateTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
package org.apache.sedona.flink;

import org.apache.flink.table.api.Table;
import org.apache.sedona.flink.expressions.Predicates;
import org.junit.BeforeClass;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class PredicateTest extends TestBase{
@BeforeClass
Expand Down Expand Up @@ -79,6 +82,22 @@ public void testCoveredBy() {
assertEquals(1, count(result));
}

@Test
public void testCrosses() {
Table table = tableEnv.sqlQuery("SELECT ST_GeomFromWKT('MULTIPOINT((0 0), (2 2))') AS g1, ST_GeomFromWKT('LINESTRING(-1 -1, 1 1)') as g2");
table = table.select(call(Predicates.ST_Crosses.class.getSimpleName(), $("g1"), $("g2")));
Boolean actual = (Boolean) first(table).getField(0);
assertEquals(true, actual);
}

@Test
public void testEquals() {
Table table = tableEnv.sqlQuery("SELECT ST_GeomFromWKT('LINESTRING (0 0, 2 2)') AS g1, ST_GeomFromWKT('LINESTRING (0 0, 1 1, 2 2)') as g2");
table = table.select(call(Predicates.ST_Equals.class.getSimpleName(), $("g1"), $("g2")));
Boolean actual = (Boolean) first(table).getField(0);
assertEquals(true, actual);
}

@Test
public void testOrderingEquals() {
Table lineStringTable = createLineStringTable(testDataSize);
Expand All @@ -87,4 +106,20 @@ public void testOrderingEquals() {
Table result = lineStringTable.filter(expr);
assertEquals(1, count(result));
}

@Test
public void testOverlaps() {
Table table = tableEnv.sqlQuery("SELECT ST_GeomFromWKT('LINESTRING (0 0, 2 2)') AS g1, ST_GeomFromWKT('LINESTRING (1 1, 3 3)') as g2");
table = table.select(call(Predicates.ST_Overlaps.class.getSimpleName(), $("g1"), $("g2")));
Boolean actual = (Boolean) first(table).getField(0);
assertEquals(true, actual);
}

@Test
public void testTouches() {
Table table = tableEnv.sqlQuery("SELECT ST_GeomFromWKT('LINESTRING (0 0, 1 0)') AS g1, ST_GeomFromWKT('LINESTRING (0 0, 1 1)') as g2");
table = table.select(call(Predicates.ST_Touches.class.getSimpleName(), $("g1"), $("g2")));
Boolean actual = (Boolean) first(table).getField(0);
assertEquals(true, actual);
}
}
Loading