Skip to content

Commit

Permalink
[SEDONA-637] Refactor multiple spark version build and package
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangfengcdt committed Aug 8, 2024
1 parent 8118894 commit dc8523b
Show file tree
Hide file tree
Showing 74 changed files with 17,754 additions and 2 deletions.
52 changes: 50 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
Setting a default value helps IDE:s that can't make sense of profiles. -->
<scala.compat.version>2.12</scala.compat.version>
<spark.version>3.3.0</spark.version>
<spark.compat.version>3.0</spark.compat.version>
<spark.compat.version>3.3</spark.compat.version>
<log4j.version>2.17.2</log4j.version>

<flink.version>1.19.0</flink.version>
Expand Down Expand Up @@ -684,11 +684,59 @@
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.version>3.3.0</spark.version>
<spark.version>3.0.0</spark.version>
<spark.compat.version>3.0</spark.compat.version>
<log4j.version>2.17.2</log4j.version>
</properties>
</profile>
<profile>
<!-- This profile works for Spark 3.1 -->
<id>sedona-spark-3.1</id>
<activation>
<property>
<name>spark</name>
<value>3.1</value>
</property>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.version>3.1.0</spark.version>
<spark.compat.version>3.1</spark.compat.version>
<log4j.version>2.17.2</log4j.version>
</properties>
</profile>
<profile>
<!-- This profile works for Spark 3.2 -->
<id>sedona-spark-3.2</id>
<activation>
<property>
<name>spark</name>
<value>3.2</value>
</property>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.version>3.2.0</spark.version>
<spark.compat.version>3.2</spark.compat.version>
<log4j.version>2.17.2</log4j.version>
</properties>
</profile>
<profile>
<!-- This profile works for Spark 3.2 -->
<id>sedona-spark-3.3</id>
<activation>
<property>
<name>spark</name>
<value>3.3</value>
</property>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.version>3.3.0</spark.version>
<spark.compat.version>3.3</spark.compat.version>
<log4j.version>2.17.2</log4j.version>
</properties>
</profile>
<profile>
<id>sedona-spark-3.4</id>
<activation>
Expand Down
4 changes: 4 additions & 0 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
<profile>
<id>enable-all-submodules</id>
<modules>
<module>spark-3.0</module>
<module>spark-3.1</module>
<module>spark-3.2</module>
<module>spark-3.3</module>
<module>spark-3.4</module>
<module>spark-3.5</module>
</modules>
Expand Down
12 changes: 12 additions & 0 deletions spark/spark-3.1/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/target/
/.settings/
/.classpath
/.project
/dependency-reduced-pom.xml
/doc/
/.idea/
*.iml
/latest/
/spark-warehouse/
/metastore_db/
*.log
145 changes: 145 additions & 0 deletions spark/spark-3.1/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-spark-parent-${spark.compat.version}_${scala.compat.version}</artifactId>
<version>1.6.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>sedona-spark-3.1_${scala.compat.version}</artifactId>

<name>${project.groupId}:${project.artifactId}</name>
<description>A cluster computing system for processing large-scale spatial data: SQL API for Spark 3.1.</description>
<url>http://sedona.apache.org/</url>
<packaging>jar</packaging>

<properties>
<maven.deploy.skip>false</maven.deploy.skip>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-common</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-spark-common-${spark.compat.version}_${scala.compat.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-main</artifactId>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-referencing</artifactId>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-epsg-hsql</artifactId>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-geotiff</artifactId>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-coverage</artifactId>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-arcgrid</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.jts</groupId>
<artifactId>jts-core</artifactId>
</dependency>
<dependency>
<groupId>org.wololo</groupId>
<artifactId>jts2geojson</artifactId>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-collection-compat_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
org.apache.spark.sql.execution.datasources.parquet.GeoParquetFileFormat
org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata.GeoParquetMetadataDataSource
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.spark.sql.execution.datasources.parquet

import org.apache.spark.sql.catalyst.util.RebaseDateTime
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.util.Utils

import scala.util.Try

// Needed by Sedona to support Spark 3.0 - 3.3
object GeoDataSourceUtils {

val PARQUET_REBASE_MODE_IN_READ = firstAvailableConf(
"spark.sql.parquet.datetimeRebaseModeInRead",
"spark.sql.legacy.parquet.datetimeRebaseModeInRead")
val PARQUET_REBASE_MODE_IN_WRITE = firstAvailableConf(
"spark.sql.parquet.datetimeRebaseModeInWrite",
"spark.sql.legacy.parquet.datetimeRebaseModeInWrite")
val PARQUET_INT96_REBASE_MODE_IN_READ = firstAvailableConf(
"spark.sql.parquet.int96RebaseModeInRead",
"spark.sql.legacy.parquet.int96RebaseModeInRead",
"spark.sql.legacy.parquet.datetimeRebaseModeInRead")
val PARQUET_INT96_REBASE_MODE_IN_WRITE = firstAvailableConf(
"spark.sql.parquet.int96RebaseModeInWrite",
"spark.sql.legacy.parquet.int96RebaseModeInWrite",
"spark.sql.legacy.parquet.datetimeRebaseModeInWrite")

private def firstAvailableConf(confs: String*): String = {
confs.find(c => Try(SQLConf.get.getConfString(c)).isSuccess).get
}

def datetimeRebaseMode(
lookupFileMeta: String => String,
modeByConfig: String): LegacyBehaviorPolicy.Value = {
if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
return LegacyBehaviorPolicy.CORRECTED
}
// If there is no version, we return the mode specified by the config.
Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY))
.map { version =>
// Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to
// rebase the datetime values.
// Files written by Spark 3.0 and latter may also need the rebase if they were written with
// the "LEGACY" rebase mode.
if (version < "3.0.0" || lookupFileMeta("org.apache.spark.legacyDateTime") != null) {
LegacyBehaviorPolicy.LEGACY
} else {
LegacyBehaviorPolicy.CORRECTED
}
}
.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
}

def int96RebaseMode(
lookupFileMeta: String => String,
modeByConfig: String): LegacyBehaviorPolicy.Value = {
if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
return LegacyBehaviorPolicy.CORRECTED
}
// If there is no version, we return the mode specified by the config.
Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY))
.map { version =>
// Files written by Spark 3.0 and earlier follow the legacy hybrid calendar and we need to
// rebase the INT96 timestamp values.
// Files written by Spark 3.1 and latter may also need the rebase if they were written with
// the "LEGACY" rebase mode.
if (version < "3.1.0" || lookupFileMeta("org.apache.spark.legacyINT96") != null) {
LegacyBehaviorPolicy.LEGACY
} else {
LegacyBehaviorPolicy.CORRECTED
}
}
.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
}

def creteDateRebaseFuncInRead(
rebaseMode: LegacyBehaviorPolicy.Value,
format: String): Int => Int = rebaseMode match {
case LegacyBehaviorPolicy.EXCEPTION =>
days: Int =>
if (days < RebaseDateTime.lastSwitchJulianDay) {
throw DataSourceUtils.newRebaseExceptionInRead(format)
}
days
case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseJulianToGregorianDays
case LegacyBehaviorPolicy.CORRECTED => identity[Int]
}

def creteDateRebaseFuncInWrite(
rebaseMode: LegacyBehaviorPolicy.Value,
format: String): Int => Int = rebaseMode match {
case LegacyBehaviorPolicy.EXCEPTION =>
days: Int =>
if (days < RebaseDateTime.lastSwitchGregorianDay) {
throw DataSourceUtils.newRebaseExceptionInWrite(format)
}
days
case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseGregorianToJulianDays
case LegacyBehaviorPolicy.CORRECTED => identity[Int]
}

def creteTimestampRebaseFuncInRead(
rebaseMode: LegacyBehaviorPolicy.Value,
format: String): Long => Long = rebaseMode match {
case LegacyBehaviorPolicy.EXCEPTION =>
micros: Long =>
if (micros < RebaseDateTime.lastSwitchJulianTs) {
throw DataSourceUtils.newRebaseExceptionInRead(format)
}
micros
case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseJulianToGregorianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}

def creteTimestampRebaseFuncInWrite(
rebaseMode: LegacyBehaviorPolicy.Value,
format: String): Long => Long = rebaseMode match {
case LegacyBehaviorPolicy.EXCEPTION =>
micros: Long =>
if (micros < RebaseDateTime.lastSwitchGregorianTs) {
throw DataSourceUtils.newRebaseExceptionInWrite(format)
}
micros
case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
}
Loading

0 comments on commit dc8523b

Please sign in to comment.