diff --git a/posthog/temporal/data_imports/pipelines/pipeline.py b/posthog/temporal/data_imports/pipelines/pipeline.py index 161b90ef2c7..9ce64aa97ac 100644 --- a/posthog/temporal/data_imports/pipelines/pipeline.py +++ b/posthog/temporal/data_imports/pipelines/pipeline.py @@ -21,6 +21,7 @@ from posthog.warehouse.data_load.validate_schema import validate_schema_and_upda from posthog.warehouse.models.external_data_job import ExternalDataJob, get_external_data_job from posthog.warehouse.models.external_data_schema import ExternalDataSchema, aget_schema_by_id from posthog.warehouse.models.external_data_source import ExternalDataSource +from posthog.warehouse.models.table import DataWarehouseTable from posthog.warehouse.s3 import get_s3_client @@ -160,6 +161,12 @@ class DataImportPipeline: if total_counts.total() > 0: delta_tables = get_delta_tables(pipeline) + table_format = DataWarehouseTable.TableFormat.DeltaS3Wrapper + + # Workaround while we fix msising table_format on DLT resource + if len(delta_tables.values()) == 0: + table_format = DataWarehouseTable.TableFormat.Delta + # There should only ever be one table here for table in delta_tables.values(): table.optimize.compact() @@ -174,6 +181,7 @@ class DataImportPipeline: schema_id=self.inputs.schema_id, table_schema=self.source.schema.tables, row_count=total_counts.total(), + table_format=table_format, ) pipeline_runs = pipeline_runs + 1 @@ -208,6 +216,12 @@ class DataImportPipeline: if total_counts.total() > 0: delta_tables = get_delta_tables(pipeline) + table_format = DataWarehouseTable.TableFormat.DeltaS3Wrapper + + # Workaround while we fix msising table_format on DLT resource + if len(delta_tables.values()) == 0: + table_format = DataWarehouseTable.TableFormat.Delta + # There should only ever be one table here for table in delta_tables.values(): table.optimize.compact() @@ -222,6 +236,7 @@ class DataImportPipeline: schema_id=self.inputs.schema_id, table_schema=self.source.schema.tables, row_count=total_counts.total(), + table_format=table_format, ) # Delete local state from the file system diff --git a/posthog/warehouse/data_load/validate_schema.py b/posthog/warehouse/data_load/validate_schema.py index 4864f761d64..e1b3203a16e 100644 --- a/posthog/warehouse/data_load/validate_schema.py +++ b/posthog/warehouse/data_load/validate_schema.py @@ -72,6 +72,7 @@ async def validate_schema_and_update_table( schema_id: uuid.UUID, table_schema: TSchemaTables, row_count: int, + table_format: DataWarehouseTable.TableFormat, ) -> None: """ @@ -117,7 +118,7 @@ async def validate_schema_and_update_table( table_params = { "credential": credential, "name": table_name, - "format": DataWarehouseTable.TableFormat.DeltaS3Wrapper, + "format": table_format, "url_pattern": new_url_pattern, "team_id": team_id, "row_count": row_count,