0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-28 09:16:49 +01:00

Fetch person property values via Clickhouse (#8414)

* fetch person properties via clickhouse

* cleaner queries

* fix some types

* parse up to 100k persons with this prop, add back counts

* ignore something that's bound to error anyway

* no point making it too flat now

* revert flattening of lists

* add statsd

* ignore type

* add back conversion

* mypy

* better mypy

* refactor
This commit is contained in:
Marius Andra 2022-02-09 17:17:37 +01:00 committed by GitHub
parent 1aea20c683
commit ed25618236
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 107 additions and 19 deletions

View File

@ -29,6 +29,8 @@ from ee.clickhouse.sql.person import (
GET_DISTINCT_IDS_BY_PERSON_ID_FILTER,
GET_DISTINCT_IDS_BY_PROPERTY_SQL,
GET_TEAM_PERSON_DISTINCT_IDS,
SELECT_PERSON_PROP_VALUES_SQL,
SELECT_PERSON_PROP_VALUES_SQL_WITH_FILTER,
)
from posthog.models.cohort import Cohort
from posthog.models.event import Selector
@ -419,6 +421,14 @@ def get_property_values_for_key(key: str, team: Team, value: Optional[str] = Non
)
def get_person_property_values_for_key(key: str, team: Team, value: Optional[str] = None):
if value:
return sync_execute(
SELECT_PERSON_PROP_VALUES_SQL_WITH_FILTER, {"team_id": team.pk, "key": key, "value": "%{}%".format(value)},
)
return sync_execute(SELECT_PERSON_PROP_VALUES_SQL, {"team_id": team.pk, "key": key},)
def filter_element(filters: Dict, *, operator: Optional[OperatorType] = None, prepend: str = "") -> Tuple[str, Dict]:
if not operator:
operator = "exact"

View File

@ -114,11 +114,29 @@ FROM events WHERE team_id = %(team_id)s
"""
SELECT_PROP_VALUES_SQL = """
SELECT DISTINCT trim(BOTH '\"' FROM JSONExtractRaw(properties, %(key)s)) FROM events where JSONHas(properties, %(key)s) AND team_id = %(team_id)s {parsed_date_from} {parsed_date_to} LIMIT 10
SELECT
DISTINCT trim(BOTH '\"' FROM JSONExtractRaw(properties, %(key)s))
FROM
events
WHERE
team_id = %(team_id)s AND
JSONHas(properties, %(key)s)
{parsed_date_from}
{parsed_date_to}
LIMIT 10
"""
SELECT_PROP_VALUES_SQL_WITH_FILTER = """
SELECT DISTINCT trim(BOTH '\"' FROM JSONExtractRaw(properties, %(key)s)) FROM events where team_id = %(team_id)s AND trim(BOTH '\"' FROM JSONExtractRaw(properties, %(key)s)) ILIKE %(value)s {parsed_date_from} {parsed_date_to} LIMIT 10
SELECT
DISTINCT trim(BOTH '\"' FROM JSONExtractRaw(properties, %(key)s))
FROM
events
WHERE
team_id = %(team_id)s AND
trim(BOTH '\"' FROM JSONExtractRaw(properties, %(key)s)) ILIKE %(value)s
{parsed_date_from}
{parsed_date_to}
LIMIT 10
"""
SELECT_EVENT_BY_TEAM_AND_CONDITIONS_SQL = """

View File

@ -384,3 +384,44 @@ GROUP BY actor_id
COMMENT_DISTINCT_ID_COLUMN_SQL = (
lambda: f"ALTER TABLE person_distinct_id ON CLUSTER {CLICKHOUSE_CLUSTER} COMMENT COLUMN distinct_id 'skip_0003_fill_person_distinct_id2'"
)
SELECT_PERSON_PROP_VALUES_SQL = """
SELECT
value,
count(value)
FROM (
SELECT
trim(BOTH '\"' FROM JSONExtractRaw(properties, %(key)s)) as value
FROM
person
WHERE
team_id = %(team_id)s AND
JSONHas(properties, %(key)s)
ORDER BY id DESC
LIMIT 100000
)
GROUP BY value
ORDER BY count(value) DESC
LIMIT 20
"""
SELECT_PERSON_PROP_VALUES_SQL_WITH_FILTER = """
SELECT
value,
count(value)
FROM (
SELECT
trim(BOTH '\"' FROM JSONExtractRaw(properties, %(key)s)) as value
FROM
person
WHERE
team_id = %(team_id)s AND
trim(BOTH '\"' FROM JSONExtractRaw(properties, %(key)s)) ILIKE %(value)s
ORDER BY id DESC
LIMIT 100000
)
GROUP BY value
ORDER BY count(value) DESC
LIMIT 20
"""

View File

@ -223,7 +223,6 @@ class EventViewSet(StructuredViewSetMixin, mixins.RetrieveModelMixin, mixins.Lis
def values(self, request: request.Request, **kwargs) -> response.Response:
key = request.GET.get("key")
team = self.team
result = []
flattened = []
if key == "custom_event":
events = sync_execute(GET_CUSTOM_EVENTS, {"team_id": team.pk})

View File

@ -22,9 +22,11 @@ from rest_framework.permissions import IsAuthenticated
from rest_framework.settings import api_settings
from rest_framework.utils.serializer_helpers import ReturnDict
from rest_framework_csv import renderers as csvrenderers
from statshog.defaults.django import statsd
from ee.clickhouse.client import sync_execute
from ee.clickhouse.models.person import delete_person
from ee.clickhouse.models.property import get_person_property_values_for_key
from ee.clickhouse.queries.funnels import ClickhouseFunnelActors, ClickhouseFunnelTrendsActors
from ee.clickhouse.queries.funnels.base import ClickhouseFunnelBase
from ee.clickhouse.queries.funnels.funnel_correlation_persons import FunnelCorrelationActors
@ -52,11 +54,14 @@ from posthog.models.filters.path_filter import PathFilter
from posthog.models.filters.retention_filter import RetentionFilter
from posthog.models.filters.stickiness_filter import StickinessFilter
from posthog.permissions import ProjectMembershipNecessaryPermissions, TeamMemberAccessPermission
from posthog.queries.lifecycle import LifecycleTrend
from posthog.queries.retention import Retention
from posthog.queries.stickiness import Stickiness
from posthog.tasks.split_person import split_person
from posthog.utils import convert_property_value, format_query_params_absolute_url, is_anonymous_id, relative_date_parse
from posthog.utils import (
convert_property_value,
flatten,
format_query_params_absolute_url,
is_anonymous_id,
relative_date_parse,
)
class PersonCursorPagination(CursorPagination):
@ -353,18 +358,33 @@ class PersonViewSet(StructuredViewSetMixin, viewsets.ModelViewSet):
@action(methods=["GET"], detail=False)
def values(self, request: request.Request, **kwargs) -> response.Response:
people = self.get_queryset()
key = "properties__{}".format(request.GET.get("key"))
people = people.values(key).annotate(count=Count("id")).filter(**{f"{key}__isnull": False}).order_by("-count")
key = request.GET.get("key")
value = request.GET.get("value")
flattened = []
if key:
timer = statsd.timer("get_person_property_values_for_key_timer").start()
try:
result = get_person_property_values_for_key(key, self.team, value)
statsd.incr(
"get_person_property_values_for_key_success",
tags={"key": key, "value": value, "team_id": self.team.id},
)
except Exception as e:
statsd.incr(
"get_person_property_values_for_key_error",
tags={"error": str(e), "key": key, "value": value, "team_id": self.team.id},
)
raise e
finally:
timer.stop()
if request.GET.get("value"):
people = people.extra(
where=["properties ->> %s LIKE %s"], params=[request.GET["key"], "%{}%".format(request.GET["value"])],
)
return response.Response(
[{"name": convert_property_value(event[key]), "count": event["count"]} for event in people[:50]]
)
for (value, count) in result:
try:
# Try loading as json for dicts or arrays
flattened.append({"name": convert_property_value(json.loads(value)), "count": count}) # type: ignore
except json.decoder.JSONDecodeError:
flattened.append({"name": convert_property_value(value), "count": count})
return response.Response(flattened)
@action(methods=["POST"], detail=True)
def merge(self, request: request.Request, pk=None, **kwargs) -> response.Response:

View File

@ -353,7 +353,7 @@ def dict_from_cursor_fetchall(cursor):
return [dict(zip(columns, row)) for row in cursor.fetchall()]
def convert_property_value(input: Union[str, bool, dict, list, int]) -> str:
def convert_property_value(input: Union[str, bool, dict, list, int, Optional[str]]) -> str:
if isinstance(input, bool):
if input is True:
return "true"