diff --git a/.github/actions/run-backend-tests/action.yml b/.github/actions/run-backend-tests/action.yml
index 34c0a34cebe..cd632a491c2 100644
--- a/.github/actions/run-backend-tests/action.yml
+++ b/.github/actions/run-backend-tests/action.yml
@@ -31,11 +31,13 @@ runs:
steps:
# Pre-tests
+ # Copies the fully versioned UDF xml file for use in CI testing
- name: Stop/Start stack with Docker Compose
shell: bash
run: |
export CLICKHOUSE_SERVER_IMAGE=${{ inputs.clickhouse-server-image }}
export DOCKER_REGISTRY_PREFIX="us-east1-docker.pkg.dev/posthog-301601/mirror/"
+ cp posthog/user_scripts/latest_user_defined_function.xml docker/clickhouse/user_defined_function.xml
docker compose -f docker-compose.dev.yml down
docker compose -f docker-compose.dev.yml up -d
diff --git a/.github/workflows/ci-backend.yml b/.github/workflows/ci-backend.yml
index fb15ed052a7..193bece8bd9 100644
--- a/.github/workflows/ci-backend.yml
+++ b/.github/workflows/ci-backend.yml
@@ -147,6 +147,20 @@ jobs:
run: |
npm run schema:build:python && git diff --exit-code
+ - uses: actions/checkout@v4
+ with:
+ repository: 'PostHog/posthog-cloud-infra'
+ path: 'posthog-cloud-infra'
+ token: ${{ secrets.POSTHOG_BOT_GITHUB_TOKEN }}
+
+ - name: Assert user_defined_function.xml is deployed to US
+ run: |
+ cmp posthog/user_scripts/latest_user_defined_function.xml posthog-cloud-infra/ansible/us/clickhouse/config/common/user_defined_function.xml
+
+ - name: Assert user_defined_function.xml is deployed to EU
+ run: |
+ cmp posthog/user_scripts/latest_user_defined_function.xml posthog-cloud-infra/ansible/eu/clickhouse/config/common/user_defined_function.xml
+
check-migrations:
needs: changes
if: needs.changes.outputs.backend == 'true'
diff --git a/posthog/cloud_utils.py b/posthog/cloud_utils.py
index 6e6aaeabf63..0af4834187e 100644
--- a/posthog/cloud_utils.py
+++ b/posthog/cloud_utils.py
@@ -1,3 +1,4 @@
+import os
from typing import TYPE_CHECKING, Any, Optional
from django.conf import settings
@@ -16,6 +17,10 @@ def is_cloud() -> bool:
return bool(settings.CLOUD_DEPLOYMENT)
+def is_ci() -> bool:
+ return os.environ.get("GITHUB_ACTIONS") is not None
+
+
def get_cached_instance_license() -> Optional["License"]:
"""Returns the first valid license and caches the value for the lifetime of the instance, as it is not expected to change.
If there is no valid license, it returns None.
diff --git a/posthog/datetime.py b/posthog/date_util.py
similarity index 100%
rename from posthog/datetime.py
rename to posthog/date_util.py
diff --git a/posthog/hogql/functions/mapping.py b/posthog/hogql/functions/mapping.py
index cf151a8f8ae..4d810875243 100644
--- a/posthog/hogql/functions/mapping.py
+++ b/posthog/hogql/functions/mapping.py
@@ -1,6 +1,8 @@
from dataclasses import dataclass
from itertools import chain
from typing import Optional
+
+from posthog.cloud_utils import is_cloud, is_ci
from posthog.hogql import ast
from posthog.hogql.ast import (
ArrayType,
@@ -834,15 +836,8 @@ HOGQL_CLICKHOUSE_FUNCTIONS: dict[str, HogQLFunctionMeta] = {
"leadInFrame": HogQLFunctionMeta("leadInFrame", 1, 1),
# table functions
"generateSeries": HogQLFunctionMeta("generate_series", 3, 3),
- ## UDFS
- "aggregate_funnel": HogQLFunctionMeta("aggregate_funnel", 6, 6, aggregate=False),
- "aggregate_funnel_array": HogQLFunctionMeta("aggregate_funnel_array", 6, 6, aggregate=False),
- "aggregate_funnel_cohort": HogQLFunctionMeta("aggregate_funnel_cohort", 6, 6, aggregate=False),
- "aggregate_funnel_trends": HogQLFunctionMeta("aggregate_funnel_trends", 7, 7, aggregate=False),
- "aggregate_funnel_array_trends": HogQLFunctionMeta("aggregate_funnel_array_trends", 7, 7, aggregate=False),
- "aggregate_funnel_cohort_trends": HogQLFunctionMeta("aggregate_funnel_cohort_trends", 7, 7, aggregate=False),
- "aggregate_funnel_test": HogQLFunctionMeta("aggregate_funnel_test", 6, 6, aggregate=False),
}
+
# Permitted HogQL aggregations
HOGQL_AGGREGATIONS: dict[str, HogQLFunctionMeta] = {
# Standard aggregate functions
@@ -1034,6 +1029,26 @@ HOGQL_POSTHOG_FUNCTIONS: dict[str, HogQLFunctionMeta] = {
"hogql_lookupOrganicMediumType": HogQLFunctionMeta("hogql_lookupOrganicMediumType", 1, 1),
}
+
+UDFS: dict[str, HogQLFunctionMeta] = {
+ "aggregate_funnel": HogQLFunctionMeta("aggregate_funnel", 6, 6, aggregate=False),
+ "aggregate_funnel_array": HogQLFunctionMeta("aggregate_funnel_array", 6, 6, aggregate=False),
+ "aggregate_funnel_cohort": HogQLFunctionMeta("aggregate_funnel_cohort", 6, 6, aggregate=False),
+ "aggregate_funnel_trends": HogQLFunctionMeta("aggregate_funnel_trends", 7, 7, aggregate=False),
+ "aggregate_funnel_array_trends": HogQLFunctionMeta("aggregate_funnel_array_trends", 7, 7, aggregate=False),
+ "aggregate_funnel_cohort_trends": HogQLFunctionMeta("aggregate_funnel_cohort_trends", 7, 7, aggregate=False),
+ "aggregate_funnel_test": HogQLFunctionMeta("aggregate_funnel_test", 6, 6, aggregate=False),
+}
+# We want CI to fail if there is a breaking change and the version hasn't been incremented
+if is_cloud() or is_ci():
+ from posthog.udf_versioner import augment_function_name
+
+ for v in UDFS.values():
+ v.clickhouse_name = augment_function_name(v.clickhouse_name)
+
+HOGQL_CLICKHOUSE_FUNCTIONS.update(UDFS)
+
+
ALL_EXPOSED_FUNCTION_NAMES = [
name for name in chain(HOGQL_CLICKHOUSE_FUNCTIONS.keys(), HOGQL_AGGREGATIONS.keys()) if not name.startswith("_")
]
diff --git a/posthog/hogql/test/test_query.py b/posthog/hogql/test/test_query.py
index a7ffeff936b..93847378720 100644
--- a/posthog/hogql/test/test_query.py
+++ b/posthog/hogql/test/test_query.py
@@ -1,3 +1,5 @@
+import datetime
+
import pytest
from uuid import UUID
@@ -6,7 +8,6 @@ from django.test import override_settings
from django.utils import timezone
from freezegun import freeze_time
-from posthog import datetime
from posthog.hogql import ast
from posthog.hogql.errors import QueryError
from posthog.hogql.property import property_to_expr
diff --git a/posthog/hogql_queries/insights/funnels/test/__snapshots__/test_funnel_strict_udf.ambr b/posthog/hogql_queries/insights/funnels/test/__snapshots__/test_funnel_strict_udf.ambr
index 385e5398030..76e50b6d6d9 100644
--- a/posthog/hogql_queries/insights/funnels/test/__snapshots__/test_funnel_strict_udf.ambr
+++ b/posthog/hogql_queries/insights/funnels/test/__snapshots__/test_funnel_strict_udf.ambr
@@ -1141,7 +1141,7 @@
rowNumberInBlock() AS row_number,
if(ifNull(less(row_number, 25), 0), breakdown, ['Other']) AS final_prop
FROM
- (SELECT arrayJoin(aggregate_funnel_array(2, 1209600, 'first_touch', 'strict', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1)])))))) AS af_tuple,
+ (SELECT arrayJoin(aggregate_funnel_array_v0(2, 1209600, 'first_touch', 'strict', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1)])))))) AS af_tuple,
af_tuple.1 AS af,
af_tuple.2 AS breakdown,
af_tuple.3 AS timings
@@ -1209,7 +1209,7 @@
rowNumberInBlock() AS row_number,
if(ifNull(less(row_number, 25), 0), breakdown, ['Other']) AS final_prop
FROM
- (SELECT arrayJoin(aggregate_funnel_array(2, 1209600, 'step_1', 'strict', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1)])))))) AS af_tuple,
+ (SELECT arrayJoin(aggregate_funnel_array_v0(2, 1209600, 'step_1', 'strict', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1)])))))) AS af_tuple,
af_tuple.1 AS af,
af_tuple.2 AS breakdown,
af_tuple.3 AS timings
@@ -1284,7 +1284,7 @@
rowNumberInBlock() AS row_number,
if(ifNull(less(row_number, 25), 0), breakdown, ['Other']) AS final_prop
FROM
- (SELECT arrayJoin(aggregate_funnel_array(2, 1209600, 'first_touch', 'strict', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1)])))))) AS af_tuple,
+ (SELECT arrayJoin(aggregate_funnel_array_v0(2, 1209600, 'first_touch', 'strict', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1)])))))) AS af_tuple,
af_tuple.1 AS af,
af_tuple.2 AS breakdown,
af_tuple.3 AS timings
@@ -1357,7 +1357,7 @@
rowNumberInBlock() AS row_number,
if(ifNull(less(row_number, 25), 0), breakdown, 'Other') AS final_prop
FROM
- (SELECT arrayJoin(aggregate_funnel(3, 1209600, 'first_touch', 'strict', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
+ (SELECT arrayJoin(aggregate_funnel_v0(3, 1209600, 'first_touch', 'strict', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
af_tuple.1 AS af,
af_tuple.2 AS breakdown,
af_tuple.3 AS timings
@@ -1437,7 +1437,7 @@
rowNumberInBlock() AS row_number,
if(ifNull(less(row_number, 25), 0), breakdown, 'Other') AS final_prop
FROM
- (SELECT arrayJoin(aggregate_funnel(3, 1209600, 'first_touch', 'strict', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
+ (SELECT arrayJoin(aggregate_funnel_v0(3, 1209600, 'first_touch', 'strict', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
af_tuple.1 AS af,
af_tuple.2 AS breakdown,
af_tuple.3 AS timings
@@ -1517,7 +1517,7 @@
rowNumberInBlock() AS row_number,
if(ifNull(less(row_number, 25), 0), breakdown, 'Other') AS final_prop
FROM
- (SELECT arrayJoin(aggregate_funnel(3, 1209600, 'first_touch', 'strict', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
+ (SELECT arrayJoin(aggregate_funnel_v0(3, 1209600, 'first_touch', 'strict', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
af_tuple.1 AS af,
af_tuple.2 AS breakdown,
af_tuple.3 AS timings
diff --git a/posthog/hogql_queries/insights/funnels/test/__snapshots__/test_funnel_trends_udf.ambr b/posthog/hogql_queries/insights/funnels/test/__snapshots__/test_funnel_trends_udf.ambr
index 376a6078fb7..8467ca0eff9 100644
--- a/posthog/hogql_queries/insights/funnels/test/__snapshots__/test_funnel_trends_udf.ambr
+++ b/posthog/hogql_queries/insights/funnels/test/__snapshots__/test_funnel_trends_udf.ambr
@@ -7,7 +7,7 @@
if(ifNull(greater(reached_from_step_count, 0), 0), round(multiply(divide(reached_to_step_count, reached_from_step_count), 100), 2), 0) AS conversion_rate,
data.breakdown AS prop
FROM
- (SELECT arrayJoin(aggregate_funnel_array_trends(0, 3, 1209600, 'first_touch', 'ordered', [[]], arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), toStartOfDay(timestamp), [], arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
+ (SELECT arrayJoin(aggregate_funnel_array_trends_v0(0, 3, 1209600, 'first_touch', 'ordered', [[]], arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), toStartOfDay(timestamp), [], arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
toTimeZone(af_tuple.1, 'UTC') AS entrance_period_start,
af_tuple.2 AS success_bool,
af_tuple.3 AS breakdown
@@ -58,7 +58,7 @@
if(ifNull(greater(reached_from_step_count, 0), 0), round(multiply(divide(reached_to_step_count, reached_from_step_count), 100), 2), 0) AS conversion_rate,
data.breakdown AS prop
FROM
- (SELECT arrayJoin(aggregate_funnel_array_trends(0, 3, 1209600, 'first_touch', 'ordered', [[]], arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), toStartOfDay(timestamp), [], arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
+ (SELECT arrayJoin(aggregate_funnel_array_trends_v0(0, 3, 1209600, 'first_touch', 'ordered', [[]], arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), toStartOfDay(timestamp), [], arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
toTimeZone(af_tuple.1, 'US/Pacific') AS entrance_period_start,
af_tuple.2 AS success_bool,
af_tuple.3 AS breakdown
@@ -109,7 +109,7 @@
if(ifNull(greater(reached_from_step_count, 0), 0), round(multiply(divide(reached_to_step_count, reached_from_step_count), 100), 2), 0) AS conversion_rate,
data.breakdown AS prop
FROM
- (SELECT arrayJoin(aggregate_funnel_array_trends(0, 3, 1209600, 'first_touch', 'ordered', [[]], arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), toStartOfWeek(timestamp, 0), [], arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
+ (SELECT arrayJoin(aggregate_funnel_array_trends_v0(0, 3, 1209600, 'first_touch', 'ordered', [[]], arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), toStartOfWeek(timestamp, 0), [], arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
toTimeZone(af_tuple.1, 'UTC') AS entrance_period_start,
af_tuple.2 AS success_bool,
af_tuple.3 AS breakdown
diff --git a/posthog/hogql_queries/insights/funnels/test/__snapshots__/test_funnel_udf.ambr b/posthog/hogql_queries/insights/funnels/test/__snapshots__/test_funnel_udf.ambr
index 10d32220dde..c1afb19fb51 100644
--- a/posthog/hogql_queries/insights/funnels/test/__snapshots__/test_funnel_udf.ambr
+++ b/posthog/hogql_queries/insights/funnels/test/__snapshots__/test_funnel_udf.ambr
@@ -19,7 +19,7 @@
rowNumberInBlock() AS row_number,
breakdown AS final_prop
FROM
- (SELECT arrayJoin(aggregate_funnel_array(3, 15, 'first_touch', 'ordered', [[]], arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), [], arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
+ (SELECT arrayJoin(aggregate_funnel_array_v0(3, 15, 'first_touch', 'ordered', [[]], arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), [], arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
af_tuple.1 AS af,
af_tuple.2 AS breakdown,
af_tuple.3 AS timings
@@ -195,7 +195,7 @@
rowNumberInBlock() AS row_number,
breakdown AS final_prop
FROM
- (SELECT arrayJoin(aggregate_funnel_array(3, 1209600, 'first_touch', 'ordered', [[]], arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), [], arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
+ (SELECT arrayJoin(aggregate_funnel_array_v0(3, 1209600, 'first_touch', 'ordered', [[]], arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), [], arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
af_tuple.1 AS af,
af_tuple.2 AS breakdown,
af_tuple.3 AS timings
@@ -270,7 +270,7 @@
rowNumberInBlock() AS row_number,
breakdown AS final_prop
FROM
- (SELECT arrayJoin(aggregate_funnel_array(2, 1209600, 'first_touch', 'ordered', [[]], arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), [], arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1)])))))) AS af_tuple,
+ (SELECT arrayJoin(aggregate_funnel_array_v0(2, 1209600, 'first_touch', 'ordered', [[]], arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), [], arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1)])))))) AS af_tuple,
af_tuple.1 AS af,
af_tuple.2 AS breakdown,
af_tuple.3 AS timings
@@ -330,7 +330,7 @@
rowNumberInBlock() AS row_number,
breakdown AS final_prop
FROM
- (SELECT arrayJoin(aggregate_funnel_array(3, 1209600, 'first_touch', 'ordered', [[]], arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), [], arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
+ (SELECT arrayJoin(aggregate_funnel_array_v0(3, 1209600, 'first_touch', 'ordered', [[]], arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), [], arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
af_tuple.1 AS af,
af_tuple.2 AS breakdown,
af_tuple.3 AS timings
@@ -759,7 +759,7 @@
rowNumberInBlock() AS row_number,
breakdown AS final_prop
FROM
- (SELECT arrayJoin(aggregate_funnel_array(2, 1209600, 'first_touch', 'ordered', [[]], arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), [], arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1)])))))) AS af_tuple,
+ (SELECT arrayJoin(aggregate_funnel_array_v0(2, 1209600, 'first_touch', 'ordered', [[]], arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), [], arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1)])))))) AS af_tuple,
af_tuple.1 AS af,
af_tuple.2 AS breakdown,
af_tuple.3 AS timings
@@ -814,7 +814,7 @@
rowNumberInBlock() AS row_number,
breakdown AS final_prop
FROM
- (SELECT arrayJoin(aggregate_funnel_array(2, 1209600, 'first_touch', 'ordered', [[]], arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), [], arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1)])))))) AS af_tuple,
+ (SELECT arrayJoin(aggregate_funnel_array_v0(2, 1209600, 'first_touch', 'ordered', [[]], arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), [], arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1)])))))) AS af_tuple,
af_tuple.1 AS af,
af_tuple.2 AS breakdown,
af_tuple.3 AS timings
@@ -866,7 +866,7 @@
rowNumberInBlock() AS row_number,
if(ifNull(less(row_number, 25), 0), breakdown, ['Other']) AS final_prop
FROM
- (SELECT arrayJoin(aggregate_funnel_array(2, 1209600, 'first_touch', 'ordered', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1)])))))) AS af_tuple,
+ (SELECT arrayJoin(aggregate_funnel_array_v0(2, 1209600, 'first_touch', 'ordered', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1)])))))) AS af_tuple,
af_tuple.1 AS af,
af_tuple.2 AS breakdown,
af_tuple.3 AS timings
@@ -934,7 +934,7 @@
rowNumberInBlock() AS row_number,
if(ifNull(less(row_number, 25), 0), breakdown, ['Other']) AS final_prop
FROM
- (SELECT arrayJoin(aggregate_funnel_array(2, 1209600, 'step_1', 'ordered', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1)])))))) AS af_tuple,
+ (SELECT arrayJoin(aggregate_funnel_array_v0(2, 1209600, 'step_1', 'ordered', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1)])))))) AS af_tuple,
af_tuple.1 AS af,
af_tuple.2 AS breakdown,
af_tuple.3 AS timings
@@ -1009,7 +1009,7 @@
rowNumberInBlock() AS row_number,
if(ifNull(less(row_number, 25), 0), breakdown, ['Other']) AS final_prop
FROM
- (SELECT arrayJoin(aggregate_funnel_array(2, 1209600, 'first_touch', 'ordered', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1)])))))) AS af_tuple,
+ (SELECT arrayJoin(aggregate_funnel_array_v0(2, 1209600, 'first_touch', 'ordered', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1)])))))) AS af_tuple,
af_tuple.1 AS af,
af_tuple.2 AS breakdown,
af_tuple.3 AS timings
@@ -1082,7 +1082,7 @@
rowNumberInBlock() AS row_number,
if(ifNull(less(row_number, 25), 0), breakdown, 'Other') AS final_prop
FROM
- (SELECT arrayJoin(aggregate_funnel(3, 1209600, 'first_touch', 'ordered', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
+ (SELECT arrayJoin(aggregate_funnel_v0(3, 1209600, 'first_touch', 'ordered', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
af_tuple.1 AS af,
af_tuple.2 AS breakdown,
af_tuple.3 AS timings
@@ -1162,7 +1162,7 @@
rowNumberInBlock() AS row_number,
if(ifNull(less(row_number, 25), 0), breakdown, 'Other') AS final_prop
FROM
- (SELECT arrayJoin(aggregate_funnel(3, 1209600, 'first_touch', 'ordered', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
+ (SELECT arrayJoin(aggregate_funnel_v0(3, 1209600, 'first_touch', 'ordered', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
af_tuple.1 AS af,
af_tuple.2 AS breakdown,
af_tuple.3 AS timings
@@ -1242,7 +1242,7 @@
rowNumberInBlock() AS row_number,
if(ifNull(less(row_number, 25), 0), breakdown, 'Other') AS final_prop
FROM
- (SELECT arrayJoin(aggregate_funnel(3, 1209600, 'first_touch', 'ordered', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
+ (SELECT arrayJoin(aggregate_funnel_v0(3, 1209600, 'first_touch', 'ordered', groupUniqArray(prop), arraySort(t -> t.1, groupArray(tuple(accurateCastOrNull(timestamp, 'Float64'), prop, arrayFilter(x -> ifNull(notEquals(x, 0), 1), [multiply(1, step_0), multiply(2, step_1), multiply(3, step_2)])))))) AS af_tuple,
af_tuple.1 AS af,
af_tuple.2 AS breakdown,
af_tuple.3 AS timings
diff --git a/posthog/models/utils.py b/posthog/models/utils.py
index 3dc7e83940b..f9d4eca332d 100644
--- a/posthog/models/utils.py
+++ b/posthog/models/utils.py
@@ -1,3 +1,4 @@
+import datetime
import secrets
import string
import uuid
@@ -12,7 +13,6 @@ from django.db.backends.ddl_references import Statement
from django.db.models.constraints import BaseConstraint
from django.utils.text import slugify
-from posthog import datetime
from posthog.constants import MAX_SLUG_LENGTH
if TYPE_CHECKING:
diff --git a/posthog/test/test_datetime.py b/posthog/test/test_datetime.py
index 9365cffb085..3a0807b3889 100644
--- a/posthog/test/test_datetime.py
+++ b/posthog/test/test_datetime.py
@@ -1,6 +1,6 @@
from datetime import datetime, UTC
-from posthog.datetime import (
+from posthog.date_util import (
start_of_hour,
start_of_day,
end_of_day,
diff --git a/posthog/udf_versioner.py b/posthog/udf_versioner.py
new file mode 100644
index 00000000000..a0d96a547b9
--- /dev/null
+++ b/posthog/udf_versioner.py
@@ -0,0 +1,71 @@
+import argparse
+import os
+import shutil
+import glob
+import datetime
+import xml.etree.ElementTree as ET
+from xml import etree
+
+# For revertible cloud deploys:
+# 1. Develop using the python files at the top level of `user_scripts`, with schema defined in `docker/clickhouse/user_defined_function.xml`
+# 2. If you're made breaking changes to UDFs (likely involving changing type definitions), when ready to deploy, increment the version below and run this file
+# 3. Copy the `user_defined_function.xml` file in the newly created version folder (e.g. `user_scripts/v4/user_defined_function.xml`) to the `posthog-cloud-infra` repo and deploy it
+# 4. After that deploy goes out, it is safe to land and deploy the changes to the `posthog` repo
+# If deploys aren't seamless, look into moving the action that copies the `user_scripts` folder to the clickhouse cluster earlier in the deploy process
+UDF_VERSION = 0 # Last modified by: @aspicer, 2024-09-20
+
+CLICKHOUSE_XML_FILENAME = "user_defined_function.xml"
+ACTIVE_XML_CONFIG = "../../docker/clickhouse/user_defined_function.xml"
+
+format_version_string = lambda version: f"v{version}"
+VERSION_STR = format_version_string(UDF_VERSION)
+LAST_VERSION_STR = format_version_string(UDF_VERSION - 1)
+
+augment_function_name = lambda name: f"{name}_{VERSION_STR}"
+
+
+def prepare_version(force=False):
+ os.chdir(os.path.join(os.path.dirname(os.path.abspath(__file__)), "user_scripts"))
+ if args.force:
+ shutil.rmtree(VERSION_STR)
+ try:
+ os.mkdir(VERSION_STR)
+ except FileExistsError:
+ if not args.force:
+ raise FileExistsError(
+ f"A directory already exists for this version at posthog/user_scripts/{VERSION_STR}. Did you forget to increment the version? If not, delete the folder and run this again, or run this script with a -f"
+ )
+ for file in glob.glob("*.py"):
+ shutil.copy(file, VERSION_STR)
+
+ base_xml = ET.parse(ACTIVE_XML_CONFIG)
+
+ if os.path.exists(LAST_VERSION_STR):
+ last_version_xml = ET.parse(os.path.join(LAST_VERSION_STR, CLICKHOUSE_XML_FILENAME))
+ else:
+ last_version_xml = ET.parse(ACTIVE_XML_CONFIG)
+
+ last_version_root = last_version_xml.getroot()
+ # We want to update the name and the command to include the version, and add it to last version
+ for function in list(base_xml.getroot()):
+ name = function.find("name")
+ name.text = augment_function_name(name.text)
+ command = function.find("command")
+ command.text = f"{VERSION_STR}/{command.text}"
+ last_version_root.append(function)
+
+ comment = etree.ElementTree.Comment(
+ f" Version: {VERSION_STR}. Generated at: {datetime.datetime.now(datetime.UTC).isoformat()}\nThis file is autogenerated by udf_versioner.py. Do not edit this, only edit the version at docker/clickhouse/user_defined_function.xml"
+ )
+ last_version_root.insert(0, comment)
+
+ last_version_xml.write(os.path.join(VERSION_STR, CLICKHOUSE_XML_FILENAME))
+ last_version_xml.write(f"latest_{CLICKHOUSE_XML_FILENAME}")
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(description="Create a new version for UDF deployment.")
+ parser.add_argument("-f", "--force", action="store_true", help="override existing directories")
+ args = parser.parse_args()
+
+ prepare_version(args.force)
diff --git a/posthog/user_scripts/latest_user_defined_function.xml b/posthog/user_scripts/latest_user_defined_function.xml
new file mode 100644
index 00000000000..6f8f787da15
--- /dev/null
+++ b/posthog/user_scripts/latest_user_defined_function.xml
@@ -0,0 +1,573 @@
+
+
+ executable
+ aggregate_funnel
+ Array(Tuple(Int8, Nullable(String), Array(Float64)))
+ result
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Nullable(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ aggregate_funnel.py
+
+
+
+ executable
+ aggregate_funnel_cohort
+ Array(Tuple(Int8, UInt64, Array(Float64)))
+ result
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(UInt64)
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), UInt64, Array(Int8)))
+ value
+
+ JSONEachRow
+ aggregate_funnel_cohort.py
+
+
+
+ executable
+ aggregate_funnel_array
+ Array(Tuple(Int8, Array(String), Array(Float64)))
+ result
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Array(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Array(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ aggregate_funnel_array.py
+
+
+
+ executable
+ aggregate_funnel_test
+ String
+ result
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Array(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ aggregate_funnel_test.py
+
+
+
+ executable
+ aggregate_funnel_trends
+ Array(Tuple(DateTime, Int8, Nullable(String)))
+ result
+
+ UInt8
+ from_step
+
+
+ UInt8
+ num_steps
+
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Nullable(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(DateTime), Nullable(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ aggregate_funnel_trends.py
+
+
+
+ executable
+ aggregate_funnel_array_trends
+
+ Array(Tuple(DateTime, Int8, Array(String)))
+ result
+
+ UInt8
+ from_step
+
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Array(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(DateTime), Array(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ aggregate_funnel_array_trends.py
+
+
+
+ executable
+ aggregate_funnel_cohort_trends
+
+ Array(Tuple(DateTime, Int8, UInt64))
+ result
+
+ UInt8
+ from_step
+
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(UInt64)
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(DateTime), UInt64, Array(Int8)))
+ value
+
+ JSONEachRow
+ aggregate_funnel_cohort_trends.py
+
+
+
+ executable
+ aggregate_funnel_array_trends_test
+ String
+ result
+
+ UInt8
+ from_step
+
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Array(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(DateTime), Array(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ aggregate_funnel_array_trends_test.py
+
+
+ executable
+ aggregate_funnel_v0
+ Array(Tuple(Int8, Nullable(String), Array(Float64)))
+ result
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Nullable(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ v0/aggregate_funnel.py
+
+
+
+ executable
+ aggregate_funnel_cohort_v0
+ Array(Tuple(Int8, UInt64, Array(Float64)))
+ result
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(UInt64)
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), UInt64, Array(Int8)))
+ value
+
+ JSONEachRow
+ v0/aggregate_funnel_cohort.py
+
+
+
+ executable
+ aggregate_funnel_array_v0
+ Array(Tuple(Int8, Array(String), Array(Float64)))
+ result
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Array(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Array(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ v0/aggregate_funnel_array.py
+
+
+
+ executable
+ aggregate_funnel_test_v0
+ String
+ result
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Array(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ v0/aggregate_funnel_test.py
+
+
+
+ executable
+ aggregate_funnel_trends_v0
+ Array(Tuple(DateTime, Int8, Nullable(String)))
+ result
+
+ UInt8
+ from_step
+
+
+ UInt8
+ num_steps
+
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Nullable(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(DateTime), Nullable(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ v0/aggregate_funnel_trends.py
+
+
+
+ executable
+ aggregate_funnel_array_trends_v0
+
+ Array(Tuple(DateTime, Int8, Array(String)))
+ result
+
+ UInt8
+ from_step
+
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Array(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(DateTime), Array(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ v0/aggregate_funnel_array_trends.py
+
+
+
+ executable
+ aggregate_funnel_cohort_trends_v0
+
+ Array(Tuple(DateTime, Int8, UInt64))
+ result
+
+ UInt8
+ from_step
+
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(UInt64)
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(DateTime), UInt64, Array(Int8)))
+ value
+
+ JSONEachRow
+ v0/aggregate_funnel_cohort_trends.py
+
+
+
+ executable
+ aggregate_funnel_array_trends_test_v0
+ String
+ result
+
+ UInt8
+ from_step
+
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Array(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(DateTime), Array(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ v0/aggregate_funnel_array_trends_test.py
+
+
\ No newline at end of file
diff --git a/posthog/user_scripts/v0/aggregate_funnel.py b/posthog/user_scripts/v0/aggregate_funnel.py
new file mode 100755
index 00000000000..162918a8196
--- /dev/null
+++ b/posthog/user_scripts/v0/aggregate_funnel.py
@@ -0,0 +1,144 @@
+#!/usr/bin/python3
+import json
+import sys
+from dataclasses import dataclass, replace
+from itertools import groupby, permutations
+from typing import Any, cast
+from collections.abc import Sequence
+
+
+def parse_args(line):
+ args = json.loads(line)
+ return [
+ int(args["num_steps"]),
+ int(args["conversion_window_limit"]),
+ str(args["breakdown_attribution_type"]),
+ str(args["funnel_order_type"]),
+ args["prop_vals"], # Array(Array(String))
+ args["value"], # Array(Tuple(Nullable(Float64), Nullable(DateTime), Array(String), Array(Int8)))
+ ]
+
+
+@dataclass(frozen=True)
+class EnteredTimestamp:
+ timestamp: Any
+ timings: Any
+
+
+# each one can be multiple steps here
+# it only matters when they entered the funnel - you can propagate the time from the previous step when you update
+# This function is defined for Clickhouse in user_defined_functions.xml along with types
+# num_steps is the total number of steps in the funnel
+# conversion_window_limit is in seconds
+# events is a array of tuples of (timestamp, breakdown, [steps])
+# steps is an array of integers which represent the steps that this event qualifies for. it looks like [1,3,5,6].
+# negative integers represent an exclusion on that step. each event is either all exclusions or all steps.
+def calculate_funnel_from_user_events(
+ num_steps: int,
+ conversion_window_limit_seconds: int,
+ breakdown_attribution_type: str,
+ funnel_order_type: str,
+ prop_vals: list[Any],
+ events: Sequence[tuple[float, list[str] | int | str, list[int]]],
+):
+ default_entered_timestamp = EnteredTimestamp(0, [])
+ max_step = [0, default_entered_timestamp]
+ # If the attribution mode is a breakdown step, set this to the integer that represents that step
+ breakdown_step = int(breakdown_attribution_type[5:]) if breakdown_attribution_type.startswith("step_") else None
+
+ # This function returns an Array. We build up an array of strings to return here.
+ results: list[tuple[int, Any, list[float]]] = []
+
+ # Process an event. If this hits an exclusion, return False, else return True.
+ def process_event(timestamp, breakdown, steps, *, entered_timestamp, prop_val) -> bool:
+ # iterate the steps in reverse so we don't count this event multiple times
+ for step in reversed(steps):
+ exclusion = False
+ if step < 0:
+ exclusion = True
+ step = -step
+
+ in_match_window = timestamp - entered_timestamp[step - 1].timestamp <= conversion_window_limit_seconds
+ already_reached_this_step_with_same_entered_timestamp = (
+ entered_timestamp[step].timestamp == entered_timestamp[step - 1].timestamp
+ )
+
+ if in_match_window and not already_reached_this_step_with_same_entered_timestamp:
+ if exclusion:
+ results.append((-1, prop_val, []))
+ return False
+ is_unmatched_step_attribution = (
+ breakdown_step is not None and step == breakdown_step - 1 and prop_val != breakdown
+ )
+ if not is_unmatched_step_attribution:
+ entered_timestamp[step] = replace(
+ entered_timestamp[step - 1], timings=[*entered_timestamp[step - 1].timings, timestamp]
+ )
+ if step > max_step[0]:
+ max_step[:] = (step, entered_timestamp[step])
+
+ if funnel_order_type == "strict":
+ for i in range(len(entered_timestamp)):
+ if i not in steps:
+ entered_timestamp[i] = default_entered_timestamp
+
+ return True
+
+ # We call this for each possible breakdown value.
+ def loop_prop_val(prop_val):
+ # an array of when the user entered the funnel
+ # entered_timestamp = [(0, "", [])] * (num_steps + 1)
+ max_step[:] = [0, default_entered_timestamp]
+ entered_timestamp: list[EnteredTimestamp] = [default_entered_timestamp] * (num_steps + 1)
+
+ def add_max_step():
+ i = cast(int, max_step[0])
+ final = cast(EnteredTimestamp, max_step[1])
+ results.append((i - 1, prop_val, [final.timings[i] - final.timings[i - 1] for i in range(1, i)]))
+
+ filtered_events = (
+ ((timestamp, breakdown, steps) for (timestamp, breakdown, steps) in events if breakdown == prop_val)
+ if breakdown_attribution_type == "all_events"
+ else events
+ )
+ for timestamp, events_with_same_timestamp_iterator in groupby(filtered_events, key=lambda x: x[0]):
+ events_with_same_timestamp = tuple(events_with_same_timestamp_iterator)
+ entered_timestamp[0] = EnteredTimestamp(timestamp, [])
+ if len(events_with_same_timestamp) == 1:
+ if not process_event(
+ *events_with_same_timestamp[0], entered_timestamp=entered_timestamp, prop_val=prop_val
+ ):
+ return
+ else:
+ # This is a special case for events with the same timestamp
+ # We play all of their permutations and most generously take the ones that advanced the furthest
+ # This has quite bad performance, and can probably be optimized through clever but annoying logic
+ # but shouldn't be hit too often
+ entered_timestamps = []
+ for events_group_perm in permutations(events_with_same_timestamp):
+ entered_timestamps.append(list(entered_timestamp))
+ for event in events_group_perm:
+ if not process_event(*event, entered_timestamp=entered_timestamps[-1], prop_val=prop_val):
+ # If any of the permutations hits an exclusion, we exclude this user.
+ # This isn't an important implementation detail and we could do something smarter here.
+ return
+ for i in range(len(entered_timestamp)):
+ entered_timestamp[i] = max((x[i] for x in entered_timestamps), key=lambda x: x.timestamp)
+
+ # If we have hit the goal, we can terminate early
+ if entered_timestamp[num_steps].timestamp > 0:
+ add_max_step()
+ return
+
+ # Find the furthest step we have made it to and print it
+ add_max_step()
+ return
+
+ [loop_prop_val(prop_val) for prop_val in prop_vals]
+ print(json.dumps({"result": results}), end="\n") # noqa: T201
+
+
+if __name__ == "__main__":
+ for line in sys.stdin:
+ calculate_funnel_from_user_events(*parse_args(line))
+ sys.stdout.flush()
diff --git a/posthog/user_scripts/v0/aggregate_funnel_array.py b/posthog/user_scripts/v0/aggregate_funnel_array.py
new file mode 100755
index 00000000000..17b053bb7d4
--- /dev/null
+++ b/posthog/user_scripts/v0/aggregate_funnel_array.py
@@ -0,0 +1,9 @@
+#!/usr/bin/python3
+import sys
+
+from aggregate_funnel import parse_args, calculate_funnel_from_user_events
+
+if __name__ == "__main__":
+ for line in sys.stdin:
+ calculate_funnel_from_user_events(*parse_args(line))
+ sys.stdout.flush()
diff --git a/posthog/user_scripts/v0/aggregate_funnel_array_trends.py b/posthog/user_scripts/v0/aggregate_funnel_array_trends.py
new file mode 100755
index 00000000000..15e93f54527
--- /dev/null
+++ b/posthog/user_scripts/v0/aggregate_funnel_array_trends.py
@@ -0,0 +1,9 @@
+#!/usr/bin/python3
+import sys
+
+from aggregate_funnel_trends import parse_args, calculate_funnel_trends_from_user_events
+
+if __name__ == "__main__":
+ for line in sys.stdin:
+ calculate_funnel_trends_from_user_events(*parse_args(line))
+ sys.stdout.flush()
diff --git a/posthog/user_scripts/v0/aggregate_funnel_array_trends_test.py b/posthog/user_scripts/v0/aggregate_funnel_array_trends_test.py
new file mode 100755
index 00000000000..44d3cc9b8f0
--- /dev/null
+++ b/posthog/user_scripts/v0/aggregate_funnel_array_trends_test.py
@@ -0,0 +1,13 @@
+#!/usr/bin/python3
+
+from aggregate_funnel_trends import calculate_funnel_trends_from_user_events, parse_args
+import sys
+import json
+
+if __name__ == "__main__":
+ for line in sys.stdin:
+ try:
+ calculate_funnel_trends_from_user_events(*parse_args(line))
+ except Exception as e:
+ print(json.dumps({"result": json.dumps(str(e))}), end="\n") # noqa: T201
+ sys.stdout.flush()
diff --git a/posthog/user_scripts/v0/aggregate_funnel_cohort.py b/posthog/user_scripts/v0/aggregate_funnel_cohort.py
new file mode 100755
index 00000000000..17b053bb7d4
--- /dev/null
+++ b/posthog/user_scripts/v0/aggregate_funnel_cohort.py
@@ -0,0 +1,9 @@
+#!/usr/bin/python3
+import sys
+
+from aggregate_funnel import parse_args, calculate_funnel_from_user_events
+
+if __name__ == "__main__":
+ for line in sys.stdin:
+ calculate_funnel_from_user_events(*parse_args(line))
+ sys.stdout.flush()
diff --git a/posthog/user_scripts/v0/aggregate_funnel_cohort_trends.py b/posthog/user_scripts/v0/aggregate_funnel_cohort_trends.py
new file mode 100755
index 00000000000..15e93f54527
--- /dev/null
+++ b/posthog/user_scripts/v0/aggregate_funnel_cohort_trends.py
@@ -0,0 +1,9 @@
+#!/usr/bin/python3
+import sys
+
+from aggregate_funnel_trends import parse_args, calculate_funnel_trends_from_user_events
+
+if __name__ == "__main__":
+ for line in sys.stdin:
+ calculate_funnel_trends_from_user_events(*parse_args(line))
+ sys.stdout.flush()
diff --git a/posthog/user_scripts/v0/aggregate_funnel_test.py b/posthog/user_scripts/v0/aggregate_funnel_test.py
new file mode 100755
index 00000000000..e0689b82af2
--- /dev/null
+++ b/posthog/user_scripts/v0/aggregate_funnel_test.py
@@ -0,0 +1,13 @@
+#!/usr/bin/python3
+import json
+
+from aggregate_funnel import calculate_funnel_from_user_events, parse_args
+import sys
+
+if __name__ == "__main__":
+ for line in sys.stdin:
+ try:
+ calculate_funnel_from_user_events(*parse_args(line))
+ except Exception as e:
+ print(json.dumps({"result": json.dumps(str(e))}), end="\n") # noqa: T201
+ sys.stdout.flush()
diff --git a/posthog/user_scripts/v0/aggregate_funnel_trends.py b/posthog/user_scripts/v0/aggregate_funnel_trends.py
new file mode 100755
index 00000000000..0aa96b7a19b
--- /dev/null
+++ b/posthog/user_scripts/v0/aggregate_funnel_trends.py
@@ -0,0 +1,131 @@
+#!/usr/bin/python3
+import sys
+from dataclasses import dataclass, replace
+from typing import Any
+from collections.abc import Sequence
+import json
+
+
+def parse_args(line):
+ args = json.loads(line)
+ return [
+ int(args["from_step"]),
+ int(args["num_steps"]),
+ int(args["conversion_window_limit"]),
+ str(args["breakdown_attribution_type"]),
+ str(args["funnel_order_type"]),
+ args["prop_vals"], # Array(Array(String))
+ args["value"], # Array(Tuple(Nullable(Float64), Nullable(DateTime), Array(String), Array(Int8)))
+ ]
+
+
+@dataclass(frozen=True)
+class EnteredTimestamp:
+ timestamp: Any
+ timings: Any
+
+
+# each one can be multiple steps here
+# it only matters when they entered the funnel - you can propagate the time from the previous step when you update
+# This function is defined for Clickhouse in user_defined_functions.xml along with types
+# num_steps is the total number of steps in the funnel
+# conversion_window_limit is in seconds
+# events is a array of tuples of (timestamp, breakdown, [steps])
+# steps is an array of integers which represent the steps that this event qualifies for. it looks like [1,3,5,6].
+# negative integers represent an exclusion on that step. each event is either all exclusions or all steps.
+def calculate_funnel_trends_from_user_events(
+ from_step: int,
+ num_steps: int,
+ conversion_window_limit_seconds: int,
+ breakdown_attribution_type: str,
+ funnel_order_type: str,
+ prop_vals: list[Any],
+ events: Sequence[tuple[float, int, list[str] | int | str, list[int]]],
+):
+ default_entered_timestamp = EnteredTimestamp(0, [])
+ # If the attribution mode is a breakdown step, set this to the integer that represents that step
+ breakdown_step = int(breakdown_attribution_type[5:]) if breakdown_attribution_type.startswith("step_") else None
+
+ # Results is a map of start intervals to success or failure. If an interval isn't here, it means the
+ # user didn't enter
+ results = {}
+
+ # We call this for each possible breakdown value.
+ def loop_prop_val(prop_val):
+ # we need to track every distinct entry into the funnel through to the end
+ filtered_events = (
+ (
+ (timestamp, interval_start, breakdown, steps)
+ for (timestamp, interval_start, breakdown, steps) in events
+ if breakdown == prop_val
+ )
+ if breakdown_attribution_type == "all_events"
+ else events
+ )
+ list_of_entered_timestamps = []
+
+ for timestamp, interval_start, breakdown, steps in filtered_events:
+ for step in reversed(steps):
+ exclusion = False
+ if step < 0:
+ exclusion = True
+ step = -step
+ # Special code to handle the first step
+ # Potential Optimization: we could skip tracking here if the user has already completed the funnel for this interval
+ if step == 1:
+ entered_timestamp = [default_entered_timestamp] * (num_steps + 1)
+ # Set the interval start at 0, which is what we want to return if this works.
+ # For strict funnels, we need to track if the "from_step" has been hit
+ # Abuse the timings field on the 0th index entered_timestamp to have the elt True if we have
+ entered_timestamp[0] = EnteredTimestamp(interval_start, [True] if from_step == 0 else [])
+ entered_timestamp[1] = EnteredTimestamp(timestamp, [timestamp])
+ list_of_entered_timestamps.append(entered_timestamp)
+ else:
+ for entered_timestamp in list_of_entered_timestamps[:]:
+ in_match_window = (
+ timestamp - entered_timestamp[step - 1].timestamp <= conversion_window_limit_seconds
+ )
+ already_reached_this_step_with_same_entered_timestamp = (
+ entered_timestamp[step].timestamp == entered_timestamp[step - 1].timestamp
+ )
+ if in_match_window and not already_reached_this_step_with_same_entered_timestamp:
+ if exclusion:
+ # this is a complete failure, exclude this person, don't print anything, don't count
+ return False
+ is_unmatched_step_attribution = (
+ breakdown_step is not None and step == breakdown_step - 1 and prop_val != breakdown
+ )
+ if not is_unmatched_step_attribution:
+ entered_timestamp[step] = replace(
+ entered_timestamp[step - 1],
+ timings=[*entered_timestamp[step - 1].timings, timestamp],
+ )
+ # check if we have hit the goal. if we have, remove it from the list and add it to the successful_timestamps
+ if entered_timestamp[num_steps].timestamp > 0:
+ results[entered_timestamp[0].timestamp] = (1, prop_val)
+ list_of_entered_timestamps.remove(entered_timestamp)
+ # If we have hit the from_step threshold, record it (abuse the timings field)
+ elif step == from_step + 1:
+ entered_timestamp[0].timings.append(True)
+
+ # At the end of the event, clear all steps that weren't done by that event
+ if funnel_order_type == "strict":
+ for entered_timestamp in list_of_entered_timestamps[:]:
+ for i in range(1, len(entered_timestamp)):
+ if i not in steps:
+ entered_timestamp[i] = default_entered_timestamp
+
+ # At this point, everything left in entered_timestamps is a failure, if it has made it to from_step
+ for entered_timestamp in list_of_entered_timestamps:
+ if entered_timestamp[0].timestamp not in results and len(entered_timestamp[0].timings) > 0:
+ results[entered_timestamp[0].timestamp] = (-1, prop_val)
+
+ [loop_prop_val(prop_val) for prop_val in prop_vals]
+ result = [(interval_start, success_bool, prop_val) for interval_start, (success_bool, prop_val) in results.items()]
+ print(json.dumps({"result": result}), end="\n") # noqa: T201
+
+
+if __name__ == "__main__":
+ for line in sys.stdin:
+ calculate_funnel_trends_from_user_events(*parse_args(line))
+ sys.stdout.flush()
diff --git a/posthog/user_scripts/v0/user_defined_function.xml b/posthog/user_scripts/v0/user_defined_function.xml
new file mode 100644
index 00000000000..6f8f787da15
--- /dev/null
+++ b/posthog/user_scripts/v0/user_defined_function.xml
@@ -0,0 +1,573 @@
+
+
+ executable
+ aggregate_funnel
+ Array(Tuple(Int8, Nullable(String), Array(Float64)))
+ result
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Nullable(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ aggregate_funnel.py
+
+
+
+ executable
+ aggregate_funnel_cohort
+ Array(Tuple(Int8, UInt64, Array(Float64)))
+ result
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(UInt64)
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), UInt64, Array(Int8)))
+ value
+
+ JSONEachRow
+ aggregate_funnel_cohort.py
+
+
+
+ executable
+ aggregate_funnel_array
+ Array(Tuple(Int8, Array(String), Array(Float64)))
+ result
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Array(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Array(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ aggregate_funnel_array.py
+
+
+
+ executable
+ aggregate_funnel_test
+ String
+ result
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Array(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ aggregate_funnel_test.py
+
+
+
+ executable
+ aggregate_funnel_trends
+ Array(Tuple(DateTime, Int8, Nullable(String)))
+ result
+
+ UInt8
+ from_step
+
+
+ UInt8
+ num_steps
+
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Nullable(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(DateTime), Nullable(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ aggregate_funnel_trends.py
+
+
+
+ executable
+ aggregate_funnel_array_trends
+
+ Array(Tuple(DateTime, Int8, Array(String)))
+ result
+
+ UInt8
+ from_step
+
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Array(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(DateTime), Array(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ aggregate_funnel_array_trends.py
+
+
+
+ executable
+ aggregate_funnel_cohort_trends
+
+ Array(Tuple(DateTime, Int8, UInt64))
+ result
+
+ UInt8
+ from_step
+
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(UInt64)
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(DateTime), UInt64, Array(Int8)))
+ value
+
+ JSONEachRow
+ aggregate_funnel_cohort_trends.py
+
+
+
+ executable
+ aggregate_funnel_array_trends_test
+ String
+ result
+
+ UInt8
+ from_step
+
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Array(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(DateTime), Array(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ aggregate_funnel_array_trends_test.py
+
+
+ executable
+ aggregate_funnel_v0
+ Array(Tuple(Int8, Nullable(String), Array(Float64)))
+ result
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Nullable(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ v0/aggregate_funnel.py
+
+
+
+ executable
+ aggregate_funnel_cohort_v0
+ Array(Tuple(Int8, UInt64, Array(Float64)))
+ result
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(UInt64)
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), UInt64, Array(Int8)))
+ value
+
+ JSONEachRow
+ v0/aggregate_funnel_cohort.py
+
+
+
+ executable
+ aggregate_funnel_array_v0
+ Array(Tuple(Int8, Array(String), Array(Float64)))
+ result
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Array(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Array(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ v0/aggregate_funnel_array.py
+
+
+
+ executable
+ aggregate_funnel_test_v0
+ String
+ result
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Array(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ v0/aggregate_funnel_test.py
+
+
+
+ executable
+ aggregate_funnel_trends_v0
+ Array(Tuple(DateTime, Int8, Nullable(String)))
+ result
+
+ UInt8
+ from_step
+
+
+ UInt8
+ num_steps
+
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Nullable(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(DateTime), Nullable(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ v0/aggregate_funnel_trends.py
+
+
+
+ executable
+ aggregate_funnel_array_trends_v0
+
+ Array(Tuple(DateTime, Int8, Array(String)))
+ result
+
+ UInt8
+ from_step
+
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Array(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(DateTime), Array(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ v0/aggregate_funnel_array_trends.py
+
+
+
+ executable
+ aggregate_funnel_cohort_trends_v0
+
+ Array(Tuple(DateTime, Int8, UInt64))
+ result
+
+ UInt8
+ from_step
+
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(UInt64)
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(DateTime), UInt64, Array(Int8)))
+ value
+
+ JSONEachRow
+ v0/aggregate_funnel_cohort_trends.py
+
+
+
+ executable
+ aggregate_funnel_array_trends_test_v0
+ String
+ result
+
+ UInt8
+ from_step
+
+
+ UInt8
+ num_steps
+
+
+ UInt64
+ conversion_window_limit
+
+
+ String
+ breakdown_attribution_type
+
+
+ String
+ funnel_order_type
+
+
+ Array(Array(String))
+ prop_vals
+
+
+ Array(Tuple(Nullable(Float64), Nullable(DateTime), Array(String), Array(Int8)))
+ value
+
+ JSONEachRow
+ v0/aggregate_funnel_array_trends_test.py
+
+
\ No newline at end of file
diff --git a/pytest.ini b/pytest.ini
index 25f9b01f5d9..cd616f3cc31 100644
--- a/pytest.ini
+++ b/pytest.ini
@@ -3,7 +3,7 @@ env =
DEBUG=1
TEST=1
DJANGO_SETTINGS_MODULE = posthog.settings
-addopts = -p no:warnings --reuse-db
+addopts = -p no:warnings --reuse-db --ignore=posthog/user_scripts
markers =
ee