0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-24 18:07:17 +01:00

(WIP) feat(flags): add GET feature_flags/:id/status for getting staleness

This commit is contained in:
Haven Barnes 2024-11-15 15:08:30 -08:00
parent 39659d7103
commit c9c0e69e8f
7 changed files with 1269 additions and 0 deletions

View File

@ -14,6 +14,9 @@ from rest_framework import (
viewsets,
)
from posthog.api.utils import action
from posthog.schema import (
FeatureFlagStatusQueryResponse,
)
from rest_framework.permissions import SAFE_METHODS, BasePermission
from rest_framework.request import Request
from rest_framework.response import Response
@ -796,6 +799,15 @@ class FeatureFlagViewSet(
return activity_page_response(activity_page, limit, page, request)
@action(methods=["GET"], url_path="status", detail=True, required_scopes=["feature_flag:read"])
@extend_schema(
responses={
200: FeatureFlagStatusQueryResponse,
}
)
def status(self, request: request.Request, **kwargs):
return []
@action(methods=["GET"], detail=True, required_scopes=["activity_log:read"])
def activity(self, request: request.Request, **kwargs):
limit = int(request.query_params.get("limit", "10"))

View File

@ -0,0 +1,12 @@
# The FF variant name for control
CONTROL_VARIANT_KEY = "control"
# controls minimum number of people to be exposed to a variant
# before the results are deemed significant
FF_DISTRIBUTION_THRESHOLD = 100
# If probability of a variant is below this threshold, it will be considered
# insignificant
MIN_PROBABILITY_FOR_SIGNIFICANCE = 0.9
EXPECTED_LOSS_SIGNIFICANCE_LEVEL = 0.01

View File

@ -0,0 +1,354 @@
import json
from zoneinfo import ZoneInfo
from django.conf import settings
from posthog.constants import ExperimentNoResultsErrorKeys
from posthog.hogql import ast
from posthog.hogql_queries.experiments import CONTROL_VARIANT_KEY
from posthog.hogql_queries.experiments.trends_statistics import (
are_results_significant,
calculate_credible_intervals,
calculate_probabilities,
)
from posthog.hogql_queries.insights.trends.trends_query_runner import TrendsQueryRunner
from posthog.hogql_queries.query_runner import QueryRunner
from posthog.models.experiment import Experiment
from posthog.queries.trends.util import ALL_SUPPORTED_MATH_FUNCTIONS
from rest_framework.exceptions import ValidationError
from posthog.schema import (
BaseMathType,
BreakdownFilter,
CachedExperimentTrendsQueryResponse,
ChartDisplayType,
EventPropertyFilter,
EventsNode,
ExperimentSignificanceCode,
ExperimentTrendsQuery,
ExperimentTrendsQueryResponse,
ExperimentVariantTrendsBaseStats,
InsightDateRange,
PropertyMathType,
TrendsFilter,
TrendsQuery,
TrendsQueryResponse,
)
from typing import Any, Optional
import threading
class ExperimentTrendsQueryRunner(QueryRunner):
query: ExperimentTrendsQuery
response: ExperimentTrendsQueryResponse
cached_response: CachedExperimentTrendsQueryResponse
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.experiment = Experiment.objects.get(id=self.query.experiment_id)
self.feature_flag = self.experiment.feature_flag
self.variants = [variant["key"] for variant in self.feature_flag.variants]
if self.experiment.holdout:
self.variants.append(f"holdout-{self.experiment.holdout.id}")
self.breakdown_key = f"$feature/{self.feature_flag.key}"
self.prepared_count_query = self._prepare_count_query()
self.prepared_exposure_query = self._prepare_exposure_query()
self.count_query_runner = TrendsQueryRunner(
query=self.prepared_count_query, team=self.team, timings=self.timings, limit_context=self.limit_context
)
self.exposure_query_runner = TrendsQueryRunner(
query=self.prepared_exposure_query, team=self.team, timings=self.timings, limit_context=self.limit_context
)
def _uses_math_aggregation_by_user_or_property_value(self, query: TrendsQuery):
math_keys = ALL_SUPPORTED_MATH_FUNCTIONS
# "sum" doesn't need special handling, we *can* have custom exposure for sum filters
if "sum" in math_keys:
math_keys.remove("sum")
return any(entity.math in math_keys for entity in query.series)
def _get_insight_date_range(self) -> InsightDateRange:
"""
Returns an InsightDateRange object based on the experiment's start and end dates,
adjusted for the team's timezone if applicable.
"""
if self.team.timezone:
tz = ZoneInfo(self.team.timezone)
start_date = self.experiment.start_date.astimezone(tz) if self.experiment.start_date else None
end_date = self.experiment.end_date.astimezone(tz) if self.experiment.end_date else None
else:
start_date = self.experiment.start_date
end_date = self.experiment.end_date
return InsightDateRange(
date_from=start_date.isoformat() if start_date else None,
date_to=end_date.isoformat() if end_date else None,
explicitDate=True,
)
def _get_breakdown_filter(self) -> BreakdownFilter:
return BreakdownFilter(
breakdown=self.breakdown_key,
breakdown_type="event",
)
def _prepare_count_query(self) -> TrendsQuery:
"""
This method takes the raw trend query and adapts it
for the needs of experiment analysis:
1. Set the trend display type based on whether math aggregation is used
2. Set the date range to match the experiment's duration, using the project's timezone.
3. Configure the breakdown to use the feature flag key, which allows us
to separate results for different experiment variants.
"""
prepared_count_query = TrendsQuery(**self.query.count_query.model_dump())
uses_math_aggregation = self._uses_math_aggregation_by_user_or_property_value(prepared_count_query)
# :TRICKY: for `avg` aggregation, use `sum` data as an approximation
if prepared_count_query.series[0].math == PropertyMathType.AVG:
prepared_count_query.series[0].math = PropertyMathType.SUM
# TODO: revisit this; using the count data for the remaining aggregation types is likely wrong
elif uses_math_aggregation:
prepared_count_query.series[0].math = None
prepared_count_query.trendsFilter = TrendsFilter(display=ChartDisplayType.ACTIONS_LINE_GRAPH_CUMULATIVE)
prepared_count_query.dateRange = self._get_insight_date_range()
prepared_count_query.breakdownFilter = self._get_breakdown_filter()
prepared_count_query.properties = [
EventPropertyFilter(
key=self.breakdown_key,
value=self.variants,
operator="exact",
type="event",
)
]
return prepared_count_query
def _prepare_exposure_query(self) -> TrendsQuery:
"""
This method prepares the exposure query for the experiment analysis.
Exposure is the count of users who have seen the experiment. This is necessary to calculate the statistical
significance of the experiment.
There are 3 possible cases for the exposure query:
1. If math aggregation is used, we construct an implicit exposure query
2. Otherwise, if an exposure query is provided, we use it as is, adapting it to the experiment's duration and breakdown
3. Otherwise, we construct a default exposure query (the count of $feature_flag_called events)
"""
# 1. If math aggregation is used, we construct an implicit exposure query: unique users for the count event
uses_math_aggregation = self._uses_math_aggregation_by_user_or_property_value(self.query.count_query)
if uses_math_aggregation:
prepared_exposure_query = TrendsQuery(**self.query.count_query.model_dump())
count_event = self.query.count_query.series[0]
if hasattr(count_event, "event"):
prepared_exposure_query.dateRange = self._get_insight_date_range()
prepared_exposure_query.breakdownFilter = self._get_breakdown_filter()
prepared_exposure_query.series = [
EventsNode(
event=count_event.event,
math=BaseMathType.DAU,
)
]
prepared_exposure_query.properties = [
EventPropertyFilter(
key=self.breakdown_key,
value=self.variants,
operator="exact",
type="event",
)
]
else:
raise ValueError("Expected first series item to have an 'event' attribute")
# 2. Otherwise, if an exposure query is provided, we use it as is, adapting the date range and breakdown
elif self.query.exposure_query:
prepared_exposure_query = TrendsQuery(**self.query.exposure_query.model_dump())
prepared_exposure_query.dateRange = self._get_insight_date_range()
prepared_exposure_query.breakdownFilter = self._get_breakdown_filter()
prepared_exposure_query.properties = [
EventPropertyFilter(
key=self.breakdown_key,
value=self.variants,
operator="exact",
type="event",
)
]
# 3. Otherwise, we construct a default exposure query: unique users for the $feature_flag_called event
else:
prepared_exposure_query = TrendsQuery(
dateRange=self._get_insight_date_range(),
breakdownFilter=self._get_breakdown_filter(),
series=[
EventsNode(
event="$feature_flag_called",
math=BaseMathType.DAU, # TODO sync with frontend!!!
)
],
properties=[
EventPropertyFilter(
key=self.breakdown_key,
value=self.variants,
operator="exact",
type="event",
),
EventPropertyFilter(
key="$feature_flag",
value=[self.feature_flag.key],
operator="exact",
type="event",
),
],
)
return prepared_exposure_query
def calculate(self) -> ExperimentTrendsQueryResponse:
shared_results: dict[str, Optional[Any]] = {"count_result": None, "exposure_result": None}
errors = []
def run(query_runner: TrendsQueryRunner, result_key: str, is_parallel: bool):
try:
result = query_runner.calculate()
shared_results[result_key] = result
except Exception as e:
errors.append(e)
finally:
if is_parallel:
from django.db import connection
# This will only close the DB connection for the newly spawned thread and not the whole app
connection.close()
# This exists so that we're not spawning threads during unit tests
if settings.IN_UNIT_TESTING:
run(self.count_query_runner, "count_result", False)
run(self.exposure_query_runner, "exposure_result", False)
else:
jobs = [
threading.Thread(target=run, args=(self.count_query_runner, "count_result", True)),
threading.Thread(target=run, args=(self.exposure_query_runner, "exposure_result", True)),
]
[j.start() for j in jobs] # type: ignore
[j.join() for j in jobs] # type: ignore
# Raise any errors raised in a separate thread
if errors:
raise errors[0]
count_result = shared_results["count_result"]
exposure_result = shared_results["exposure_result"]
if count_result is None or exposure_result is None:
raise ValueError("One or both query runners failed to produce a response")
self._validate_event_variants(count_result)
# Statistical analysis
control_variant, test_variants = self._get_variants_with_base_stats(count_result, exposure_result)
probabilities = calculate_probabilities(control_variant, test_variants)
significance_code, p_value = are_results_significant(control_variant, test_variants, probabilities)
credible_intervals = calculate_credible_intervals([control_variant, *test_variants])
return ExperimentTrendsQueryResponse(
kind="ExperimentTrendsQuery",
insight=count_result.results,
count_query=self.prepared_count_query,
exposure_query=self.prepared_exposure_query,
variants=[variant.model_dump() for variant in [control_variant, *test_variants]],
probability={
variant.key: probability
for variant, probability in zip([control_variant, *test_variants], probabilities)
},
significant=significance_code == ExperimentSignificanceCode.SIGNIFICANT,
significance_code=significance_code,
p_value=p_value,
credible_intervals=credible_intervals,
)
def _get_variants_with_base_stats(
self, count_results: TrendsQueryResponse, exposure_results: TrendsQueryResponse
) -> tuple[ExperimentVariantTrendsBaseStats, list[ExperimentVariantTrendsBaseStats]]:
control_variant: Optional[ExperimentVariantTrendsBaseStats] = None
test_variants = []
exposure_counts = {}
exposure_ratios = {}
for result in exposure_results.results:
count = result.get("count", 0)
breakdown_value = result.get("breakdown_value")
exposure_counts[breakdown_value] = count
control_exposure = exposure_counts.get(CONTROL_VARIANT_KEY, 0)
if control_exposure != 0:
for key, count in exposure_counts.items():
exposure_ratios[key] = count / control_exposure
for result in count_results.results:
count = result.get("count", 0)
breakdown_value = result.get("breakdown_value")
if breakdown_value == CONTROL_VARIANT_KEY:
control_variant = ExperimentVariantTrendsBaseStats(
key=breakdown_value,
count=count,
exposure=1,
# TODO: in the absence of exposure data, we should throw rather than default to 1
absolute_exposure=exposure_counts.get(breakdown_value, 1),
)
else:
test_variants.append(
ExperimentVariantTrendsBaseStats(
key=breakdown_value,
count=count,
# TODO: in the absence of exposure data, we should throw rather than default to 1
exposure=exposure_ratios.get(breakdown_value, 1),
absolute_exposure=exposure_counts.get(breakdown_value, 1),
)
)
if control_variant is None:
raise ValueError("Control variant not found in count results")
return control_variant, test_variants
def _validate_event_variants(self, count_result: TrendsQueryResponse):
errors = {
ExperimentNoResultsErrorKeys.NO_EVENTS: True,
ExperimentNoResultsErrorKeys.NO_FLAG_INFO: True,
ExperimentNoResultsErrorKeys.NO_CONTROL_VARIANT: True,
ExperimentNoResultsErrorKeys.NO_TEST_VARIANT: True,
}
if not count_result.results or not count_result.results[0]:
raise ValidationError(code="no-results", detail=json.dumps(errors))
errors[ExperimentNoResultsErrorKeys.NO_EVENTS] = False
# Check if "control" is present
for event in count_result.results:
event_variant = event.get("breakdown_value")
if event_variant == "control":
errors[ExperimentNoResultsErrorKeys.NO_CONTROL_VARIANT] = False
errors[ExperimentNoResultsErrorKeys.NO_FLAG_INFO] = False
break
# Check if at least one of the test variants is present
test_variants = [variant for variant in self.variants if variant != "control"]
for event in count_result.results:
event_variant = event.get("breakdown_value")
if event_variant in test_variants:
errors[ExperimentNoResultsErrorKeys.NO_TEST_VARIANT] = False
errors[ExperimentNoResultsErrorKeys.NO_FLAG_INFO] = False
break
has_errors = any(errors.values())
if has_errors:
raise ValidationError(detail=json.dumps(errors))
def to_query(self) -> ast.SelectQuery:
raise ValueError(f"Cannot convert source query of type {self.query.count_query.kind} to query")

View File

@ -0,0 +1,640 @@
from django.test import override_settings
from posthog.hogql_queries.experiments.experiment_trends_query_runner import ExperimentTrendsQueryRunner
from posthog.models.experiment import Experiment, ExperimentHoldout
from posthog.models.feature_flag.feature_flag import FeatureFlag
from posthog.schema import (
EventsNode,
ExperimentSignificanceCode,
ExperimentTrendsQuery,
ExperimentTrendsQueryResponse,
TrendsQuery,
)
from posthog.test.base import APIBaseTest, ClickhouseTestMixin, _create_event, flush_persons_and_events
from freezegun import freeze_time
from typing import cast
from django.utils import timezone
from datetime import timedelta
from posthog.test.test_journeys import journeys_for
from rest_framework.exceptions import ValidationError
from posthog.constants import ExperimentNoResultsErrorKeys
import json
@override_settings(IN_UNIT_TESTING=True)
class TestExperimentTrendsQueryRunner(ClickhouseTestMixin, APIBaseTest):
def create_feature_flag(self, key="test-experiment"):
return FeatureFlag.objects.create(
name=f"Test experiment flag: {key}",
key=key,
team=self.team,
filters={
"groups": [{"properties": [], "rollout_percentage": None}],
"multivariate": {
"variants": [
{
"key": "control",
"name": "Control",
"rollout_percentage": 50,
},
{
"key": "test",
"name": "Test",
"rollout_percentage": 50,
},
]
},
},
created_by=self.user,
)
def create_experiment(self, name="test-experiment", feature_flag=None):
if feature_flag is None:
feature_flag = self.create_feature_flag(name)
return Experiment.objects.create(
name=name,
team=self.team,
feature_flag=feature_flag,
start_date=timezone.now(),
end_date=timezone.now() + timedelta(days=14),
)
def create_holdout_for_experiment(self, experiment: Experiment):
holdout = ExperimentHoldout.objects.create(
team=self.team,
name="Test Experiment holdout",
)
holdout.filters = [{"properties": [], "rollout_percentage": 20, "variant": f"holdout-{holdout.id}"}]
holdout.save()
experiment.holdout = holdout
experiment.save()
return holdout
@freeze_time("2020-01-01T12:00:00Z")
def test_query_runner(self):
feature_flag = self.create_feature_flag()
experiment = self.create_experiment(feature_flag=feature_flag)
feature_flag_property = f"$feature/{feature_flag.key}"
count_query = TrendsQuery(series=[EventsNode(event="$pageview")])
exposure_query = TrendsQuery(series=[EventsNode(event="$feature_flag_called")])
experiment_query = ExperimentTrendsQuery(
experiment_id=experiment.id,
kind="ExperimentTrendsQuery",
count_query=count_query,
exposure_query=exposure_query,
)
experiment.metrics = [{"type": "primary", "query": experiment_query.model_dump()}]
experiment.save()
# Populate experiment events
for variant, count in [("control", 11), ("test", 15)]:
for i in range(count):
_create_event(
team=self.team,
event="$pageview",
distinct_id=f"user_{variant}_{i}",
properties={feature_flag_property: variant},
)
# Populate exposure events
for variant, count in [("control", 7), ("test", 9)]:
for i in range(count):
_create_event(
team=self.team,
event="$feature_flag_called",
distinct_id=f"user_{variant}_{i}",
properties={feature_flag_property: variant},
)
flush_persons_and_events()
query_runner = ExperimentTrendsQueryRunner(
query=ExperimentTrendsQuery(**experiment.metrics[0]["query"]), team=self.team
)
result = query_runner.calculate()
self.assertEqual(len(result.variants), 2)
control_result = next(variant for variant in result.variants if variant.key == "control")
test_result = next(variant for variant in result.variants if variant.key == "test")
self.assertEqual(control_result.count, 11)
self.assertEqual(test_result.count, 15)
self.assertEqual(control_result.absolute_exposure, 7)
self.assertEqual(test_result.absolute_exposure, 9)
@freeze_time("2020-01-01T12:00:00Z")
def test_query_runner_with_custom_exposure(self):
feature_flag = self.create_feature_flag()
experiment = self.create_experiment(feature_flag=feature_flag)
ff_property = f"$feature/{feature_flag.key}"
count_query = TrendsQuery(series=[EventsNode(event="$pageview")])
exposure_query = TrendsQuery(
series=[EventsNode(event="custom_exposure_event", properties=[{"key": "valid_exposure", "value": "true"}])]
)
experiment_query = ExperimentTrendsQuery(
experiment_id=experiment.id,
kind="ExperimentTrendsQuery",
count_query=count_query,
exposure_query=exposure_query,
)
experiment.metrics = [{"type": "primary", "query": experiment_query.model_dump()}]
experiment.save()
journeys_for(
{
"user_control_1": [
{"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "control"}},
{"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "control"}},
{
"event": "custom_exposure_event",
"timestamp": "2020-01-02",
"properties": {ff_property: "control", "valid_exposure": "true"},
},
],
"user_control_2": [
{"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "control"}},
{
"event": "custom_exposure_event",
"timestamp": "2020-01-02",
"properties": {ff_property: "control", "valid_exposure": "true"},
},
],
"user_test_1": [
{"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}},
{"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}},
{"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}},
{
"event": "custom_exposure_event",
"timestamp": "2020-01-02",
"properties": {ff_property: "test", "valid_exposure": "true"},
},
],
"user_test_2": [
{"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}},
{"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}},
{
"event": "custom_exposure_event",
"timestamp": "2020-01-02",
"properties": {ff_property: "test", "valid_exposure": "true"},
},
],
"user_out_of_control": [
{"event": "$pageview", "timestamp": "2020-01-02"},
],
"user_out_of_control_exposure": [
{
"event": "custom_exposure_event",
"timestamp": "2020-01-02",
"properties": {ff_property: "control", "valid_exposure": "false"},
},
],
"user_out_of_date_range": [
{"event": "$pageview", "timestamp": "2019-01-01", "properties": {ff_property: "control"}},
{
"event": "custom_exposure_event",
"timestamp": "2019-01-01",
"properties": {ff_property: "control", "valid_exposure": "true"},
},
],
},
self.team,
)
flush_persons_and_events()
query_runner = ExperimentTrendsQueryRunner(
query=ExperimentTrendsQuery(**experiment.metrics[0]["query"]), team=self.team
)
result = query_runner.calculate()
trend_result = cast(ExperimentTrendsQueryResponse, result)
control_result = next(variant for variant in trend_result.variants if variant.key == "control")
test_result = next(variant for variant in trend_result.variants if variant.key == "test")
self.assertEqual(control_result.count, 3)
self.assertEqual(test_result.count, 5)
self.assertEqual(control_result.absolute_exposure, 2)
self.assertEqual(test_result.absolute_exposure, 2)
@freeze_time("2020-01-01T12:00:00Z")
def test_query_runner_with_default_exposure(self):
feature_flag = self.create_feature_flag()
experiment = self.create_experiment(feature_flag=feature_flag)
ff_property = f"$feature/{feature_flag.key}"
count_query = TrendsQuery(series=[EventsNode(event="$pageview")])
experiment_query = ExperimentTrendsQuery(
experiment_id=experiment.id,
kind="ExperimentTrendsQuery",
count_query=count_query,
exposure_query=None, # No exposure query provided
)
experiment.metrics = [{"type": "primary", "query": experiment_query.model_dump()}]
experiment.save()
journeys_for(
{
"user_control_1": [
{"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "control"}},
{"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "control"}},
{
"event": "$feature_flag_called",
"timestamp": "2020-01-02",
"properties": {ff_property: "control", "$feature_flag": feature_flag.key},
},
],
"user_control_2": [
{"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "control"}},
{
"event": "$feature_flag_called",
"timestamp": "2020-01-02",
"properties": {ff_property: "control", "$feature_flag": feature_flag.key},
},
],
"user_test_1": [
{"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}},
{"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}},
{"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}},
{
"event": "$feature_flag_called",
"timestamp": "2020-01-02",
"properties": {ff_property: "test", "$feature_flag": feature_flag.key},
},
],
"user_test_2": [
{"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}},
{"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}},
{
"event": "$feature_flag_called",
"timestamp": "2020-01-02",
"properties": {ff_property: "test", "$feature_flag": feature_flag.key},
},
],
"user_out_of_control": [
{"event": "$pageview", "timestamp": "2020-01-02"},
],
"user_out_of_control_exposure": [
{"event": "$feature_flag_called", "timestamp": "2020-01-02"},
],
"user_out_of_date_range": [
{"event": "$pageview", "timestamp": "2019-01-01", "properties": {ff_property: "control"}},
{
"event": "$feature_flag_called",
"timestamp": "2019-01-01",
"properties": {ff_property: "control", "$feature_flag": feature_flag.key},
},
],
},
self.team,
)
flush_persons_and_events()
query_runner = ExperimentTrendsQueryRunner(
query=ExperimentTrendsQuery(**experiment.metrics[0]["query"]), team=self.team
)
result = query_runner.calculate()
trend_result = cast(ExperimentTrendsQueryResponse, result)
control_result = next(variant for variant in trend_result.variants if variant.key == "control")
test_result = next(variant for variant in trend_result.variants if variant.key == "test")
self.assertEqual(control_result.count, 3)
self.assertEqual(test_result.count, 5)
self.assertEqual(control_result.absolute_exposure, 2)
self.assertEqual(test_result.absolute_exposure, 2)
@freeze_time("2020-01-01T12:00:00Z")
def test_query_runner_with_holdout(self):
feature_flag = self.create_feature_flag()
experiment = self.create_experiment(feature_flag=feature_flag)
holdout = self.create_holdout_for_experiment(experiment)
feature_flag_property = f"$feature/{feature_flag.key}"
count_query = TrendsQuery(series=[EventsNode(event="$pageview")])
exposure_query = TrendsQuery(series=[EventsNode(event="$feature_flag_called")])
experiment_query = ExperimentTrendsQuery(
experiment_id=experiment.id,
kind="ExperimentTrendsQuery",
count_query=count_query,
exposure_query=exposure_query,
)
experiment.metrics = [{"type": "primary", "query": experiment_query.model_dump()}]
experiment.save()
# Populate experiment events
for variant, count in [("control", 11), ("test", 15), (f"holdout-{holdout.id}", 8)]:
for i in range(count):
_create_event(
team=self.team,
event="$pageview",
distinct_id=f"user_{variant}_{i}",
properties={feature_flag_property: variant},
)
# Populate exposure events
for variant, count in [("control", 7), ("test", 9), (f"holdout-{holdout.id}", 4)]:
for i in range(count):
_create_event(
team=self.team,
event="$feature_flag_called",
distinct_id=f"user_{variant}_{i}",
properties={feature_flag_property: variant},
)
flush_persons_and_events()
query_runner = ExperimentTrendsQueryRunner(
query=ExperimentTrendsQuery(**experiment.metrics[0]["query"]), team=self.team
)
result = query_runner.calculate()
self.assertEqual(len(result.variants), 3)
control_result = next(variant for variant in result.variants if variant.key == "control")
test_result = next(variant for variant in result.variants if variant.key == "test")
holdout_result = next(variant for variant in result.variants if variant.key == f"holdout-{holdout.id}")
self.assertEqual(control_result.count, 11)
self.assertEqual(test_result.count, 15)
self.assertEqual(holdout_result.count, 8)
self.assertEqual(control_result.absolute_exposure, 7)
self.assertEqual(test_result.absolute_exposure, 9)
self.assertEqual(holdout_result.absolute_exposure, 4)
@freeze_time("2020-01-01T12:00:00Z")
def test_query_runner_with_avg_math(self):
feature_flag = self.create_feature_flag()
experiment = self.create_experiment(feature_flag=feature_flag)
count_query = TrendsQuery(series=[EventsNode(event="$pageview", math="avg")])
exposure_query = TrendsQuery(series=[EventsNode(event="$feature_flag_called")])
experiment_query = ExperimentTrendsQuery(
experiment_id=experiment.id,
kind="ExperimentTrendsQuery",
count_query=count_query,
exposure_query=exposure_query,
)
experiment.metrics = [{"type": "primary", "query": experiment_query.model_dump()}]
experiment.save()
query_runner = ExperimentTrendsQueryRunner(
query=ExperimentTrendsQuery(**experiment.metrics[0]["query"]), team=self.team
)
prepared_count_query = query_runner.prepared_count_query
self.assertEqual(prepared_count_query.series[0].math, "sum")
@freeze_time("2020-01-01T12:00:00Z")
def test_query_runner_standard_flow(self):
feature_flag = self.create_feature_flag()
experiment = self.create_experiment(feature_flag=feature_flag)
ff_property = f"$feature/{feature_flag.key}"
count_query = TrendsQuery(series=[EventsNode(event="$pageview")])
exposure_query = TrendsQuery(series=[EventsNode(event="$feature_flag_called")])
experiment_query = ExperimentTrendsQuery(
experiment_id=experiment.id,
kind="ExperimentTrendsQuery",
count_query=count_query,
exposure_query=exposure_query,
)
experiment.metrics = [{"type": "primary", "query": experiment_query.model_dump()}]
experiment.save()
journeys_for(
{
"user_control_1": [
{"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "control"}},
{"event": "$pageview", "timestamp": "2020-01-03", "properties": {ff_property: "control"}},
{
"event": "$feature_flag_called",
"timestamp": "2020-01-02",
"properties": {ff_property: "control"},
},
],
"user_control_2": [
{"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "control"}},
{
"event": "$feature_flag_called",
"timestamp": "2020-01-02",
"properties": {ff_property: "control"},
},
],
"user_test_1": [
{"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}},
{"event": "$pageview", "timestamp": "2020-01-03", "properties": {ff_property: "test"}},
{"event": "$pageview", "timestamp": "2020-01-04", "properties": {ff_property: "test"}},
{"event": "$feature_flag_called", "timestamp": "2020-01-02", "properties": {ff_property: "test"}},
],
"user_test_2": [
{"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}},
{"event": "$pageview", "timestamp": "2020-01-03", "properties": {ff_property: "test"}},
{"event": "$feature_flag_called", "timestamp": "2020-01-02", "properties": {ff_property: "test"}},
],
},
self.team,
)
flush_persons_and_events()
query_runner = ExperimentTrendsQueryRunner(
query=ExperimentTrendsQuery(**experiment.metrics[0]["query"]), team=self.team
)
result = query_runner.calculate()
self.assertEqual(len(result.variants), 2)
for variant in result.variants:
self.assertIn(variant.key, ["control", "test"])
control_variant = next(v for v in result.variants if v.key == "control")
test_variant = next(v for v in result.variants if v.key == "test")
self.assertEqual(control_variant.count, 3)
self.assertEqual(test_variant.count, 5)
self.assertEqual(control_variant.absolute_exposure, 2)
self.assertEqual(test_variant.absolute_exposure, 2)
self.assertAlmostEqual(result.credible_intervals["control"][0], 0.5449, places=3)
self.assertAlmostEqual(result.credible_intervals["control"][1], 4.3836, places=3)
self.assertAlmostEqual(result.credible_intervals["test"][0], 1.1009, places=3)
self.assertAlmostEqual(result.credible_intervals["test"][1], 5.8342, places=3)
self.assertAlmostEqual(result.p_value, 1.0, places=3)
self.assertAlmostEqual(result.probability["control"], 0.2549, places=2)
self.assertAlmostEqual(result.probability["test"], 0.7453, places=2)
self.assertEqual(result.significance_code, ExperimentSignificanceCode.NOT_ENOUGH_EXPOSURE)
self.assertFalse(result.significant)
self.assertEqual(len(result.variants), 2)
self.assertEqual(control_variant.absolute_exposure, 2.0)
self.assertEqual(control_variant.count, 3.0)
self.assertEqual(control_variant.exposure, 1.0)
self.assertEqual(test_variant.absolute_exposure, 2.0)
self.assertEqual(test_variant.count, 5.0)
self.assertEqual(test_variant.exposure, 1.0)
@freeze_time("2020-01-01T12:00:00Z")
def test_validate_event_variants_no_events(self):
feature_flag = self.create_feature_flag()
experiment = self.create_experiment(feature_flag=feature_flag)
count_query = TrendsQuery(series=[EventsNode(event="$pageview")])
experiment_query = ExperimentTrendsQuery(
experiment_id=experiment.id,
kind="ExperimentTrendsQuery",
count_query=count_query,
)
query_runner = ExperimentTrendsQueryRunner(query=experiment_query, team=self.team)
with self.assertRaises(ValidationError) as context:
query_runner.calculate()
expected_errors = json.dumps(
{
ExperimentNoResultsErrorKeys.NO_EVENTS: True,
ExperimentNoResultsErrorKeys.NO_FLAG_INFO: True,
ExperimentNoResultsErrorKeys.NO_CONTROL_VARIANT: True,
ExperimentNoResultsErrorKeys.NO_TEST_VARIANT: True,
}
)
self.assertEqual(cast(list, context.exception.detail)[0], expected_errors)
@freeze_time("2020-01-01T12:00:00Z")
def test_validate_event_variants_no_control(self):
feature_flag = self.create_feature_flag()
experiment = self.create_experiment(feature_flag=feature_flag)
ff_property = f"$feature/{feature_flag.key}"
journeys_for(
{
"user_test": [
{"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "test"}},
],
},
self.team,
)
flush_persons_and_events()
count_query = TrendsQuery(series=[EventsNode(event="$pageview")])
experiment_query = ExperimentTrendsQuery(
experiment_id=experiment.id,
kind="ExperimentTrendsQuery",
count_query=count_query,
)
query_runner = ExperimentTrendsQueryRunner(query=experiment_query, team=self.team)
with self.assertRaises(ValidationError) as context:
query_runner.calculate()
expected_errors = json.dumps(
{
ExperimentNoResultsErrorKeys.NO_EVENTS: False,
ExperimentNoResultsErrorKeys.NO_FLAG_INFO: False,
ExperimentNoResultsErrorKeys.NO_CONTROL_VARIANT: True,
ExperimentNoResultsErrorKeys.NO_TEST_VARIANT: False,
}
)
self.assertEqual(cast(list, context.exception.detail)[0], expected_errors)
@freeze_time("2020-01-01T12:00:00Z")
def test_validate_event_variants_no_test(self):
feature_flag = self.create_feature_flag()
experiment = self.create_experiment(feature_flag=feature_flag)
ff_property = f"$feature/{feature_flag.key}"
journeys_for(
{
"user_control": [
{"event": "$pageview", "timestamp": "2020-01-02", "properties": {ff_property: "control"}},
],
},
self.team,
)
flush_persons_and_events()
count_query = TrendsQuery(series=[EventsNode(event="$pageview")])
experiment_query = ExperimentTrendsQuery(
experiment_id=experiment.id,
kind="ExperimentTrendsQuery",
count_query=count_query,
)
query_runner = ExperimentTrendsQueryRunner(query=experiment_query, team=self.team)
with self.assertRaises(ValidationError) as context:
query_runner.calculate()
expected_errors = json.dumps(
{
ExperimentNoResultsErrorKeys.NO_EVENTS: False,
ExperimentNoResultsErrorKeys.NO_FLAG_INFO: False,
ExperimentNoResultsErrorKeys.NO_CONTROL_VARIANT: False,
ExperimentNoResultsErrorKeys.NO_TEST_VARIANT: True,
}
)
self.assertEqual(cast(list, context.exception.detail)[0], expected_errors)
@freeze_time("2020-01-01T12:00:00Z")
def test_validate_event_variants_no_flag_info(self):
feature_flag = self.create_feature_flag()
experiment = self.create_experiment(feature_flag=feature_flag)
journeys_for(
{
"user_no_flag_1": [
{"event": "$pageview", "timestamp": "2020-01-02"},
],
"user_no_flag_2": [
{"event": "$pageview", "timestamp": "2020-01-03"},
],
},
self.team,
)
flush_persons_and_events()
count_query = TrendsQuery(series=[EventsNode(event="$pageview")])
experiment_query = ExperimentTrendsQuery(
experiment_id=experiment.id,
kind="ExperimentTrendsQuery",
count_query=count_query,
)
query_runner = ExperimentTrendsQueryRunner(query=experiment_query, team=self.team)
with self.assertRaises(ValidationError) as context:
query_runner.calculate()
expected_errors = json.dumps(
{
ExperimentNoResultsErrorKeys.NO_EVENTS: True,
ExperimentNoResultsErrorKeys.NO_FLAG_INFO: True,
ExperimentNoResultsErrorKeys.NO_CONTROL_VARIANT: True,
ExperimentNoResultsErrorKeys.NO_TEST_VARIANT: True,
}
)
self.assertEqual(cast(list, context.exception.detail)[0], expected_errors)

View File

@ -0,0 +1,196 @@
from functools import lru_cache
from math import exp, lgamma, log, ceil
from numpy.random import default_rng
from rest_framework.exceptions import ValidationError
import scipy.stats as stats
from sentry_sdk import capture_exception
from ee.clickhouse.queries.experiments import (
FF_DISTRIBUTION_THRESHOLD,
MIN_PROBABILITY_FOR_SIGNIFICANCE,
P_VALUE_SIGNIFICANCE_LEVEL,
)
from posthog.schema import ExperimentSignificanceCode, ExperimentVariantTrendsBaseStats
Probability = float
def calculate_probabilities(
control_variant: ExperimentVariantTrendsBaseStats, test_variants: list[ExperimentVariantTrendsBaseStats]
) -> list[Probability]:
"""
Calculates probability that A is better than B. First variant is control, rest are test variants.
Supports maximum 10 variants today
For each variant, we create a Gamma distribution of arrival rates,
where alpha (shape parameter) = count of variant + 1
beta (exposure parameter) = 1
"""
if not control_variant:
raise ValidationError("No control variant data found", code="no_data")
if len(test_variants) >= 10:
raise ValidationError(
"Can't calculate experiment results for more than 10 variants",
code="too_much_data",
)
if len(test_variants) < 1:
raise ValidationError(
"Can't calculate experiment results for less than 2 variants",
code="no_data",
)
variants = [control_variant, *test_variants]
probabilities = []
# simulate winning for each test variant
for index, variant in enumerate(variants):
probabilities.append(
simulate_winning_variant_for_arrival_rates(variant, variants[:index] + variants[index + 1 :])
)
total_test_probabilities = sum(probabilities[1:])
return [max(0, 1 - total_test_probabilities), *probabilities[1:]]
def simulate_winning_variant_for_arrival_rates(
target_variant: ExperimentVariantTrendsBaseStats, variants: list[ExperimentVariantTrendsBaseStats]
) -> float:
random_sampler = default_rng()
simulations_count = 100_000
variant_samples = []
for variant in variants:
# Get `N=simulations` samples from a Gamma distribution with alpha = variant_sucess + 1,
# and exposure = relative exposure of variant
samples = random_sampler.gamma(variant.count + 1, 1 / variant.exposure, simulations_count)
variant_samples.append(samples)
target_variant_samples = random_sampler.gamma(
target_variant.count + 1, 1 / target_variant.exposure, simulations_count
)
winnings = 0
variant_conversions = list(zip(*variant_samples))
for i in range(ceil(simulations_count)):
if target_variant_samples[i] > max(variant_conversions[i]):
winnings += 1
return winnings / simulations_count
def are_results_significant(
control_variant: ExperimentVariantTrendsBaseStats,
test_variants: list[ExperimentVariantTrendsBaseStats],
probabilities: list[Probability],
) -> tuple[ExperimentSignificanceCode, Probability]:
# TODO: Experiment with Expected Loss calculations for trend experiments
for variant in test_variants:
# We need a feature flag distribution threshold because distribution of people
# can skew wildly when there are few people in the experiment
if variant.absolute_exposure < FF_DISTRIBUTION_THRESHOLD:
return ExperimentSignificanceCode.NOT_ENOUGH_EXPOSURE, 1
if control_variant.absolute_exposure < FF_DISTRIBUTION_THRESHOLD:
return ExperimentSignificanceCode.NOT_ENOUGH_EXPOSURE, 1
if (
probabilities[0] < MIN_PROBABILITY_FOR_SIGNIFICANCE
and sum(probabilities[1:]) < MIN_PROBABILITY_FOR_SIGNIFICANCE
):
# Sum of probability of winning for all variants except control is less than 90%
return ExperimentSignificanceCode.LOW_WIN_PROBABILITY, 1
p_value = calculate_p_value(control_variant, test_variants)
if p_value >= P_VALUE_SIGNIFICANCE_LEVEL:
return ExperimentSignificanceCode.HIGH_P_VALUE, p_value
return ExperimentSignificanceCode.SIGNIFICANT, p_value
@lru_cache(maxsize=100_000)
def combinationln(n: float, k: float) -> float:
"""
Returns the log of the binomial coefficient.
"""
return lgamma(n + 1) - lgamma(k + 1) - lgamma(n - k + 1)
def intermediate_poisson_term(count: float, iterator: float, relative_exposure: float):
return exp(
combinationln(count, iterator)
+ iterator * log(relative_exposure)
+ (count - iterator) * log(1 - relative_exposure)
)
def poisson_p_value(control_count, control_exposure, test_count, test_exposure):
"""
Calculates the p-value of the experiment.
Calculations from: https://www.evanmiller.org/statistical-formulas-for-programmers.html#count_test
"""
relative_exposure = test_exposure / (control_exposure + test_exposure)
total_count = control_count + test_count
low_p_value = 0.0
high_p_value = 0.0
for i in range(ceil(test_count) + 1):
low_p_value += intermediate_poisson_term(total_count, i, relative_exposure)
for i in range(ceil(test_count), ceil(total_count) + 1):
high_p_value += intermediate_poisson_term(total_count, i, relative_exposure)
return min(1, 2 * min(low_p_value, high_p_value))
def calculate_p_value(
control_variant: ExperimentVariantTrendsBaseStats, test_variants: list[ExperimentVariantTrendsBaseStats]
) -> Probability:
best_test_variant = max(test_variants, key=lambda variant: variant.count)
return poisson_p_value(
control_variant.count,
control_variant.exposure,
best_test_variant.count,
best_test_variant.exposure,
)
def calculate_credible_intervals(variants, lower_bound=0.025, upper_bound=0.975):
"""
Calculate the Bayesian credible intervals for the mean (average events per unit)
for a list of variants in a Trend experiment.
If no lower/upper bound is provided, the function calculates the 95% credible interval.
"""
intervals = {}
for variant in variants:
try:
# Alpha (shape parameter) is count + 1, assuming a Gamma distribution for counts
alpha = variant.count + 1
# Beta (scale parameter) is the inverse of absolute_exposure,
# representing the average rate of events per user
beta = 1 / variant.absolute_exposure
# Calculate the credible interval for the mean using Gamma distribution
credible_interval = stats.gamma.ppf([lower_bound, upper_bound], a=alpha, scale=beta)
intervals[variant.key] = (credible_interval[0], credible_interval[1])
except Exception as e:
capture_exception(
Exception(f"Error calculating credible interval for variant {variant.key}"),
{"error": str(e)},
)
return {}
return intervals

View File

@ -0,0 +1,41 @@
import structlog
from django.db.models.query import QuerySet
from posthog.schema import (
FeatureFlagStatus,
)
from .feature_flag import (
FeatureFlag,
)
logger = structlog.get_logger(__name__)
# FeatureFlagStatusChecker is used to determine the status of a feature flag for a given user.
# Eventually, this may be used to automatically archive old flags that are no longer in use.
#
# Status can be one of the following:
# - ACTIVE: The feature flag is actively evaluated and the evaluations continue to vary.
# - STALE: The feature flag either has not been evaluated recently, or the evaluation has not changed recently.
# - DELETED: The feature flag has been soft deleted.
# - UNKNOWN: The feature flag is not found in the database.
class FeatureFlagStatusChecker:
def __init__(
self,
feature_flag_id: str,
# The amount of time considered "recent" for the purposes of determining staleness.
stale_window: str = "-30d",
):
self.feature_flag_id = feature_flag_id
self.stale_window = stale_window
def get_status(self) -> FeatureFlagStatus:
flag: FeatureFlag = FeatureFlag.objects.get(pk=self.feature_flag_id)
if flag is None:
return FeatureFlagStatus.UNKNOWN
if flag.deleted:
return FeatureFlagStatus.DELETED
def get_recent_evaluations(self) -> QuerySet:
return

View File

@ -716,6 +716,13 @@ class Status(StrEnum):
PENDING_RELEASE = "pending_release"
class FeatureFlagStatus(StrEnum):
ACTIVE = "active"
STALE = "stale"
DELETED = "deleted"
UNKNOWN = "unknown"
class ErrorTrackingGroup(BaseModel):
model_config = ConfigDict(
extra="forbid",
@ -6722,6 +6729,13 @@ class ExperimentTrendsQuery(BaseModel):
response: Optional[ExperimentTrendsQueryResponse] = None
class FeatureFlagStatusQueryResponse(BaseModel):
model_config = ConfigDict(
extra="forbid",
)
status: FeatureFlagStatus
class FunnelPathsFilter(BaseModel):
model_config = ConfigDict(
extra="forbid",