diff --git a/posthog/api/feature_flag.py b/posthog/api/feature_flag.py index d24ee4499a4..0ae7331437b 100644 --- a/posthog/api/feature_flag.py +++ b/posthog/api/feature_flag.py @@ -14,6 +14,9 @@ from rest_framework import ( viewsets, ) from posthog.api.utils import action +from posthog.schema import ( + FeatureFlagStatusQueryResponse, +) from rest_framework.permissions import SAFE_METHODS, BasePermission from rest_framework.request import Request from rest_framework.response import Response @@ -796,6 +799,15 @@ class FeatureFlagViewSet( return activity_page_response(activity_page, limit, page, request) + @action(methods=["GET"], url_path="status", detail=True, required_scopes=["feature_flag:read"]) + @extend_schema( + responses={ + 200: FeatureFlagStatusQueryResponse, + } + ) + def status(self, request: request.Request, **kwargs): + return [] + @action(methods=["GET"], detail=True, required_scopes=["activity_log:read"]) def activity(self, request: request.Request, **kwargs): limit = int(request.query_params.get("limit", "10")) diff --git a/posthog/hogql_queries/feature_flags/__init__.py b/posthog/hogql_queries/feature_flags/__init__.py new file mode 100644 index 00000000000..0b50bdd16cc --- /dev/null +++ b/posthog/hogql_queries/feature_flags/__init__.py @@ -0,0 +1,12 @@ +# The FF variant name for control +CONTROL_VARIANT_KEY = "control" + +# controls minimum number of people to be exposed to a variant +# before the results are deemed significant +FF_DISTRIBUTION_THRESHOLD = 100 + +# If probability of a variant is below this threshold, it will be considered +# insignificant +MIN_PROBABILITY_FOR_SIGNIFICANCE = 0.9 + +EXPECTED_LOSS_SIGNIFICANCE_LEVEL = 0.01 diff --git a/posthog/hogql_queries/feature_flags/feature_flags_trends_query_runner.py b/posthog/hogql_queries/feature_flags/feature_flags_trends_query_runner.py new file mode 100644 index 00000000000..30247912822 --- /dev/null +++ b/posthog/hogql_queries/feature_flags/feature_flags_trends_query_runner.py @@ -0,0 +1,354 @@ +import json +from zoneinfo import ZoneInfo +from django.conf import settings +from posthog.constants import ExperimentNoResultsErrorKeys +from posthog.hogql import ast +from posthog.hogql_queries.experiments import CONTROL_VARIANT_KEY +from posthog.hogql_queries.experiments.trends_statistics import ( + are_results_significant, + calculate_credible_intervals, + calculate_probabilities, +) +from posthog.hogql_queries.insights.trends.trends_query_runner import TrendsQueryRunner +from posthog.hogql_queries.query_runner import QueryRunner +from posthog.models.experiment import Experiment +from posthog.queries.trends.util import ALL_SUPPORTED_MATH_FUNCTIONS +from rest_framework.exceptions import ValidationError +from posthog.schema import ( + BaseMathType, + BreakdownFilter, + CachedExperimentTrendsQueryResponse, + ChartDisplayType, + EventPropertyFilter, + EventsNode, + ExperimentSignificanceCode, + ExperimentTrendsQuery, + ExperimentTrendsQueryResponse, + ExperimentVariantTrendsBaseStats, + InsightDateRange, + PropertyMathType, + TrendsFilter, + TrendsQuery, + TrendsQueryResponse, +) +from typing import Any, Optional +import threading + + +class ExperimentTrendsQueryRunner(QueryRunner): + query: ExperimentTrendsQuery + response: ExperimentTrendsQueryResponse + cached_response: CachedExperimentTrendsQueryResponse + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.experiment = Experiment.objects.get(id=self.query.experiment_id) + self.feature_flag = self.experiment.feature_flag + self.variants = [variant["key"] for variant in self.feature_flag.variants] + if self.experiment.holdout: + self.variants.append(f"holdout-{self.experiment.holdout.id}") + self.breakdown_key = f"$feature/{self.feature_flag.key}" + + self.prepared_count_query = self._prepare_count_query() + self.prepared_exposure_query = self._prepare_exposure_query() + + self.count_query_runner = TrendsQueryRunner( + query=self.prepared_count_query, team=self.team, timings=self.timings, limit_context=self.limit_context + ) + self.exposure_query_runner = TrendsQueryRunner( + query=self.prepared_exposure_query, team=self.team, timings=self.timings, limit_context=self.limit_context + ) + + def _uses_math_aggregation_by_user_or_property_value(self, query: TrendsQuery): + math_keys = ALL_SUPPORTED_MATH_FUNCTIONS + # "sum" doesn't need special handling, we *can* have custom exposure for sum filters + if "sum" in math_keys: + math_keys.remove("sum") + return any(entity.math in math_keys for entity in query.series) + + def _get_insight_date_range(self) -> InsightDateRange: + """ + Returns an InsightDateRange object based on the experiment's start and end dates, + adjusted for the team's timezone if applicable. + """ + if self.team.timezone: + tz = ZoneInfo(self.team.timezone) + start_date = self.experiment.start_date.astimezone(tz) if self.experiment.start_date else None + end_date = self.experiment.end_date.astimezone(tz) if self.experiment.end_date else None + else: + start_date = self.experiment.start_date + end_date = self.experiment.end_date + + return InsightDateRange( + date_from=start_date.isoformat() if start_date else None, + date_to=end_date.isoformat() if end_date else None, + explicitDate=True, + ) + + def _get_breakdown_filter(self) -> BreakdownFilter: + return BreakdownFilter( + breakdown=self.breakdown_key, + breakdown_type="event", + ) + + def _prepare_count_query(self) -> TrendsQuery: + """ + This method takes the raw trend query and adapts it + for the needs of experiment analysis: + + 1. Set the trend display type based on whether math aggregation is used + 2. Set the date range to match the experiment's duration, using the project's timezone. + 3. Configure the breakdown to use the feature flag key, which allows us + to separate results for different experiment variants. + """ + prepared_count_query = TrendsQuery(**self.query.count_query.model_dump()) + + uses_math_aggregation = self._uses_math_aggregation_by_user_or_property_value(prepared_count_query) + + # :TRICKY: for `avg` aggregation, use `sum` data as an approximation + if prepared_count_query.series[0].math == PropertyMathType.AVG: + prepared_count_query.series[0].math = PropertyMathType.SUM + # TODO: revisit this; using the count data for the remaining aggregation types is likely wrong + elif uses_math_aggregation: + prepared_count_query.series[0].math = None + + prepared_count_query.trendsFilter = TrendsFilter(display=ChartDisplayType.ACTIONS_LINE_GRAPH_CUMULATIVE) + prepared_count_query.dateRange = self._get_insight_date_range() + prepared_count_query.breakdownFilter = self._get_breakdown_filter() + prepared_count_query.properties = [ + EventPropertyFilter( + key=self.breakdown_key, + value=self.variants, + operator="exact", + type="event", + ) + ] + + return prepared_count_query + + def _prepare_exposure_query(self) -> TrendsQuery: + """ + This method prepares the exposure query for the experiment analysis. + + Exposure is the count of users who have seen the experiment. This is necessary to calculate the statistical + significance of the experiment. + + There are 3 possible cases for the exposure query: + 1. If math aggregation is used, we construct an implicit exposure query + 2. Otherwise, if an exposure query is provided, we use it as is, adapting it to the experiment's duration and breakdown + 3. Otherwise, we construct a default exposure query (the count of $feature_flag_called events) + """ + + # 1. If math aggregation is used, we construct an implicit exposure query: unique users for the count event + uses_math_aggregation = self._uses_math_aggregation_by_user_or_property_value(self.query.count_query) + + if uses_math_aggregation: + prepared_exposure_query = TrendsQuery(**self.query.count_query.model_dump()) + count_event = self.query.count_query.series[0] + + if hasattr(count_event, "event"): + prepared_exposure_query.dateRange = self._get_insight_date_range() + prepared_exposure_query.breakdownFilter = self._get_breakdown_filter() + prepared_exposure_query.series = [ + EventsNode( + event=count_event.event, + math=BaseMathType.DAU, + ) + ] + prepared_exposure_query.properties = [ + EventPropertyFilter( + key=self.breakdown_key, + value=self.variants, + operator="exact", + type="event", + ) + ] + else: + raise ValueError("Expected first series item to have an 'event' attribute") + + # 2. Otherwise, if an exposure query is provided, we use it as is, adapting the date range and breakdown + elif self.query.exposure_query: + prepared_exposure_query = TrendsQuery(**self.query.exposure_query.model_dump()) + prepared_exposure_query.dateRange = self._get_insight_date_range() + prepared_exposure_query.breakdownFilter = self._get_breakdown_filter() + prepared_exposure_query.properties = [ + EventPropertyFilter( + key=self.breakdown_key, + value=self.variants, + operator="exact", + type="event", + ) + ] + # 3. Otherwise, we construct a default exposure query: unique users for the $feature_flag_called event + else: + prepared_exposure_query = TrendsQuery( + dateRange=self._get_insight_date_range(), + breakdownFilter=self._get_breakdown_filter(), + series=[ + EventsNode( + event="$feature_flag_called", + math=BaseMathType.DAU, # TODO sync with frontend!!! + ) + ], + properties=[ + EventPropertyFilter( + key=self.breakdown_key, + value=self.variants, + operator="exact", + type="event", + ), + EventPropertyFilter( + key="$feature_flag", + value=[self.feature_flag.key], + operator="exact", + type="event", + ), + ], + ) + + return prepared_exposure_query + + def calculate(self) -> ExperimentTrendsQueryResponse: + shared_results: dict[str, Optional[Any]] = {"count_result": None, "exposure_result": None} + errors = [] + + def run(query_runner: TrendsQueryRunner, result_key: str, is_parallel: bool): + try: + result = query_runner.calculate() + shared_results[result_key] = result + except Exception as e: + errors.append(e) + finally: + if is_parallel: + from django.db import connection + + # This will only close the DB connection for the newly spawned thread and not the whole app + connection.close() + + # This exists so that we're not spawning threads during unit tests + if settings.IN_UNIT_TESTING: + run(self.count_query_runner, "count_result", False) + run(self.exposure_query_runner, "exposure_result", False) + else: + jobs = [ + threading.Thread(target=run, args=(self.count_query_runner, "count_result", True)), + threading.Thread(target=run, args=(self.exposure_query_runner, "exposure_result", True)), + ] + [j.start() for j in jobs] # type: ignore + [j.join() for j in jobs] # type: ignore + + # Raise any errors raised in a separate thread + if errors: + raise errors[0] + + count_result = shared_results["count_result"] + exposure_result = shared_results["exposure_result"] + if count_result is None or exposure_result is None: + raise ValueError("One or both query runners failed to produce a response") + + self._validate_event_variants(count_result) + + # Statistical analysis + control_variant, test_variants = self._get_variants_with_base_stats(count_result, exposure_result) + probabilities = calculate_probabilities(control_variant, test_variants) + significance_code, p_value = are_results_significant(control_variant, test_variants, probabilities) + credible_intervals = calculate_credible_intervals([control_variant, *test_variants]) + + return ExperimentTrendsQueryResponse( + kind="ExperimentTrendsQuery", + insight=count_result.results, + count_query=self.prepared_count_query, + exposure_query=self.prepared_exposure_query, + variants=[variant.model_dump() for variant in [control_variant, *test_variants]], + probability={ + variant.key: probability + for variant, probability in zip([control_variant, *test_variants], probabilities) + }, + significant=significance_code == ExperimentSignificanceCode.SIGNIFICANT, + significance_code=significance_code, + p_value=p_value, + credible_intervals=credible_intervals, + ) + + def _get_variants_with_base_stats( + self, count_results: TrendsQueryResponse, exposure_results: TrendsQueryResponse + ) -> tuple[ExperimentVariantTrendsBaseStats, list[ExperimentVariantTrendsBaseStats]]: + control_variant: Optional[ExperimentVariantTrendsBaseStats] = None + test_variants = [] + exposure_counts = {} + exposure_ratios = {} + + for result in exposure_results.results: + count = result.get("count", 0) + breakdown_value = result.get("breakdown_value") + exposure_counts[breakdown_value] = count + + control_exposure = exposure_counts.get(CONTROL_VARIANT_KEY, 0) + + if control_exposure != 0: + for key, count in exposure_counts.items(): + exposure_ratios[key] = count / control_exposure + + for result in count_results.results: + count = result.get("count", 0) + breakdown_value = result.get("breakdown_value") + if breakdown_value == CONTROL_VARIANT_KEY: + control_variant = ExperimentVariantTrendsBaseStats( + key=breakdown_value, + count=count, + exposure=1, + # TODO: in the absence of exposure data, we should throw rather than default to 1 + absolute_exposure=exposure_counts.get(breakdown_value, 1), + ) + else: + test_variants.append( + ExperimentVariantTrendsBaseStats( + key=breakdown_value, + count=count, + # TODO: in the absence of exposure data, we should throw rather than default to 1 + exposure=exposure_ratios.get(breakdown_value, 1), + absolute_exposure=exposure_counts.get(breakdown_value, 1), + ) + ) + + if control_variant is None: + raise ValueError("Control variant not found in count results") + + return control_variant, test_variants + + def _validate_event_variants(self, count_result: TrendsQueryResponse): + errors = { + ExperimentNoResultsErrorKeys.NO_EVENTS: True, + ExperimentNoResultsErrorKeys.NO_FLAG_INFO: True, + ExperimentNoResultsErrorKeys.NO_CONTROL_VARIANT: True, + ExperimentNoResultsErrorKeys.NO_TEST_VARIANT: True, + } + + if not count_result.results or not count_result.results[0]: + raise ValidationError(code="no-results", detail=json.dumps(errors)) + + errors[ExperimentNoResultsErrorKeys.NO_EVENTS] = False + + # Check if "control" is present + for event in count_result.results: + event_variant = event.get("breakdown_value") + if event_variant == "control": + errors[ExperimentNoResultsErrorKeys.NO_CONTROL_VARIANT] = False + errors[ExperimentNoResultsErrorKeys.NO_FLAG_INFO] = False + break + # Check if at least one of the test variants is present + test_variants = [variant for variant in self.variants if variant != "control"] + + for event in count_result.results: + event_variant = event.get("breakdown_value") + if event_variant in test_variants: + errors[ExperimentNoResultsErrorKeys.NO_TEST_VARIANT] = False + errors[ExperimentNoResultsErrorKeys.NO_FLAG_INFO] = False + break + + has_errors = any(errors.values()) + if has_errors: + raise ValidationError(detail=json.dumps(errors)) + + def to_query(self) -> ast.SelectQuery: + raise ValueError(f"Cannot convert source query of type {self.query.count_query.kind} to query") diff --git a/posthog/hogql_queries/feature_flags/test/test_experiment_trends_query_runner.py b/posthog/hogql_queries/feature_flags/test/test_experiment_trends_query_runner.py new file mode 100644 index 00000000000..85a818987db --- /dev/null +++ b/posthog/hogql_queries/feature_flags/test/test_experiment_trends_query_runner.py @@ -0,0 +1,640 @@ +from django.test import override_settings +from posthog.hogql_queries.experiments.experiment_trends_query_runner import ExperimentTrendsQueryRunner +from posthog.models.experiment import Experiment, ExperimentHoldout +from posthog.models.feature_flag.feature_flag import FeatureFlag +from posthog.schema import ( + EventsNode, + ExperimentSignificanceCode, + ExperimentTrendsQuery, + ExperimentTrendsQueryResponse, + TrendsQuery, +) +from posthog.test.base import APIBaseTest, ClickhouseTestMixin, _create_event, flush_persons_and_events +from freezegun import freeze_time +from typing import cast +from django.utils import timezone +from datetime import timedelta +from posthog.test.test_journeys import journeys_for +from rest_framework.exceptions import ValidationError +from posthog.constants import ExperimentNoResultsErrorKeys +import json + + +@override_settings(IN_UNIT_TESTING=True) +class TestExperimentTrendsQueryRunner(ClickhouseTestMixin, APIBaseTest): + def create_feature_flag(self, key="test-experiment"): + return FeatureFlag.objects.create( + name=f"Test experiment flag: {key}", + key=key, + team=self.team, + filters={ + "groups": [{"properties": [], "rollout_percentage": None}], + "multivariate": { + "variants": [ + { + "key": "control", + "name": "Control", + "rollout_percentage": 50, + }, + { + "key": "test", + "name": "Test", + "rollout_percentage": 50, + }, + ] + }, + }, + created_by=self.user, + ) + + def create_experiment(self, name="test-experiment", feature_flag=None): + if feature_flag is None: + feature_flag = self.create_feature_flag(name) + return Experiment.objects.create( + name=name, + team=self.team, + feature_flag=feature_flag, + start_date=timezone.now(), + end_date=timezone.now() + timedelta(days=14), + ) + + def create_holdout_for_experiment(self, experiment: Experiment): + holdout = ExperimentHoldout.objects.create( + team=self.team, + name="Test Experiment holdout", + ) + holdout.filters = [{"properties": [], "rollout_percentage": 20, "variant": f"holdout-{holdout.id}"}] + holdout.save() + experiment.holdout = holdout + experiment.save() + return holdout + + @freeze_time("2020-01-01T12:00:00Z") + def test_query_runner(self): + feature_flag = self.create_feature_flag() + experiment = self.create_experiment(feature_flag=feature_flag) + + feature_flag_property = f"$feature/{feature_flag.key}" + count_query = TrendsQuery(series=[EventsNode(event="$pageview")]) + exposure_query = TrendsQuery(series=[EventsNode(event="$feature_flag_called")]) + + experiment_query = ExperimentTrendsQuery( + experiment_id=experiment.id, + kind="ExperimentTrendsQuery", + count_query=count_query, + exposure_query=exposure_query, + ) + + experiment.metrics = [{"type": "primary", "query": experiment_query.model_dump()}] + experiment.save() + + # Populate experiment events + for variant, count in [("control", 11), ("test", 15)]: + for i in range(count): + _create_event( + team=self.team, + event="$pageview", + distinct_id=f"user_{variant}_{i}", + properties={feature_flag_property: variant}, + ) + + # Populate exposure events + for variant, count in [("control", 7), ("test", 9)]: + for i in range(count): + _create_event( + team=self.team, + event="$feature_flag_called", + distinct_id=f"user_{variant}_{i}", + properties={feature_flag_property: variant}, + ) + + flush_persons_and_events() + + query_runner = ExperimentTrendsQueryRunner( + query=ExperimentTrendsQuery(**experiment.metrics[0]["query"]), team=self.team + ) + result = query_runner.calculate() + + self.assertEqual(len(result.variants), 2) + + control_result = next(variant for variant in result.variants if variant.key == "control") + test_result = next(variant for variant in result.variants if variant.key == "test") + + self.assertEqual(control_result.count, 11) + self.assertEqual(test_result.count, 15) + self.assertEqual(control_result.absolute_exposure, 7) + self.assertEqual(test_result.absolute_exposure, 9) + + @freeze_time("2020-01-01T12:00:00Z") + def test_query_runner_with_custom_exposure(self): + feature_flag = self.create_feature_flag() + experiment = self.create_experiment(feature_flag=feature_flag) + + ff_property = f"$feature/{feature_flag.key}" + count_query = TrendsQuery(series=[EventsNode(event="$pageview")]) + exposure_query = TrendsQuery( + series=[EventsNode(event="custom_exposure_event", properties=[{"key": "valid_exposure", "value": "true"}])] + ) + + experiment_query = ExperimentTrendsQuery( + experiment_id=experiment.id, + kind="ExperimentTrendsQuery", + count_query=count_query, + exposure_query=exposure_query, + ) + + experiment.metrics = [{"type": "primary", "query": experiment_query.model_dump()}] + experiment.save() + + journeys_for( + { + "user_control_1": [ + {"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "control"}}, + {"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "control"}}, + { + "event": "custom_exposure_event", + "timestamp": "2020-01-02", + "properties": {ff_property: "control", "valid_exposure": "true"}, + }, + ], + "user_control_2": [ + {"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "control"}}, + { + "event": "custom_exposure_event", + "timestamp": "2020-01-02", + "properties": {ff_property: "control", "valid_exposure": "true"}, + }, + ], + "user_test_1": [ + {"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}}, + {"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}}, + {"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}}, + { + "event": "custom_exposure_event", + "timestamp": "2020-01-02", + "properties": {ff_property: "test", "valid_exposure": "true"}, + }, + ], + "user_test_2": [ + {"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}}, + {"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}}, + { + "event": "custom_exposure_event", + "timestamp": "2020-01-02", + "properties": {ff_property: "test", "valid_exposure": "true"}, + }, + ], + "user_out_of_control": [ + {"event": "$pageview", "timestamp": "2020-01-02"}, + ], + "user_out_of_control_exposure": [ + { + "event": "custom_exposure_event", + "timestamp": "2020-01-02", + "properties": {ff_property: "control", "valid_exposure": "false"}, + }, + ], + "user_out_of_date_range": [ + {"event": "$pageview", "timestamp": "2019-01-01", "properties": {ff_property: "control"}}, + { + "event": "custom_exposure_event", + "timestamp": "2019-01-01", + "properties": {ff_property: "control", "valid_exposure": "true"}, + }, + ], + }, + self.team, + ) + + flush_persons_and_events() + + query_runner = ExperimentTrendsQueryRunner( + query=ExperimentTrendsQuery(**experiment.metrics[0]["query"]), team=self.team + ) + result = query_runner.calculate() + + trend_result = cast(ExperimentTrendsQueryResponse, result) + + control_result = next(variant for variant in trend_result.variants if variant.key == "control") + test_result = next(variant for variant in trend_result.variants if variant.key == "test") + + self.assertEqual(control_result.count, 3) + self.assertEqual(test_result.count, 5) + + self.assertEqual(control_result.absolute_exposure, 2) + self.assertEqual(test_result.absolute_exposure, 2) + + @freeze_time("2020-01-01T12:00:00Z") + def test_query_runner_with_default_exposure(self): + feature_flag = self.create_feature_flag() + experiment = self.create_experiment(feature_flag=feature_flag) + + ff_property = f"$feature/{feature_flag.key}" + count_query = TrendsQuery(series=[EventsNode(event="$pageview")]) + + experiment_query = ExperimentTrendsQuery( + experiment_id=experiment.id, + kind="ExperimentTrendsQuery", + count_query=count_query, + exposure_query=None, # No exposure query provided + ) + + experiment.metrics = [{"type": "primary", "query": experiment_query.model_dump()}] + experiment.save() + + journeys_for( + { + "user_control_1": [ + {"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "control"}}, + {"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "control"}}, + { + "event": "$feature_flag_called", + "timestamp": "2020-01-02", + "properties": {ff_property: "control", "$feature_flag": feature_flag.key}, + }, + ], + "user_control_2": [ + {"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "control"}}, + { + "event": "$feature_flag_called", + "timestamp": "2020-01-02", + "properties": {ff_property: "control", "$feature_flag": feature_flag.key}, + }, + ], + "user_test_1": [ + {"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}}, + {"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}}, + {"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}}, + { + "event": "$feature_flag_called", + "timestamp": "2020-01-02", + "properties": {ff_property: "test", "$feature_flag": feature_flag.key}, + }, + ], + "user_test_2": [ + {"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}}, + {"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}}, + { + "event": "$feature_flag_called", + "timestamp": "2020-01-02", + "properties": {ff_property: "test", "$feature_flag": feature_flag.key}, + }, + ], + "user_out_of_control": [ + {"event": "$pageview", "timestamp": "2020-01-02"}, + ], + "user_out_of_control_exposure": [ + {"event": "$feature_flag_called", "timestamp": "2020-01-02"}, + ], + "user_out_of_date_range": [ + {"event": "$pageview", "timestamp": "2019-01-01", "properties": {ff_property: "control"}}, + { + "event": "$feature_flag_called", + "timestamp": "2019-01-01", + "properties": {ff_property: "control", "$feature_flag": feature_flag.key}, + }, + ], + }, + self.team, + ) + + flush_persons_and_events() + + query_runner = ExperimentTrendsQueryRunner( + query=ExperimentTrendsQuery(**experiment.metrics[0]["query"]), team=self.team + ) + result = query_runner.calculate() + + trend_result = cast(ExperimentTrendsQueryResponse, result) + + control_result = next(variant for variant in trend_result.variants if variant.key == "control") + test_result = next(variant for variant in trend_result.variants if variant.key == "test") + + self.assertEqual(control_result.count, 3) + self.assertEqual(test_result.count, 5) + + self.assertEqual(control_result.absolute_exposure, 2) + self.assertEqual(test_result.absolute_exposure, 2) + + @freeze_time("2020-01-01T12:00:00Z") + def test_query_runner_with_holdout(self): + feature_flag = self.create_feature_flag() + experiment = self.create_experiment(feature_flag=feature_flag) + holdout = self.create_holdout_for_experiment(experiment) + + feature_flag_property = f"$feature/{feature_flag.key}" + count_query = TrendsQuery(series=[EventsNode(event="$pageview")]) + exposure_query = TrendsQuery(series=[EventsNode(event="$feature_flag_called")]) + + experiment_query = ExperimentTrendsQuery( + experiment_id=experiment.id, + kind="ExperimentTrendsQuery", + count_query=count_query, + exposure_query=exposure_query, + ) + + experiment.metrics = [{"type": "primary", "query": experiment_query.model_dump()}] + experiment.save() + + # Populate experiment events + for variant, count in [("control", 11), ("test", 15), (f"holdout-{holdout.id}", 8)]: + for i in range(count): + _create_event( + team=self.team, + event="$pageview", + distinct_id=f"user_{variant}_{i}", + properties={feature_flag_property: variant}, + ) + + # Populate exposure events + for variant, count in [("control", 7), ("test", 9), (f"holdout-{holdout.id}", 4)]: + for i in range(count): + _create_event( + team=self.team, + event="$feature_flag_called", + distinct_id=f"user_{variant}_{i}", + properties={feature_flag_property: variant}, + ) + + flush_persons_and_events() + + query_runner = ExperimentTrendsQueryRunner( + query=ExperimentTrendsQuery(**experiment.metrics[0]["query"]), team=self.team + ) + result = query_runner.calculate() + + self.assertEqual(len(result.variants), 3) + + control_result = next(variant for variant in result.variants if variant.key == "control") + test_result = next(variant for variant in result.variants if variant.key == "test") + holdout_result = next(variant for variant in result.variants if variant.key == f"holdout-{holdout.id}") + + self.assertEqual(control_result.count, 11) + self.assertEqual(test_result.count, 15) + self.assertEqual(holdout_result.count, 8) + self.assertEqual(control_result.absolute_exposure, 7) + self.assertEqual(test_result.absolute_exposure, 9) + self.assertEqual(holdout_result.absolute_exposure, 4) + + @freeze_time("2020-01-01T12:00:00Z") + def test_query_runner_with_avg_math(self): + feature_flag = self.create_feature_flag() + experiment = self.create_experiment(feature_flag=feature_flag) + + count_query = TrendsQuery(series=[EventsNode(event="$pageview", math="avg")]) + exposure_query = TrendsQuery(series=[EventsNode(event="$feature_flag_called")]) + + experiment_query = ExperimentTrendsQuery( + experiment_id=experiment.id, + kind="ExperimentTrendsQuery", + count_query=count_query, + exposure_query=exposure_query, + ) + + experiment.metrics = [{"type": "primary", "query": experiment_query.model_dump()}] + experiment.save() + + query_runner = ExperimentTrendsQueryRunner( + query=ExperimentTrendsQuery(**experiment.metrics[0]["query"]), team=self.team + ) + + prepared_count_query = query_runner.prepared_count_query + self.assertEqual(prepared_count_query.series[0].math, "sum") + + @freeze_time("2020-01-01T12:00:00Z") + def test_query_runner_standard_flow(self): + feature_flag = self.create_feature_flag() + experiment = self.create_experiment(feature_flag=feature_flag) + + ff_property = f"$feature/{feature_flag.key}" + count_query = TrendsQuery(series=[EventsNode(event="$pageview")]) + exposure_query = TrendsQuery(series=[EventsNode(event="$feature_flag_called")]) + + experiment_query = ExperimentTrendsQuery( + experiment_id=experiment.id, + kind="ExperimentTrendsQuery", + count_query=count_query, + exposure_query=exposure_query, + ) + + experiment.metrics = [{"type": "primary", "query": experiment_query.model_dump()}] + experiment.save() + + journeys_for( + { + "user_control_1": [ + {"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "control"}}, + {"event": "$pageview", "timestamp": "2020-01-03", "properties": {ff_property: "control"}}, + { + "event": "$feature_flag_called", + "timestamp": "2020-01-02", + "properties": {ff_property: "control"}, + }, + ], + "user_control_2": [ + {"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "control"}}, + { + "event": "$feature_flag_called", + "timestamp": "2020-01-02", + "properties": {ff_property: "control"}, + }, + ], + "user_test_1": [ + {"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}}, + {"event": "$pageview", "timestamp": "2020-01-03", "properties": {ff_property: "test"}}, + {"event": "$pageview", "timestamp": "2020-01-04", "properties": {ff_property: "test"}}, + {"event": "$feature_flag_called", "timestamp": "2020-01-02", "properties": {ff_property: "test"}}, + ], + "user_test_2": [ + {"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}}, + {"event": "$pageview", "timestamp": "2020-01-03", "properties": {ff_property: "test"}}, + {"event": "$feature_flag_called", "timestamp": "2020-01-02", "properties": {ff_property: "test"}}, + ], + }, + self.team, + ) + + flush_persons_and_events() + + query_runner = ExperimentTrendsQueryRunner( + query=ExperimentTrendsQuery(**experiment.metrics[0]["query"]), team=self.team + ) + result = query_runner.calculate() + + self.assertEqual(len(result.variants), 2) + for variant in result.variants: + self.assertIn(variant.key, ["control", "test"]) + + control_variant = next(v for v in result.variants if v.key == "control") + test_variant = next(v for v in result.variants if v.key == "test") + + self.assertEqual(control_variant.count, 3) + self.assertEqual(test_variant.count, 5) + self.assertEqual(control_variant.absolute_exposure, 2) + self.assertEqual(test_variant.absolute_exposure, 2) + + self.assertAlmostEqual(result.credible_intervals["control"][0], 0.5449, places=3) + self.assertAlmostEqual(result.credible_intervals["control"][1], 4.3836, places=3) + self.assertAlmostEqual(result.credible_intervals["test"][0], 1.1009, places=3) + self.assertAlmostEqual(result.credible_intervals["test"][1], 5.8342, places=3) + + self.assertAlmostEqual(result.p_value, 1.0, places=3) + + self.assertAlmostEqual(result.probability["control"], 0.2549, places=2) + self.assertAlmostEqual(result.probability["test"], 0.7453, places=2) + + self.assertEqual(result.significance_code, ExperimentSignificanceCode.NOT_ENOUGH_EXPOSURE) + + self.assertFalse(result.significant) + + self.assertEqual(len(result.variants), 2) + + self.assertEqual(control_variant.absolute_exposure, 2.0) + self.assertEqual(control_variant.count, 3.0) + self.assertEqual(control_variant.exposure, 1.0) + + self.assertEqual(test_variant.absolute_exposure, 2.0) + self.assertEqual(test_variant.count, 5.0) + self.assertEqual(test_variant.exposure, 1.0) + + @freeze_time("2020-01-01T12:00:00Z") + def test_validate_event_variants_no_events(self): + feature_flag = self.create_feature_flag() + experiment = self.create_experiment(feature_flag=feature_flag) + + count_query = TrendsQuery(series=[EventsNode(event="$pageview")]) + experiment_query = ExperimentTrendsQuery( + experiment_id=experiment.id, + kind="ExperimentTrendsQuery", + count_query=count_query, + ) + + query_runner = ExperimentTrendsQueryRunner(query=experiment_query, team=self.team) + with self.assertRaises(ValidationError) as context: + query_runner.calculate() + + expected_errors = json.dumps( + { + ExperimentNoResultsErrorKeys.NO_EVENTS: True, + ExperimentNoResultsErrorKeys.NO_FLAG_INFO: True, + ExperimentNoResultsErrorKeys.NO_CONTROL_VARIANT: True, + ExperimentNoResultsErrorKeys.NO_TEST_VARIANT: True, + } + ) + self.assertEqual(cast(list, context.exception.detail)[0], expected_errors) + + @freeze_time("2020-01-01T12:00:00Z") + def test_validate_event_variants_no_control(self): + feature_flag = self.create_feature_flag() + experiment = self.create_experiment(feature_flag=feature_flag) + + ff_property = f"$feature/{feature_flag.key}" + journeys_for( + { + "user_test": [ + {"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}}, + ], + }, + self.team, + ) + + flush_persons_and_events() + + count_query = TrendsQuery(series=[EventsNode(event="$pageview")]) + experiment_query = ExperimentTrendsQuery( + experiment_id=experiment.id, + kind="ExperimentTrendsQuery", + count_query=count_query, + ) + + query_runner = ExperimentTrendsQueryRunner(query=experiment_query, team=self.team) + with self.assertRaises(ValidationError) as context: + query_runner.calculate() + + expected_errors = json.dumps( + { + ExperimentNoResultsErrorKeys.NO_EVENTS: False, + ExperimentNoResultsErrorKeys.NO_FLAG_INFO: False, + ExperimentNoResultsErrorKeys.NO_CONTROL_VARIANT: True, + ExperimentNoResultsErrorKeys.NO_TEST_VARIANT: False, + } + ) + self.assertEqual(cast(list, context.exception.detail)[0], expected_errors) + + @freeze_time("2020-01-01T12:00:00Z") + def test_validate_event_variants_no_test(self): + feature_flag = self.create_feature_flag() + experiment = self.create_experiment(feature_flag=feature_flag) + + ff_property = f"$feature/{feature_flag.key}" + journeys_for( + { + "user_control": [ + {"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "control"}}, + ], + }, + self.team, + ) + + flush_persons_and_events() + + count_query = TrendsQuery(series=[EventsNode(event="$pageview")]) + experiment_query = ExperimentTrendsQuery( + experiment_id=experiment.id, + kind="ExperimentTrendsQuery", + count_query=count_query, + ) + + query_runner = ExperimentTrendsQueryRunner(query=experiment_query, team=self.team) + with self.assertRaises(ValidationError) as context: + query_runner.calculate() + + expected_errors = json.dumps( + { + ExperimentNoResultsErrorKeys.NO_EVENTS: False, + ExperimentNoResultsErrorKeys.NO_FLAG_INFO: False, + ExperimentNoResultsErrorKeys.NO_CONTROL_VARIANT: False, + ExperimentNoResultsErrorKeys.NO_TEST_VARIANT: True, + } + ) + self.assertEqual(cast(list, context.exception.detail)[0], expected_errors) + + @freeze_time("2020-01-01T12:00:00Z") + def test_validate_event_variants_no_flag_info(self): + feature_flag = self.create_feature_flag() + experiment = self.create_experiment(feature_flag=feature_flag) + + journeys_for( + { + "user_no_flag_1": [ + {"event": "$pageview", "timestamp": "2020-01-02"}, + ], + "user_no_flag_2": [ + {"event": "$pageview", "timestamp": "2020-01-03"}, + ], + }, + self.team, + ) + + flush_persons_and_events() + + count_query = TrendsQuery(series=[EventsNode(event="$pageview")]) + experiment_query = ExperimentTrendsQuery( + experiment_id=experiment.id, + kind="ExperimentTrendsQuery", + count_query=count_query, + ) + + query_runner = ExperimentTrendsQueryRunner(query=experiment_query, team=self.team) + with self.assertRaises(ValidationError) as context: + query_runner.calculate() + + expected_errors = json.dumps( + { + ExperimentNoResultsErrorKeys.NO_EVENTS: True, + ExperimentNoResultsErrorKeys.NO_FLAG_INFO: True, + ExperimentNoResultsErrorKeys.NO_CONTROL_VARIANT: True, + ExperimentNoResultsErrorKeys.NO_TEST_VARIANT: True, + } + ) + self.assertEqual(cast(list, context.exception.detail)[0], expected_errors) diff --git a/posthog/hogql_queries/feature_flags/trends_statistics.py b/posthog/hogql_queries/feature_flags/trends_statistics.py new file mode 100644 index 00000000000..61b19d1486f --- /dev/null +++ b/posthog/hogql_queries/feature_flags/trends_statistics.py @@ -0,0 +1,196 @@ +from functools import lru_cache +from math import exp, lgamma, log, ceil + +from numpy.random import default_rng +from rest_framework.exceptions import ValidationError +import scipy.stats as stats +from sentry_sdk import capture_exception + +from ee.clickhouse.queries.experiments import ( + FF_DISTRIBUTION_THRESHOLD, + MIN_PROBABILITY_FOR_SIGNIFICANCE, + P_VALUE_SIGNIFICANCE_LEVEL, +) + +from posthog.schema import ExperimentSignificanceCode, ExperimentVariantTrendsBaseStats + +Probability = float + + +def calculate_probabilities( + control_variant: ExperimentVariantTrendsBaseStats, test_variants: list[ExperimentVariantTrendsBaseStats] +) -> list[Probability]: + """ + Calculates probability that A is better than B. First variant is control, rest are test variants. + + Supports maximum 10 variants today + + For each variant, we create a Gamma distribution of arrival rates, + where alpha (shape parameter) = count of variant + 1 + beta (exposure parameter) = 1 + """ + if not control_variant: + raise ValidationError("No control variant data found", code="no_data") + + if len(test_variants) >= 10: + raise ValidationError( + "Can't calculate experiment results for more than 10 variants", + code="too_much_data", + ) + + if len(test_variants) < 1: + raise ValidationError( + "Can't calculate experiment results for less than 2 variants", + code="no_data", + ) + + variants = [control_variant, *test_variants] + probabilities = [] + + # simulate winning for each test variant + for index, variant in enumerate(variants): + probabilities.append( + simulate_winning_variant_for_arrival_rates(variant, variants[:index] + variants[index + 1 :]) + ) + + total_test_probabilities = sum(probabilities[1:]) + + return [max(0, 1 - total_test_probabilities), *probabilities[1:]] + + +def simulate_winning_variant_for_arrival_rates( + target_variant: ExperimentVariantTrendsBaseStats, variants: list[ExperimentVariantTrendsBaseStats] +) -> float: + random_sampler = default_rng() + simulations_count = 100_000 + + variant_samples = [] + for variant in variants: + # Get `N=simulations` samples from a Gamma distribution with alpha = variant_sucess + 1, + # and exposure = relative exposure of variant + samples = random_sampler.gamma(variant.count + 1, 1 / variant.exposure, simulations_count) + variant_samples.append(samples) + + target_variant_samples = random_sampler.gamma( + target_variant.count + 1, 1 / target_variant.exposure, simulations_count + ) + + winnings = 0 + variant_conversions = list(zip(*variant_samples)) + for i in range(ceil(simulations_count)): + if target_variant_samples[i] > max(variant_conversions[i]): + winnings += 1 + + return winnings / simulations_count + + +def are_results_significant( + control_variant: ExperimentVariantTrendsBaseStats, + test_variants: list[ExperimentVariantTrendsBaseStats], + probabilities: list[Probability], +) -> tuple[ExperimentSignificanceCode, Probability]: + # TODO: Experiment with Expected Loss calculations for trend experiments + + for variant in test_variants: + # We need a feature flag distribution threshold because distribution of people + # can skew wildly when there are few people in the experiment + if variant.absolute_exposure < FF_DISTRIBUTION_THRESHOLD: + return ExperimentSignificanceCode.NOT_ENOUGH_EXPOSURE, 1 + + if control_variant.absolute_exposure < FF_DISTRIBUTION_THRESHOLD: + return ExperimentSignificanceCode.NOT_ENOUGH_EXPOSURE, 1 + + if ( + probabilities[0] < MIN_PROBABILITY_FOR_SIGNIFICANCE + and sum(probabilities[1:]) < MIN_PROBABILITY_FOR_SIGNIFICANCE + ): + # Sum of probability of winning for all variants except control is less than 90% + return ExperimentSignificanceCode.LOW_WIN_PROBABILITY, 1 + + p_value = calculate_p_value(control_variant, test_variants) + + if p_value >= P_VALUE_SIGNIFICANCE_LEVEL: + return ExperimentSignificanceCode.HIGH_P_VALUE, p_value + + return ExperimentSignificanceCode.SIGNIFICANT, p_value + + +@lru_cache(maxsize=100_000) +def combinationln(n: float, k: float) -> float: + """ + Returns the log of the binomial coefficient. + """ + return lgamma(n + 1) - lgamma(k + 1) - lgamma(n - k + 1) + + +def intermediate_poisson_term(count: float, iterator: float, relative_exposure: float): + return exp( + combinationln(count, iterator) + + iterator * log(relative_exposure) + + (count - iterator) * log(1 - relative_exposure) + ) + + +def poisson_p_value(control_count, control_exposure, test_count, test_exposure): + """ + Calculates the p-value of the experiment. + Calculations from: https://www.evanmiller.org/statistical-formulas-for-programmers.html#count_test + """ + relative_exposure = test_exposure / (control_exposure + test_exposure) + total_count = control_count + test_count + + low_p_value = 0.0 + high_p_value = 0.0 + + for i in range(ceil(test_count) + 1): + low_p_value += intermediate_poisson_term(total_count, i, relative_exposure) + + for i in range(ceil(test_count), ceil(total_count) + 1): + high_p_value += intermediate_poisson_term(total_count, i, relative_exposure) + + return min(1, 2 * min(low_p_value, high_p_value)) + + +def calculate_p_value( + control_variant: ExperimentVariantTrendsBaseStats, test_variants: list[ExperimentVariantTrendsBaseStats] +) -> Probability: + best_test_variant = max(test_variants, key=lambda variant: variant.count) + + return poisson_p_value( + control_variant.count, + control_variant.exposure, + best_test_variant.count, + best_test_variant.exposure, + ) + + +def calculate_credible_intervals(variants, lower_bound=0.025, upper_bound=0.975): + """ + Calculate the Bayesian credible intervals for the mean (average events per unit) + for a list of variants in a Trend experiment. + If no lower/upper bound is provided, the function calculates the 95% credible interval. + """ + intervals = {} + + for variant in variants: + try: + # Alpha (shape parameter) is count + 1, assuming a Gamma distribution for counts + alpha = variant.count + 1 + + # Beta (scale parameter) is the inverse of absolute_exposure, + # representing the average rate of events per user + beta = 1 / variant.absolute_exposure + + # Calculate the credible interval for the mean using Gamma distribution + credible_interval = stats.gamma.ppf([lower_bound, upper_bound], a=alpha, scale=beta) + + intervals[variant.key] = (credible_interval[0], credible_interval[1]) + + except Exception as e: + capture_exception( + Exception(f"Error calculating credible interval for variant {variant.key}"), + {"error": str(e)}, + ) + return {} + + return intervals diff --git a/posthog/models/feature_flag/flag_status.py b/posthog/models/feature_flag/flag_status.py new file mode 100644 index 00000000000..05d791dc1c7 --- /dev/null +++ b/posthog/models/feature_flag/flag_status.py @@ -0,0 +1,41 @@ +import structlog +from django.db.models.query import QuerySet +from posthog.schema import ( + FeatureFlagStatus, +) + +from .feature_flag import ( + FeatureFlag, +) + +logger = structlog.get_logger(__name__) + + +# FeatureFlagStatusChecker is used to determine the status of a feature flag for a given user. +# Eventually, this may be used to automatically archive old flags that are no longer in use. +# +# Status can be one of the following: +# - ACTIVE: The feature flag is actively evaluated and the evaluations continue to vary. +# - STALE: The feature flag either has not been evaluated recently, or the evaluation has not changed recently. +# - DELETED: The feature flag has been soft deleted. +# - UNKNOWN: The feature flag is not found in the database. +class FeatureFlagStatusChecker: + def __init__( + self, + feature_flag_id: str, + # The amount of time considered "recent" for the purposes of determining staleness. + stale_window: str = "-30d", + ): + self.feature_flag_id = feature_flag_id + self.stale_window = stale_window + + def get_status(self) -> FeatureFlagStatus: + flag: FeatureFlag = FeatureFlag.objects.get(pk=self.feature_flag_id) + + if flag is None: + return FeatureFlagStatus.UNKNOWN + if flag.deleted: + return FeatureFlagStatus.DELETED + + def get_recent_evaluations(self) -> QuerySet: + return diff --git a/posthog/schema.py b/posthog/schema.py index 6680ac5a4b3..14740b491b6 100644 --- a/posthog/schema.py +++ b/posthog/schema.py @@ -716,6 +716,13 @@ class Status(StrEnum): PENDING_RELEASE = "pending_release" +class FeatureFlagStatus(StrEnum): + ACTIVE = "active" + STALE = "stale" + DELETED = "deleted" + UNKNOWN = "unknown" + + class ErrorTrackingGroup(BaseModel): model_config = ConfigDict( extra="forbid", @@ -6722,6 +6729,13 @@ class ExperimentTrendsQuery(BaseModel): response: Optional[ExperimentTrendsQueryResponse] = None +class FeatureFlagStatusQueryResponse(BaseModel): + model_config = ConfigDict( + extra="forbid", + ) + status: FeatureFlagStatus + + class FunnelPathsFilter(BaseModel): model_config = ConfigDict( extra="forbid",