0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-30 19:41:46 +01:00
posthog/ee/clickhouse/models/event.py
James Greenhill 1ed6263a71
Create Omni-Person model for managing people in Clickhouse (#1712)
* Create Omni-Person model for managing people in Clickhouse

* type fixes

* rebase all the things

* cleanups

* id -> uuid for events in clickhouse

* cleanups and type checks

* Further cleanups and uuid conversions

* kafka fix

* break out serializer across kafka clients

* fix a few bugs w/ datetime types

* basic fix for people kafka table

* fix migration errors (copy pasta errors)

* Use KafkaProducer for Omni Person emitting

* setup mock kafka producer

* undo some work for inserting

* Test TestKafkaProducer

* change if order, obvious mistake

* remove unnecessary function arg

* Fix getters for new column

* Test fixes

* mirror columns across element queries

* firm up handling of timestamps

* only return timestamps for handle_timestamp

* Correct heroku config for Kafka
2020-09-25 11:05:50 +01:00

105 lines
3.1 KiB
Python

import json
import uuid
from datetime import datetime, timezone
from typing import Dict, List, Optional, Tuple, Union
import pytz
from dateutil.parser import isoparse
from rest_framework import serializers
from ee.clickhouse.client import sync_execute
from ee.clickhouse.models.element import create_elements
from ee.clickhouse.sql.events import GET_EVENTS_SQL, INSERT_EVENT_SQL
from ee.kafka.client import ClickhouseProducer
from ee.kafka.topics import KAFKA_EVENTS
from posthog.models.element import Element
from posthog.models.team import Team
def create_event(
event_uuid: uuid.UUID,
event: str,
team: Team,
distinct_id: str,
timestamp: Optional[Union[datetime, str]],
properties: Optional[Dict] = {},
elements_hash: Optional[str] = "",
elements: Optional[List[Element]] = None,
) -> None:
if not timestamp:
timestamp = datetime.now()
# clickhouse specific formatting
if isinstance(timestamp, str):
timestamp = isoparse(timestamp)
else:
timestamp = timestamp.astimezone(pytz.utc)
if elements and not elements_hash:
elements_hash = create_elements(event_uuid=event_uuid, elements=elements, team=team)
data = {
"uuid": str(event_uuid),
"event": event,
"properties": json.dumps(properties),
"timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S.%f"),
"team_id": team.pk,
"distinct_id": distinct_id,
"elements_hash": elements_hash,
"created_at": timestamp.strftime("%Y-%m-%d %H:%M:%S.%f"),
}
p = ClickhouseProducer()
p.produce(sql=INSERT_EVENT_SQL, topic=KAFKA_EVENTS, data=data)
def get_events():
events = sync_execute(GET_EVENTS_SQL)
return ClickhouseEventSerializer(events, many=True, context={"elements": None, "people": None}).data
# reference raw sql for
class ClickhouseEventSerializer(serializers.Serializer):
uuid = serializers.SerializerMethodField()
properties = serializers.SerializerMethodField()
event = serializers.SerializerMethodField()
timestamp = serializers.SerializerMethodField()
person = serializers.SerializerMethodField()
elements = serializers.SerializerMethodField()
elements_hash = serializers.SerializerMethodField()
def get_uuid(self, event):
return str(event[0])
def get_properties(self, event):
return dict(zip(event[8], event[9]))
def get_event(self, event):
return event[1]
def get_timestamp(self, event):
dt = event[3].replace(tzinfo=timezone.utc)
return dt.astimezone().isoformat()
def get_person(self, event):
return event[5]
def get_elements(self, event):
return []
def get_elements_hash(self, event):
return event[6]
def determine_event_conditions(conditions: Dict[str, str]) -> Tuple[str, Dict]:
result = ""
params = {}
for idx, (k, v) in enumerate(conditions.items()):
if k == "after":
result += "AND timestamp > %(after)s"
params.update({"after": v})
elif k == "before":
result += "AND timestamp < %(before)s"
params.update({"before": v})
return result, params