Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Viagens 2.0 - Cria tabela de segmentos de Shape #237

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
319 changes: 155 additions & 164 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = ["Gabriel Gazola Milan <[email protected]>"]

[tool.poetry.dependencies]
python = ">=3.10,<3.11"
dbt-bigquery = "1.7.2"
dbt-bigquery = "1.7.3"
google-cloud-storage = "2.13.0"
prefect = "1.4.1"
prefeitura-rio = "1.1.3a1"
Expand Down
9 changes: 9 additions & 0 deletions queries/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -321,3 +321,12 @@ models:
staging:
+materialized: view
+schema: transito_staging
planejamento:
+materialized: incremental
+incremental_strategy: insert_overwrite
+schema: planejamento
staging:
+database: rj-smtr-dev
+materialized: view
+schema: planejamento_staging
+database: rj-smtr-dev
135 changes: 135 additions & 0 deletions queries/models/planejamento/segmento_shape.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
{{
config(
partition_by = {
'field' :'feed_start_date',
'data_type' :'date',
'granularity': 'day'
},
tags=['geolocalizacao']
)
}}

-- depends_on: {{ ref('feed_info_gtfs') }}
{% if execute and is_incremental() %}
{% set last_feed_version = get_last_feed_start_date(var("data_versao_gtfs")) %}
{% endif %}

WITH aux_segmento AS (
SELECT
feed_version,
feed_start_date,
feed_end_date,
shape_id,
id_segmento,
ST_GEOGFROMTEXT(wkt_segmento) AS segmento,
wkt_segmento,
ROUND(ST_LENGTH(ST_GEOGFROMTEXT(wkt_segmento)), 1) AS comprimento_segmento
FROM
{{ ref("aux_segmento_shape") }}
),
tunel AS (
SELECT
ST_UNION_AGG(ST_BUFFER(geometry, 50)) AS buffer_tunel
FROM
{{ source("dados_mestres", "logradouro") }}
WHERE
tipo = "Túnel"

),
buffer_segmento AS (
SELECT
*,
ST_BUFFER(segmento, 20) AS buffer_completo,
FROM
aux_segmento
),
intercessao_segmento AS (
SELECT
b1.shape_id,
b1.id_segmento,
ST_UNION(ARRAY_AGG(b2.buffer_completo) )AS buffer_segmento_posterior
FROM
buffer_segmento b1
JOIN
buffer_segmento b2
ON
b1.shape_id = b2.shape_id
AND b1.id_segmento < b2.id_segmento
AND ST_INTERSECTS(b1.buffer_completo, b2.buffer_completo)
GROUP BY
1,
2
),
buffer_segmento_recortado AS (
SELECT
b.*,
COALESCE(
ST_DIFFERENCE(
buffer_completo,
i.buffer_segmento_posterior
),
buffer_completo
) AS buffer
FROM
buffer_segmento b
LEFT JOIN
intercessao_segmento i
USING(shape_id, id_segmento)
),
indicador_validacao_shape AS (
SELECT
s.*,
ST_INTERSECTS(s.segmento, t.buffer_tunel) AS indicador_tunel,
ST_AREA(s.buffer) / ST_AREA(s.buffer_completo) < 0.5 AS indicador_area_prejudicada,
s.comprimento_segmento < 990 AS indicador_segmento_pequeno,
CAST(id_segmento AS INTEGER) AS id_segmento_int
FROM
buffer_segmento_recortado s
CROSS JOIN
tunel t
)
SELECT
* EXCEPT(id_segmento_int),
(
(
indicador_tunel
AND (
(id_segmento_int > 1)
OR (id_segmento_int < MAX(id_segmento_int) OVER (PARTITION BY feed_start_date, shape_id))
)
)
OR indicador_area_prejudicada
OR indicador_segmento_pequeno
) AS indicador_segmento_desconsiderado,
'{{ var("version") }}' AS versao
FROM
indicador_validacao_shape

{% if is_incremental() %}

UNION ALL

SELECT
s.feed_version
s.feed_start_date
fi.feed_end_date
s.shape_id
s.id_segmento
s.segmento
s.wkt_segmento
s.comprimento_segmento,
s.buffer_completo,
s.buffer,
s.indicador_tunel,
s.indicador_area_prejudicada,
s.indicador_segmento_pequeno,
s.indicador_segmento_desconsiderado,
s.versao
FROM
{{ this }} s
JOIN
{{ ref('feed_info_gtfs') }} fi
USING(feed_start_date)
WHERE
feed_start_date = '{{ last_feed_version }}'
{% endif %}
49 changes: 49 additions & 0 deletions queries/models/planejamento/shapes_geom_planejamento.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
{{
config(
partition_by = {
'field' :'feed_start_date',
'data_type' :'date',
'granularity': 'day'
},
alias = 'shapes_geom',
tags=['geolocalizacao']
)
}}

-- depends_on: {{ ref('feed_info_gtfs') }}
{% if execute and is_incremental() %}
{% set last_feed_version = get_last_feed_start_date(var("data_versao_gtfs")) %}
{% endif %}

WITH shapes AS (
SELECT
feed_version,
feed_start_date,
feed_end_date,
shape_id,
shape_pt_sequence,
ST_GEOGPOINT(shape_pt_lon, shape_pt_lat) AS ponto_shape,
CONCAT(shape_pt_lon, " ", shape_pt_lat) AS lon_lat,
FROM
{{ ref("shapes_gtfs") }} s
-- rj-smtr.gtfs.shapes s
{% if is_incremental() %}
WHERE
feed_start_date IN ('{{ last_feed_version }}', '{{ var("data_versao_gtfs") }}')
{% endif %}
)
SELECT
feed_version,
feed_start_date,
feed_end_date,
shape_id,
ST_MAKELINE(ARRAY_AGG(ponto_shape ORDER BY shape_pt_sequence)) AS shape,
CONCAT("LINESTRING(", STRING_AGG(lon_lat, ", " ORDER BY shape_pt_sequence), ")") AS wkt_shape,
'{{ var("version") }}' as versao
FROM
shapes
GROUP BY
1,
2,
3,
4
80 changes: 80 additions & 0 deletions queries/models/planejamento/staging/aux_segmento_shape.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# -*- coding: utf-8 -*-
import numpy as np
import pyproj
from pyspark.sql.functions import col, explode, udf
from pyspark.sql.types import ArrayType, StringType
from shapely import wkt

# from shapely.geometry import LineString, Point
from shapely.ops import substring, transform


def transform_projection(shape, from_utm=False):
bq_projection = pyproj.CRS("EPSG:4326")
shapely_projection = pyproj.CRS("EPSG:31983")
if from_utm:
project = pyproj.Transformer.from_crs(
shapely_projection, bq_projection, always_xy=True
).transform
else:
project = pyproj.Transformer.from_crs(
bq_projection, shapely_projection, always_xy=True
).transform

return transform(project, shape)


def cut(line, distance):
line_len = line.length

dist_mod = line_len % distance
dist_range = list(np.arange(0, line_len, distance))
middle_index = len(dist_range) // 2

last_final_dist = 0
lines = []

for i, _ in enumerate(dist_range, start=1):
if i == middle_index:
cut_distance = dist_mod
else:
cut_distance = distance
final_dist = last_final_dist + cut_distance
lines.append(
[str(i), transform_projection(substring(line, last_final_dist, final_dist), True).wkt]
)
last_final_dist = final_dist

return lines


def cut_udf(wkt_string):
line = transform_projection(wkt.loads(wkt_string))
return cut(line, distance=1000)


cut_udf = udf(cut_udf, ArrayType(ArrayType(StringType())))


def model(dbt, session):
dbt.config(
materialized="table",
)
df = dbt.ref("aux_shapes_geom_filtrada")

df_segments = df.withColumn("shape_lists", cut_udf(col("wkt_shape")))

df_exploded = (
df_segments.select(
"feed_version",
"feed_start_date",
"feed_end_date",
"shape_id",
explode(col("shape_lists")).alias("shape_list"),
)
.withColumn("id_segmento", col("shape_list").getItem(0))
.withColumn("wkt_segmento", col("shape_list").getItem(1))
.drop("shape_list")
)

return df_exploded
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
SELECT
*
FROM
{{ ref("shapes_geom_planejamento") }}
WHERE
feed_start_date = '{{ var("data_versao_gtfs") }}'
8 changes: 7 additions & 1 deletion queries/models/sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,10 @@ sources:

tables:
- name: autuacoes_citran
- name: receita_autuacao
- name: receita_autuacao

- name: dados_mestres
database: datario

tables:
- name: logradouro
31 changes: 31 additions & 0 deletions queries/models/teste_python/aux_shapes_wkt.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{{
config(
materialized="view"
)

}}
with shapes AS (
select
feed_start_date,
shape_id,
shape_pt_sequence,
CONCAT(shape_pt_lon, " ", shape_pt_lat) AS lon_lat
from
rj-smtr.gtfs.shapes
where
feed_start_date = '2024-09-01'
and shape_id = "hj1m"
order by
1,
2,
3
)
SELECT
feed_start_date,
shape_id,
concat("LINESTRING(", string_agg(lon_lat, ", "), ")") AS shape_wkt
from
shapes
group by
1,
2
Loading
Loading