diff --git a/mypy-baseline.txt b/mypy-baseline.txt index 02effbb24a3..3846c41a764 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -801,6 +801,8 @@ posthog/temporal/tests/batch_exports/test_batch_exports.py:0: error: TypedDict k posthog/temporal/data_modeling/run_workflow.py:0: error: Dict entry 20 has incompatible type "str": "Literal['complex']"; expected "str": "Literal['text', 'double', 'bool', 'timestamp', 'bigint', 'binary', 'json', 'decimal', 'wei', 'date', 'time']" [dict-item] posthog/temporal/data_modeling/run_workflow.py:0: error: Dict entry 21 has incompatible type "str": "Literal['complex']"; expected "str": "Literal['text', 'double', 'bool', 'timestamp', 'bigint', 'binary', 'json', 'decimal', 'wei', 'date', 'time']" [dict-item] posthog/temporal/data_modeling/run_workflow.py:0: error: Dict entry 22 has incompatible type "str": "Literal['complex']"; expected "str": "Literal['text', 'double', 'bool', 'timestamp', 'bigint', 'binary', 'json', 'decimal', 'wei', 'date', 'time']" [dict-item] +posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: "FilesystemDestinationClientConfiguration" has no attribute "delta_jobs_per_write" [attr-defined] +posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: "type[FilesystemDestinationClientConfiguration]" has no attribute "delta_jobs_per_write" [attr-defined] posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Incompatible types in assignment (expression has type "object", variable has type "DataWarehouseCredential | Combinable | None") [assignment] posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Incompatible types in assignment (expression has type "object", variable has type "str | int | Combinable") [assignment] posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Incompatible types in assignment (expression has type "dict[str, dict[str, str | bool]] | dict[str, str]", variable has type "dict[str, dict[str, str]]") [assignment] diff --git a/posthog/temporal/data_imports/pipelines/pipeline_sync.py b/posthog/temporal/data_imports/pipelines/pipeline_sync.py index 1994cb48a5d..05ccedf2fdd 100644 --- a/posthog/temporal/data_imports/pipelines/pipeline_sync.py +++ b/posthog/temporal/data_imports/pipelines/pipeline_sync.py @@ -1,11 +1,13 @@ from dataclasses import dataclass -from typing import Literal +from typing import Any, Literal, Optional +from collections.abc import Iterator, Sequence import uuid import dlt from django.conf import settings from django.db.models import Prefetch from dlt.pipeline.exceptions import PipelineStepFailed +from deltalake import DeltaTable from posthog.settings.base_variables import TEST from structlog.typing import FilteringBoundLogger @@ -14,6 +16,21 @@ from dlt.common.normalizers.naming.snake_case import NamingConvention from dlt.common.schema.typing import TSchemaTables from dlt.load.exceptions import LoadClientJobRetry from dlt.sources import DltSource +from dlt.destinations.impl.filesystem.filesystem import FilesystemClient +from dlt.destinations.impl.filesystem.configuration import FilesystemDestinationClientConfiguration +from dlt.common.destination.reference import ( + FollowupJobRequest, +) +from dlt.common.destination.typing import ( + PreparedTableSchema, +) +from dlt.destinations.job_impl import ( + ReferenceFollowupJobRequest, +) +from dlt.common.storages import FileStorage +from dlt.common.storages.load_package import ( + LoadJobInfo, +) from deltalake.exceptions import DeltaError from collections import Counter from clickhouse_driver.errors import ServerException @@ -25,7 +42,7 @@ from posthog.warehouse.models.external_data_job import ExternalDataJob from posthog.warehouse.models.external_data_schema import ExternalDataSchema from posthog.warehouse.models.external_data_source import ExternalDataSource from posthog.warehouse.models.table import DataWarehouseTable -from posthog.temporal.data_imports.util import prepare_s3_files_for_querying +from posthog.temporal.data_imports.util import is_posthog_team, prepare_s3_files_for_querying @dataclass @@ -100,51 +117,54 @@ class DataImportPipelineSync: pipeline_name = self._get_pipeline_name() destination = self._get_destination() - # def create_table_chain_completed_followup_jobs( - # self: FilesystemClient, - # table_chain: Sequence[PreparedTableSchema], - # completed_table_chain_jobs: Optional[Sequence[LoadJobInfo]] = None, - # ) -> list[FollowupJobRequest]: - # assert completed_table_chain_jobs is not None - # jobs = super(FilesystemClient, self).create_table_chain_completed_followup_jobs( - # table_chain, completed_table_chain_jobs - # ) - # if table_chain[0].get("table_format") == "delta": - # for table in table_chain: - # table_job_paths = [ - # job.file_path - # for job in completed_table_chain_jobs - # if job.job_file_info.table_name == table["name"] - # ] - # if len(table_job_paths) == 0: - # # file_name = ParsedLoadJobFileName(table["name"], "empty", 0, "reference").file_name() - # # TODO: if we implement removal od orphaned rows, we may need to propagate such job without files - # # to the delta load job - # pass - # else: - # files_per_job = self.config.delta_jobs_per_write or len(table_job_paths) - # for i in range(0, len(table_job_paths), files_per_job): - # jobs_chunk = table_job_paths[i : i + files_per_job] - # file_name = FileStorage.get_file_name_from_file_path(jobs_chunk[0]) - # jobs.append(ReferenceFollowupJobRequest(file_name, jobs_chunk)) + def create_table_chain_completed_followup_jobs( + self: FilesystemClient, + table_chain: Sequence[PreparedTableSchema], + completed_table_chain_jobs: Optional[Sequence[LoadJobInfo]] = None, + ) -> list[FollowupJobRequest]: + assert completed_table_chain_jobs is not None + jobs = super(FilesystemClient, self).create_table_chain_completed_followup_jobs( + table_chain, completed_table_chain_jobs + ) + if table_chain[0].get("table_format") == "delta": + for table in table_chain: + table_job_paths = [ + job.file_path + for job in completed_table_chain_jobs + if job.job_file_info.table_name == table["name"] + ] + if len(table_job_paths) == 0: + # file_name = ParsedLoadJobFileName(table["name"], "empty", 0, "reference").file_name() + # TODO: if we implement removal od orphaned rows, we may need to propagate such job without files + # to the delta load job + pass + else: + files_per_job = self.config.delta_jobs_per_write or len(table_job_paths) + for i in range(0, len(table_job_paths), files_per_job): + jobs_chunk = table_job_paths[i : i + files_per_job] + file_name = FileStorage.get_file_name_from_file_path(jobs_chunk[0]) + jobs.append(ReferenceFollowupJobRequest(file_name, jobs_chunk)) - # return jobs + return jobs - # def _iter_chunks(self, lst: list[Any], n: int) -> Iterator[list[Any]]: - # """Yield successive n-sized chunks from lst.""" - # for i in range(0, len(lst), n): - # yield lst[i : i + n] + def _iter_chunks(self, lst: list[Any], n: int) -> Iterator[list[Any]]: + """Yield successive n-sized chunks from lst.""" + for i in range(0, len(lst), n): + yield lst[i : i + n] # Monkey patch to fix large memory consumption until https://github.com/dlt-hub/dlt/pull/2031 gets merged in - # if self._incremental or is_posthog_team(self.inputs.team_id): - # FilesystemDestinationClientConfiguration.delta_jobs_per_write = 1 - # FilesystemClient.create_table_chain_completed_followup_jobs = create_table_chain_completed_followup_jobs # type: ignore - # FilesystemClient._iter_chunks = _iter_chunks # type: ignore + if ( + is_posthog_team(self.inputs.team_id) + and str(self.inputs.source_id) == "01932521-63e7-0000-087c-527af4a2bc4d" # toms test source in prod + ): + FilesystemDestinationClientConfiguration.delta_jobs_per_write = 1 + FilesystemClient.create_table_chain_completed_followup_jobs = create_table_chain_completed_followup_jobs # type: ignore + FilesystemClient._iter_chunks = _iter_chunks # type: ignore - # dlt.config["data_writer.file_max_items"] = 500_000 - # dlt.config["data_writer.file_max_bytes"] = 500_000_000 # 500 MB - # dlt.config["loader_parallelism_strategy"] = "table-sequential" - # dlt.config["delta_jobs_per_write"] = 1 + dlt.config["data_writer.file_max_items"] = 500_000 + dlt.config["data_writer.file_max_bytes"] = 500_000_000 # 500 MB + dlt.config["loader_parallelism_strategy"] = "table-sequential" + dlt.config["delta_jobs_per_write"] = 1 dlt.config["normalize.parquet_normalizer.add_dlt_load_id"] = True dlt.config["normalize.parquet_normalizer.add_dlt_id"] = True @@ -173,21 +193,22 @@ class DataImportPipelineSync: pipeline = self._create_pipeline() # Workaround for full refresh schemas while we wait for Rust to fix memory issue - # if is_posthog_team(self.inputs.team_id): - # for name, resource in self.source._resources.items(): - # if resource.write_disposition == "replace": - # try: - # delta_uri = f"{settings.BUCKET_URL}/{self.inputs.dataset_name}/{name}" - # delta_table = DeltaTable(delta_uri, storage_options=self._get_credentials()) - # except TableNotFoundError: - # delta_table = None + if ( + is_posthog_team(self.inputs.team_id) + and str(self.inputs.source_id) == "01932521-63e7-0000-087c-527af4a2bc4d" # toms test source in prod + ): + for name, resource in self.source._resources.items(): + if resource.write_disposition == "replace": + delta_uri = f"{settings.BUCKET_URL}/{self.inputs.dataset_name}/{name}" + storage_options = self._get_credentials() - # if delta_table: - # self.logger.debug("Deleting existing delta table") - # delta_table.delete() + if DeltaTable.is_deltatable(delta_uri, storage_options): + delta_table = DeltaTable(delta_uri, storage_options=self._get_credentials()) + self.logger.debug("Deleting existing delta table") + delta_table.delete() - # self.logger.debug("Updating table write_disposition to append") - # resource.apply_hints(write_disposition="append") + self.logger.debug("Updating table write_disposition to append") + resource.apply_hints(write_disposition="append") total_counts: Counter[str] = Counter({}) diff --git a/posthog/temporal/data_imports/pipelines/test/test_pipeline_sync.py b/posthog/temporal/data_imports/pipelines/test/test_pipeline_sync.py index 3b265f54a35..04f5dedadd2 100644 --- a/posthog/temporal/data_imports/pipelines/test/test_pipeline_sync.py +++ b/posthog/temporal/data_imports/pipelines/test/test_pipeline_sync.py @@ -77,6 +77,7 @@ class TestDataImportPipeline(APIBaseTest): ) as mock_validate_schema_and_update_table, patch("posthog.temporal.data_imports.pipelines.pipeline_sync.get_delta_tables"), patch("posthog.temporal.data_imports.pipelines.pipeline_sync.update_last_synced_at_sync"), + patch("posthog.temporal.data_imports.pipelines.pipeline_sync.is_posthog_team", return_value=False), ): pipeline = self._create_pipeline("Customer", False) res = pipeline.run() @@ -98,6 +99,7 @@ class TestDataImportPipeline(APIBaseTest): ) as mock_validate_schema_and_update_table, patch("posthog.temporal.data_imports.pipelines.pipeline_sync.get_delta_tables"), patch("posthog.temporal.data_imports.pipelines.pipeline_sync.update_last_synced_at_sync"), + patch("posthog.temporal.data_imports.pipelines.pipeline_sync.is_posthog_team", return_value=False), ): pipeline = self._create_pipeline("Customer", True) res = pipeline.run() diff --git a/posthog/temporal/tests/data_imports/test_end_to_end.py b/posthog/temporal/tests/data_imports/test_end_to_end.py index cb29cbafa5d..3bf744084fc 100644 --- a/posthog/temporal/tests/data_imports/test_end_to_end.py +++ b/posthog/temporal/tests/data_imports/test_end_to_end.py @@ -202,6 +202,7 @@ async def _execute_run(workflow_id: str, inputs: ExternalDataWorkflowInputs, moc ), mock.patch.object(AwsCredentials, "to_session_credentials", mock_to_session_credentials), mock.patch.object(AwsCredentials, "to_object_store_rs_credentials", mock_to_object_store_rs_credentials), + mock.patch("posthog.temporal.data_imports.pipelines.pipeline_sync.is_posthog_team", return_value=False), ): async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker(