mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-28 09:16:49 +01:00
perf(cohorts): reduce select batch size, and sleep 1 between (#8598)
* perf(cohorts): reduce select batch size, and sleep 1 between Previously we were hammering the database, see https://github.com/PostHog/product-internal/issues/256 for details. Now we: 1. reduce the batch by 5 times 2. have a little sleep(1) between batches We also add a little bit of logging for instance in failure cases. * include stack trace info in warning * just reraise so we don't lose stack trace * use monotonic for timings, not time * update snapshots
This commit is contained in:
parent
286d8844d7
commit
3be0cda8ce
@ -109,7 +109,7 @@
|
||||
GROUP BY person_id
|
||||
HAVING count(*) = 1) ))
|
||||
ORDER BY _timestamp ASC
|
||||
LIMIT 50000
|
||||
LIMIT 10000
|
||||
'
|
||||
---
|
||||
# name: TestCohort.test_cohortpeople_action_count.13
|
||||
@ -163,8 +163,8 @@
|
||||
GROUP BY person_id
|
||||
HAVING count(*) = 1) ))
|
||||
ORDER BY _timestamp ASC
|
||||
LIMIT 50000
|
||||
OFFSET 50000
|
||||
LIMIT 10000
|
||||
OFFSET 10000
|
||||
'
|
||||
---
|
||||
# name: TestCohort.test_cohortpeople_action_count.14
|
||||
@ -254,7 +254,7 @@
|
||||
AND ((event = '$pageview'))
|
||||
GROUP BY person_id) ))
|
||||
ORDER BY _timestamp ASC
|
||||
LIMIT 50000
|
||||
LIMIT 10000
|
||||
'
|
||||
---
|
||||
# name: TestCohort.test_cohortpeople_action_count.18
|
||||
@ -307,8 +307,8 @@
|
||||
AND ((event = '$pageview'))
|
||||
GROUP BY person_id) ))
|
||||
ORDER BY _timestamp ASC
|
||||
LIMIT 50000
|
||||
OFFSET 50000
|
||||
LIMIT 10000
|
||||
OFFSET 10000
|
||||
'
|
||||
---
|
||||
# name: TestCohort.test_cohortpeople_action_count.19
|
||||
@ -369,7 +369,7 @@
|
||||
GROUP BY person_id
|
||||
HAVING count(*) >= 2) ))
|
||||
ORDER BY _timestamp ASC
|
||||
LIMIT 50000
|
||||
LIMIT 10000
|
||||
'
|
||||
---
|
||||
# name: TestCohort.test_cohortpeople_action_count.3
|
||||
@ -423,8 +423,8 @@
|
||||
GROUP BY person_id
|
||||
HAVING count(*) >= 2) ))
|
||||
ORDER BY _timestamp ASC
|
||||
LIMIT 50000
|
||||
OFFSET 50000
|
||||
LIMIT 10000
|
||||
OFFSET 10000
|
||||
'
|
||||
---
|
||||
# name: TestCohort.test_cohortpeople_action_count.4
|
||||
@ -515,7 +515,7 @@
|
||||
GROUP BY person_id
|
||||
HAVING count(*) <= 1) ))
|
||||
ORDER BY _timestamp ASC
|
||||
LIMIT 50000
|
||||
LIMIT 10000
|
||||
'
|
||||
---
|
||||
# name: TestCohort.test_cohortpeople_action_count.8
|
||||
@ -569,8 +569,8 @@
|
||||
GROUP BY person_id
|
||||
HAVING count(*) <= 1) ))
|
||||
ORDER BY _timestamp ASC
|
||||
LIMIT 50000
|
||||
OFFSET 50000
|
||||
LIMIT 10000
|
||||
OFFSET 10000
|
||||
'
|
||||
---
|
||||
# name: TestCohort.test_cohortpeople_action_count.9
|
||||
|
@ -173,7 +173,7 @@
|
||||
AND (trim(BOTH '"'
|
||||
FROM JSONExtractRaw(properties, 'email')) ILIKE '%.com%') )) ))
|
||||
ORDER BY _timestamp ASC
|
||||
LIMIT 50000
|
||||
LIMIT 10000
|
||||
'
|
||||
---
|
||||
# name: TestClickhouseFunnel.test_funnel_with_precalculated_cohort_step_filter.3
|
||||
@ -228,8 +228,8 @@
|
||||
AND (trim(BOTH '"'
|
||||
FROM JSONExtractRaw(properties, 'email')) ILIKE '%.com%') )) ))
|
||||
ORDER BY _timestamp ASC
|
||||
LIMIT 50000
|
||||
OFFSET 50000
|
||||
LIMIT 10000
|
||||
OFFSET 10000
|
||||
'
|
||||
---
|
||||
# name: TestClickhouseFunnel.test_funnel_with_precalculated_cohort_step_filter.4
|
||||
|
@ -194,7 +194,7 @@
|
||||
AND (has(['some_val'], trim(BOTH '"'
|
||||
FROM JSONExtractRaw(properties, '$some_prop')))) )) ))
|
||||
ORDER BY _timestamp ASC
|
||||
LIMIT 50000
|
||||
LIMIT 10000
|
||||
'
|
||||
---
|
||||
# name: TestClickhouseSessionRecordingsList.test_event_filter_with_cohort_properties.3
|
||||
@ -249,8 +249,8 @@
|
||||
AND (has(['some_val'], trim(BOTH '"'
|
||||
FROM JSONExtractRaw(properties, '$some_prop')))) )) ))
|
||||
ORDER BY _timestamp ASC
|
||||
LIMIT 50000
|
||||
OFFSET 50000
|
||||
LIMIT 10000
|
||||
OFFSET 10000
|
||||
'
|
||||
---
|
||||
# name: TestClickhouseSessionRecordingsList.test_event_filter_with_cohort_properties.4
|
||||
|
@ -80,7 +80,7 @@
|
||||
AND (has(['Jane'], trim(BOTH '"'
|
||||
FROM JSONExtractRaw(properties, 'name')))) )) ))
|
||||
ORDER BY _timestamp ASC
|
||||
LIMIT 50000
|
||||
LIMIT 10000
|
||||
'
|
||||
---
|
||||
# name: TestEventQuery.test_account_filters.3
|
||||
@ -135,8 +135,8 @@
|
||||
AND (has(['Jane'], trim(BOTH '"'
|
||||
FROM JSONExtractRaw(properties, 'name')))) )) ))
|
||||
ORDER BY _timestamp ASC
|
||||
LIMIT 50000
|
||||
OFFSET 50000
|
||||
LIMIT 10000
|
||||
OFFSET 10000
|
||||
'
|
||||
---
|
||||
# name: TestEventQuery.test_account_filters.4
|
||||
|
@ -109,7 +109,7 @@ class Cohort(models.Model):
|
||||
"deleted": self.deleted,
|
||||
}
|
||||
|
||||
def calculate_people(self, new_version: int, batch_size=50000, pg_batch_size=1000):
|
||||
def calculate_people(self, new_version: int, batch_size=10000, pg_batch_size=1000):
|
||||
if self.is_static:
|
||||
return
|
||||
try:
|
||||
@ -123,6 +123,7 @@ class Cohort(models.Model):
|
||||
|
||||
cursor += batch_size
|
||||
persons = self._clickhouse_persons_query(batch_size=batch_size, offset=cursor)
|
||||
time.sleep(1)
|
||||
|
||||
except Exception as err:
|
||||
# Clear the pending version people if there's an error
|
||||
@ -133,13 +134,10 @@ class Cohort(models.Model):
|
||||
def calculate_people_ch(self, pending_version):
|
||||
from ee.clickhouse.models.cohort import recalculate_cohortpeople
|
||||
|
||||
logger.info("cohort_calculation_started", id=self.pk, current_version=self.version, new_version=pending_version)
|
||||
start_time = time.monotonic()
|
||||
|
||||
try:
|
||||
start_time = time.time()
|
||||
|
||||
logger.info(
|
||||
"cohort_calculation_started", id=self.pk, current_version=self.version, new_version=pending_version
|
||||
)
|
||||
|
||||
count = recalculate_cohortpeople(self)
|
||||
self.calculate_people(new_version=pending_version)
|
||||
|
||||
@ -151,16 +149,27 @@ class Cohort(models.Model):
|
||||
|
||||
self.last_calculation = timezone.now()
|
||||
self.errors_calculating = 0
|
||||
logger.info(
|
||||
"cohort_calculation_completed", id=self.pk, version=pending_version, duration=(time.time() - start_time)
|
||||
)
|
||||
except Exception as e:
|
||||
except Exception:
|
||||
self.errors_calculating = F("errors_calculating") + 1
|
||||
raise e
|
||||
logger.warning(
|
||||
"cohort_calculation_failed",
|
||||
id=self.pk,
|
||||
current_version=self.version,
|
||||
new_version=pending_version,
|
||||
exc_info=True,
|
||||
)
|
||||
raise
|
||||
finally:
|
||||
self.is_calculating = False
|
||||
self.save()
|
||||
|
||||
logger.info(
|
||||
"cohort_calculation_completed",
|
||||
id=self.pk,
|
||||
version=pending_version,
|
||||
duration=(time.monotonic() - start_time),
|
||||
)
|
||||
|
||||
def insert_users_by_list(self, items: List[str]) -> None:
|
||||
"""
|
||||
Items can be distinct_id or email
|
||||
|
Loading…
Reference in New Issue
Block a user