Skip to content

Commit

Permalink
support spark320
Browse files Browse the repository at this point in the history
  • Loading branch information
dcoliversun committed Apr 3, 2024
1 parent 6f50ca9 commit 605d664
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 55 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@
<properties>
<sparkbundle.version>3.2</sparkbundle.version>
<sparkshim.artifactId>spark-sql-columnar-shims-spark32</sparkshim.artifactId>
<spark.version>3.2.2</spark.version>
<spark.version>3.2.0</spark.version>
<iceberg.version>1.3.1</iceberg.version>
<delta.version>2.0.1</delta.version>
<delta.binary.version>20</delta.binary.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging

lazy val footerFileMetaData =
ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
datetimeRebaseModeInRead)
// Try to push down filters when filter push-down is enabled.
Expand All @@ -324,7 +324,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseSpec)
datetimeRebaseMode)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
Expand All @@ -350,7 +350,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging
None
}

val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
val int96RebaseMode = DataSourceUtils.int96RebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
int96RebaseModeInRead)

Expand All @@ -367,73 +367,50 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging
if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseSpec.mode.toString,
datetimeRebaseSpec.timeZone,
int96RebaseSpec.mode.toString,
int96RebaseSpec.timeZone,
datetimeRebaseMode.toString,
int96RebaseMode.toString,
enableOffHeapColumnVector && taskContext.isDefined,
capacity
)
// SPARK-37089: We cannot register a task completion listener to close this iterator here
// because downstream exec nodes have already registered their listeners. Since listeners
// are executed in reverse order of registration, a listener registered here would close the
// iterator while downstream exec nodes are still running. When off-heap column vectors are
// enabled, this can cause a use-after-free bug leading to a segfault.
//
// Instead, we use FileScanRDD's task completion listener to close this iterator.
capacity)
val iter = new RecordReaderIterator(vectorizedReader)
try {
vectorizedReader.initialize(split, hadoopAttemptContext)
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}

// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
iter.asInstanceOf[Iterator[InternalRow]]
} catch {
case e: Throwable =>
// SPARK-23457: In case there is an exception in initialization, close the iterator to
// avoid leaking resources.
iter.close()
throw e
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
vectorizedReader.initialize(split, hadoopAttemptContext)
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}

// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
iter.asInstanceOf[Iterator[InternalRow]]
} else {
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns InternalRow
val readSupport = new ParquetReadSupport(
convertTz,
enableVectorizedReader = false,
datetimeRebaseSpec,
int96RebaseSpec)
datetimeRebaseMode,
int96RebaseMode)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
} else {
new ParquetRecordReader[InternalRow](readSupport)
}
val iter = new RecordReaderIterator[InternalRow](reader)
try {
reader.initialize(split, hadoopAttemptContext)

val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)

if (partitionSchema.length == 0) {
// There is no partition columns
iter.map(unsafeProjection)
} else {
val joinedRow = new JoinedRow()
iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
}
} catch {
case e: Throwable =>
// SPARK-23457: In case there is an exception in initialization, close the iterator to
// avoid leaking resources.
iter.close()
throw e
}
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
reader.initialize(split, hadoopAttemptContext)

val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)

if (partitionSchema.length == 0) {
// There is no partition columns
iter.map(unsafeProjection)
} else {
val joinedRow = new JoinedRow()
iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
}
}
}
Expand Down

0 comments on commit 605d664

Please sign in to comment.