0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-30 19:41:46 +01:00
posthog/ee/clickhouse/models/event.py
Tim Glaser 88c54896d0
Fix person querying (#1797)
* convert sessions table logic to TS

* convert rest of sessions to TS

* sessions table logic refactor, store date in the url

* add back/forward buttons

* load sessions based on the URL, not after mount --> avoids duplicate query if opening an url with a filter

* prevent multiple queries

* throw error if failed instead of returning an empty list

* date from filters

* rename offset to nextOffset

* initial limit/offset block

* indent sql

* support limit + offset

* load LIMIT+1 sessions in postgres, pop last and show load more sign. (was: show sign if exactly LIMIT fetched)

* based offset is always 0

* default limit to 50

* events in clickhouse sessions

* add elements to query results

* add person properties to sessions query response

* show seconds with two digits

* fix pagination, timestamp calculation and ordering on pages 2 and beyond

* mypy

* fix test

* add default time to fix test, fix some any(*) filter issues

* remove reverse

* WIP event list

* Events progress

* Finish off event listing, skip live actions for now

* Fix mypy

* Fix mypy again

* Try fixing mypy

* Fix assertnumqueries

* Fix tests

* Fix tests

* fix test

* Fix tests

* Fix tests

* Fix tests again

* Fix person querying

* Fix flake

* Fix person stuff

* Fix test

Co-authored-by: Marius Andra <marius.andra@gmail.com>
Co-authored-by: Eric <eeoneric@gmail.com>
2020-10-02 19:30:05 +02:00

131 lines
4.5 KiB
Python

import json
import uuid
from datetime import datetime, timezone
from typing import Dict, List, Optional, Tuple, Union
import pytz
from dateutil.parser import isoparse
from django.utils.timezone import now
from rest_framework import serializers
from ee.clickhouse.client import sync_execute
from ee.clickhouse.models.element import create_elements
from ee.clickhouse.sql.events import GET_EVENTS_BY_TEAM_SQL, GET_EVENTS_SQL, INSERT_EVENT_SQL
from ee.kafka.client import ClickhouseProducer
from ee.kafka.topics import KAFKA_EVENTS
from posthog.models.element import Element
from posthog.models.team import Team
def create_event(
event_uuid: uuid.UUID,
event: str,
team: Team,
distinct_id: str,
timestamp: Optional[Union[datetime, str]] = None,
properties: Optional[Dict] = {},
elements_hash: Optional[str] = "",
elements: Optional[List[Element]] = None,
) -> str:
if not timestamp:
timestamp = now()
# clickhouse specific formatting
if isinstance(timestamp, str):
timestamp = isoparse(timestamp)
else:
timestamp = timestamp.astimezone(pytz.utc)
if elements and not elements_hash:
elements_hash = create_elements(event_uuid=event_uuid, elements=elements, team=team)
data = {
"uuid": str(event_uuid),
"event": event,
"properties": json.dumps(properties),
"timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S.%f"),
"team_id": team.pk,
"distinct_id": distinct_id,
"elements_hash": elements_hash,
"created_at": timestamp.strftime("%Y-%m-%d %H:%M:%S.%f"),
}
p = ClickhouseProducer()
p.produce(sql=INSERT_EVENT_SQL, topic=KAFKA_EVENTS, data=data)
return str(event_uuid)
def get_events():
events = sync_execute(GET_EVENTS_SQL)
return ClickhouseEventSerializer(events, many=True, context={"elements": None, "people": None}).data
def get_events_by_team(team_id: Union[str, int]):
events = sync_execute(GET_EVENTS_BY_TEAM_SQL, {"team_id": str(team_id)})
return ClickhouseEventSerializer(events, many=True, context={"elements": None, "people": None}).data
# reference raw sql for
class ClickhouseEventSerializer(serializers.Serializer):
id = serializers.SerializerMethodField()
properties = serializers.SerializerMethodField()
event = serializers.SerializerMethodField()
timestamp = serializers.SerializerMethodField()
person = serializers.SerializerMethodField()
elements = serializers.SerializerMethodField()
elements_hash = serializers.SerializerMethodField()
def get_id(self, event):
return str(event[0])
def get_properties(self, event):
if len(event) >= 10 and event[8] and event[9]:
return dict(zip(event[8], event[9]))
else:
return json.loads(event[2])
def get_event(self, event):
return event[1]
def get_timestamp(self, event):
dt = event[3].replace(tzinfo=timezone.utc)
return dt.astimezone().isoformat()
def get_person(self, event):
if not self.context.get("people") or event[5] not in self.context["people"]:
return event[5]
return self.context["people"][event[5]]["properties"].get("email", event[5])
def get_elements(self, event):
if not event[6] or not self.context["elements"] or event[6] not in self.context["elements"]:
return []
return self.context["elements"][event[6]]
def get_elements_hash(self, event):
return event[6]
def determine_event_conditions(conditions: Dict[str, Union[str, List[str]]]) -> Tuple[str, Dict]:
result = ""
params = {}
for idx, (k, v) in enumerate(conditions.items()):
if not isinstance(v, str):
continue
if k == "after":
timestamp = isoparse(v).strftime("%Y-%m-%d %H:%M:%S.%f")
result += "AND timestamp > %(after)s"
params.update({"after": timestamp})
elif k == "before":
timestamp = isoparse(v).strftime("%Y-%m-%d %H:%M:%S.%f")
result += "AND timestamp < %(before)s"
params.update({"before": timestamp})
elif k == "person_id":
result += """AND distinct_id IN (
SELECT distinct_id FROM person_distinct_id WHERE person_id = %(person_id)s AND team_id = %(team_id)s
)"""
params.update({"person_id": v})
elif k == "distinct_id":
result += "AND distinct_id = %(distinct_id)s"
params.update({"distinct_id": v})
return result, params