mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-21 13:39:22 +01:00
fix(data-warehouse): add naming convention (#25429)
This commit is contained in:
parent
fb92840378
commit
22d2aff23c
@ -11,7 +11,6 @@ import asyncio
|
||||
from posthog.settings.base_variables import TEST
|
||||
from structlog.typing import FilteringBoundLogger
|
||||
from dlt.common.libs.deltalake import get_delta_tables
|
||||
from dlt.common.normalizers.naming.snake_case import NamingConvention
|
||||
from dlt.load.exceptions import LoadClientJobRetry
|
||||
from dlt.sources import DltSource
|
||||
from deltalake.exceptions import DeltaError
|
||||
@ -104,9 +103,7 @@ class DataImportPipeline:
|
||||
job: ExternalDataJob = await get_external_data_job(job_id=self.inputs.run_id)
|
||||
schema: ExternalDataSchema = await aget_schema_by_id(self.inputs.schema_id, self.inputs.team_id)
|
||||
|
||||
normalized_schema_name = NamingConvention().normalize_identifier(schema.name)
|
||||
|
||||
prepare_s3_files_for_querying(job.folder_path(), normalized_schema_name, file_uris)
|
||||
prepare_s3_files_for_querying(job.folder_path(), schema.name, file_uris)
|
||||
|
||||
def _run(self) -> dict[str, int]:
|
||||
if self.refresh_dlt:
|
||||
|
@ -1,13 +1,16 @@
|
||||
from posthog.warehouse.s3 import get_s3_client
|
||||
from django.conf import settings
|
||||
from dlt.common.normalizers.naming.snake_case import NamingConvention
|
||||
|
||||
|
||||
def prepare_s3_files_for_querying(folder_path: str, table_name: str, file_uris: list[str]):
|
||||
s3 = get_s3_client()
|
||||
|
||||
normalized_table_name = NamingConvention().normalize_identifier(table_name)
|
||||
|
||||
s3_folder_for_job = f"{settings.BUCKET_URL}/{folder_path}"
|
||||
s3_folder_for_schema = f"{s3_folder_for_job}/{table_name}"
|
||||
s3_folder_for_querying = f"{s3_folder_for_job}/{table_name}__query"
|
||||
s3_folder_for_schema = f"{s3_folder_for_job}/{normalized_table_name}"
|
||||
s3_folder_for_querying = f"{s3_folder_for_job}/{normalized_table_name}__query"
|
||||
|
||||
if s3.exists(s3_folder_for_querying):
|
||||
s3.delete(s3_folder_for_querying, recursive=True)
|
||||
|
@ -330,6 +330,7 @@ async def materialize_model(model_label: str, team: Team) -> tuple[str, DeltaTab
|
||||
table.vacuum(retention_hours=24, enforce_retention_duration=False, dry_run=False)
|
||||
|
||||
file_uris = table.file_uris()
|
||||
|
||||
prepare_s3_files_for_querying(saved_query.folder_path, saved_query.name, file_uris)
|
||||
|
||||
key, delta_table = tables.popitem()
|
||||
|
Loading…
Reference in New Issue
Block a user