From 52f8595705d6961a98b01088af3323965417fc30 Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 21 Aug 2024 14:26:30 -0700 Subject: [PATCH] refactor(12089): handle join error when using SpawnedTask::join_unwind --- .../core/src/datasource/file_format/arrow.rs | 5 ++++- .../core/src/datasource/file_format/parquet.rs | 18 ++++++++++++++---- .../file_format/write/orchestration.rs | 4 ++-- datafusion/core/src/datasource/stream.rs | 5 ++++- 4 files changed, 24 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 8b6a8800119d..95f76195e63d 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -341,7 +341,10 @@ impl DataSink for ArrowFileSink { } } - demux_task.join_unwind().await?; + demux_task + .join_unwind() + .await + .map_err(DataFusionError::ExecutionJoin)??; Ok(row_count as u64) } } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index f233f3842c8c..83f77ca9371a 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -836,7 +836,10 @@ impl DataSink for ParquetSink { } } - demux_task.join_unwind().await?; + demux_task + .join_unwind() + .await + .map_err(DataFusionError::ExecutionJoin)??; Ok(row_count as u64) } @@ -942,7 +945,10 @@ fn spawn_rg_join_and_finalize_task( let num_cols = column_writer_tasks.len(); let mut finalized_rg = Vec::with_capacity(num_cols); for task in column_writer_tasks.into_iter() { - let (writer, _col_reservation) = task.join_unwind().await?; + let (writer, _col_reservation) = task + .join_unwind() + .await + .map_err(DataFusionError::ExecutionJoin)??; let encoded_size = writer.get_estimated_total_bytes(); rg_reservation.grow(encoded_size); finalized_rg.push(writer.close()?); @@ -1070,7 +1076,8 @@ async fn concatenate_parallel_row_groups( while let Some(task) = serialize_rx.recv().await { let result = task.join_unwind().await; let mut rg_out = parquet_writer.next_row_group()?; - let (serialized_columns, mut rg_reservation, _cnt) = result?; + let (serialized_columns, mut rg_reservation, _cnt) = + result.map_err(DataFusionError::ExecutionJoin)??; for chunk in serialized_columns { chunk.append_to_row_group(&mut rg_out)?; rg_reservation.free(); @@ -1134,7 +1141,10 @@ async fn output_single_parquet_file_parallelized( ) .await?; - launch_serialization_task.join_unwind().await?; + launch_serialization_task + .join_unwind() + .await + .map_err(DataFusionError::ExecutionJoin)??; Ok(file_metadata) } diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs b/datafusion/core/src/datasource/file_format/write/orchestration.rs index 1d32063ee9f3..6f27e6f3889f 100644 --- a/datafusion/core/src/datasource/file_format/write/orchestration.rs +++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs @@ -298,8 +298,8 @@ pub(crate) async fn stateless_multipart_put( write_coordinator_task.join_unwind(), demux_task.join_unwind() ); - r1?; - r2?; + r1.map_err(DataFusionError::ExecutionJoin)??; + r2.map_err(DataFusionError::ExecutionJoin)??; let total_count = rx_row_cnt.await.map_err(|_| { internal_datafusion_err!("Did not receive row count from write coordinator") diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs index 682565aea909..b53fe8663178 100644 --- a/datafusion/core/src/datasource/stream.rs +++ b/datafusion/core/src/datasource/stream.rs @@ -438,6 +438,9 @@ impl DataSink for StreamWrite { } } drop(sender); - write_task.join_unwind().await + write_task + .join_unwind() + .await + .map_err(DataFusionError::ExecutionJoin)? } }