0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-30 19:41:46 +01:00
posthog/ee/clickhouse/models/action.py
Eric Duong cecb7510b8
General Reorganization (#1981)
* small tweaks and reorganize session queries

* move all queries to query folder
2020-10-22 07:35:57 -04:00

130 lines
5.1 KiB
Python

import re
from typing import Dict, Tuple
from django.forms.models import model_to_dict
from ee.clickhouse.sql.actions import ELEMENT_ACTION_FILTER, EVENT_ACTION_FILTER, EVENT_NO_PROP_FILTER
from posthog.constants import AUTOCAPTURE_EVENT
from posthog.models import Action, Filter
from posthog.models.action_step import ActionStep
from posthog.models.event import Selector
def format_action_filter(action: Action, prepend: str = "", index=0) -> Tuple[str, Dict]:
# get action steps
params = {"team_id": action.team.pk}
steps = action.steps.all()
if len(steps) == 0:
return "", {}
or_queries = []
for index, step in enumerate(steps):
# filter element
if step.event == AUTOCAPTURE_EVENT:
query, element_params, index = filter_element(step, "{}{}".format(index, prepend), index)
params = {**params, **element_params}
# filter event
else:
query, event_params, index = filter_event(step, "{}{}".format(index, prepend), index)
params = {**params, **event_params}
if step.properties:
from ee.clickhouse.models.property import parse_prop_clauses
prop_query, prop_params = parse_prop_clauses(
"uuid", Filter(data={"properties": step.properties}).properties, action.team
)
query += prop_query
params = {**params, **prop_params}
or_queries.append(query)
or_separator = "OR uuid IN"
formatted_query = or_separator.join(or_queries)
return formatted_query, params
def filter_event(step: ActionStep, prepend: str = "", index: int = 0) -> Tuple[str, Dict, int]:
params = {}
event_filter = ""
efilter = ""
property_filter = ""
if step.url and step.event:
if step.url_matching == ActionStep.EXACT:
operation = "trim(BOTH '\"' FROM value) = '{}'".format(step.url)
params.update({"prop_val_{}".format(index): step.url})
elif step.url_matching == ActionStep.REGEX:
operation = "match(trim(BOTH '\"' FROM value), '{}')".format(step.url)
params.update({"{}_prop_val_{}".format(prepend, index): step.url})
else:
operation = "trim(BOTH '\"' FROM value) LIKE %({}_prop_val_{idx})s ".format(prepend, idx=index)
params.update({"{}_prop_val_{}".format(prepend, index): "%" + step.url + "%"})
property_filter = "AND key = '$current_url' AND {operation}".format(operation=operation)
efilter = "AND event = '{}'".format(step.event)
event_filter = EVENT_ACTION_FILTER.format(event_filter=efilter, property_filter=property_filter)
elif step.event:
efilter = "AND event = '{}'".format(step.event)
event_filter = EVENT_NO_PROP_FILTER.format(event_filter=efilter)
return event_filter, params, index + 1
def _create_regex(selector: Selector) -> str:
regex = r""
for idx, tag in enumerate(selector.parts):
if tag.data.get("tag_name") and isinstance(tag.data["tag_name"], str):
regex += tag.data["tag_name"]
if tag.data.get("attr_class__contains"):
regex += r".*?\.{}".format(r"\..*?".join(sorted(tag.data["attr_class__contains"])))
if tag.ch_attributes:
regex += ".*?"
for key, value in sorted(tag.ch_attributes.items()):
regex += '{}="{}".*?'.format(key, value)
regex += r"([-_a-zA-Z0-9\.]*?)?($|;|:([^;^\s]*(;|$|\s)))"
if tag.direct_descendant:
regex += ".*"
return regex
def filter_element(step: ActionStep, prepend: str = "", index=0) -> Tuple[str, Dict, int]:
event_filter, params, index = filter_event(step, prepend, index) if step.url else ("", {}, index + 1)
filters = model_to_dict(step)
if filters.get("selector"):
selector = Selector(filters["selector"], escape_slashes=False)
params["{}selector_regex".format(prepend)] = _create_regex(selector)
if filters.get("tag_name"):
params["{}tag_name_regex".format(prepend)] = r"(^|;){}(\.|$|;|:)".format(filters["tag_name"])
attributes: Dict[str, str] = {}
for key in ["href", "text"]:
if filters.get(key):
attributes[key] = re.escape(filters[key])
attributes_regex = False
if len(attributes.keys()) > 0:
attributes_regex = True
params["{}attributes_regex".format(prepend)] = ".*?({}).*?".format(
".*?".join(['{}="{}"'.format(key, value) for key, value in attributes.items()])
)
return (
ELEMENT_ACTION_FILTER.format(
selector_regex="AND match(elements_chain, %({}selector_regex)s)".format(prepend)
if filters.get("selector")
else "",
attributes_regex="AND match(elements_chain, %({}attributes_regex)s)".format(prepend)
if attributes_regex
else "",
tag_name_regex="AND match(elements_chain, %({}tag_name_regex)s)".format(prepend)
if filters.get("tag_name")
else "",
event_filter="AND uuid IN {}".format(event_filter) if event_filter else "",
),
params,
index + 1,
)