mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-24 00:47:50 +01:00
fix: Make timeouts variable according to interval (#21467)
* fix: Make timeouts variable according to interval * chore: Bump maximum attempts and initial retry interval * fix: Pass the interval along to the execute function * feat: Remove multiplier We'll deal with timeouts in higher frequency batch exports. * refactor: Add a 10 min floor to timeout * fix: Test event generation function now batches inserts to clickhouse * fix: Add missing type hint * Update query snapshots * Update query snapshots * fix: Account for tests that rely on event name generation * Update query snapshots * fix: Event generation in test * chore: Bump batch size --------- Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
parent
6183f88b67
commit
2f6344ab77
@ -593,10 +593,10 @@ async def execute_batch_export_insert_activity(
|
||||
inputs,
|
||||
non_retryable_error_types: list[str],
|
||||
finish_inputs: FinishBatchExportRunInputs,
|
||||
start_to_close_timeout_seconds: int = 3600,
|
||||
interval: str,
|
||||
heartbeat_timeout_seconds: int | None = 120,
|
||||
maximum_attempts: int = 10,
|
||||
initial_retry_interval_seconds: int = 10,
|
||||
maximum_attempts: int = 15,
|
||||
initial_retry_interval_seconds: int = 30,
|
||||
maximum_retry_interval_seconds: int = 120,
|
||||
) -> None:
|
||||
"""Execute the main insert activity of a batch export handling any errors.
|
||||
@ -610,7 +610,7 @@ async def execute_batch_export_insert_activity(
|
||||
inputs: The inputs to the activity.
|
||||
non_retryable_error_types: A list of errors to not retry on when executing the activity.
|
||||
finish_inputs: Inputs to the 'finish_batch_export_run' to run at the end.
|
||||
start_to_close_timeout: A timeout for the 'insert_into_*' activity function.
|
||||
interval: The interval of the batch export used to set the start to close timeout.
|
||||
maximum_attempts: Maximum number of retries for the 'insert_into_*' activity function.
|
||||
Assuming the error that triggered the retry is not in non_retryable_error_types.
|
||||
initial_retry_interval_seconds: When retrying, seconds until the first retry.
|
||||
@ -624,11 +624,23 @@ async def execute_batch_export_insert_activity(
|
||||
non_retryable_error_types=non_retryable_error_types,
|
||||
)
|
||||
|
||||
if interval == "hour":
|
||||
start_to_close_timeout = dt.timedelta(hours=1)
|
||||
elif interval == "day":
|
||||
start_to_close_timeout = dt.timedelta(days=1)
|
||||
elif interval.startswith("every"):
|
||||
_, value, unit = interval.split(" ")
|
||||
kwargs = {unit: int(value)}
|
||||
# TODO: Consider removing this 10 minute minimum once we are more confident about hitting 5 minute or lower SLAs.
|
||||
start_to_close_timeout = max(dt.timedelta(minutes=10), dt.timedelta(**kwargs))
|
||||
else:
|
||||
raise ValueError(f"Unsupported interval: '{interval}'")
|
||||
|
||||
try:
|
||||
records_completed = await workflow.execute_activity(
|
||||
activity,
|
||||
inputs,
|
||||
start_to_close_timeout=dt.timedelta(seconds=start_to_close_timeout_seconds),
|
||||
start_to_close_timeout=start_to_close_timeout,
|
||||
heartbeat_timeout=dt.timedelta(seconds=heartbeat_timeout_seconds) if heartbeat_timeout_seconds else None,
|
||||
retry_policy=retry_policy,
|
||||
)
|
||||
|
@ -432,6 +432,7 @@ class BigQueryBatchExportWorkflow(PostHogWorkflow):
|
||||
await execute_batch_export_insert_activity(
|
||||
insert_into_bigquery_activity,
|
||||
insert_inputs,
|
||||
interval=inputs.interval,
|
||||
non_retryable_error_types=[
|
||||
# Raised on missing permissions.
|
||||
"Forbidden",
|
||||
|
@ -373,6 +373,7 @@ class HttpBatchExportWorkflow(PostHogWorkflow):
|
||||
await execute_batch_export_insert_activity(
|
||||
insert_into_http_activity,
|
||||
insert_inputs,
|
||||
interval=inputs.interval,
|
||||
non_retryable_error_types=[
|
||||
"NonRetryableResponseError",
|
||||
],
|
||||
|
@ -439,6 +439,7 @@ class PostgresBatchExportWorkflow(PostHogWorkflow):
|
||||
await execute_batch_export_insert_activity(
|
||||
insert_into_postgres_activity,
|
||||
insert_inputs,
|
||||
interval=inputs.interval,
|
||||
non_retryable_error_types=[
|
||||
# Raised on errors that are related to database operation.
|
||||
# For example: unexpected disconnect, database or other object not found.
|
||||
|
@ -469,6 +469,7 @@ class RedshiftBatchExportWorkflow(PostHogWorkflow):
|
||||
await execute_batch_export_insert_activity(
|
||||
insert_into_redshift_activity,
|
||||
insert_inputs,
|
||||
interval=inputs.interval,
|
||||
non_retryable_error_types=[
|
||||
# Raised on errors that are related to database operation.
|
||||
# For example: unexpected disconnect, database or other object not found.
|
||||
|
@ -687,6 +687,7 @@ class S3BatchExportWorkflow(PostHogWorkflow):
|
||||
await execute_batch_export_insert_activity(
|
||||
insert_into_s3_activity,
|
||||
insert_inputs,
|
||||
interval=inputs.interval,
|
||||
non_retryable_error_types=[
|
||||
# S3 parameter validation failed.
|
||||
"ParamValidationError",
|
||||
|
@ -631,6 +631,7 @@ class SnowflakeBatchExportWorkflow(PostHogWorkflow):
|
||||
await execute_batch_export_insert_activity(
|
||||
insert_into_snowflake_activity,
|
||||
insert_inputs,
|
||||
interval=inputs.interval,
|
||||
non_retryable_error_types=[
|
||||
# Raised when we cannot connect to Snowflake.
|
||||
"DatabaseError",
|
||||
|
@ -44,6 +44,7 @@ def generate_test_events(
|
||||
site_url: str | None = "",
|
||||
set_field: dict | None = None,
|
||||
set_once: dict | None = None,
|
||||
start: int = 0,
|
||||
):
|
||||
"""Generate a list of events for testing."""
|
||||
_timestamp = random.choice(possible_datetimes)
|
||||
@ -77,7 +78,7 @@ def generate_test_events(
|
||||
"set": set_field,
|
||||
"set_once": set_once,
|
||||
}
|
||||
for i in range(count)
|
||||
for i in range(start, count + start)
|
||||
]
|
||||
|
||||
return events
|
||||
@ -138,6 +139,7 @@ async def generate_test_events_in_clickhouse(
|
||||
person_properties: dict | None = None,
|
||||
inserted_at: str | dt.datetime | None = "_timestamp",
|
||||
duplicate: bool = False,
|
||||
batch_size: int = 10000,
|
||||
) -> tuple[list[EventValues], list[EventValues], list[EventValues]]:
|
||||
"""Insert test events into the sharded_events table.
|
||||
|
||||
@ -165,20 +167,27 @@ async def generate_test_events_in_clickhouse(
|
||||
possible_datetimes = list(date_range(start_time, end_time, dt.timedelta(minutes=1)))
|
||||
|
||||
# Base events
|
||||
events = generate_test_events(
|
||||
count=count,
|
||||
team_id=team_id,
|
||||
possible_datetimes=possible_datetimes,
|
||||
event_name=event_name,
|
||||
properties=properties,
|
||||
person_properties=person_properties,
|
||||
inserted_at=inserted_at,
|
||||
)
|
||||
events: list[EventValues] = []
|
||||
while len(events) < count:
|
||||
events_to_insert = generate_test_events(
|
||||
count=min(count - len(events), batch_size),
|
||||
team_id=team_id,
|
||||
possible_datetimes=possible_datetimes,
|
||||
event_name=event_name,
|
||||
properties=properties,
|
||||
person_properties=person_properties,
|
||||
inserted_at=inserted_at,
|
||||
start=len(events),
|
||||
)
|
||||
|
||||
# Add duplicates if required
|
||||
duplicate_events = []
|
||||
if duplicate is True:
|
||||
duplicate_events = events
|
||||
# Add duplicates if required
|
||||
duplicate_events = []
|
||||
if duplicate is True:
|
||||
duplicate_events = events_to_insert
|
||||
|
||||
await insert_event_values_in_clickhouse(client=client, events=events_to_insert + duplicate_events)
|
||||
|
||||
events.extend(events_to_insert)
|
||||
|
||||
# Events outside original date range
|
||||
delta = end_time - start_time
|
||||
@ -207,7 +216,5 @@ async def generate_test_events_in_clickhouse(
|
||||
inserted_at=inserted_at,
|
||||
)
|
||||
|
||||
await insert_event_values_in_clickhouse(
|
||||
client=client, events=events + events_outside_range + events_from_other_team + duplicate_events
|
||||
)
|
||||
await insert_event_values_in_clickhouse(client=client, events=events_outside_range + events_from_other_team)
|
||||
return (events, events_outside_range, events_from_other_team)
|
||||
|
Loading…
Reference in New Issue
Block a user