diff --git a/ee/clickhouse/models/test/__snapshots__/test_cohort.ambr b/ee/clickhouse/models/test/__snapshots__/test_cohort.ambr index a753be81012..fcd34ec79af 100644 --- a/ee/clickhouse/models/test/__snapshots__/test_cohort.ambr +++ b/ee/clickhouse/models/test/__snapshots__/test_cohort.ambr @@ -109,7 +109,7 @@ GROUP BY person_id HAVING count(*) = 1) )) ORDER BY _timestamp ASC - LIMIT 50000 + LIMIT 10000 ' --- # name: TestCohort.test_cohortpeople_action_count.13 @@ -163,8 +163,8 @@ GROUP BY person_id HAVING count(*) = 1) )) ORDER BY _timestamp ASC - LIMIT 50000 - OFFSET 50000 + LIMIT 10000 + OFFSET 10000 ' --- # name: TestCohort.test_cohortpeople_action_count.14 @@ -254,7 +254,7 @@ AND ((event = '$pageview')) GROUP BY person_id) )) ORDER BY _timestamp ASC - LIMIT 50000 + LIMIT 10000 ' --- # name: TestCohort.test_cohortpeople_action_count.18 @@ -307,8 +307,8 @@ AND ((event = '$pageview')) GROUP BY person_id) )) ORDER BY _timestamp ASC - LIMIT 50000 - OFFSET 50000 + LIMIT 10000 + OFFSET 10000 ' --- # name: TestCohort.test_cohortpeople_action_count.19 @@ -369,7 +369,7 @@ GROUP BY person_id HAVING count(*) >= 2) )) ORDER BY _timestamp ASC - LIMIT 50000 + LIMIT 10000 ' --- # name: TestCohort.test_cohortpeople_action_count.3 @@ -423,8 +423,8 @@ GROUP BY person_id HAVING count(*) >= 2) )) ORDER BY _timestamp ASC - LIMIT 50000 - OFFSET 50000 + LIMIT 10000 + OFFSET 10000 ' --- # name: TestCohort.test_cohortpeople_action_count.4 @@ -515,7 +515,7 @@ GROUP BY person_id HAVING count(*) <= 1) )) ORDER BY _timestamp ASC - LIMIT 50000 + LIMIT 10000 ' --- # name: TestCohort.test_cohortpeople_action_count.8 @@ -569,8 +569,8 @@ GROUP BY person_id HAVING count(*) <= 1) )) ORDER BY _timestamp ASC - LIMIT 50000 - OFFSET 50000 + LIMIT 10000 + OFFSET 10000 ' --- # name: TestCohort.test_cohortpeople_action_count.9 diff --git a/ee/clickhouse/queries/funnels/test/__snapshots__/test_funnel.ambr b/ee/clickhouse/queries/funnels/test/__snapshots__/test_funnel.ambr index 4e32f83f8f4..8fdbc1035c7 100644 --- a/ee/clickhouse/queries/funnels/test/__snapshots__/test_funnel.ambr +++ b/ee/clickhouse/queries/funnels/test/__snapshots__/test_funnel.ambr @@ -173,7 +173,7 @@ AND (trim(BOTH '"' FROM JSONExtractRaw(properties, 'email')) ILIKE '%.com%') )) )) ORDER BY _timestamp ASC - LIMIT 50000 + LIMIT 10000 ' --- # name: TestClickhouseFunnel.test_funnel_with_precalculated_cohort_step_filter.3 @@ -228,8 +228,8 @@ AND (trim(BOTH '"' FROM JSONExtractRaw(properties, 'email')) ILIKE '%.com%') )) )) ORDER BY _timestamp ASC - LIMIT 50000 - OFFSET 50000 + LIMIT 10000 + OFFSET 10000 ' --- # name: TestClickhouseFunnel.test_funnel_with_precalculated_cohort_step_filter.4 diff --git a/ee/clickhouse/queries/session_recordings/test/__snapshots__/test_clickhouse_session_recording_list.ambr b/ee/clickhouse/queries/session_recordings/test/__snapshots__/test_clickhouse_session_recording_list.ambr index 6265e605c99..d04b8f8aec6 100644 --- a/ee/clickhouse/queries/session_recordings/test/__snapshots__/test_clickhouse_session_recording_list.ambr +++ b/ee/clickhouse/queries/session_recordings/test/__snapshots__/test_clickhouse_session_recording_list.ambr @@ -194,7 +194,7 @@ AND (has(['some_val'], trim(BOTH '"' FROM JSONExtractRaw(properties, '$some_prop')))) )) )) ORDER BY _timestamp ASC - LIMIT 50000 + LIMIT 10000 ' --- # name: TestClickhouseSessionRecordingsList.test_event_filter_with_cohort_properties.3 @@ -249,8 +249,8 @@ AND (has(['some_val'], trim(BOTH '"' FROM JSONExtractRaw(properties, '$some_prop')))) )) )) ORDER BY _timestamp ASC - LIMIT 50000 - OFFSET 50000 + LIMIT 10000 + OFFSET 10000 ' --- # name: TestClickhouseSessionRecordingsList.test_event_filter_with_cohort_properties.4 diff --git a/ee/clickhouse/queries/test/__snapshots__/test_event_query.ambr b/ee/clickhouse/queries/test/__snapshots__/test_event_query.ambr index 82e6098af77..c399584121b 100644 --- a/ee/clickhouse/queries/test/__snapshots__/test_event_query.ambr +++ b/ee/clickhouse/queries/test/__snapshots__/test_event_query.ambr @@ -80,7 +80,7 @@ AND (has(['Jane'], trim(BOTH '"' FROM JSONExtractRaw(properties, 'name')))) )) )) ORDER BY _timestamp ASC - LIMIT 50000 + LIMIT 10000 ' --- # name: TestEventQuery.test_account_filters.3 @@ -135,8 +135,8 @@ AND (has(['Jane'], trim(BOTH '"' FROM JSONExtractRaw(properties, 'name')))) )) )) ORDER BY _timestamp ASC - LIMIT 50000 - OFFSET 50000 + LIMIT 10000 + OFFSET 10000 ' --- # name: TestEventQuery.test_account_filters.4 diff --git a/posthog/models/cohort.py b/posthog/models/cohort.py index 3c444dedf7b..f146455e931 100644 --- a/posthog/models/cohort.py +++ b/posthog/models/cohort.py @@ -109,7 +109,7 @@ class Cohort(models.Model): "deleted": self.deleted, } - def calculate_people(self, new_version: int, batch_size=50000, pg_batch_size=1000): + def calculate_people(self, new_version: int, batch_size=10000, pg_batch_size=1000): if self.is_static: return try: @@ -123,6 +123,7 @@ class Cohort(models.Model): cursor += batch_size persons = self._clickhouse_persons_query(batch_size=batch_size, offset=cursor) + time.sleep(1) except Exception as err: # Clear the pending version people if there's an error @@ -133,13 +134,10 @@ class Cohort(models.Model): def calculate_people_ch(self, pending_version): from ee.clickhouse.models.cohort import recalculate_cohortpeople + logger.info("cohort_calculation_started", id=self.pk, current_version=self.version, new_version=pending_version) + start_time = time.monotonic() + try: - start_time = time.time() - - logger.info( - "cohort_calculation_started", id=self.pk, current_version=self.version, new_version=pending_version - ) - count = recalculate_cohortpeople(self) self.calculate_people(new_version=pending_version) @@ -151,16 +149,27 @@ class Cohort(models.Model): self.last_calculation = timezone.now() self.errors_calculating = 0 - logger.info( - "cohort_calculation_completed", id=self.pk, version=pending_version, duration=(time.time() - start_time) - ) - except Exception as e: + except Exception: self.errors_calculating = F("errors_calculating") + 1 - raise e + logger.warning( + "cohort_calculation_failed", + id=self.pk, + current_version=self.version, + new_version=pending_version, + exc_info=True, + ) + raise finally: self.is_calculating = False self.save() + logger.info( + "cohort_calculation_completed", + id=self.pk, + version=pending_version, + duration=(time.monotonic() - start_time), + ) + def insert_users_by_list(self, items: List[str]) -> None: """ Items can be distinct_id or email