2021-06-22 17:44:55 +02:00
|
|
|
from abc import ABCMeta, abstractmethod
|
2021-08-20 20:33:21 +02:00
|
|
|
from typing import Any, Dict, List, Tuple, Union
|
2021-06-10 20:47:33 +02:00
|
|
|
|
2021-09-09 08:42:19 +02:00
|
|
|
from ee.clickhouse.materialized_columns.columns import ColumnName
|
2021-10-08 09:51:11 +02:00
|
|
|
from ee.clickhouse.models.cohort import format_person_query, format_precalculated_cohort_query, is_precalculated_query
|
[groups persons] API for returning groups based on trend results (#7144)
* working for unique_groups math
* fix types
* add null check
* update snapshots
* update payload
* update snapshots
* use constructor
* adjust queries
* introduce base class
* consolidate querying
* shared serializer and typed
* sort imports
* snapshots
* typing
* change name
* Add group model
```sql
BEGIN;
--
-- Create model Group
--
CREATE TABLE "posthog_group" ("id" serial NOT NULL PRIMARY KEY, "group_key" varchar(400) NOT NULL, "group_type_index" integer NOT NULL, "group_properties" jsonb NOT NULL, "created_at" timestamp with time zone NOT NULL, "properties_last_updated_at" jsonb NOT NULL, "properties_last_operation" jsonb NOT NULL, "version" bigint NOT NULL, "team_id" integer NOT NULL);
--
-- Create constraint unique team_id/group_key/group_type_index combo on model group
--
ALTER TABLE "posthog_group" ADD CONSTRAINT "unique team_id/group_key/group_type_index combo" UNIQUE ("team_id", "group_key", "group_type_index");
ALTER TABLE "posthog_group" ADD CONSTRAINT "posthog_group_team_id_b3aed896_fk_posthog_team_id" FOREIGN KEY ("team_id") REFERENCES "posthog_team" ("id") DEFERRABLE INITIALLY DEFERRED;
CREATE INDEX "posthog_group_team_id_b3aed896" ON "posthog_group" ("team_id");
COMMIT;
```
* Remove a dead import
* Improve typing for groups
* Make groups updating more generic, avoid mutation
This simplifies using the same logic for groups
Note there's a behavioral change: We don't produce a new kafka message
if nothing has been updated anymore.
* Rename a function
* WIP: Handle group property updates
... by storing them in postgres
Uses identical pattern to person property updates, except we handle
first-seen case within updates as well.
* Get rid of boolean option
* WIP continued
* fetchGroup() and upsertGroup()
* Test more edge cases
* Add tests for upsertGroup() in properties-updater
* Rename to PropertyUpdateOperation
* Followup
* Solve typing issues
* changed implementation to use pg
* unusd
* update type
* update snapshots
* rename and remove inlining
* restore bad merge code
* adjust types
* add flag
* remove var
* misnamed
* change to uuid
* make sure to use string when passing result
* remove from columnoptimizer logic and have group join logic implemented by event query classes per insight
* remove unnecessary logic
* typing
* remove dead imports
* remove verbosity
* update snapshots
* typos
* remove signals
* remove plugin excess
Co-authored-by: Karl-Aksel Puulmann <oxymaccy@gmail.com>
2021-11-18 17:58:48 +01:00
|
|
|
from ee.clickhouse.models.property import parse_prop_clauses
|
2021-10-13 16:00:47 +02:00
|
|
|
from ee.clickhouse.models.util import PersonPropertiesMode
|
2021-08-26 20:00:49 +02:00
|
|
|
from ee.clickhouse.queries.column_optimizer import ColumnOptimizer
|
2021-11-05 12:47:41 +01:00
|
|
|
from ee.clickhouse.queries.groups_join_query import GroupsJoinQuery
|
2021-08-26 20:00:49 +02:00
|
|
|
from ee.clickhouse.queries.person_query import ClickhousePersonQuery
|
2021-06-22 17:44:55 +02:00
|
|
|
from ee.clickhouse.queries.util import parse_timestamps
|
2021-07-20 04:47:41 +02:00
|
|
|
from ee.clickhouse.sql.person import GET_TEAM_PERSON_DISTINCT_IDS
|
2021-10-13 16:00:47 +02:00
|
|
|
from posthog.models import Cohort, Filter, Property
|
2021-08-20 20:33:21 +02:00
|
|
|
from posthog.models.filters.path_filter import PathFilter
|
2021-09-23 17:16:31 +02:00
|
|
|
from posthog.models.filters.retention_filter import RetentionFilter
|
2021-11-16 01:15:37 +01:00
|
|
|
from posthog.models.filters.session_recordings_filter import SessionRecordingsFilter
|
2021-12-02 17:43:20 +01:00
|
|
|
from posthog.models.filters.stickiness_filter import StickinessFilter
|
2021-06-10 20:47:33 +02:00
|
|
|
|
|
|
|
|
2021-06-22 17:44:55 +02:00
|
|
|
class ClickhouseEventQuery(metaclass=ABCMeta):
|
2021-06-10 20:47:33 +02:00
|
|
|
DISTINCT_ID_TABLE_ALIAS = "pdi"
|
|
|
|
PERSON_TABLE_ALIAS = "person"
|
|
|
|
EVENT_TABLE_ALIAS = "e"
|
|
|
|
|
2021-12-02 17:43:20 +01:00
|
|
|
_filter: Union[Filter, PathFilter, RetentionFilter, StickinessFilter, SessionRecordingsFilter]
|
2021-06-10 20:47:33 +02:00
|
|
|
_team_id: int
|
2021-08-26 20:00:49 +02:00
|
|
|
_column_optimizer: ColumnOptimizer
|
2021-10-08 09:51:11 +02:00
|
|
|
_person_query: ClickhousePersonQuery
|
2021-06-10 20:47:33 +02:00
|
|
|
_should_join_distinct_ids = False
|
|
|
|
_should_join_persons = False
|
|
|
|
_should_round_interval = False
|
2021-09-09 08:42:19 +02:00
|
|
|
_extra_fields: List[ColumnName]
|
|
|
|
_extra_person_fields: List[ColumnName]
|
2021-06-10 20:47:33 +02:00
|
|
|
|
|
|
|
def __init__(
|
|
|
|
self,
|
2021-12-02 17:43:20 +01:00
|
|
|
filter: Union[Filter, PathFilter, RetentionFilter, StickinessFilter, SessionRecordingsFilter],
|
2021-06-10 20:47:33 +02:00
|
|
|
team_id: int,
|
|
|
|
round_interval=False,
|
|
|
|
should_join_distinct_ids=False,
|
|
|
|
should_join_persons=False,
|
2021-09-09 08:42:19 +02:00
|
|
|
# Extra events/person table columns to fetch since parent query needs them
|
|
|
|
extra_fields: List[ColumnName] = [],
|
|
|
|
extra_person_fields: List[ColumnName] = [],
|
2021-06-10 20:47:33 +02:00
|
|
|
**kwargs,
|
|
|
|
) -> None:
|
|
|
|
self._filter = filter
|
|
|
|
self._team_id = team_id
|
2021-08-26 20:00:49 +02:00
|
|
|
self._column_optimizer = ColumnOptimizer(self._filter, self._team_id)
|
2021-10-08 09:51:11 +02:00
|
|
|
self._person_query = ClickhousePersonQuery(
|
2021-10-13 16:00:47 +02:00
|
|
|
self._filter, self._team_id, self._column_optimizer, extra_fields=extra_person_fields
|
2021-10-08 09:51:11 +02:00
|
|
|
)
|
2021-08-20 20:33:21 +02:00
|
|
|
self.params: Dict[str, Any] = {
|
2021-06-10 20:47:33 +02:00
|
|
|
"team_id": self._team_id,
|
|
|
|
}
|
|
|
|
|
|
|
|
self._should_join_distinct_ids = should_join_distinct_ids
|
|
|
|
self._should_join_persons = should_join_persons
|
2021-09-09 08:42:19 +02:00
|
|
|
self._extra_fields = extra_fields
|
|
|
|
self._extra_person_fields = extra_person_fields
|
2021-06-10 20:47:33 +02:00
|
|
|
|
|
|
|
if not self._should_join_distinct_ids:
|
|
|
|
self._determine_should_join_distinct_ids()
|
|
|
|
|
|
|
|
if not self._should_join_persons:
|
|
|
|
self._determine_should_join_persons()
|
|
|
|
|
|
|
|
self._should_round_interval = round_interval
|
|
|
|
|
2021-06-22 17:44:55 +02:00
|
|
|
@abstractmethod
|
2021-06-10 20:47:33 +02:00
|
|
|
def get_query(self) -> Tuple[str, Dict[str, Any]]:
|
2021-06-22 17:44:55 +02:00
|
|
|
pass
|
2021-06-10 20:47:33 +02:00
|
|
|
|
2021-06-22 17:44:55 +02:00
|
|
|
@abstractmethod
|
2021-06-10 20:47:33 +02:00
|
|
|
def _determine_should_join_distinct_ids(self) -> None:
|
2021-06-22 17:44:55 +02:00
|
|
|
pass
|
2021-06-10 20:47:33 +02:00
|
|
|
|
[groups persons] API for returning groups based on trend results (#7144)
* working for unique_groups math
* fix types
* add null check
* update snapshots
* update payload
* update snapshots
* use constructor
* adjust queries
* introduce base class
* consolidate querying
* shared serializer and typed
* sort imports
* snapshots
* typing
* change name
* Add group model
```sql
BEGIN;
--
-- Create model Group
--
CREATE TABLE "posthog_group" ("id" serial NOT NULL PRIMARY KEY, "group_key" varchar(400) NOT NULL, "group_type_index" integer NOT NULL, "group_properties" jsonb NOT NULL, "created_at" timestamp with time zone NOT NULL, "properties_last_updated_at" jsonb NOT NULL, "properties_last_operation" jsonb NOT NULL, "version" bigint NOT NULL, "team_id" integer NOT NULL);
--
-- Create constraint unique team_id/group_key/group_type_index combo on model group
--
ALTER TABLE "posthog_group" ADD CONSTRAINT "unique team_id/group_key/group_type_index combo" UNIQUE ("team_id", "group_key", "group_type_index");
ALTER TABLE "posthog_group" ADD CONSTRAINT "posthog_group_team_id_b3aed896_fk_posthog_team_id" FOREIGN KEY ("team_id") REFERENCES "posthog_team" ("id") DEFERRABLE INITIALLY DEFERRED;
CREATE INDEX "posthog_group_team_id_b3aed896" ON "posthog_group" ("team_id");
COMMIT;
```
* Remove a dead import
* Improve typing for groups
* Make groups updating more generic, avoid mutation
This simplifies using the same logic for groups
Note there's a behavioral change: We don't produce a new kafka message
if nothing has been updated anymore.
* Rename a function
* WIP: Handle group property updates
... by storing them in postgres
Uses identical pattern to person property updates, except we handle
first-seen case within updates as well.
* Get rid of boolean option
* WIP continued
* fetchGroup() and upsertGroup()
* Test more edge cases
* Add tests for upsertGroup() in properties-updater
* Rename to PropertyUpdateOperation
* Followup
* Solve typing issues
* changed implementation to use pg
* unusd
* update type
* update snapshots
* rename and remove inlining
* restore bad merge code
* adjust types
* add flag
* remove var
* misnamed
* change to uuid
* make sure to use string when passing result
* remove from columnoptimizer logic and have group join logic implemented by event query classes per insight
* remove unnecessary logic
* typing
* remove dead imports
* remove verbosity
* update snapshots
* typos
* remove signals
* remove plugin excess
Co-authored-by: Karl-Aksel Puulmann <oxymaccy@gmail.com>
2021-11-18 17:58:48 +01:00
|
|
|
def _get_distinct_id_query(self) -> str:
|
2021-06-10 20:47:33 +02:00
|
|
|
if self._should_join_distinct_ids:
|
|
|
|
return f"""
|
2021-07-20 04:47:41 +02:00
|
|
|
INNER JOIN ({GET_TEAM_PERSON_DISTINCT_IDS}) AS {self.DISTINCT_ID_TABLE_ALIAS}
|
2021-06-10 20:47:33 +02:00
|
|
|
ON events.distinct_id = {self.DISTINCT_ID_TABLE_ALIAS}.distinct_id
|
|
|
|
"""
|
|
|
|
else:
|
|
|
|
return ""
|
|
|
|
|
|
|
|
def _determine_should_join_persons(self) -> None:
|
2021-10-08 09:51:11 +02:00
|
|
|
if self._person_query.is_used:
|
2021-09-10 10:22:45 +02:00
|
|
|
self._should_join_distinct_ids = True
|
|
|
|
self._should_join_persons = True
|
|
|
|
return
|
|
|
|
|
|
|
|
# :KLUDGE: The following is mostly making sure if cohorts are included as well.
|
|
|
|
# Can be simplified significantly after https://github.com/PostHog/posthog/issues/5854
|
2021-07-27 10:59:41 +02:00
|
|
|
if any(self._should_property_join_persons(prop) for prop in self._filter.properties):
|
|
|
|
self._should_join_distinct_ids = True
|
|
|
|
self._should_join_persons = True
|
|
|
|
return
|
|
|
|
|
|
|
|
if any(
|
|
|
|
self._should_property_join_persons(prop) for entity in self._filter.entities for prop in entity.properties
|
|
|
|
):
|
|
|
|
self._should_join_distinct_ids = True
|
|
|
|
self._should_join_persons = True
|
|
|
|
return
|
2021-06-10 20:47:33 +02:00
|
|
|
|
2021-07-27 10:59:41 +02:00
|
|
|
def _should_property_join_persons(self, prop: Property) -> bool:
|
|
|
|
return prop.type == "cohort" and self._does_cohort_need_persons(prop)
|
|
|
|
|
2021-06-10 20:47:33 +02:00
|
|
|
def _does_cohort_need_persons(self, prop: Property) -> bool:
|
2021-07-01 19:30:18 +02:00
|
|
|
try:
|
2021-07-15 06:05:59 +02:00
|
|
|
cohort: Cohort = Cohort.objects.get(pk=prop.value, team_id=self._team_id)
|
2021-07-01 19:30:18 +02:00
|
|
|
except Cohort.DoesNotExist:
|
|
|
|
return False
|
2021-07-05 11:49:55 +02:00
|
|
|
if is_precalculated_query(cohort):
|
|
|
|
return True
|
2021-07-15 06:05:59 +02:00
|
|
|
if cohort.is_static:
|
|
|
|
return True
|
2021-06-10 20:47:33 +02:00
|
|
|
for group in cohort.groups:
|
|
|
|
if group.get("properties"):
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
|
2021-10-13 16:00:47 +02:00
|
|
|
def _get_person_query(self) -> Tuple[str, Dict]:
|
2021-06-10 20:47:33 +02:00
|
|
|
if self._should_join_persons:
|
2021-10-13 16:00:47 +02:00
|
|
|
person_query, params = self._person_query.get_query()
|
|
|
|
return (
|
|
|
|
f"""
|
|
|
|
INNER JOIN ({person_query}) {self.PERSON_TABLE_ALIAS}
|
2021-06-10 20:47:33 +02:00
|
|
|
ON {self.PERSON_TABLE_ALIAS}.id = {self.DISTINCT_ID_TABLE_ALIAS}.person_id
|
2021-10-13 16:00:47 +02:00
|
|
|
""",
|
|
|
|
params,
|
|
|
|
)
|
2021-06-10 20:47:33 +02:00
|
|
|
else:
|
2021-10-13 16:00:47 +02:00
|
|
|
return "", {}
|
2021-06-10 20:47:33 +02:00
|
|
|
|
2021-11-03 19:43:22 +01:00
|
|
|
def _get_groups_query(self) -> Tuple[str, Dict]:
|
2021-11-05 12:47:41 +01:00
|
|
|
return GroupsJoinQuery(self._filter, self._team_id, self._column_optimizer).get_join_query()
|
2021-11-03 19:43:22 +01:00
|
|
|
|
2021-06-10 20:47:33 +02:00
|
|
|
def _get_date_filter(self) -> Tuple[str, Dict]:
|
|
|
|
|
|
|
|
parsed_date_from, parsed_date_to, date_params = parse_timestamps(filter=self._filter, team_id=self._team_id)
|
|
|
|
|
|
|
|
query = f"""
|
|
|
|
{parsed_date_from}
|
|
|
|
{parsed_date_to}
|
|
|
|
"""
|
|
|
|
|
|
|
|
return query, date_params
|
|
|
|
|
2021-08-23 16:17:24 +02:00
|
|
|
def _get_props(self, filters: List[Property]) -> Tuple[str, Dict]:
|
2021-06-10 20:47:33 +02:00
|
|
|
final = []
|
|
|
|
params: Dict[str, Any] = {}
|
|
|
|
|
|
|
|
for idx, prop in enumerate(filters):
|
|
|
|
if prop.type == "cohort":
|
|
|
|
person_id_query, cohort_filter_params = self._get_cohort_subquery(prop)
|
|
|
|
params = {**params, **cohort_filter_params}
|
|
|
|
final.append(f"AND {person_id_query}")
|
|
|
|
else:
|
2021-10-08 09:51:11 +02:00
|
|
|
filter_query, filter_params = parse_prop_clauses(
|
|
|
|
[prop],
|
|
|
|
prepend=f"global_{idx}",
|
|
|
|
allow_denormalized_props=True,
|
2021-10-13 16:00:47 +02:00
|
|
|
person_properties_mode=PersonPropertiesMode.EXCLUDE,
|
2021-06-10 20:47:33 +02:00
|
|
|
)
|
|
|
|
final.append(filter_query)
|
|
|
|
params.update(filter_params)
|
|
|
|
return " ".join(final), params
|
|
|
|
|
|
|
|
def _get_cohort_subquery(self, prop) -> Tuple[str, Dict[str, Any]]:
|
2021-07-01 19:30:18 +02:00
|
|
|
try:
|
2021-07-05 11:49:55 +02:00
|
|
|
cohort: Cohort = Cohort.objects.get(pk=prop.value, team_id=self._team_id)
|
2021-07-01 19:30:18 +02:00
|
|
|
except Cohort.DoesNotExist:
|
2021-08-19 10:59:03 +02:00
|
|
|
return "0 = 11", {} # If cohort doesn't exist, nothing can match
|
2021-07-01 19:30:18 +02:00
|
|
|
|
2021-06-10 20:47:33 +02:00
|
|
|
is_precalculated = is_precalculated_query(cohort)
|
|
|
|
|
|
|
|
person_id_query, cohort_filter_params = (
|
2021-10-08 09:51:11 +02:00
|
|
|
format_precalculated_cohort_query(
|
|
|
|
cohort.pk, 0, custom_match_field=f"{self.DISTINCT_ID_TABLE_ALIAS}.person_id"
|
|
|
|
)
|
2021-06-10 20:47:33 +02:00
|
|
|
if is_precalculated
|
2021-07-09 01:00:15 +02:00
|
|
|
else format_person_query(cohort, 0, custom_match_field=f"{self.DISTINCT_ID_TABLE_ALIAS}.person_id")
|
2021-06-10 20:47:33 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
return person_id_query, cohort_filter_params
|