mirror of
https://github.com/PostHog/posthog.git
synced 2024-12-01 12:21:02 +01:00
fix: write out async deletion verification in chunks (#20697)
This commit is contained in:
parent
3df7e87186
commit
10e6003867
@ -11,7 +11,8 @@ logger = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
class AsyncDeletionProcess(ABC):
|
||||
CLICKHOUSE_CHUNK_SIZE = 100_000
|
||||
CLICKHOUSE_MUTATION_CHUNK_SIZE = 1_000_000
|
||||
CLICKHOUSE_VERIFY_CHUNK_SIZE = 1_000
|
||||
DELETION_TYPES: List[DeletionType] = []
|
||||
|
||||
def __init__(self) -> None:
|
||||
@ -21,31 +22,31 @@ class AsyncDeletionProcess(ABC):
|
||||
queued_deletions = list(
|
||||
AsyncDeletion.objects.filter(delete_verified_at__isnull=True, deletion_type__in=self.DELETION_TYPES)
|
||||
)
|
||||
for i in range(0, len(queued_deletions), self.CLICKHOUSE_CHUNK_SIZE):
|
||||
chunk = queued_deletions[i : i + self.CLICKHOUSE_CHUNK_SIZE]
|
||||
for i in range(0, len(queued_deletions), self.CLICKHOUSE_MUTATION_CHUNK_SIZE):
|
||||
chunk = queued_deletions[i : i + self.CLICKHOUSE_MUTATION_CHUNK_SIZE]
|
||||
self.process(chunk)
|
||||
|
||||
def mark_deletions_done(self):
|
||||
"""
|
||||
Checks and updates `delete_verified_at` for deletions
|
||||
"""
|
||||
to_verify = []
|
||||
unverified = self._fetch_unverified_deletions_grouped()
|
||||
|
||||
for (deletion_type, _), async_deletions in unverified.items():
|
||||
for i in range(0, len(async_deletions), self.CLICKHOUSE_CHUNK_SIZE):
|
||||
chunk = async_deletions[i : i + self.CLICKHOUSE_CHUNK_SIZE]
|
||||
to_verify.extend(self._verify_by_group(deletion_type, chunk))
|
||||
|
||||
if len(to_verify) > 0:
|
||||
AsyncDeletion.objects.filter(pk__in=[row.pk for row in to_verify]).update(delete_verified_at=timezone.now())
|
||||
logger.warn(
|
||||
"Updated `delete_verified_at` for AsyncDeletion",
|
||||
{
|
||||
"count": len(to_verify),
|
||||
"team_ids": list(set(row.team_id for row in to_verify)),
|
||||
},
|
||||
)
|
||||
for i in range(0, len(async_deletions), self.CLICKHOUSE_VERIFY_CHUNK_SIZE):
|
||||
chunk = async_deletions[i : i + self.CLICKHOUSE_VERIFY_CHUNK_SIZE]
|
||||
to_verify = self._verify_by_group(deletion_type, chunk)
|
||||
if len(to_verify) > 0:
|
||||
AsyncDeletion.objects.filter(pk__in=[row.pk for row in to_verify]).update(
|
||||
delete_verified_at=timezone.now()
|
||||
)
|
||||
logger.warn(
|
||||
"Updated `delete_verified_at` for AsyncDeletion",
|
||||
{
|
||||
"count": len(to_verify),
|
||||
"team_ids": list(set(row.team_id for row in to_verify)),
|
||||
},
|
||||
)
|
||||
|
||||
def _fetch_unverified_deletions_grouped(self):
|
||||
result = defaultdict(list)
|
||||
|
@ -439,12 +439,28 @@ def clickhouse_clear_removed_data() -> None:
|
||||
from posthog.models.async_deletion.delete_events import AsyncEventDeletion
|
||||
|
||||
runner = AsyncEventDeletion()
|
||||
runner.mark_deletions_done()
|
||||
runner.run()
|
||||
|
||||
try:
|
||||
runner.mark_deletions_done()
|
||||
except Exception as e:
|
||||
logger.error("Failed to mark deletions done", error=e, exc_info=True)
|
||||
|
||||
try:
|
||||
runner.run()
|
||||
except Exception as e:
|
||||
logger.error("Failed to run deletions", error=e, exc_info=True)
|
||||
|
||||
cohort_runner = AsyncCohortDeletion()
|
||||
cohort_runner.mark_deletions_done()
|
||||
cohort_runner.run()
|
||||
|
||||
try:
|
||||
cohort_runner.mark_deletions_done()
|
||||
except Exception as e:
|
||||
logger.error("Failed to mark cohort deletions done", error=e, exc_info=True)
|
||||
|
||||
try:
|
||||
cohort_runner.run()
|
||||
except Exception as e:
|
||||
logger.error("Failed to run cohort deletions", error=e, exc_info=True)
|
||||
|
||||
|
||||
@shared_task(ignore_result=True)
|
||||
|
Loading…
Reference in New Issue
Block a user