Skip to content

Commit

Permalink
refactor(12089): handle join error when using SpawnedTask::join_unwind
Browse files Browse the repository at this point in the history
  • Loading branch information
wiedld committed Aug 21, 2024
1 parent 780f1b6 commit 52f8595
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 8 deletions.
5 changes: 4 additions & 1 deletion datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,10 @@ impl DataSink for ArrowFileSink {
}
}

demux_task.join_unwind().await?;
demux_task
.join_unwind()
.await
.map_err(DataFusionError::ExecutionJoin)??;
Ok(row_count as u64)
}
}
Expand Down
18 changes: 14 additions & 4 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,10 @@ impl DataSink for ParquetSink {
}
}

demux_task.join_unwind().await?;
demux_task
.join_unwind()
.await
.map_err(DataFusionError::ExecutionJoin)??;

Ok(row_count as u64)
}
Expand Down Expand Up @@ -942,7 +945,10 @@ fn spawn_rg_join_and_finalize_task(
let num_cols = column_writer_tasks.len();
let mut finalized_rg = Vec::with_capacity(num_cols);
for task in column_writer_tasks.into_iter() {
let (writer, _col_reservation) = task.join_unwind().await?;
let (writer, _col_reservation) = task
.join_unwind()
.await
.map_err(DataFusionError::ExecutionJoin)??;
let encoded_size = writer.get_estimated_total_bytes();
rg_reservation.grow(encoded_size);
finalized_rg.push(writer.close()?);
Expand Down Expand Up @@ -1070,7 +1076,8 @@ async fn concatenate_parallel_row_groups(
while let Some(task) = serialize_rx.recv().await {
let result = task.join_unwind().await;
let mut rg_out = parquet_writer.next_row_group()?;
let (serialized_columns, mut rg_reservation, _cnt) = result?;
let (serialized_columns, mut rg_reservation, _cnt) =
result.map_err(DataFusionError::ExecutionJoin)??;
for chunk in serialized_columns {
chunk.append_to_row_group(&mut rg_out)?;
rg_reservation.free();
Expand Down Expand Up @@ -1134,7 +1141,10 @@ async fn output_single_parquet_file_parallelized(
)
.await?;

launch_serialization_task.join_unwind().await?;
launch_serialization_task
.join_unwind()
.await
.map_err(DataFusionError::ExecutionJoin)??;
Ok(file_metadata)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,8 @@ pub(crate) async fn stateless_multipart_put(
write_coordinator_task.join_unwind(),
demux_task.join_unwind()
);
r1?;
r2?;
r1.map_err(DataFusionError::ExecutionJoin)??;
r2.map_err(DataFusionError::ExecutionJoin)??;

let total_count = rx_row_cnt.await.map_err(|_| {
internal_datafusion_err!("Did not receive row count from write coordinator")
Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,9 @@ impl DataSink for StreamWrite {
}
}
drop(sender);
write_task.join_unwind().await
write_task
.join_unwind()
.await
.map_err(DataFusionError::ExecutionJoin)?
}
}

0 comments on commit 52f8595

Please sign in to comment.