Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: funnel actors queries on udf #25013

Open
wants to merge 54 commits into
base: master
Choose a base branch
from
Open

Conversation

aspicer
Copy link
Contributor

@aspicer aspicer commented Sep 17, 2024

Move funnel actors queries over to UDF if you have the feature flag on.

Passes the UUID of each event to the UDF. The UDF returns, for each step, a list of event UUIDs that correspond to the funnel matches for each user.

Adds in timing fields used by correlation and user paths.

Also a tiny bit of cleanup of the superfluous leftovers in SQL that were hanging on from the old queries.

Testing

Tested with tests. Added UDF actors query unit tests. Also tested quite a bit on dev.

/* user_id:2 celery:posthog.tasks.tasks.process_query_task */ SELECT
    persons.id AS id,
    persons.created_at AS created_at,
    source.matching_events AS matching_events
FROM
    (SELECT
        aggregation_target AS actor_id,
        matched_events_array[3] AS matching_events
    FROM
        (SELECT
            arrayJoin(aggregate_funnel_array(3, 1209600, 'first_touch', 'ordered', [[]], arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), uuid, [], arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
            af_tuple.1 AS step_reached,
            af_tuple.2 AS breakdown,
            af_tuple.3 AS timings,
            af_tuple.4 AS matched_event_uuids_array_array,
            groupArray(tuple(timestamp, uuid, `$session_id`, `$window_id`)) AS user_events,
            mapFromArrays(arrayMap(x -> x.2, user_events), user_events) AS user_events_map,
            arrayMap(matched_event_uuids_array -> arrayMap(event_uuid -> user_events_map[event_uuid], matched_event_uuids_array), matched_event_uuids_array_array) AS matched_events_array,
            aggregation_target AS aggregation_target
        FROM
            (SELECT
                toTimeZone(e.timestamp, 'UTC') AS timestamp,
                if(not(empty(e__override.distinct_id)), e__override.person_id, e.person_id) AS aggregation_target,
                e.uuid AS uuid,
                e.`$session_id` AS `$session_id`,
                e.`$window_id` AS `$window_id`,
                if(and(equals(e.event, '$pageview'), and(313131313131, startsWith(toString(e.distinct_id), 'a'))), 1, 0) AS step_0,
                if(equals(e.event, '$pageview'), 1, 0) AS step_1,
                if(equals(e.event, '$pageview'), 1, 0) AS step_2
            FROM
                events AS e
                LEFT OUTER JOIN (SELECT
                    argMax(person_distinct_id_overrides.person_id, person_distinct_id_overrides.version) AS person_id,
                    person_distinct_id_overrides.distinct_id AS distinct_id
                FROM
                    person_distinct_id_overrides
                WHERE
                    equals(person_distinct_id_overrides.team_id, 2)
                GROUP BY
                    person_distinct_id_overrides.distinct_id
                HAVING
                    ifNull(equals(argMax(person_distinct_id_overrides.is_deleted, person_distinct_id_overrides.version), 0), 0)
                SETTINGS optimize_aggregation_in_order=1) AS e__override ON equals(e.distinct_id, e__override.distinct_id)
            WHERE
                and(equals(e.team_id, 2), and(and(greaterOrEquals(toTimeZone(e.timestamp, 'UTC'), toDateTime64('2024-09-10 00:00:00.000000', 6, 'UTC')), lessOrEquals(toTimeZone(e.timestamp, 'UTC'), toDateTime64('2024-09-17 23:59:59.999999', 6, 'UTC'))), in(e.event, tuple('$pageview'))), or(ifNull(equals(step_0, 1), 0), ifNull(equals(step_1, 1), 0), ifNull(equals(step_2, 1), 0))))
        GROUP BY
            aggregation_target
        HAVING
            ifNull(greaterOrEquals(step_reached, 0), 0))
    WHERE
        ifNull(greaterOrEquals(step_reached, 2), 0)
    ORDER BY
        aggregation_target ASC) AS source
    INNER JOIN (SELECT
        argMax(toTimeZone(person.created_at, 'UTC'), person.version) AS created_at,
        person.id AS id
    FROM
        person
    WHERE
        equals(person.team_id, 2)
    GROUP BY
        person.id
    HAVING
        and(ifNull(equals(argMax(person.is_deleted, person.version), 0), 0), ifNull(less(argMax(toTimeZone(person.created_at, 'UTC'), person.version), plus(now64(6, 'UTC'), toIntervalDay(1))), 0))
    SETTINGS optimize_aggregation_in_order=1) AS persons ON equals(persons.id, source.actor_id)
ORDER BY
    persons.created_at DESC
LIMIT 101
OFFSET 0 SETTINGS readonly=2, max_execution_time=600, allow_experimental_object_type=1, format_csv_allow_double_quotes=0, max_ast_elements=4000000, max_expanded_ast_elements=4000000, max_bytes_before_external_group_by=0, allow_experimental_analyzer=1

@aspicer aspicer changed the title wip feat: funnel actors queries on udf Sep 17, 2024
@aspicer aspicer marked this pull request as ready for review September 17, 2024 08:37
Copy link
Contributor

@skoob13 skoob13 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything is perfect! I didn't manage to test associated session recordings locally. It's always empty, but it's also empty without UDFs. One potential low-hanging fruit is using the orjson module instead of json since we deal with massive data arrays. When Ansible scripts for UDFs are in place, modules should be easier to use.

select: list[ast.Expr] = [
ast.Alias(alias="actor_id", expr=ast.Field(chain=["aggregation_target"])),
*self._get_funnel_person_step_events(),
# *self._get_timestamp_outer_select(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant line

@aspicer
Copy link
Contributor Author

aspicer commented Sep 18, 2024

@skoob13 Just did a quick benchmark to see what's what.

In the benchmark, I parsed a json blob, iterated through a 100 item array and multipled each item by 2, to emulate doing some logic in the UDF, and then printed out the result, 1,000,000 times.

Python JSON: 14.8s
Python orjson: 5.4s
nodejs: 7.2s
rust: 2.3s

A little surprised about the node performance, thought it would be a bit faster!

Using orjson definitely makes sense, but can you think of an easy way to administer it? Would we have to set up ansible to maintain a virtualenv at each clickhouse node?

Or maybe their distro provides a package they can install natively.

j = """{
        "id": "0178a3ab-d163-0000-4b55-bceadebb03fa",
        "name": "Hogflix Movies",
        "created_at": "2021-04-05T20:14:09.763753Z",
        "updated_at": "2021-04-05T20:14:25.443181Z",
        "membership_level": 15,
        "plugins_access_level": 9,
        "teams": [
            {
                "id": 1,
                "uuid": "0178a3ab-d1e5-0000-c5ca-da746c68f506",
                "organization": "0178a3ab-d163-0000-4b55-bceadebb03fa",
                "api_token": "tJy-b6mTLwvNP_ZJHrfgn99pQCYOGFE3-nwpb8utFa8",
                "name": "Hogflix Demo App",
                "completed_snippet_onboarding": true,
                "ingested_event": true,
                "is_demo": true,
                "timezone": "Europe/Kiev"
            }
        ],
        "available_product_features": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]

    }"""

Python

import json
for i in range(1000000):
    r = json.loads(j)
    r['available_product_features'] = [x * 2 for x in r['available_product_features']]
    print(json.dumps(r))

JS

for (let i = 0; i < 1000000; i++) {
    let r = JSON.parse(j)
    r.available_product_features = r.available_product_features.map((x) => x * 2)
    process.stdout.write(JSON.stringify(r) + '\n')
}

Rust

use serde_json::{from_str, to_string};
use serde::{Deserialize, Serialize};


#[derive(Serialize, Deserialize)]
struct Team {
    id: u64,
    uuid: String,
    organization: String,
    api_token: String,
    name: String,
    completed_snippet_onboarding: bool,
    ingested_event: bool,
    is_demo: bool,
    timezone: String
}

#[derive(Serialize, Deserialize)]
struct Person {
    id: String,
    name: String,
    created_at: String,
    updated_at: String,
    membership_level: u64,
    plugins_access_level: u64,
    teams: Vec<Team>,
    available_product_features: Vec<u64>,
}

fn main() {
    // JSON string
    let j = r#"{
        "id": "0178a3ab-d163-0000-4b55-bceadebb03fa",
        "name": "Hogflix Movies",
        "created_at": "2021-04-05T20:14:09.763753Z",
        "updated_at": "2021-04-05T20:14:25.443181Z",
        "membership_level": 15,
        "plugins_access_level": 9,
        "teams": [
            {
                "id": 1,
                "uuid": "0178a3ab-d1e5-0000-c5ca-da746c68f506",
                "organization": "0178a3ab-d163-0000-4b55-bceadebb03fa",
                "api_token": "tJy-b6mTLwvNP_ZJHrfgn99pQCYOGFE3-nwpb8utFa8",
                "name": "Hogflix Demo App",
                "completed_snippet_onboarding": true,
                "ingested_event": true,
                "is_demo": true,
                "timezone": "Europe/Kiev"
            }
        ],
        "available_product_features": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]

    }"#;

    // Loop 1,000,000 times
    for _ in 0..1_000_000 {
        // Deserialize the JSON string into a Rust value
        let mut r: Person = from_str(j).unwrap();

        r.available_product_features = r.available_product_features.into_iter().map(|x| x * 2).collect();

        // Serialize the Rust value back into a JSON string
        let z = to_string(&r).unwrap();
        println!("{}", z);
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants