mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-22 08:40:03 +01:00
109 lines
3.2 KiB
Python
109 lines
3.2 KiB
Python
import re
|
|
from typing import Any
|
|
|
|
|
|
COST_PER_UNIT = 8
|
|
|
|
|
|
class HogVMException(Exception):
|
|
pass
|
|
|
|
|
|
class UncaughtHogVMException(HogVMException):
|
|
type: str
|
|
message: str
|
|
payload: Any
|
|
|
|
def __init__(self, type, message, payload):
|
|
super().__init__(message)
|
|
self.type = type
|
|
self.message = message
|
|
self.payload = payload
|
|
|
|
def __str__(self):
|
|
msg = self.message.replace("'", "\\'")
|
|
return f"{self.type}('{msg}')"
|
|
|
|
|
|
def like(string, pattern, flags=0):
|
|
pattern = re.escape(pattern).replace("%", ".*")
|
|
re_pattern = re.compile(pattern, flags)
|
|
return re_pattern.search(string) is not None
|
|
|
|
|
|
def get_nested_value(obj, chain, nullish=False) -> Any:
|
|
if obj is None:
|
|
return None
|
|
for key in chain:
|
|
if nullish and obj is None:
|
|
return None
|
|
if isinstance(key, int):
|
|
if key <= 0:
|
|
raise HogVMException(f"Hog arrays start from index 1")
|
|
if nullish and key > len(obj):
|
|
return None
|
|
obj = obj[key - 1]
|
|
else:
|
|
obj = obj.get(key, None)
|
|
return obj
|
|
|
|
|
|
def set_nested_value(obj, chain, value) -> Any:
|
|
if obj is None:
|
|
return None
|
|
for key in chain[:-1]:
|
|
if isinstance(key, int):
|
|
obj = obj[key]
|
|
else:
|
|
obj = obj.get(key, None)
|
|
|
|
if isinstance(obj, dict):
|
|
obj[chain[-1]] = value
|
|
elif isinstance(obj, list):
|
|
if not isinstance(chain[-1], int):
|
|
raise HogVMException(f"Invalid index: {chain[-1]}")
|
|
if chain[-1] <= 0:
|
|
raise HogVMException(f"Hog arrays start from index 1")
|
|
obj[chain[-1] - 1] = value
|
|
else:
|
|
raise HogVMException(f'Can not set property "{chain[-1]}" on object of type "{type(obj).__name__}"')
|
|
|
|
return obj
|
|
|
|
|
|
def calculate_cost(object, marked: set | None = None) -> int:
|
|
if marked is None:
|
|
marked = set()
|
|
if isinstance(object, dict) or isinstance(object, list) or isinstance(object, tuple):
|
|
if id(object) in marked:
|
|
return COST_PER_UNIT
|
|
marked.add(id(object))
|
|
try:
|
|
if isinstance(object, dict):
|
|
return COST_PER_UNIT + sum(
|
|
[calculate_cost(key, marked) + calculate_cost(value, marked) for key, value in object.items()]
|
|
)
|
|
elif isinstance(object, list) or isinstance(object, tuple):
|
|
return COST_PER_UNIT + sum([calculate_cost(val, marked) for val in object])
|
|
finally:
|
|
marked.remove(id(object))
|
|
elif isinstance(object, str):
|
|
return COST_PER_UNIT + len(object)
|
|
return COST_PER_UNIT
|
|
|
|
|
|
def unify_comparison_types(left, right):
|
|
if isinstance(left, int | float) and isinstance(right, str):
|
|
return left, float(right)
|
|
if isinstance(left, str) and isinstance(right, int | float):
|
|
return float(left), right
|
|
if isinstance(left, bool) and isinstance(right, str):
|
|
return left, bool(right)
|
|
if isinstance(left, str) and isinstance(right, bool):
|
|
return bool(left), right
|
|
if isinstance(left, int | float) and isinstance(right, bool):
|
|
return left, int(right)
|
|
if isinstance(left, bool) and isinstance(right, int | float):
|
|
return int(left), right
|
|
return left, right
|