diff --git a/posthog/temporal/batch_exports/batch_exports.py b/posthog/temporal/batch_exports/batch_exports.py index 66279ccd718..c522a75bce2 100644 --- a/posthog/temporal/batch_exports/batch_exports.py +++ b/posthog/temporal/batch_exports/batch_exports.py @@ -593,10 +593,10 @@ async def execute_batch_export_insert_activity( inputs, non_retryable_error_types: list[str], finish_inputs: FinishBatchExportRunInputs, - start_to_close_timeout_seconds: int = 3600, + interval: str, heartbeat_timeout_seconds: int | None = 120, - maximum_attempts: int = 10, - initial_retry_interval_seconds: int = 10, + maximum_attempts: int = 15, + initial_retry_interval_seconds: int = 30, maximum_retry_interval_seconds: int = 120, ) -> None: """Execute the main insert activity of a batch export handling any errors. @@ -610,7 +610,7 @@ async def execute_batch_export_insert_activity( inputs: The inputs to the activity. non_retryable_error_types: A list of errors to not retry on when executing the activity. finish_inputs: Inputs to the 'finish_batch_export_run' to run at the end. - start_to_close_timeout: A timeout for the 'insert_into_*' activity function. + interval: The interval of the batch export used to set the start to close timeout. maximum_attempts: Maximum number of retries for the 'insert_into_*' activity function. Assuming the error that triggered the retry is not in non_retryable_error_types. initial_retry_interval_seconds: When retrying, seconds until the first retry. @@ -624,11 +624,23 @@ async def execute_batch_export_insert_activity( non_retryable_error_types=non_retryable_error_types, ) + if interval == "hour": + start_to_close_timeout = dt.timedelta(hours=1) + elif interval == "day": + start_to_close_timeout = dt.timedelta(days=1) + elif interval.startswith("every"): + _, value, unit = interval.split(" ") + kwargs = {unit: int(value)} + # TODO: Consider removing this 10 minute minimum once we are more confident about hitting 5 minute or lower SLAs. + start_to_close_timeout = max(dt.timedelta(minutes=10), dt.timedelta(**kwargs)) + else: + raise ValueError(f"Unsupported interval: '{interval}'") + try: records_completed = await workflow.execute_activity( activity, inputs, - start_to_close_timeout=dt.timedelta(seconds=start_to_close_timeout_seconds), + start_to_close_timeout=start_to_close_timeout, heartbeat_timeout=dt.timedelta(seconds=heartbeat_timeout_seconds) if heartbeat_timeout_seconds else None, retry_policy=retry_policy, ) diff --git a/posthog/temporal/batch_exports/bigquery_batch_export.py b/posthog/temporal/batch_exports/bigquery_batch_export.py index 93a2e522e1e..9e81aafe138 100644 --- a/posthog/temporal/batch_exports/bigquery_batch_export.py +++ b/posthog/temporal/batch_exports/bigquery_batch_export.py @@ -432,6 +432,7 @@ class BigQueryBatchExportWorkflow(PostHogWorkflow): await execute_batch_export_insert_activity( insert_into_bigquery_activity, insert_inputs, + interval=inputs.interval, non_retryable_error_types=[ # Raised on missing permissions. "Forbidden", diff --git a/posthog/temporal/batch_exports/http_batch_export.py b/posthog/temporal/batch_exports/http_batch_export.py index f86703f3cf7..623cc53bed6 100644 --- a/posthog/temporal/batch_exports/http_batch_export.py +++ b/posthog/temporal/batch_exports/http_batch_export.py @@ -373,6 +373,7 @@ class HttpBatchExportWorkflow(PostHogWorkflow): await execute_batch_export_insert_activity( insert_into_http_activity, insert_inputs, + interval=inputs.interval, non_retryable_error_types=[ "NonRetryableResponseError", ], diff --git a/posthog/temporal/batch_exports/postgres_batch_export.py b/posthog/temporal/batch_exports/postgres_batch_export.py index 6281862a72f..6ebede565bc 100644 --- a/posthog/temporal/batch_exports/postgres_batch_export.py +++ b/posthog/temporal/batch_exports/postgres_batch_export.py @@ -439,6 +439,7 @@ class PostgresBatchExportWorkflow(PostHogWorkflow): await execute_batch_export_insert_activity( insert_into_postgres_activity, insert_inputs, + interval=inputs.interval, non_retryable_error_types=[ # Raised on errors that are related to database operation. # For example: unexpected disconnect, database or other object not found. diff --git a/posthog/temporal/batch_exports/redshift_batch_export.py b/posthog/temporal/batch_exports/redshift_batch_export.py index e98fa9106c1..61e5ad7c026 100644 --- a/posthog/temporal/batch_exports/redshift_batch_export.py +++ b/posthog/temporal/batch_exports/redshift_batch_export.py @@ -469,6 +469,7 @@ class RedshiftBatchExportWorkflow(PostHogWorkflow): await execute_batch_export_insert_activity( insert_into_redshift_activity, insert_inputs, + interval=inputs.interval, non_retryable_error_types=[ # Raised on errors that are related to database operation. # For example: unexpected disconnect, database or other object not found. diff --git a/posthog/temporal/batch_exports/s3_batch_export.py b/posthog/temporal/batch_exports/s3_batch_export.py index febdac88b45..39a2755a721 100644 --- a/posthog/temporal/batch_exports/s3_batch_export.py +++ b/posthog/temporal/batch_exports/s3_batch_export.py @@ -687,6 +687,7 @@ class S3BatchExportWorkflow(PostHogWorkflow): await execute_batch_export_insert_activity( insert_into_s3_activity, insert_inputs, + interval=inputs.interval, non_retryable_error_types=[ # S3 parameter validation failed. "ParamValidationError", diff --git a/posthog/temporal/batch_exports/snowflake_batch_export.py b/posthog/temporal/batch_exports/snowflake_batch_export.py index 2d782c1f94d..c769862af96 100644 --- a/posthog/temporal/batch_exports/snowflake_batch_export.py +++ b/posthog/temporal/batch_exports/snowflake_batch_export.py @@ -631,6 +631,7 @@ class SnowflakeBatchExportWorkflow(PostHogWorkflow): await execute_batch_export_insert_activity( insert_into_snowflake_activity, insert_inputs, + interval=inputs.interval, non_retryable_error_types=[ # Raised when we cannot connect to Snowflake. "DatabaseError", diff --git a/posthog/temporal/tests/utils/events.py b/posthog/temporal/tests/utils/events.py index 71ce7f7f616..ce482573818 100644 --- a/posthog/temporal/tests/utils/events.py +++ b/posthog/temporal/tests/utils/events.py @@ -44,6 +44,7 @@ def generate_test_events( site_url: str | None = "", set_field: dict | None = None, set_once: dict | None = None, + start: int = 0, ): """Generate a list of events for testing.""" _timestamp = random.choice(possible_datetimes) @@ -77,7 +78,7 @@ def generate_test_events( "set": set_field, "set_once": set_once, } - for i in range(count) + for i in range(start, count + start) ] return events @@ -138,6 +139,7 @@ async def generate_test_events_in_clickhouse( person_properties: dict | None = None, inserted_at: str | dt.datetime | None = "_timestamp", duplicate: bool = False, + batch_size: int = 10000, ) -> tuple[list[EventValues], list[EventValues], list[EventValues]]: """Insert test events into the sharded_events table. @@ -165,20 +167,27 @@ async def generate_test_events_in_clickhouse( possible_datetimes = list(date_range(start_time, end_time, dt.timedelta(minutes=1))) # Base events - events = generate_test_events( - count=count, - team_id=team_id, - possible_datetimes=possible_datetimes, - event_name=event_name, - properties=properties, - person_properties=person_properties, - inserted_at=inserted_at, - ) + events: list[EventValues] = [] + while len(events) < count: + events_to_insert = generate_test_events( + count=min(count - len(events), batch_size), + team_id=team_id, + possible_datetimes=possible_datetimes, + event_name=event_name, + properties=properties, + person_properties=person_properties, + inserted_at=inserted_at, + start=len(events), + ) - # Add duplicates if required - duplicate_events = [] - if duplicate is True: - duplicate_events = events + # Add duplicates if required + duplicate_events = [] + if duplicate is True: + duplicate_events = events_to_insert + + await insert_event_values_in_clickhouse(client=client, events=events_to_insert + duplicate_events) + + events.extend(events_to_insert) # Events outside original date range delta = end_time - start_time @@ -207,7 +216,5 @@ async def generate_test_events_in_clickhouse( inserted_at=inserted_at, ) - await insert_event_values_in_clickhouse( - client=client, events=events + events_outside_range + events_from_other_team + duplicate_events - ) + await insert_event_values_in_clickhouse(client=client, events=events_outside_range + events_from_other_team) return (events, events_outside_range, events_from_other_team)