0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-12-01 12:21:02 +01:00
posthog/ee/clickhouse/test/test_journeys.py
Michael Matloka 5d2ad6c7bb
chore(deps): Update black to 22.8.0 (#11596)
* chore(deps): Update `black` to `22.8.0`

* Format
2022-09-05 14:38:54 +02:00

159 lines
6.1 KiB
Python

import dataclasses
import json
from datetime import datetime
from typing import Any, Dict, List
from uuid import uuid4
from django.utils import timezone
from posthog.client import sync_execute
from posthog.models import Group, Person, PersonDistinctId, Team
from posthog.models.event.sql import EVENTS_DATA_TABLE
from posthog.test.base import _create_event, flush_persons_and_events
def journeys_for(
events_by_person: Dict[str, List[Dict[str, Any]]], team: Team, create_people: bool = True
) -> Dict[str, Person]:
"""
Helper for creating specific events for a team.
Allows tests to be written in a declarative style
# these things happened in the past for these people
events_by_person = {
"person1": [{"some": "events}],
"person2": [{"some": "more events}],
}
journeys_for(events_by_person, team)
# then the application receives them
actual = system_under_test.runs()
# and we can assert on the results of that
...
Writing tests in this way reduces duplication in test setup
And clarifies the preconditions of the test
"""
def _create_event_from_args(**event):
return {**event}
flush_persons_and_events()
people = {}
events_to_create = []
for distinct_id, events in events_by_person.items():
if create_people:
people[distinct_id] = update_or_create_person(distinct_ids=[distinct_id], team_id=team.pk)
else:
people[distinct_id] = Person.objects.get(
persondistinctid__distinct_id=distinct_id, persondistinctid__team_id=team.pk
)
for event in events:
# Populate group properties as well
group_props = {}
for property_key, value in (event.get("properties") or {}).items():
if property_key.startswith("$group_"):
group_type_index = property_key[-1]
try:
group = Group.objects.get(team_id=team.pk, group_type_index=group_type_index, group_key=value)
group_property_key = f"group{group_type_index}_properties"
group_props = {
group_property_key: {**group.group_properties, **event.get(group_property_key, {})}
}
except Group.DoesNotExist:
continue
if "timestamp" not in event:
event["timestamp"] = datetime.now()
events_to_create.append(
_create_event_from_args(
team=team,
distinct_id=distinct_id,
event=event["event"],
timestamp=event["timestamp"],
properties=event.get("properties", {}),
person_id=people[distinct_id].uuid,
person_properties=people[distinct_id].properties or {},
group0_properties=event.get("group0_properties", {}) or group_props.get("group0_properties", {}),
group1_properties=event.get("group1_properties", {}) or group_props.get("group1_properties", {}),
group2_properties=event.get("group2_properties", {}) or group_props.get("group2_properties", {}),
group3_properties=event.get("group3_properties", {}) or group_props.get("group3_properties", {}),
group4_properties=event.get("group4_properties", {}) or group_props.get("group4_properties", {}),
)
)
_create_all_events_raw(events_to_create)
return people
def _create_all_events_raw(all_events: List[Dict]):
parsed = ""
for event in all_events:
data: Dict[str, Any] = {
"properties": {},
"timestamp": timezone.now().strftime("%Y-%m-%d %H:%M:%S.%f"),
"person_id": str(uuid4()),
"person_properties": {},
"group0_properties": {},
"group1_properties": {},
"group2_properties": {},
"group3_properties": {},
"group4_properties": {},
}
data.update(event)
in_memory_event = InMemoryEvent(**data)
parsed += f"""
('{str(uuid4())}', '{in_memory_event.event}', '{json.dumps(in_memory_event.properties)}', '{in_memory_event.timestamp}', {in_memory_event.team.pk}, '{in_memory_event.distinct_id}', '', '{in_memory_event.person_id}', '{json.dumps(in_memory_event.person_properties)}', '{json.dumps(in_memory_event.group0_properties)}', '{json.dumps(in_memory_event.group1_properties)}', '{json.dumps(in_memory_event.group2_properties)}', '{json.dumps(in_memory_event.group3_properties)}', '{json.dumps(in_memory_event.group4_properties)}', '{timezone.now().strftime("%Y-%m-%d %H:%M:%S.%f")}', now(), 0)
"""
sync_execute(
f"""
INSERT INTO {EVENTS_DATA_TABLE()} (uuid, event, properties, timestamp, team_id, distinct_id, elements_chain, person_id, person_properties, group0_properties, group1_properties, group2_properties, group3_properties, group4_properties, created_at, _timestamp, _offset) VALUES
{parsed}
"""
)
def create_all_events(all_events: List[dict]):
for event in all_events:
_create_event(**event)
# We collect all events per test into an array and batch create the events to reduce creation time
@dataclasses.dataclass
class InMemoryEvent:
event: str
distinct_id: str
team: Team
timestamp: str
properties: Dict
person_id: str
person_properties: Dict
group0_properties: Dict
group1_properties: Dict
group2_properties: Dict
group3_properties: Dict
group4_properties: Dict
def update_or_create_person(distinct_ids: List[str], team_id: int, **kwargs):
(person, _) = Person.objects.update_or_create(
persondistinctid__distinct_id__in=distinct_ids,
persondistinctid__team_id=team_id,
defaults={**kwargs, "team_id": team_id},
)
for distinct_id in distinct_ids:
PersonDistinctId.objects.update_or_create(
distinct_id=distinct_id,
team_id=person.team_id,
defaults={"person_id": person.id, "team_id": team_id, "distinct_id": distinct_id},
)
return person