Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add: pipelines and queries #20

Merged
merged 15 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pipelines/chatbot/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# -*- coding: utf-8 -*-
from pipelines.chatbot.dbt_chatbot_metricas.flows import * # noqa
Empty file.
44 changes: 44 additions & 0 deletions pipelines/chatbot/dbt_chatbot_metricas/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
"""
DBT-related flows.......
"""

from copy import deepcopy

from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefeitura_rio.pipelines_templates.run_dbt_model.flows import (
templates__run_dbt_model__flow,
)
from prefeitura_rio.pipelines_utils.prefect import set_default_parameters
from prefeitura_rio.pipelines_utils.state_handlers import (
handler_initialize_sentry,
handler_inject_bd_credentials,
)

from pipelines.chatbot.dbt_chatbot_metricas.schedules import update_schedule
from pipelines.constants import constants

run_rbt_chatbot_flow = deepcopy(templates__run_dbt_model__flow)
run_rbt_chatbot_flow.name = "Chatbot: Materializar tabelas"
run_rbt_chatbot_flow.state_handlers = [
handler_inject_bd_credentials,
handler_initialize_sentry,
]
run_rbt_chatbot_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
run_rbt_chatbot_flow.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[constants.RJ_ESCRITORIO_AGENT_LABEL.value],
)

identidade_unica_default_parameters = {
"dataset_id": "dialogflowcx",
"upstream": False,
"dbt_alias": False,
}
run_rbt_chatbot_flow = set_default_parameters(
run_rbt_chatbot_flow,
default_parameters=identidade_unica_default_parameters,
)

run_rbt_chatbot_flow.schedule = update_schedule
57 changes: 57 additions & 0 deletions pipelines/chatbot/dbt_chatbot_metricas/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
"""
Schedules for the database dump pipeline
"""

from datetime import datetime, timedelta

import pytz
from prefect.schedules import Schedule
from prefect.schedules.clocks import IntervalClock
from prefeitura_rio.pipelines_utils.io import untuple_clocks as untuple

from pipelines.constants import constants

#####################################
#
# Chatbot Metrics Schedules
#
#####################################

chatbot_tables = {
"fim_conversas": {
"table_id": "fim_conversas",
"upstream": True,
},
"historico_conversas_legivel": {
"table_id": "historico_conversas_legivel",
"upstream": False,
},
"conversas_completas_metricas": {
"table_id": "conversas_completas_metricas",
"upstream": False,
},
}

chatbot_clocks = [
IntervalClock(
interval=timedelta(hours=12),
start_date=datetime(2024, 9, 17, 19, 0, tzinfo=pytz.timezone("America/Sao_Paulo"))
+ timedelta(minutes=45 * count),
labels=[
constants.RJ_ESCRITORIO_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "dialogflowcx",
"table_id": parameters["table_id"],
"upstream": parameters["upstream"],
"infisical_credential_dict": {
"secret_path": "/rj-chatbot-dev",
"secret_name": "DBT_SEVICE_ACCOUNT",
},
"dbt_project_materialization": "rj-chatbot-dev",
},
)
for count, (_, parameters) in enumerate(chatbot_tables.items())
]
update_schedule = Schedule(clocks=untuple(chatbot_clocks))
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
from prefect import task
from prefeitura_rio.pipelines_utils.gcs import upload_file_to_bucket
from prefeitura_rio.pipelines_utils.infisical import get_secret
from prefeitura_rio.pipelines_utils.io import to_partitions
from prefeitura_rio.pipelines_utils.logging import log
from prefeitura_rio.pipelines_utils.pandas import parse_date_columns
from prefeitura_rio.pipelines_utils.pandas import parse_date_columns, to_partitions
from prefeitura_rio.pipelines_utils.redis_pal import get_redis_client
from prefeitura_rio.pipelines_utils.time import TimeoutError
from redis_pal import RedisPal
Expand Down
1 change: 1 addition & 0 deletions pipelines/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""
Imports all flows for every project so we can register all of them.
"""
from pipelines.chatbot import * # noqa
from pipelines.deteccao_alagamento_cameras import * # noqa
from pipelines.exemplo import * # noqa
from pipelines.healthcheck import * # noqa
Expand Down
2 changes: 1 addition & 1 deletion pipelines/templates/run_dbt_model/flows.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
MATERIALIZA MODELOS DO DBT.
MATERIALIZA MODELOS DO DBT.....
"""

from copy import deepcopy
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@ profile = "black"
lint = "black . && isort . && flake8 ."

[tool.uv.sources]
prefeitura-rio = { git = "https://github.com/prefeitura-rio/prefeitura-rio.git", rev = "62236b527b3528437b8d460f60c79ee39309b151" }
prefeitura-rio = { git = "https://github.com/prefeitura-rio/prefeitura-rio.git", rev = "1e20ad960b680d8982046715e18e1a66bd4386ea" }
3 changes: 3 additions & 0 deletions queries/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,6 @@ models:
identidade_unica:
+materialized: table
+schema: identidade_unica
dialogflowcx:
+materialized: table
+schema: dialogflowcx
11 changes: 11 additions & 0 deletions queries/models/dialogflowcx/conversas_completas_metricas.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
SELECT
MIN(request_time) as request_time,
conversa_completa_id,
MAX(ambiente) AS ambiente,
ARRAY_AGG(DISTINCT conversation_name) AS conversation_name_list,
AVG(conversa_completa_turnos_em_menus) AS conversa_completa_turnos_em_menus,
MAX(conversa_completa_fluxos_interagidos) AS conversa_completa_fluxos_interagidos,
MAX(conversa_completa_duracao) AS conversa_completa_duracao,
MAX(conversa_completa_ultimo_fluxo_servico) AS conversa_completa_ultimo_fluxo_servico
FROM {{ ref('fim_conversas') }}
GROUP BY conversa_completa_id
146 changes: 146 additions & 0 deletions queries/models/dialogflowcx/fim_conversas.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
with ultima_interacao as (
SELECT
conversation_name,
MAX(turn_position) as last_turn
FROM `rj-chatbot-dev.dialogflowcx.historico_conversas`
GROUP BY conversation_name
),

# Captura apenas mensagens vindas do ASC através da mensagem que inicia a conversa
primeira_interacao AS (
SELECT
conversation_name,
`rj-chatbot-dev.dialogflowcx.inicial_sentence_to_flow_name`(JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.text'))) AS fluxo_primeira_interacao,
request_time AS hora_primeira_interacao,
JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.text')) AS primeira_mensagem,
JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.parameters.ambiente')) AS ambiente
FROM `rj-chatbot-dev.dialogflowcx.historico_conversas`
WHERE
turn_position = 1
AND `rj-chatbot-dev.dialogflowcx.inicial_sentences`(JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.text')))
),

fim_conversas_1746 AS (
SELECT
hist.conversation_name,
JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.parameters.usuario_cpf')) AS cpf,
JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.parameters.phone')) as telefone,
CASE
WHEN pi.ambiente = "production" THEN "Produção"
ELSE "Homologação"
END ambiente,
INITCAP(pi.fluxo_primeira_interacao) as fluxo_nome,
FORMAT_DATETIME("%d/%m/%Y às %H:%M", DATETIME(request_time, "America/Buenos_Aires")) as horario,
DATETIME(request_time, "America/Buenos_Aires") as request_time,
ROUND(DATE_DIFF(request_time, hora_primeira_interacao, SECOND)/60,1) as duracao_minutos,
hist.turn_position as ultimo_turno,
pi.primeira_mensagem,
response,
CASE
WHEN JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.currentPage.displayName')) = "End Session" THEN true
ELSE false
END conversa_finalizada,
CASE
WHEN JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.parameters.solicitacao_protocolo')) IS NOT NULL THEN "chamado_aberto"
WHEN hist.turn_position = 1 THEN "hard_bounce"
WHEN
hist.turn_position = 2
AND (
ENDS_WITH(JSON_VALUE(JSON_EXTRACT(derived_data, '$.agentUtterances')), 'VOLTAR')
OR ENDS_WITH(JSON_VALUE(JSON_EXTRACT(derived_data, '$.agentUtterances')), 'SAIR')
)
THEN "soft_bounce"
WHEN
hist.turn_position = 2
THEN "timeout_usuario_pos_transacao"
WHEN
JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.parameters.solicitacao_retorno')) = "erro_interno_timeout"
THEN "timeout SGRC"
WHEN ENDS_WITH(JSON_VALUE(JSON_EXTRACT(derived_data, '$.agentUtterances')),'TRANSBORDO') THEN "transbordo"
WHEN
# Casos em que o Chatbot identificou a inelegibilidade
(JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.parameters.rebi_elegibilidade_abertura_chamado_justificativa')) != "erro_desconhecido"
AND
(JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.parameters.rebi_elegibilidade_abertura_chamado')) = "false"
OR JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.parameters.rebi_elegibilidade_endereco_abertura_chamado')) = "false")
)
# Casos em que o SGRC identificou a inelegibilidade
OR JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.parameters.rebi_elegibilidade_abertura_chamado_justificativa')) = "chamado_aberto"
OR JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.parameters.rebi_elegibilidade_abertura_chamado_justificativa')) = "chamado_fechado_12_dias"
OR JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.parameters.rebi_elegibilidade_endereco_abertura_chamado_justificativa')) = "chamado_aberto"
OR JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.parameters.rebi_elegibilidade_endereco_abertura_chamado_justificativa')) = "chamado_fechado_12_dias"
THEN "impedimento_regra_negocio"
WHEN
JSON_VALUE(JSON_EXTRACT(derived_data, '$.agentUtterances')) IS NULL
OR JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.parameters.solicitacao_retorno')) = "erro_interno"
OR JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.parameters.rebi_elegibilidade_endereco_abertura_chamado_justificativa')) = "erro_desconhecido"
OR JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.parameters.rebi_elegibilidade_abertura_chamado_justificativa')) = "erro_desconhecido"
THEN "timeout interno"
WHEN
ENDS_WITH(JSON_VALUE(JSON_EXTRACT(derived_data, '$.agentUtterances')), 'VOLTAR')
OR ENDS_WITH(JSON_VALUE(JSON_EXTRACT(derived_data, '$.agentUtterances')), 'SAIR')
THEN "desistencia"
ELSE "timeout_usuario_pos_transacao"
END classificacao_conversa,
CASE
WHEN JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.parameters.solicitacao_protocolo')) IS NOT NULL
OR ENDS_WITH(JSON_VALUE(JSON_EXTRACT(derived_data, '$.agentUtterances')), 'VOLTAR')
OR ENDS_WITH(JSON_VALUE(JSON_EXTRACT(derived_data, '$.agentUtterances')), 'SAIR')
THEN "sucesso"
ELSE "falha"
END status_final_conversa,
JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.currentPage.displayName')) as passo_falha,
CASE
WHEN JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.currentFlow.displayName')) = '1647 (RRL)' THEN 'Remoção de Resíduo'
WHEN JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.currentFlow.displayName')) = '1614 (PAL)' THEN 'Poda de Árvore'
WHEN JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.currentFlow.displayName')) = '1464 (VACIO)' THEN 'Verificação de Ar Condicionado Inoperante em Ônibus'
WHEN JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.currentFlow.displayName')) = '152 (RLU)' THEN 'Reparo de Luminária'
WHEN JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.currentFlow.displayName')) = '182 (RBDAP)' THEN 'Reparo de Buraco, Deformação ou Afundamento na pista'
WHEN JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.currentFlow.displayName')) = '3581 (FEIV)' THEN "Fiscalização de Estacionamento Irregular de Veículo"
WHEN JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.currentFlow.displayName')) = '3802 (RSTA)' THEN "Reparo de Sinal de Trânsito Apagado"
WHEN JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.currentFlow.displayName')) = '1607 (REBI)' THEN "Remoção de Entulho e Bens Inservíveis"
WHEN JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.currentFlow.displayName')) = '192 (DBGRR)' THEN "Desobstrução de bueiros, galerias, ramais de águas pluviais e ralos"
WHEN JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.currentFlow.displayName')) = '2569 (RTG)' THEN "Reposição de Tampão ou Grelha"
WHEN JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.currentFlow.displayName')) = '223 (RSTAP)' THEN "Reparo de Sinal de Trânsito em Amarelo Piscante"
WHEN JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.currentFlow.displayName')) = '1618 (CRCA)' THEN "Controle de Roedores e Caramujos Africanos"
WHEN JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.currentFlow.displayName')) = '3803 (RSTAAV)' THEN "Reparo de Sinal de Trânsito Abalroado ou Ausente ou Virado"
ELSE JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.currentFlow.displayName'))
END as fluxo_falha,
JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.parameters.solicitacao_retorno')) erro_abertura_ticket,
JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.parameters.solicitacao_protocolo')) protocolo,
SAFE_CAST(NULL AS INT64) AS turnos_em_menus,
SAFE_CAST(NULL AS INT64) AS estimativa_turnos_menu,
SAFE_CAST(NULL AS INT64) AS turnos_em_servico,
SAFE_CAST(NULL AS INT64) AS estimativa_turnos_servico,
SAFE_CAST(NULL AS INT64) AS turnos_em_endereco,
SAFE_CAST(NULL AS INT64) AS turnos_em_identificacao,
hist.conversation_name as conversa_completa_id,
SAFE_CAST(NULL AS INT64) AS conversa_completa_turnos_em_menus,
SAFE_CAST(NULL AS INT64) AS conversa_completa_fluxos_interagidos,
SAFE_CAST(NULL AS INT64) AS conversa_completa_duracao,
SAFE_CAST(NULL AS STRING) AS conversa_completa_ultimo_fluxo_servico,
FROM `rj-chatbot-dev.dialogflowcx.historico_conversas` as hist
INNER JOIN ultima_interacao as ui
ON hist.conversation_name = ui.conversation_name AND hist.turn_position = ui.last_turn
INNER JOIN primeira_interacao as pi
ON hist.conversation_name = pi.conversation_name
ORDER BY request_time DESC)

SELECT
*,
JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.parameters.codigo_servico_1746')) as codigo_servico
FROM fim_conversas_1746

UNION ALL

SELECT
*,
JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.parameters.codigo_servico_1746')) as codigo_servico
FROM {{ ref('fim_conversas_da') }}

UNION ALL

SELECT
*,
JSON_VALUE(JSON_EXTRACT(response, '$.queryResult.parameters.codigo_servico_1746')) as codigo_servico
FROM {{ ref('fim_conversas_macrofluxos') }}
Loading
Loading