From 605d664f5eb8b27088a9650803ce3328a6461dfc Mon Sep 17 00:00:00 2001 From: Qian Sun Date: Wed, 3 Apr 2024 02:14:30 +0000 Subject: [PATCH] support spark320 --- pom.xml | 2 +- .../parquet/ParquetFileFormat.scala | 85 +++++++------------ 2 files changed, 32 insertions(+), 55 deletions(-) diff --git a/pom.xml b/pom.xml index b5833b3adab0..f854e2387411 100644 --- a/pom.xml +++ b/pom.xml @@ -134,7 +134,7 @@ 3.2 spark-sql-columnar-shims-spark32 - 3.2.2 + 3.2.0 1.3.1 2.0.1 20 diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index c6b383136590..2579002314c9 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -310,7 +310,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging lazy val footerFileMetaData = ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData - val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( + val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) // Try to push down filters when filter push-down is enabled. @@ -324,7 +324,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive, - datetimeRebaseSpec) + datetimeRebaseMode) filters // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` @@ -350,7 +350,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging None } - val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( + val int96RebaseMode = DataSourceUtils.int96RebaseMode( footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) @@ -367,46 +367,30 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader( convertTz.orNull, - datetimeRebaseSpec.mode.toString, - datetimeRebaseSpec.timeZone, - int96RebaseSpec.mode.toString, - int96RebaseSpec.timeZone, + datetimeRebaseMode.toString, + int96RebaseMode.toString, enableOffHeapColumnVector && taskContext.isDefined, - capacity - ) - // SPARK-37089: We cannot register a task completion listener to close this iterator here - // because downstream exec nodes have already registered their listeners. Since listeners - // are executed in reverse order of registration, a listener registered here would close the - // iterator while downstream exec nodes are still running. When off-heap column vectors are - // enabled, this can cause a use-after-free bug leading to a segfault. - // - // Instead, we use FileScanRDD's task completion listener to close this iterator. + capacity) val iter = new RecordReaderIterator(vectorizedReader) - try { - vectorizedReader.initialize(split, hadoopAttemptContext) - logDebug(s"Appending $partitionSchema ${file.partitionValues}") - vectorizedReader.initBatch(partitionSchema, file.partitionValues) - if (returningBatch) { - vectorizedReader.enableReturningBatches() - } - - // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. - iter.asInstanceOf[Iterator[InternalRow]] - } catch { - case e: Throwable => - // SPARK-23457: In case there is an exception in initialization, close the iterator to - // avoid leaking resources. - iter.close() - throw e + // SPARK-23457 Register a task completion listener before `initialization`. + taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) + vectorizedReader.initialize(split, hadoopAttemptContext) + logDebug(s"Appending $partitionSchema ${file.partitionValues}") + vectorizedReader.initBatch(partitionSchema, file.partitionValues) + if (returningBatch) { + vectorizedReader.enableReturningBatches() } + + // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. + iter.asInstanceOf[Iterator[InternalRow]] } else { logDebug(s"Falling back to parquet-mr") // ParquetRecordReader returns InternalRow val readSupport = new ParquetReadSupport( convertTz, enableVectorizedReader = false, - datetimeRebaseSpec, - int96RebaseSpec) + datetimeRebaseMode, + int96RebaseMode) val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) new ParquetRecordReader[InternalRow](readSupport, parquetFilter) @@ -414,26 +398,19 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging new ParquetRecordReader[InternalRow](readSupport) } val iter = new RecordReaderIterator[InternalRow](reader) - try { - reader.initialize(split, hadoopAttemptContext) - - val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes - val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) - - if (partitionSchema.length == 0) { - // There is no partition columns - iter.map(unsafeProjection) - } else { - val joinedRow = new JoinedRow() - iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) - } - } catch { - case e: Throwable => - // SPARK-23457: In case there is an exception in initialization, close the iterator to - // avoid leaking resources. - iter.close() - throw e - } + // SPARK-23457 Register a task completion listener before `initialization`. + taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) + reader.initialize(split, hadoopAttemptContext) + + val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes + val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + + if (partitionSchema.length == 0) { + // There is no partition columns + iter.map(unsafeProjection) + } else { + val joinedRow = new JoinedRow() + iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) } } }