0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-23 16:37:52 +01:00

feat(hog): Hog bytecode function STL (#24653)

This commit is contained in:
Marius Andra 2024-08-29 14:28:50 +02:00 committed by GitHub
parent f5d92e0157
commit 00bab5eb34
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 398 additions and 95 deletions

View File

@ -122,6 +122,12 @@ jobs:
# as apt-get is quite out of date. The same version must be set in hogql_parser/pyproject.toml
ANTLR_VERSION: '4.13.2'
- name: Check if STL bytecode is up to date
if: needs.changes.outputs.hog == 'true'
run: |
python -m hogvm.stl.compile
git diff --exit-code
- name: Run HogVM Python tests
if: needs.changes.outputs.hog == 'true'
run: |

View File

@ -17,3 +17,4 @@ dist/
node_modules/
pnpm-lock.yaml
posthog/templates/email/*
hogvm/typescript/src/stl/bytecode.ts

View File

@ -22,7 +22,8 @@ if [[ "$@" == *".hoge"* ]]; then
fi
exec node $CLI_PATH "$@"
fi
elif [[ "$@" == *"--out"* ]]; then
exec python3 -m posthog.hogql.cli --out "$@"
elif [[ "$@" == *".hog"* ]]; then
exec python3 -m posthog.hogql.cli --run "$@"
else

View File

@ -1,10 +1,10 @@
# HogVM
A HogVM is a 🦔 that runs HogQL bytecode. It's purpose is to locally evaluate HogQL expressions against any object.
A HogVM is a 🦔 that runs Hog bytecode. It's purpose is to locally evaluate Hog/QL expressions against any object.
## HogQL bytecode
## Hog bytecode
HogQL Bytecode is a compact representation of a subset of the HogQL AST nodes. It follows a certain structure:
Hog Bytecode is a compact representation of a subset of the Hog AST nodes. It follows a certain structure:
```
1 + 2 # [_H, op.INTEGER, 2, op.INTEGER, 1, op.PLUS]
@ -23,11 +23,11 @@ The `python/execute.py` function in this folder acts as the reference implementa
### Operations
To be considered a PostHog HogQL Bytecode Certified Parser, you must implement the following operations:
Here's a sample list of Hog bytecode operations, missing about half of them and likely out of date:
```bash
FIELD = 1 # [arg3, arg2, arg1, FIELD, 3] # arg1.arg2.arg3
CALL = 2 # [arg2, arg1, CALL, 'concat', 2] # concat(arg1, arg2)
CALL_GLOBAL = 2 # [arg2, arg1, CALL, 'concat', 2] # concat(arg1, arg2)
AND = 3 # [val3, val2, val1, AND, 3] # val1 and val2 and val3
OR = 4 # [val3, val2, val1, OR, 3] # val1 or val2 or val3
NOT = 5 # [val, NOT] # not val
@ -60,29 +60,9 @@ INTEGER = 33 # [INTEGER, 123] # 123
FLOAT = 34 # [FLOAT, 123.12] # 123.01
```
### Async Operations
Some operations can't be computed directly, and are thus asked back to the caller. These include:
```bash
IN_COHORT = 27 # [val2, val1, IREGEX] # val1 in cohort val2
NOT_IN_COHORT = 28 # [val2, val1, NOT_IREGEX] # val1 not in cohort val2
```
The arguments for these instructions will be passed on to the provided `async_operation(*args)` in reverse:
```python
def async_operation(*args):
if args[0] == op.IN_COHORT:
return db.queryInCohort(args[1], args[2])
return False
execute_bytecode(to_bytecode("'user_id' in cohort 2"), {}, async_operation).result
```
### Functions
A PostHog HogQL Bytecode Certified Parser must also implement the following function calls:
A Hog Certified Parser must also implement the following function calls:
```bash
concat(...) # concat('test: ', 1, null, '!') == 'test: 1!'
@ -96,7 +76,7 @@ ifNull(val, alternative) # ifNull('string', false) == 'string'
### Null handling
In HogQL equality comparisons, `null` is treated as any other variable. Its presence will not make functions automatically return `null`, as is the ClickHouse default.
In Hog/QL equality comparisons, `null` is treated as any other variable. Its presence will not make functions automatically return `null`, as is the ClickHouse default.
```sql
1 == null # false
@ -104,11 +84,3 @@ In HogQL equality comparisons, `null` is treated as any other variable. Its pres
```
Nulls are just ignored in `concat`
## Known broken features
- **Regular Expression** support is implemented, but NOT GUARANTEED to the same way across platforms. Different implementations (ClickHouse, Python, Node) use different Regexp engines. ClickHouse uses `re2`, the others use `pcre`. Use the case-insensitive regex operators instead of passing in modifier flags through the expression.
- **DateTime** comparisons are not supported.
- **Cohort Matching** operations are not implemented.
- Only a small subset of functions is enabled. This list is bound to expand.

View File

@ -0,0 +1,9 @@
["_H", 1, 32, "--- arrayMap ----", 2, "print", 1, 35, 52, "lambda", 1, 0, 6, 33, 2, 36, 0, 8, 38, 53, 0, 33, 1, 33, 2,
33, 3, 43, 3, 2, "arrayMap", 2, 2, "print", 1, 35, 32, "--- arrayExists ----", 2, "print", 1, 35, 52, "lambda", 1, 0, 6,
32, "%nana%", 36, 0, 17, 38, 53, 0, 32, "apple", 32, "banana", 32, "cherry", 43, 3, 2, "arrayExists", 2, 2, "print", 1,
35, 52, "lambda", 1, 0, 6, 32, "%boom%", 36, 0, 17, 38, 53, 0, 32, "apple", 32, "banana", 32, "cherry", 43, 3, 2,
"arrayExists", 2, 2, "print", 1, 35, 52, "lambda", 1, 0, 6, 32, "%boom%", 36, 0, 17, 38, 53, 0, 43, 0, 2, "arrayExists",
2, 2, "print", 1, 35, 32, "--- arrayFilter ----", 2, "print", 1, 35, 52, "lambda", 1, 0, 6, 32, "%nana%", 36, 0, 17, 38,
53, 0, 32, "apple", 32, "banana", 32, "cherry", 43, 3, 2, "arrayFilter", 2, 2, "print", 1, 35, 52, "lambda", 1, 0, 6,
32, "%e%", 36, 0, 17, 38, 53, 0, 32, "apple", 32, "banana", 32, "cherry", 43, 3, 2, "arrayFilter", 2, 2, "print", 1, 35,
52, "lambda", 1, 0, 6, 32, "%boom%", 36, 0, 17, 38, 53, 0, 43, 0, 2, "arrayFilter", 2, 2, "print", 1, 35]

View File

@ -0,0 +1,10 @@
--- arrayMap ----
[2, 4, 6]
--- arrayExists ----
true
false
false
--- arrayFilter ----
['banana']
['apple', 'cherry']
[]

View File

@ -4,4 +4,6 @@
33, 2, 52, "lambda", 1, 0, 6, 33, 2, 36, 0, 8, 38, 53, 0, 54, 1, 2, "print", 1, 35, 32, "--------", 2, "print", 1, 35,
52, "lambda", 1, 0, 20, 36, 0, 2, "print", 1, 35, 32, "moo", 2, "print", 1, 35, 32, "cow", 2, "print", 1, 35, 31, 38,
53, 0, 33, 2, 36, 3, 54, 1, 35, 32, "--------", 2, "print", 1, 35, 52, "lambda", 0, 0, 14, 32, "moo", 2, "print", 1, 35,
32, "cow", 2, "print", 1, 35, 31, 38, 53, 0, 36, 4, 54, 0, 35, 35, 35, 35, 35, 35]
32, "cow", 2, "print", 1, 35, 31, 38, 53, 0, 36, 4, 54, 0, 35, 32, "-------- lambdas do not survive json --------", 2,
"print", 1, 35, 36, 0, 2, "print", 1, 35, 36, 0, 2, "jsonStringify", 1, 2, "print", 1, 35, 36, 0, 2, "jsonStringify", 1,
2, "jsonParse", 1, 36, 5, 2, "print", 1, 35, 35, 35, 35, 35, 35, 35]

View File

@ -12,3 +12,7 @@ cow
--------
moo
cow
-------- lambdas do not survive json --------
fn<lambda(1)>
"fn<lambda(1)>"
fn<lambda(1)>

View File

@ -0,0 +1,12 @@
print('--- arrayMap ----')
print(arrayMap(x -> x * 2, [1,2,3]))
print('--- arrayExists ----')
print(arrayExists(x -> x like '%nana%', ['apple', 'banana', 'cherry']))
print(arrayExists(x -> x like '%boom%', ['apple', 'banana', 'cherry']))
print(arrayExists(x -> x like '%boom%', []))
print('--- arrayFilter ----')
print(arrayFilter(x -> x like '%nana%', ['apple', 'banana', 'cherry']))
print(arrayFilter(x -> x like '%e%', ['apple', 'banana', 'cherry']))
print(arrayFilter(x -> x like '%boom%', []))

View File

@ -28,3 +28,10 @@ let noArg := () -> {
print('cow')
}
noArg()
print('-------- lambdas do not survive json --------')
print(b)
print(jsonStringify(b)) // just a json string "<lambda:0>"
let c := jsonParse(jsonStringify(b))
print(c) // prints a string, can't be called

View File

@ -9,6 +9,7 @@ from hogvm.python.debugger import debugger, color_bytecode
from hogvm.python.objects import is_hog_error, new_hog_closure, CallFrame, ThrowFrame, new_hog_callable, is_hog_upvalue
from hogvm.python.operation import Operation, HOGQL_BYTECODE_IDENTIFIER, HOGQL_BYTECODE_IDENTIFIER_V0
from hogvm.python.stl import STL
from hogvm.python.stl.bytecode import BYTECODE_STL
from dataclasses import dataclass
from hogvm.python.utils import (
@ -67,6 +68,7 @@ def execute_bytecode(
call_stack.append(
CallFrame(
ip=2 if bytecode[0] == HOGQL_BYTECODE_IDENTIFIER else 1,
chunk="root",
stack_start=0,
arg_len=0,
closure=new_hog_closure(
@ -75,15 +77,30 @@ def execute_bytecode(
arg_count=0,
upvalue_count=0,
ip=2 if bytecode[0] == HOGQL_BYTECODE_IDENTIFIER else 1,
chunk="root",
name="",
)
),
)
)
frame = call_stack[-1]
chunk_bytecode: list[Any] = bytecode
def stack_keep_first_elements(count: int):
def set_chunk_bytecode():
nonlocal chunk_bytecode, last_op
if not frame.chunk or frame.chunk == "root":
chunk_bytecode = bytecode
last_op = len(bytecode) - 1
elif frame.chunk.startswith("stl/") and frame.chunk[4:] in BYTECODE_STL:
chunk_bytecode = BYTECODE_STL[frame.chunk[4:]][1]
last_op = len(bytecode) - 1
else:
raise HogVMException(f"Unknown chunk: {frame.chunk}")
def stack_keep_first_elements(count: int) -> list[Any]:
nonlocal stack, mem_stack, mem_used
if count < 0 or len(stack) < count:
raise HogVMException("Stack underflow")
for upvalue in reversed(upvalues):
if upvalue["location"] >= count:
if not upvalue["closed"]:
@ -91,16 +108,18 @@ def execute_bytecode(
upvalue["value"] = stack[upvalue["location"]]
else:
break
removed = stack[count:]
stack = stack[0:count]
mem_used -= sum(mem_stack[count:])
mem_stack = mem_stack[0:count]
return removed
def next_token():
nonlocal frame
nonlocal frame, chunk_bytecode
if frame.ip >= last_op:
raise HogVMException("Unexpected end of bytecode")
frame.ip += 1
return bytecode[frame.ip]
return chunk_bytecode[frame.ip]
def pop_stack():
if not stack:
@ -145,7 +164,7 @@ def execute_bytecode(
symbol: Any = None
while frame.ip <= last_op:
ops += 1
symbol = bytecode[frame.ip]
symbol = chunk_bytecode[frame.ip]
if (ops & 127) == 0: # every 128th operation
check_timeout()
elif debug:
@ -232,6 +251,7 @@ def execute_bytecode(
arg_count=0,
upvalue_count=0,
ip=-1,
chunk="stl",
)
)
)
@ -244,6 +264,20 @@ def execute_bytecode(
arg_count=STL[chain[0]].maxArgs or 0,
upvalue_count=0,
ip=-1,
chunk="stl",
)
)
)
elif chain[0] in BYTECODE_STL and len(chain) == 1:
push_stack(
new_hog_closure(
new_hog_callable(
type="stl",
name=chain[0],
arg_count=len(BYTECODE_STL[chain[0]][0]),
upvalue_count=0,
ip=0,
chunk=f"stl/{chain[0]}",
)
)
)
@ -262,6 +296,7 @@ def execute_bytecode(
stack_keep_first_elements(stack_start)
push_stack(response)
frame = call_stack[-1]
set_chunk_bytecode()
continue # resume the loop without incrementing frame.ip
case Operation.GET_LOCAL:
@ -343,6 +378,7 @@ def execute_bytecode(
new_hog_callable(
type="local",
name=name,
chunk=frame.chunk,
arg_count=arg_count,
upvalue_count=upvalue_count,
ip=frame.ip + 1,
@ -402,30 +438,59 @@ def execute_bytecode(
push_stack(None)
frame = CallFrame(
ip=func_ip,
chunk=frame.chunk,
stack_start=len(stack) - arg_len,
arg_len=arg_len,
closure=new_hog_closure(
new_hog_callable(
type="stl",
type="local",
name=name,
arg_count=arg_len,
upvalue_count=0,
ip=-1,
ip=func_ip,
chunk=frame.chunk,
)
),
)
call_stack.append(frame)
continue # resume the loop without incrementing frame.ip
else:
# Shortcut for calling STL functions (can also be done with an STL function closure)
if version == 0:
args = [pop_stack() for _ in range(arg_count)]
else:
args = list(reversed([pop_stack() for _ in range(arg_count)]))
if functions is not None and name in functions:
if version == 0:
args = [pop_stack() for _ in range(arg_count)]
else:
args = stack_keep_first_elements(len(stack) - arg_count)
push_stack(functions[name](*args))
elif name in STL:
if version == 0:
args = [pop_stack() for _ in range(arg_count)]
else:
args = stack_keep_first_elements(len(stack) - arg_count)
push_stack(STL[name].fn(args, team, stdout, timeout.total_seconds()))
elif name in BYTECODE_STL:
arg_names = BYTECODE_STL[name][0]
if len(arg_names) != arg_count:
raise HogVMException(f"Function {name} requires exactly {len(arg_names)} arguments")
frame.ip += 1 # advance for when we return
frame = CallFrame(
ip=0,
chunk=f"stl/{name}",
stack_start=len(stack) - arg_count,
arg_len=arg_count,
closure=new_hog_closure(
new_hog_callable(
type="stl",
name=name,
arg_count=arg_count,
upvalue_count=0,
ip=0,
chunk=f"stl/{name}",
)
),
)
set_chunk_bytecode()
call_stack.append(frame)
continue # resume the loop without incrementing frame.ip
else:
raise HogVMException(f"Unsupported function call: {name}")
case Operation.CALL_LOCAL:
@ -452,10 +517,12 @@ def execute_bytecode(
frame.ip += 1 # advance for when we return
frame = CallFrame(
ip=callable["ip"],
chunk=callable["chunk"],
stack_start=len(stack) - callable["argCount"],
arg_len=callable["argCount"],
closure=closure,
)
set_chunk_bytecode()
call_stack.append(frame)
continue # resume the loop without incrementing frame.ip
@ -509,6 +576,7 @@ def execute_bytecode(
call_stack = call_stack[0:call_stack_len]
push_stack(exception)
frame = call_stack[-1]
set_chunk_bytecode()
frame.ip = catch_ip
continue
else:
@ -517,6 +585,10 @@ def execute_bytecode(
message=exception.get("message"),
payload=exception.get("payload"),
)
case _:
raise HogVMException(
f'Unexpected node while running bytecode in chunk "{frame.chunk}": {chunk_bytecode[frame.ip]}'
)
frame.ip += 1
if debug:

View File

@ -5,6 +5,7 @@ from typing import Any, Optional
@dataclass
class CallFrame:
ip: int
chunk: str
stack_start: int
arg_len: int
closure: dict
@ -44,6 +45,7 @@ def is_hog_callable(obj: Any) -> bool:
and "__hogCallable__" in obj
and "argCount" in obj
and "ip" in obj
# and "chunk" in obj # TODO: enable after this has been live for some hours
and "upvalueCount" in obj
)
@ -60,10 +62,11 @@ def new_hog_closure(callable: dict, upvalues: Optional[list] = None) -> dict:
}
def new_hog_callable(type: str, arg_count: int, upvalue_count: int, ip: int, name: str) -> dict:
def new_hog_callable(type: str, arg_count: int, upvalue_count: int, ip: int, name: str, chunk: str) -> dict:
return {
"__hogCallable__": type,
"name": name,
"chunk": chunk,
"argCount": arg_count,
"upvalueCount": upvalue_count,
"ip": ip,

View File

@ -23,7 +23,7 @@ from .date import (
is_hog_date,
)
from .crypto import sha256Hex, md5Hex, sha256HmacChainHex
from ..objects import is_hog_error, new_hog_error
from ..objects import is_hog_error, new_hog_error, is_hog_callable, is_hog_closure
from ..utils import like
if TYPE_CHECKING:
@ -130,12 +130,16 @@ def jsonStringify(args: list[Any], team: Optional["Team"], stdout: Optional[list
def json_safe(obj):
if isinstance(obj, dict) or isinstance(obj, list) or isinstance(obj, tuple):
if id(obj) in marked:
if id(obj) in marked and not is_hog_callable(obj) and not is_hog_closure(obj):
return None
else:
marked.add(id(obj))
try:
if isinstance(obj, dict):
if is_hog_callable(obj):
return f"fn<{obj['name']}({obj['argCount']})>"
if is_hog_closure(obj):
return f"fn<{obj['callable']['name']}({obj['callable']['argCount']})>"
return {json_safe(k): json_safe(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [json_safe(v) for v in obj]

View File

@ -0,0 +1,8 @@
# This file is generated by hogvm/stl/compile.py
# fmt: off
BYTECODE_STL: dict[str, tuple[list[str], list]] = {
"arrayExists": (["func", "arr"], [36, 1, 36, 2, 2, "values", 1, 33, 1, 36, 3, 2, "length", 1, 31, 36, 5, 36, 4, 16, 40, 26, 36, 3, 36, 4, 45, 37, 6, 36, 6, 36, 0, 54, 1, 40, 2, 29, 38, 36, 4, 33, 1, 6, 37, 4, 39, -33, 35, 35, 35, 35, 35, 30, 38]),
"arrayFilter": (["func", "arr"], [43, 0, 36, 1, 36, 3, 2, "values", 1, 33, 1, 36, 4, 2, "length", 1, 31, 36, 6, 36, 5, 16, 40, 33, 36, 4, 36, 5, 45, 37, 7, 36, 7, 36, 0, 54, 1, 40, 9, 36, 2, 36, 7, 2, "arrayPushBack", 2, 37, 2, 36, 5, 33, 1, 6, 37, 5, 39, -40, 35, 35, 35, 35, 35, 36, 2, 38, 35]),
"arrayMap": (["func", "arr"], [43, 0, 36, 1, 36, 3, 2, "values", 1, 33, 1, 36, 4, 2, "length", 1, 31, 36, 6, 36, 5, 16, 40, 29, 36, 4, 36, 5, 45, 37, 7, 36, 2, 36, 7, 36, 0, 54, 1, 2, "arrayPushBack", 2, 37, 2, 36, 5, 33, 1, 6, 37, 5, 39, -36, 35, 35, 35, 35, 35, 36, 2, 38, 35]),
}
# fmt: on

View File

@ -126,7 +126,7 @@ class TestBytecodeExecute:
raise AssertionError("Expected Exception not raised")
try:
execute_bytecode([_H, VERSION, op.CALL_GLOBAL, "notAFunction", 1], {})
execute_bytecode([_H, VERSION, op.CALL_GLOBAL, "replaceOne", 1], {})
except Exception as e:
assert str(e) == "Stack underflow"
else:

0
hogvm/stl/__init__.py Normal file
View File

45
hogvm/stl/compile.py Executable file
View File

@ -0,0 +1,45 @@
# Run from project root (cd ../..)
# python3 -m hogvm.stl.compile
import glob
import json
from posthog.hogql import ast
from posthog.hogql.bytecode import create_bytecode, parse_program
source = "hogvm/stl/src/*.hog"
target_ts = "hogvm/typescript/src/stl/bytecode.ts"
target_py = "hogvm/python/stl/bytecode.py"
bytecodes: dict[str, [list[str], list[any]]] = {}
for filename in glob.glob(source):
with open(filename) as file:
code = file.read()
basename = filename.split("/")[-1].split(".")[0]
program = parse_program(code)
found = False
for declaration in program.declarations:
if isinstance(declaration, ast.Function) and declaration.name == basename:
found = True
bytecode = create_bytecode(declaration.body, args=declaration.params)
bytecodes[basename] = [declaration.params, bytecode]
if not found:
print(f"Error: no function called {basename} was found in {filename}!") # noqa: T201
exit(1)
with open(target_ts, "w") as output:
output.write("// This file is generated by hogvm/stl/compile.py\n")
output.write("export const BYTECODE_STL: Record<string, [string[], any[]]> = {\n")
for name, (params, bytecode) in sorted(bytecodes.items()):
output.write(f' "{name}": [{json.dumps(params)}, {json.dumps(bytecode)}],\n')
output.write("}\n")
with open(target_py, "w") as output:
output.write("# This file is generated by hogvm/stl/compile.py\n")
output.write("# fmt: off\n")
output.write("BYTECODE_STL: dict[str, tuple[list[str], list]] = {\n")
for name, (params, bytecode) in sorted(bytecodes.items()):
output.write(f' "{name}": ({json.dumps(params)}, {json.dumps(bytecode)}),\n')
output.write("}\n")
output.write("# fmt: on\n")

View File

@ -0,0 +1,8 @@
fn arrayExists(func, arr) {
for (let i in arr) {
if (func(i)) {
return true
}
}
return false
}

View File

@ -0,0 +1,9 @@
fn arrayFilter(func, arr) {
let result := []
for (let i in arr) {
if (func(i)) {
result := arrayPushBack(result, i)
}
}
return result
}

View File

@ -0,0 +1,7 @@
fn arrayMap(func, arr) {
let result := []
for (let i in arr) {
result := arrayPushBack(result, func(i))
}
return result
}

View File

@ -1,15 +1,16 @@
{
"name": "@posthog/hogvm",
"version": "1.0.38",
"version": "1.0.39",
"description": "PostHog Hog Virtual Machine",
"types": "dist/index.d.ts",
"main": "dist/index.js",
"packageManager": "pnpm@8.3.1",
"scripts": {
"test": "jest --runInBand --forceExit",
"build": "pnpm clean && pnpm compile",
"build": "pnpm clean && pnpm build:stl && pnpm build:compile",
"build:stl": "cd ../.. && python3 -m hogvm.stl.compile",
"build:compile": "tsc -p tsconfig.build.json",
"clean": "rm -rf dist/*",
"compile": "tsc -p tsconfig.build.json",
"check": "tsc -p tsconfig.build.json --noEmit",
"prettier": "prettier --write src",
"prettier:check": "prettier --check src",

View File

@ -93,7 +93,7 @@ describe('hogvm execute', () => {
await expect(execAsync([], options)).rejects.toThrow("Invalid HogQL bytecode, must start with '_H'")
expect(() => execSync(['_h', op.INTEGER, 2, op.INTEGER, 1, 'InvalidOp'], options)).toThrow(
'Unexpected node while running bytecode: InvalidOp'
'Unexpected node while running bytecode in chunk "root": InvalidOp'
)
expect(() =>
execSync(['_h', op.STRING, 'another', op.STRING, 'arg', op.CALL_GLOBAL, 'invalidFunc', 2], options)
@ -105,7 +105,7 @@ describe('hogvm execute', () => {
)
expect(() => execSync(['_H', 1, op.INTEGER, 2, op.INTEGER, 1, 'InvalidOp'], options)).toThrow(
'Unexpected node while running bytecode: InvalidOp'
'Unexpected node while running bytecode in chunk "root": InvalidOp'
)
expect(() =>
execSync(['_H', 1, op.STRING, 'another', op.STRING, 'arg', op.CALL_GLOBAL, 'invalidFunc', 2], options)
@ -549,12 +549,14 @@ describe('hogvm execute', () => {
ip: 8,
stackStart: 0,
argCount: 0,
chunk: 'root',
closure: {
__hogClosure__: true,
callable: {
__hogCallable__: 'main',
name: '',
argCount: 0,
chunk: 'root',
upvalueCount: 0,
ip: 1,
},
@ -1930,12 +1932,14 @@ describe('hogvm execute', () => {
ip: 12,
stackStart: 0,
argCount: 0,
chunk: 'root',
closure: {
__hogClosure__: true,
callable: {
__hogCallable__: 'main',
name: '',
argCount: 0,
chunk: 'root',
upvalueCount: 0,
ip: 1,
},
@ -2029,6 +2033,7 @@ describe('hogvm execute', () => {
argCount: 2,
upvalueCount: 1,
ip: 9,
chunk: 'root',
},
upvalues: [1],
},
@ -2045,6 +2050,7 @@ describe('hogvm execute', () => {
callStack: [
{
ip: 27,
chunk: 'root',
stackStart: 0,
argCount: 0,
closure: {
@ -2055,6 +2061,7 @@ describe('hogvm execute', () => {
argCount: 0,
upvalueCount: 0,
ip: 1,
chunk: 'root',
},
upvalues: [],
},
@ -2065,7 +2072,7 @@ describe('hogvm execute', () => {
ops: 5,
asyncSteps: 1,
syncDuration: expect.any(Number),
maxMemUsed: 242,
maxMemUsed: 267,
},
})
result.state!.stack.push(null)
@ -2083,7 +2090,7 @@ describe('hogvm execute', () => {
ops: 19,
asyncSteps: 1,
syncDuration: expect.any(Number),
maxMemUsed: 476,
maxMemUsed: 526,
},
})
})
@ -2171,6 +2178,7 @@ describe('hogvm execute', () => {
argCount: 0,
upvalueCount: 0,
ip: 7,
chunk: 'root',
},
upvalues: [],
},
@ -2182,6 +2190,7 @@ describe('hogvm execute', () => {
argCount: 0,
upvalueCount: 1,
ip: 14,
chunk: 'root',
},
upvalues: [1],
},
@ -2198,6 +2207,7 @@ describe('hogvm execute', () => {
callStack: [
{
ip: 37,
chunk: 'root',
stackStart: 0,
argCount: 0,
closure: {
@ -2208,6 +2218,7 @@ describe('hogvm execute', () => {
argCount: 0,
upvalueCount: 0,
ip: 1,
chunk: 'root',
},
upvalues: [],
},
@ -2218,7 +2229,7 @@ describe('hogvm execute', () => {
ops: 11,
asyncSteps: 1,
syncDuration: expect.any(Number),
maxMemUsed: 682,
maxMemUsed: 757,
},
})
result.state!.stack.push(null)
@ -2236,7 +2247,7 @@ describe('hogvm execute', () => {
ops: 17,
asyncSteps: 1,
syncDuration: expect.any(Number),
maxMemUsed: 682,
maxMemUsed: 757,
},
})
})
@ -2330,6 +2341,7 @@ describe('hogvm execute', () => {
argCount: 0,
upvalueCount: 0,
ip: 7,
chunk: 'root',
},
upvalues: [],
},
@ -2341,6 +2353,7 @@ describe('hogvm execute', () => {
argCount: 0,
upvalueCount: 1,
ip: 14,
chunk: 'root',
},
upvalues: [1],
},
@ -2357,6 +2370,7 @@ describe('hogvm execute', () => {
callStack: [
{
ip: 48,
chunk: 'root',
stackStart: 0,
argCount: 0,
closure: {
@ -2367,12 +2381,14 @@ describe('hogvm execute', () => {
argCount: 0,
upvalueCount: 0,
ip: 1,
chunk: 'root',
},
upvalues: [],
},
},
{
ip: 25,
chunk: 'root',
stackStart: 2,
argCount: 0,
closure: {
@ -2383,6 +2399,7 @@ describe('hogvm execute', () => {
argCount: 0,
upvalueCount: 1,
ip: 14,
chunk: 'root',
},
upvalues: [1],
},
@ -2393,7 +2410,7 @@ describe('hogvm execute', () => {
ops: 16,
asyncSteps: 1,
syncDuration: expect.any(Number),
maxMemUsed: 682,
maxMemUsed: 757,
},
})
result.state!.stack.push(null)
@ -2411,7 +2428,7 @@ describe('hogvm execute', () => {
ops: 20,
asyncSteps: 1,
syncDuration: expect.any(Number),
maxMemUsed: 682,
maxMemUsed: 757,
},
})
})

View File

@ -12,6 +12,7 @@ import {
ThrowFrame,
} from './objects'
import { Operation } from './operation'
import { BYTECODE_STL } from './stl/bytecode'
import { ASYNC_STL, STL } from './stl/stl'
import {
calculateCost,
@ -37,7 +38,7 @@ export interface VMState {
/** Values hoisted from the stack */
upvalues: HogUpValue[]
/** Call stack of the VM */
callStack: CallFrame[] // [number, number, number][]
callStack: CallFrame[]
/** Throw stack of the VM */
throwStack: ThrowFrame[]
/** Declared functions of the VM (deprecated) */
@ -111,7 +112,7 @@ export async function execAsync(bytecode: any[], options?: ExecOptions): Promise
export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult {
let vmState: VMState | undefined = undefined
let bytecode: any[] | undefined = undefined
let bytecode: any[]
if (!Array.isArray(code)) {
vmState = code
bytecode = vmState.bytecode
@ -155,6 +156,7 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult {
if (callStack.length === 0) {
callStack.push({
ip: bytecode[0] === '_H' ? 2 : 1,
chunk: 'root',
stackStart: 0,
argCount: 0,
closure: newHogClosure(
@ -163,11 +165,22 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult {
argCount: 0,
upvalueCount: 0,
ip: 1,
chunk: 'root',
})
),
} satisfies CallFrame)
}
let frame: CallFrame = callStack[callStack.length - 1]
let chunkBytecode: any[] = bytecode
const setChunkBytecode = (): void => {
if (!frame.chunk || frame.chunk === 'root') {
chunkBytecode = bytecode
} else if (frame.chunk.startsWith('stl/') && frame.chunk.substring(4) in BYTECODE_STL) {
chunkBytecode = BYTECODE_STL[frame.chunk.substring(4)][1]
} else {
throw new HogVMException(`Unknown chunk: ${frame.chunk}`)
}
}
function popStack(): any {
if (stack.length === 0) {
@ -192,6 +205,9 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult {
return stack.splice(start, deleteCount)
}
function stackKeepFirstElements(count: number): any[] {
if (count < 0 || stack.length < count) {
throw new HogVMException('Stack underflow')
}
for (let i = sortedUpValues.length - 1; i >= 0; i--) {
if (sortedUpValues[i].location >= count) {
if (!sortedUpValues[i].closed) {
@ -208,10 +224,10 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult {
}
function next(): any {
if (frame.ip >= bytecode!.length - 1) {
if (frame.ip >= chunkBytecode.length - 1) {
throw new HogVMException('Unexpected end of bytecode')
}
return bytecode![++frame.ip]
return chunkBytecode[++frame.ip]
}
function checkTimeout(): void {
@ -255,12 +271,12 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult {
return createdUpValue
}
while (frame.ip < bytecode.length) {
while (frame.ip < chunkBytecode.length) {
ops += 1
if ((ops & 127) === 0) {
checkTimeout()
}
switch (bytecode[frame.ip]) {
switch (chunkBytecode[frame.ip]) {
case null:
break
case Operation.STRING:
@ -391,6 +407,7 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult {
argCount: 0, // TODO
upvalueCount: 0,
ip: -1,
chunk: 'async',
})
)
)
@ -402,6 +419,7 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult {
argCount: ASYNC_STL[chain[0]].maxArgs ?? 0,
upvalueCount: 0,
ip: -1,
chunk: 'async',
})
)
)
@ -413,6 +431,19 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult {
argCount: STL[chain[0]].maxArgs ?? 0,
upvalueCount: 0,
ip: -1,
chunk: 'stl',
})
)
)
} else if (chain.length == 1 && chain[0] in BYTECODE_STL && Object.hasOwn(BYTECODE_STL, chain[0])) {
pushStack(
newHogClosure(
newHogCallable('stl', {
name: chain[0],
argCount: BYTECODE_STL[chain[0]][0].length,
upvalueCount: 0,
ip: 0,
chunk: `stl/${chain[0]}`,
})
)
)
@ -437,6 +468,7 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult {
stackKeepFirstElements(stackStart)
pushStack(result)
frame = callStack[callStack.length - 1]
setChunkBytecode()
continue // resume the loop without incrementing frame.ip
}
case Operation.GET_LOCAL:
@ -519,6 +551,7 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult {
argCount,
upvalueCount,
ip: frame.ip + 1,
chunk: frame.chunk,
})
pushStack(callable)
frame.ip += bodyLength
@ -595,17 +628,20 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult {
}
frame = {
ip: funcIp,
chunk: frame.chunk,
stackStart: stack.length - argLen,
argCount: argLen,
closure: newHogClosure(
newHogCallable('stl', {
newHogCallable('local', {
name: name,
argCount: argLen,
upvalueCount: 0,
ip: -1,
ip: funcIp,
chunk: frame.chunk,
})
),
} satisfies CallFrame
setChunkBytecode()
callStack.push(frame)
continue // resume the loop without incrementing frame.ip
} else {
@ -616,14 +652,13 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult {
throw new HogVMException('Too many arguments')
}
const args =
version === 0
? Array(temp)
.fill(null)
.map(() => popStack())
: stackKeepFirstElements(stack.length - temp)
if (options?.functions && Object.hasOwn(options.functions, name) && options.functions[name]) {
const args =
version === 0
? Array(temp)
.fill(null)
.map(() => popStack())
: stackKeepFirstElements(stack.length - temp)
pushStack(convertJSToHog(options.functions[name](...args.map(convertHogToJS))))
} else if (
name !== 'toString' &&
@ -636,6 +671,13 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult {
throw new HogVMException(`Exceeded maximum number of async steps: ${maxAsyncSteps}`)
}
const args =
version === 0
? Array(temp)
.fill(null)
.map(() => popStack())
: stackKeepFirstElements(stack.length - temp)
frame.ip += 1 // resume at the next address after async returns
return {
@ -660,7 +702,37 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult {
},
} satisfies ExecResult
} else if (name in STL) {
const args =
version === 0
? Array(temp)
.fill(null)
.map(() => popStack())
: stackKeepFirstElements(stack.length - temp)
pushStack(STL[name].fn(args, name, timeout))
} else if (name in BYTECODE_STL) {
const argNames = BYTECODE_STL[name][0]
if (argNames.length !== temp) {
throw new HogVMException(`Function ${name} requires exactly ${argNames.length} arguments`)
}
frame.ip += 1 // advance for when we return
frame = {
ip: 0,
chunk: `stl/${name}`,
stackStart: stack.length - temp,
argCount: temp,
closure: newHogClosure(
newHogCallable('stl', {
name,
argCount: temp,
upvalueCount: 0,
ip: 0,
chunk: `stl/${name}`,
})
),
} satisfies CallFrame
setChunkBytecode()
callStack.push(frame)
continue // resume the loop without incrementing frame.ip
} else {
throw new HogVMException(`Unsupported function call: ${name}`)
}
@ -696,10 +768,12 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult {
frame.ip += 1 // advance for when we return
frame = {
ip: closure.callable.ip,
chunk: closure.callable.chunk,
stackStart: stack.length - closure.callable.argCount,
argCount: closure.callable.argCount,
closure,
} satisfies CallFrame
setChunkBytecode()
callStack.push(frame)
continue // resume the loop without incrementing frame.ip
} else if (closure.callable.__hogCallable__ === 'stl') {
@ -788,6 +862,7 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult {
callStack.splice(callStackLen)
pushStack(exception)
frame = callStack[callStack.length - 1]
setChunkBytecode()
frame.ip = catchIp
continue // resume the loop without incrementing frame.ip
} else {
@ -795,7 +870,9 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult {
}
}
default:
throw new HogVMException(`Unexpected node while running bytecode: ${bytecode[frame.ip]}`)
throw new HogVMException(
`Unexpected node while running bytecode in chunk "${frame.chunk}": ${chunkBytecode[frame.ip]}`
)
}
// use "continue" to skip incrementing frame.ip each iteration

View File

@ -1,6 +1,7 @@
export interface CallFrame {
closure: HogClosure
ip: number
chunk: string
stackStart: number
argCount: number
}
@ -38,6 +39,7 @@ export interface HogCallable {
argCount: number
upvalueCount: number
ip: number
chunk: string
}
export interface HogUpValue {
@ -82,6 +84,7 @@ export function isHogCallable(obj: any): obj is HogCallable {
'__hogCallable__' in obj &&
'argCount' in obj &&
'ip' in obj &&
// 'chunk' in obj && // TODO: enable after this has been live for some hours
'upvalueCount' in obj
)
}
@ -102,11 +105,13 @@ export function newHogCallable(
type: HogCallable['__hogCallable__'],
{
name,
chunk,
argCount,
upvalueCount,
ip,
}: {
name: string
chunk: string
argCount: number
upvalueCount: number
ip: number
@ -115,6 +120,7 @@ export function newHogCallable(
return {
__hogCallable__: type,
name,
chunk: chunk,
argCount,
upvalueCount,
ip,

View File

@ -0,0 +1,6 @@
// This file is generated by hogvm/stl/compile.py
export const BYTECODE_STL: Record<string, [string[], any[]]> = {
"arrayExists": [["func", "arr"], [36, 1, 36, 2, 2, "values", 1, 33, 1, 36, 3, 2, "length", 1, 31, 36, 5, 36, 4, 16, 40, 26, 36, 3, 36, 4, 45, 37, 6, 36, 6, 36, 0, 54, 1, 40, 2, 29, 38, 36, 4, 33, 1, 6, 37, 4, 39, -33, 35, 35, 35, 35, 35, 30, 38]],
"arrayFilter": [["func", "arr"], [43, 0, 36, 1, 36, 3, 2, "values", 1, 33, 1, 36, 4, 2, "length", 1, 31, 36, 6, 36, 5, 16, 40, 33, 36, 4, 36, 5, 45, 37, 7, 36, 7, 36, 0, 54, 1, 40, 9, 36, 2, 36, 7, 2, "arrayPushBack", 2, 37, 2, 36, 5, 33, 1, 6, 37, 5, 39, -40, 35, 35, 35, 35, 35, 36, 2, 38, 35]],
"arrayMap": [["func", "arr"], [43, 0, 36, 1, 36, 3, 2, "values", 1, 33, 1, 36, 4, 2, "length", 1, 31, 36, 6, 36, 5, 16, 40, 29, 36, 4, 36, 5, 45, 37, 7, 36, 2, 36, 7, 36, 0, 54, 1, 2, "arrayPushBack", 2, 37, 2, 36, 5, 33, 1, 6, 37, 5, 39, -36, 35, 35, 35, 35, 35, 36, 2, 38, 35]],
}

View File

@ -45,8 +45,15 @@ export function printHogValue(obj: any, marked: Set<any> | undefined = undefined
if (!marked) {
marked = new Set()
}
if (typeof obj === 'object' && obj !== null) {
if (marked.has(obj) && !isHogDateTime(obj) && !isHogDate(obj) && !isHogError(obj)) {
if (typeof obj === 'object' && obj !== null && obj !== undefined) {
if (
marked.has(obj) &&
!isHogDateTime(obj) &&
!isHogDate(obj) &&
!isHogError(obj) &&
!isHogClosure(obj) &&
!isHogCallable(obj)
) {
return 'null'
}
marked.add(obj)
@ -91,7 +98,7 @@ export function printHogValue(obj: any, marked: Set<any> | undefined = undefined
}
} else if (typeof obj === 'boolean') {
return obj ? 'true' : 'false'
} else if (obj === null) {
} else if (obj === null || obj === undefined) {
return 'null'
} else if (typeof obj === 'string') {
return escapeString(obj)

View File

@ -1,6 +1,6 @@
import { DateTime } from 'luxon'
import { isHogDate, isHogDateTime, isHogError, newHogError } from '../objects'
import { isHogCallable, isHogClosure, isHogDate, isHogDateTime, isHogError, newHogError } from '../objects'
import { md5Hex, sha256Hex, sha256HmacChainHex } from './crypto'
import {
formatDateTime,
@ -230,6 +230,11 @@ export const STL: Record<string, STLFunction> = {
if (isHogDateTime(x) || isHogDate(x) || isHogError(x)) {
return x
}
if (isHogCallable(x) || isHogClosure(x)) {
// we don't support serializing callables
const callable = isHogCallable(x) ? x : x.callable
return `fn<${callable.name || 'lambda'}(${callable.argCount})>`
}
const obj: Record<string, any> = {}
for (const key in x) {
obj[key] = convert(x[key], marked)

View File

@ -52,7 +52,7 @@
"@google-cloud/storage": "^5.8.5",
"@maxmind/geoip2-node": "^3.4.0",
"@posthog/clickhouse": "^1.7.0",
"@posthog/hogvm": "^1.0.38",
"@posthog/hogvm": "^1.0.39",
"@posthog/plugin-scaffold": "1.4.4",
"@sentry/node": "^7.49.0",
"@sentry/profiling-node": "^0.3.0",

View File

@ -47,8 +47,8 @@ dependencies:
specifier: file:../rust/cyclotron-node
version: file:../rust/cyclotron-node
'@posthog/hogvm':
specifier: ^1.0.38
version: 1.0.38(luxon@3.4.4)(re2@1.20.3)
specifier: ^1.0.39
version: 1.0.39(luxon@3.4.4)(re2@1.20.3)
'@posthog/plugin-scaffold':
specifier: 1.4.4
version: 1.4.4
@ -3116,8 +3116,8 @@ packages:
engines: {node: '>=12'}
dev: false
/@posthog/hogvm@1.0.38(luxon@3.4.4)(re2@1.20.3):
resolution: {integrity: sha512-UB0mFVUCG2CJC+bQX8rzvo/zG1Mu8oZcdrxRtasZetZfOffRVBvj6ESNPEsTNlh/hNV9u+aOvSppRomiFEvjlg==}
/@posthog/hogvm@1.0.39(luxon@3.4.4)(re2@1.20.3):
resolution: {integrity: sha512-PUv8rr01PyA5FqrZTiV5OIXVfXeQpxpJuUKn37ij2JPWRnLA2U97GUHordkZkykpd3ksk2Td3SH0VV/KournFw==}
peerDependencies:
luxon: ^3.4.4
re2: ^1.21.3

View File

@ -169,7 +169,7 @@ describe('CDP Processed Events Consuner', () => {
},
{
level: 'debug',
message: "Suspending function due to async function call 'fetch'. Payload: 1639 bytes",
message: "Suspending function due to async function call 'fetch'. Payload: 1689 bytes",
},
{
level: 'info',
@ -217,7 +217,7 @@ describe('CDP Processed Events Consuner', () => {
},
{
level: 'debug',
message: "Suspending function due to async function call 'fetch'. Payload: 1639 bytes",
message: "Suspending function due to async function call 'fetch'. Payload: 1689 bytes",
},
{
level: 'debug',

View File

@ -211,7 +211,7 @@ describe('CDP Processed Events Consuner', () => {
topic: 'log_entries_test',
value: {
log_source: 'hog_function',
message: "Suspending function due to async function call 'fetch'. Payload: 1805 bytes",
message: "Suspending function due to async function call 'fetch'. Payload: 1855 bytes",
team_id: 2,
},
})

View File

@ -87,7 +87,7 @@ describe('Hog Executor', () => {
{
timestamp: expect.any(DateTime),
level: 'debug',
message: "Suspending function due to async function call 'fetch'. Payload: 1764 bytes",
message: "Suspending function due to async function call 'fetch'. Payload: 1814 bytes",
},
])
})
@ -199,7 +199,7 @@ describe('Hog Executor', () => {
expect(asyncExecResult.finished).toBe(true)
expect(logs.map((log) => log.message)).toEqual([
'Executing function',
"Suspending function due to async function call 'fetch'. Payload: 1764 bytes",
"Suspending function due to async function call 'fetch'. Payload: 1814 bytes",
'Resuming function',
'Fetch response:, {"status":200,"body":"success"}',
'Function completed in 100ms. Sync: 0ms. Mem: 746 bytes. Ops: 22.',
@ -227,7 +227,7 @@ describe('Hog Executor', () => {
expect(asyncExecResult.finished).toBe(true)
expect(logs.map((log) => log.message)).toEqual([
'Executing function',
"Suspending function due to async function call 'fetch'. Payload: 1764 bytes",
"Suspending function due to async function call 'fetch'. Payload: 1814 bytes",
'Resuming function',
'Fetch response:, {"status":200,"body":{"foo":"bar"}}', // The body is parsed
'Function completed in 100ms. Sync: 0ms. Mem: 746 bytes. Ops: 22.',

View File

@ -107,7 +107,6 @@ class BytecodeCompiler(Visitor):
self.supported_functions = supported_functions or set()
self.locals: list[Local] = []
self.upvalues: list[UpValue] = []
# self.functions: dict[str, HogFunction] = {}
self.scope_depth = 0
self.args = args
# we're in a function definition

View File

@ -27,6 +27,11 @@ if "--run" in modifiers:
for line in response.stdout:
print(line) # noqa: T201
elif "--out" in modifiers:
if len(args) != 2:
raise ValueError("Must specify exactly one filename")
print(json.dumps(bytecode)) # noqa: T201
elif "--compile" in modifiers:
if len(args) == 3:
target = args[2]