0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-28 18:26:15 +01:00
posthog/ee/clickhouse/process_event.py
Marius Andra 84d853748d
Earlier IP anonymization (#3464)
* Clarify what anonymize_ips does

* Hide IP before event ingestion, not during (hides it also from the task queues and plugins)

* flip it around

* add IPs to tests that expected them to be there

* "Discard"

* test that everything works when ip is none
2021-02-25 10:31:33 +01:00

207 lines
6.4 KiB
Python

import datetime
import json
from typing import Dict, Optional, Sequence
from uuid import UUID
import statsd
from dateutil import parser
from dateutil.relativedelta import relativedelta
from django.conf import settings
from django.db.utils import IntegrityError
from sentry_sdk import capture_exception
from ee.clickhouse.models.event import create_event
from ee.clickhouse.models.session_recording_event import create_session_recording_event
from ee.kafka_client.client import KafkaProducer
from posthog.ee import is_ee_enabled
from posthog.models.element import Element
from posthog.models.person import Person
from posthog.models.team import Team
from posthog.models.utils import UUIDT
from posthog.tasks.process_event import (
handle_identify_or_alias,
sanitize_event_name,
store_names_and_properties,
update_person_properties,
)
if settings.STATSD_HOST is not None:
statsd.Connection.set_defaults(host=settings.STATSD_HOST, port=settings.STATSD_PORT)
def _capture_ee(
event_uuid: UUID,
person_uuid: UUID,
ip: Optional[str],
site_url: str,
team_id: int,
event: str,
distinct_id: str,
properties: Dict,
timestamp: datetime.datetime,
) -> None:
elements = properties.get("$elements")
elements_list = []
if elements:
del properties["$elements"]
elements_list = [
Element(
text=el["$el_text"][0:400] if el.get("$el_text") else None,
tag_name=el["tag_name"],
href=el["attr__href"][0:2048] if el.get("attr__href") else None,
attr_class=el["attr__class"].split(" ") if el.get("attr__class") else None,
attr_id=el.get("attr__id"),
nth_child=el.get("nth_child"),
nth_of_type=el.get("nth_of_type"),
attributes={key: value for key, value in el.items() if key.startswith("attr__")},
)
for index, el in enumerate(elements)
]
team = Team.objects.select_related("organization").get(pk=team_id)
if ip and not team.anonymize_ips and "$ip" not in properties:
properties["$ip"] = ip
event = sanitize_event_name(event)
store_names_and_properties(team=team, event=event, properties=properties)
if not Person.objects.distinct_ids_exist(team_id=team_id, distinct_ids=[str(distinct_id)]):
# Catch race condition where in between getting and creating,
# another request already created this user
try:
Person.objects.create(team_id=team_id, distinct_ids=[str(distinct_id)])
except IntegrityError:
pass
if properties.get("$set"):
update_person_properties(team_id=team_id, distinct_id=distinct_id, properties=properties["$set"])
if properties.get("$set_once"):
update_person_properties(
team_id=team_id, distinct_id=distinct_id, properties=properties["$set_once"], set_once=True
)
# # determine create events
create_event(
event_uuid=event_uuid,
event=event,
properties=properties,
timestamp=timestamp,
team=team,
distinct_id=distinct_id,
elements=elements_list,
site_url=site_url,
)
def handle_timestamp(data: dict, now: datetime.datetime, sent_at: Optional[datetime.datetime]) -> datetime.datetime:
if data.get("timestamp"):
if sent_at:
# sent_at - timestamp == now - x
# x = now + (timestamp - sent_at)
try:
# timestamp and sent_at must both be in the same format: either both with or both without timezones
# otherwise we can't get a diff to add to now
return now + (parser.isoparse(data["timestamp"]) - sent_at)
except TypeError as e:
capture_exception(e)
return parser.isoparse(data["timestamp"])
now_datetime = now
if data.get("offset"):
return now_datetime - relativedelta(microseconds=data["offset"] * 1000)
return now_datetime
if is_ee_enabled():
def process_event_ee(
distinct_id: str,
ip: Optional[str],
site_url: str,
data: dict,
team_id: int,
now: datetime.datetime,
sent_at: Optional[datetime.datetime],
event_uuid: UUIDT,
) -> None:
timer = statsd.Timer("%s_posthog_cloud" % (settings.STATSD_PREFIX,))
timer.start()
properties = data.get("properties", {})
if data.get("$set"):
properties["$set"] = data["$set"]
if data.get("$set_once"):
properties["$set_once"] = data["$set_once"]
person_uuid = UUIDT()
ts = handle_timestamp(data, now, sent_at)
handle_identify_or_alias(data["event"], properties, distinct_id, team_id)
if data["event"] == "$snapshot":
create_session_recording_event(
uuid=event_uuid,
team_id=team_id,
distinct_id=distinct_id,
session_id=properties["$session_id"],
snapshot_data=properties["$snapshot_data"],
timestamp=ts,
)
return
_capture_ee(
event_uuid=event_uuid,
person_uuid=person_uuid,
ip=ip,
site_url=site_url,
team_id=team_id,
event=data["event"],
distinct_id=distinct_id,
properties=properties,
timestamp=ts,
)
timer.stop("process_event_ee")
else:
def process_event_ee(
distinct_id: str,
ip: Optional[str],
site_url: str,
data: dict,
team_id: int,
now: datetime.datetime,
sent_at: Optional[datetime.datetime],
event_uuid: UUIDT,
) -> None:
# Noop if ee is not enabled
return
def log_event(
distinct_id: str,
ip: Optional[str],
site_url: str,
data: dict,
team_id: int,
now: datetime.datetime,
sent_at: Optional[datetime.datetime],
event_uuid: UUIDT,
*,
topics: Sequence[str],
) -> None:
if settings.DEBUG:
print(f'Logging event {data["event"]} to Kafka topics {" and ".join(topics)}')
producer = KafkaProducer()
data = {
"uuid": str(event_uuid),
"distinct_id": distinct_id,
"ip": ip,
"site_url": site_url,
"data": json.dumps(data),
"team_id": team_id,
"now": now.isoformat(),
"sent_at": sent_at.isoformat() if sent_at else "",
}
for topic in topics:
producer.produce(topic=topic, data=data)