From 99de66f434b090517ea74293e87a670568042fec Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 16 Apr 2024 17:51:23 +0200 Subject: [PATCH] feat(hogql): in cohort joins with version as well (#21552) --- bin/check_temporal_up | 4 ++-- mypy-baseline.txt | 1 - posthog/hogql/transforms/in_cohort.py | 33 ++++++++++++++++----------- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/bin/check_temporal_up b/bin/check_temporal_up index ef9198459e7..4e0f7ec074c 100755 --- a/bin/check_temporal_up +++ b/bin/check_temporal_up @@ -14,8 +14,8 @@ while true; do fi if [ $SECONDS -ge $TIMEOUT ]; then - echo "Timed out waiting for Temporal to be ready" - exit 1 + echo "Timed out ($TIMEOUT sec) waiting for Temporal to be ready. Crossing fingers and trying to run tests anyway." + exit 0 fi echo "Waiting for Temporal to be ready" diff --git a/mypy-baseline.txt b/mypy-baseline.txt index e64dd4ad0ae..f167c5bfe00 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -256,7 +256,6 @@ posthog/hogql/transforms/in_cohort.py:0: error: Incompatible default for argumen posthog/hogql/transforms/in_cohort.py:0: note: PEP 484 prohibits implicit Optional. Accordingly, mypy has changed its default to no_implicit_optional=True posthog/hogql/transforms/in_cohort.py:0: note: Use https://github.com/hauntsaninja/no_implicit_optional to automatically upgrade your codebase posthog/hogql/transforms/in_cohort.py:0: error: Argument "is_static" to "_add_join_for_cohort" of "InCohortResolver" has incompatible type "bool | None"; expected "bool" [arg-type] -posthog/hogql/transforms/in_cohort.py:0: error: Incompatible types in assignment (expression has type "ValuesQuerySet[Cohort, tuple[int, bool | None]]", variable has type "ValuesQuerySet[Cohort, tuple[int, bool | None, str | None]]") [assignment] posthog/hogql/transforms/in_cohort.py:0: error: Argument "is_static" to "_add_join_for_cohort" of "InCohortResolver" has incompatible type "bool | None"; expected "bool" [arg-type] posthog/hogql/transforms/in_cohort.py:0: error: Argument "table" to "JoinExpr" has incompatible type "Expr"; expected "SelectQuery | SelectUnionQuery | Field | None" [arg-type] posthog/hogql/transforms/in_cohort.py:0: error: List item 0 has incompatible type "SelectQueryType | None"; expected "SelectQueryType" [list-item] diff --git a/posthog/hogql/transforms/in_cohort.py b/posthog/hogql/transforms/in_cohort.py index 3f1cd16fb14..d10e393f539 100644 --- a/posthog/hogql/transforms/in_cohort.py +++ b/posthog/hogql/transforms/in_cohort.py @@ -287,18 +287,19 @@ class InCohortResolver(TraversingVisitor): if (isinstance(arg.value, int) or isinstance(arg.value, float)) and not isinstance(arg.value, bool): cohorts = Cohort.objects.filter(id=int(arg.value), team_id=self.context.team_id).values_list( - "id", "is_static", "name" + "id", "is_static", "version", "name" ) if len(cohorts) == 1: self.context.add_notice( start=arg.start, end=arg.end, - message=f"Cohort #{cohorts[0][0]} can also be specified as {escape_clickhouse_string(cohorts[0][2])}", - fix=escape_clickhouse_string(cohorts[0][2]), + message=f"Cohort #{cohorts[0][0]} can also be specified as {escape_clickhouse_string(cohorts[0][3])}", + fix=escape_clickhouse_string(cohorts[0][3]), ) self._add_join_for_cohort( cohort_id=cohorts[0][0], is_static=cohorts[0][1], + version=cohorts[0][2], compare=node, select=self.stack[-1], negative=node.op == ast.CompareOperationOp.NotInCohort, @@ -307,25 +308,26 @@ class InCohortResolver(TraversingVisitor): raise QueryError(f"Could not find cohort with ID {arg.value}", node=arg) if isinstance(arg.value, str): - cohorts = Cohort.objects.filter(name=arg.value, team_id=self.context.team_id).values_list( - "id", "is_static" + cohorts2 = Cohort.objects.filter(name=arg.value, team_id=self.context.team_id).values_list( + "id", "is_static", "version" ) - if len(cohorts) == 1: + if len(cohorts2) == 1: self.context.add_notice( start=arg.start, end=arg.end, - message=f"Searching for cohort by name. Replace with numeric ID {cohorts[0][0]} to protect against renaming.", - fix=str(cohorts[0][0]), + message=f"Searching for cohort by name. Replace with numeric ID {cohorts2[0][0]} to protect against renaming.", + fix=str(cohorts2[0][0]), ) self._add_join_for_cohort( - cohort_id=cohorts[0][0], - is_static=cohorts[0][1], + cohort_id=cohorts2[0][0], + is_static=cohorts2[0][1], + version=cohorts2[0][2], compare=node, select=self.stack[-1], negative=node.op == ast.CompareOperationOp.NotInCohort, ) return - elif len(cohorts) > 1: + elif len(cohorts2) > 1: raise QueryError(f"Found multiple cohorts with name '{arg.value}'", node=arg) raise QueryError(f"Could not find a cohort with the name '{arg.value}'", node=arg) else: @@ -336,6 +338,7 @@ class InCohortResolver(TraversingVisitor): self, cohort_id: int, is_static: bool, + version: Optional[int], select: ast.SelectQuery, compare: ast.CompareOperation, negative: bool, @@ -354,11 +357,15 @@ class InCohortResolver(TraversingVisitor): if must_add_join: if is_static: sql = "(SELECT person_id, 1 as matched FROM static_cohort_people WHERE cohort_id = {cohort_id})" + elif version is not None: + sql = "(SELECT person_id, 1 as matched FROM raw_cohort_people WHERE cohort_id = {cohort_id} AND version = {version})" else: sql = "(SELECT person_id, 1 as matched FROM raw_cohort_people WHERE cohort_id = {cohort_id} GROUP BY person_id, cohort_id, version HAVING sum(sign) > 0)" subquery = parse_expr( - sql, {"cohort_id": ast.Constant(value=cohort_id)}, start=None - ) # clear the source start position + sql, + {"cohort_id": ast.Constant(value=cohort_id), "version": ast.Constant(value=version)}, + start=None, # clear the source start position + ) new_join = ast.JoinExpr( alias=f"in_cohort__{cohort_id}",