Skip to content

Commit

Permalink
Improve alias handling when removing duplicates
Browse files Browse the repository at this point in the history
  • Loading branch information
BjoernWaechter committed Nov 11, 2023
1 parent fc2dc30 commit 189e0cd
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
11 changes: 7 additions & 4 deletions osm_address/transform/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,20 @@ def remove_duplicate_rows(
Dataframe with the same schema as the input with removed duplicates
"""
order_col = col(decision_col)

if decision_max_first:
order_col = order_col.desc()
order_col = col(decision_col).desc_nulls_last()
else:
order_col = col(decision_col).asc_nulls_last()

w = Window.partitionBy(unique_col).orderBy(order_col)

df_input_ordered = df_input.withColumn(
f'{unique_col}_row_no', row_number().over(w)
f'{unique_col.replace(".","_")}_row_no', row_number().over(w)
)

df_result = df_input_ordered.where(f'{unique_col}_row_no = 1').drop(f'{unique_col}_row_no')
df_result = df_input_ordered\
.where(f'{unique_col.replace(".","_")}_row_no = 1')\
.drop(f'{unique_col.replace(".","_")}_row_no')

return df_result
24 changes: 24 additions & 0 deletions tests/unit/osm_address/transform/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,27 @@ def test_remove_duplicate_rows_multiple_min(self, test_context):
).orderBy("id")

assert df_result.count() == 3

def test_remove_duplicate_rows_with_alias_df(self, test_context):
data = [
(1, "ag", 1),
(1, "kas", 1)
]

df_data = test_context.spark.createDataFrame(
data=data,
schema=StructType([
StructField("id", IntegerType(), False),
StructField("str", StringType(), True),
StructField("order_no", IntegerType(), False),
])
)

df_result = remove_duplicate_rows(
df_input=df_data.alias("test"),
unique_col="test.id",
decision_col="test.order_no",
decision_max_first=False
).orderBy("test.id")

assert df_result.count() == 1

0 comments on commit 189e0cd

Please sign in to comment.