0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-12-01 12:21:02 +01:00
posthog/ee/clickhouse/models/event.py
Tim Glaser 21d240df30
[Clickhouse] Fix grabbing by person (#1960)
* Use postgres to grab person

* Fix type

* Fix grabbing by person id

* Fix type

* WIP people from clickhouse

* Fix person delete etc

* fix test

* Fix tests
2020-10-23 17:30:12 +02:00

161 lines
5.4 KiB
Python

import json
import uuid
from typing import Dict, List, Optional, Tuple, Union
import pytz
from dateutil.parser import isoparse
from django.utils import timezone
from rest_framework import serializers
from ee.clickhouse.client import sync_execute
from ee.clickhouse.models.element import chain_to_elements, elements_to_string
from ee.clickhouse.sql.events import GET_EVENTS_BY_TEAM_SQL, GET_EVENTS_SQL, INSERT_EVENT_SQL
from ee.kafka_client.client import ClickhouseProducer
from ee.kafka_client.topics import KAFKA_EVENTS
from posthog.models.element import Element
from posthog.models.person import Person
from posthog.models.team import Team
def create_event(
event_uuid: uuid.UUID,
event: str,
team: Team,
distinct_id: str,
timestamp: Optional[Union[timezone.datetime, str]] = None,
properties: Optional[Dict] = {},
elements: Optional[List[Element]] = None,
) -> str:
if not timestamp:
timestamp = timezone.now()
assert timestamp is not None
# clickhouse specific formatting
if isinstance(timestamp, str):
timestamp = isoparse(timestamp)
else:
timestamp = timestamp.astimezone(pytz.utc)
elements_chain = ""
if elements and len(elements) > 0:
elements_chain = elements_to_string(elements=elements)
data = {
"uuid": str(event_uuid),
"event": event,
"properties": json.dumps(properties),
"timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S.%f"),
"team_id": team.pk,
"distinct_id": distinct_id,
"created_at": timestamp.strftime("%Y-%m-%d %H:%M:%S.%f"),
"elements_chain": elements_chain,
}
p = ClickhouseProducer()
p.produce(sql=INSERT_EVENT_SQL, topic=KAFKA_EVENTS, data=data)
return str(event_uuid)
def get_events():
events = sync_execute(GET_EVENTS_SQL)
return ClickhouseEventSerializer(events, many=True, context={"elements": None, "people": None}).data
def get_events_by_team(team_id: Union[str, int]):
events = sync_execute(GET_EVENTS_BY_TEAM_SQL, {"team_id": str(team_id)})
return ClickhouseEventSerializer(events, many=True, context={"elements": None, "people": None}).data
class ElementSerializer(serializers.ModelSerializer):
event = serializers.CharField()
class Meta:
model = Element
fields = [
"event",
"text",
"tag_name",
"attr_class",
"href",
"attr_id",
"nth_child",
"nth_of_type",
"attributes",
"order",
]
# reference raw sql for
class ClickhouseEventSerializer(serializers.Serializer):
id = serializers.SerializerMethodField()
distinct_id = serializers.SerializerMethodField()
properties = serializers.SerializerMethodField()
event = serializers.SerializerMethodField()
timestamp = serializers.SerializerMethodField()
person = serializers.SerializerMethodField()
elements = serializers.SerializerMethodField()
elements_chain = serializers.SerializerMethodField()
def get_id(self, event):
return str(event[0])
def get_distinct_id(self, event):
return event[5]
def get_properties(self, event):
if len(event) >= 10 and event[8] and event[9]:
prop_vals = [res.strip('"') for res in event[9]]
return dict(zip(event[8], prop_vals))
else:
props = json.loads(event[2])
unpadded = {key: value.strip('"') if isinstance(value, str) else value for key, value in props.items()}
return unpadded
def get_event(self, event):
return event[1]
def get_timestamp(self, event):
dt = event[3].replace(tzinfo=timezone.utc)
return dt.astimezone().isoformat()
def get_person(self, event):
if not self.context.get("people") or event[5] not in self.context["people"]:
return event[5]
return self.context["people"][event[5]].properties.get("email", event[5])
def get_elements(self, event):
if not event[6]:
return []
return ElementSerializer(chain_to_elements(event[6]), many=True).data
def get_elements_chain(self, event):
return event[6]
def determine_event_conditions(conditions: Dict[str, Union[str, List[str]]]) -> Tuple[str, Dict]:
result = ""
params: Dict[str, Union[str, List[str]]] = {}
for idx, (k, v) in enumerate(conditions.items()):
if not isinstance(v, str):
continue
if k == "after":
timestamp = isoparse(v).strftime("%Y-%m-%d %H:%M:%S.%f")
result += "AND timestamp > %(after)s"
params.update({"after": timestamp})
elif k == "before":
timestamp = isoparse(v).strftime("%Y-%m-%d %H:%M:%S.%f")
result += "AND timestamp < %(before)s"
params.update({"before": timestamp})
elif k == "person_id":
result += """AND distinct_id IN (%(distinct_ids)s)"""
distinct_ids = Person.objects.filter(pk=v)[0].distinct_ids
distinct_ids = [distinct_id.__str__() for distinct_id in distinct_ids]
params.update({"distinct_ids": distinct_ids})
elif k == "distinct_id":
result += "AND distinct_id = %(distinct_id)s"
params.update({"distinct_id": v})
elif k == "event":
result += "AND event = %(event)s"
params.update({"event": v})
return result, params