0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-12-01 12:21:02 +01:00
posthog/ee/clickhouse/queries/retention/retention_event_query.py
Marius Andra 1696ed5950
Update mypy and typed-ast, fail early if db version not in range (#7599)
* error if unsupported db version

* upgrade mypy

* fix various types for mypy

* you can have it in any color you want, as long as it's black

* fix mypy

* Update `kafka-python`

* Format with Black

* Fix mypy after merge

Co-authored-by: Michael Matloka <dev@twixes.com>
2021-12-10 09:29:04 +01:00

154 lines
6.9 KiB
Python

from typing import Any, Dict, Literal, Tuple, Union, cast
from ee.clickhouse.models.action import format_action_filter
from ee.clickhouse.models.group import get_aggregation_target_field
from ee.clickhouse.models.property import get_single_or_multi_property_string_expr
from ee.clickhouse.queries.event_query import ClickhouseEventQuery
from ee.clickhouse.queries.util import get_trunc_func_ch
from posthog.constants import (
PAGEVIEW_EVENT,
TREND_FILTER_TYPE_ACTIONS,
TREND_FILTER_TYPE_EVENTS,
TRENDS_LINEAR,
RetentionQueryType,
)
from posthog.models import Entity
from posthog.models.action import Action
from posthog.models.filters.retention_filter import RetentionFilter
class RetentionEventsQuery(ClickhouseEventQuery):
_filter: RetentionFilter
_event_query_type: RetentionQueryType
_trunc_func: str
def __init__(self, filter: RetentionFilter, event_query_type: RetentionQueryType, *args, **kwargs):
self._event_query_type = event_query_type
super().__init__(
filter=filter, *args, **kwargs,
) # type: ignore
self._trunc_func = get_trunc_func_ch(self._filter.period)
def get_query(self) -> Tuple[str, Dict[str, Any]]:
_fields = [
self.get_timestamp_field(),
f"{get_aggregation_target_field(self._filter.aggregation_group_type_index, self.EVENT_TABLE_ALIAS, self.DISTINCT_ID_TABLE_ALIAS)} as target",
(
f"argMin(e.uuid, {self._trunc_func}(e.timestamp)) as min_uuid"
if self._event_query_type == RetentionQueryType.TARGET_FIRST_TIME
else f"{self.EVENT_TABLE_ALIAS}.uuid AS uuid"
),
(
f"argMin(e.event, {self._trunc_func}(e.timestamp)) as min_event"
if self._event_query_type == RetentionQueryType.TARGET_FIRST_TIME
else f"{self.EVENT_TABLE_ALIAS}.event AS event"
),
]
if (
self._event_query_type != RetentionQueryType.RETURNING
and self._filter.breakdowns
and self._filter.breakdown_type
):
# NOTE: `get_single_or_multi_property_string_expr` doesn't
# support breakdowns with different types e.g. a person property
# then an event property, so for now we just take the type of
# the self._filter.breakdown_type.
# TODO: update 'get_single_or_multi_property_string_expr` to take
# `Breakdown` type
breakdown_type = self._filter.breakdown_type
table = "events"
if breakdown_type == "person":
table = "person"
breakdown_values_expression = get_single_or_multi_property_string_expr(
breakdown=[breakdown["property"] for breakdown in self._filter.breakdowns],
table=cast(Union[Literal["events"], Literal["person"]], table),
query_alias=None,
)
_fields += [f"argMin({breakdown_values_expression}, {self._trunc_func}(e.timestamp)) AS breakdown_values"]
date_query, date_params = self._get_date_filter()
self.params.update(date_params)
prop_filters = [*self._filter.properties]
prop_query, prop_params = self._get_props(prop_filters)
self.params.update(prop_params)
entity_query, entity_params = self._get_entity_query(
entity=self._filter.target_entity
if self._event_query_type == RetentionQueryType.TARGET
or self._event_query_type == RetentionQueryType.TARGET_FIRST_TIME
else self._filter.returning_entity
)
self.params.update(entity_params)
person_query, person_params = self._get_person_query()
self.params.update(person_params)
groups_query, groups_params = self._get_groups_query()
self.params.update(groups_params)
query = f"""
SELECT {','.join(_fields)} FROM events {self.EVENT_TABLE_ALIAS}
{self._get_distinct_id_query()}
{person_query}
{groups_query}
WHERE team_id = %(team_id)s
{f"AND {entity_query}"}
{f"AND {date_query}" if self._event_query_type != RetentionQueryType.TARGET_FIRST_TIME else ''}
{prop_query}
{f"GROUP BY target HAVING {date_query}" if self._event_query_type == RetentionQueryType.TARGET_FIRST_TIME else ''}
"""
return query, self.params
def get_timestamp_field(self) -> str:
if self._event_query_type == RetentionQueryType.TARGET:
return f"DISTINCT {self._trunc_func}({self.EVENT_TABLE_ALIAS}.timestamp) AS event_date"
elif self._event_query_type == RetentionQueryType.TARGET_FIRST_TIME:
return f"min({self._trunc_func}(e.timestamp)) as event_date"
else:
return f"{self.EVENT_TABLE_ALIAS}.timestamp AS event_date"
def _determine_should_join_distinct_ids(self) -> None:
if self._filter.aggregation_group_type_index is not None:
self._should_join_distinct_ids = False
else:
self._should_join_distinct_ids = True
def _get_entity_query(self, entity: Entity):
prepend = self._event_query_type
if entity.type == TREND_FILTER_TYPE_ACTIONS:
action = Action.objects.get(pk=entity.id)
action_query, params = format_action_filter(action, prepend=prepend, use_loop=False)
condition = action_query
elif entity.type == TREND_FILTER_TYPE_EVENTS:
condition = f"{self.EVENT_TABLE_ALIAS}.event = %({prepend}_event)s"
params = {f"{prepend}_event": entity.id}
else:
condition = f"{self.EVENT_TABLE_ALIAS}.event = %({prepend}_event)s"
params = {f"{prepend}_event": PAGEVIEW_EVENT}
return condition, params
def _get_date_filter(self):
query = (
f"event_date >= toDateTime(%({self._event_query_type}_start_date)s) AND event_date <= toDateTime(%({self._event_query_type}_end_date)s)"
if self._event_query_type == RetentionQueryType.TARGET_FIRST_TIME
else f"toDateTime({self.EVENT_TABLE_ALIAS}.timestamp) >= toDateTime(%({self._event_query_type}_start_date)s) AND toDateTime({self.EVENT_TABLE_ALIAS}.timestamp) <= toDateTime(%({self._event_query_type}_end_date)s)"
)
params = {
f"{self._event_query_type}_start_date": self._filter.date_from.strftime(
"%Y-%m-%d{}".format(" %H:%M:%S" if self._filter.period == "Hour" else " 00:00:00")
),
f"{self._event_query_type}_end_date": (
(self._filter.date_from + self._filter.period_increment)
if self._filter.display == TRENDS_LINEAR and self._event_query_type == RetentionQueryType.TARGET
else self._filter.date_to
).strftime("%Y-%m-%d{}".format(" %H:%M:%S" if self._filter.period == "Hour" else " 00:00:00")),
}
return query, params