From 0adb5d5857b7407800d55e4f7941b656c92be592 Mon Sep 17 00:00:00 2001 From: Eric Duong Date: Tue, 16 Jul 2024 12:52:10 -0400 Subject: [PATCH] feat(data-warehouse): log entries on data warehouse backend (#23416) Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> --- posthog/temporal/common/logger.py | 2 +- posthog/warehouse/api/external_data_schema.py | 4 +- posthog/warehouse/api/test/test_log_entry.py | 157 ++++++++++++++++++ 3 files changed, 161 insertions(+), 2 deletions(-) create mode 100644 posthog/warehouse/api/test/test_log_entry.py diff --git a/posthog/temporal/common/logger.py b/posthog/temporal/common/logger.py index 2291232ecad..c769116921f 100644 --- a/posthog/temporal/common/logger.py +++ b/posthog/temporal/common/logger.py @@ -198,7 +198,7 @@ def get_temporal_context() -> dict[str, str | int]: log_source_id = workflow_id.split("-Backfill")[0] log_source = "batch_exports_backfill" elif workflow_type == "external-data-job": - # This works because the WorkflowID is made up like f"{external_data_source_id}-{data_interval_end}" + # This works because the WorkflowID is made up like f"{external_data_schema_id}-{data_interval_end}" log_source_id = workflow_id.rsplit("-", maxsplit=3)[0] log_source = "external_data_jobs" else: diff --git a/posthog/warehouse/api/external_data_schema.py b/posthog/warehouse/api/external_data_schema.py index 0ee95d8f6bd..eaba69507f3 100644 --- a/posthog/warehouse/api/external_data_schema.py +++ b/posthog/warehouse/api/external_data_schema.py @@ -11,6 +11,7 @@ from rest_framework.exceptions import ValidationError from rest_framework.request import Request from rest_framework.response import Response from posthog.hogql.database.database import create_hogql_database +from posthog.api.log_entries import LogEntryMixin from posthog.warehouse.data_load.service import ( external_data_workflow_exists, @@ -166,13 +167,14 @@ class SimpleExternalDataSchemaSerializer(serializers.ModelSerializer): fields = ["id", "name", "should_sync", "last_synced_at"] -class ExternalDataSchemaViewset(TeamAndOrgViewSetMixin, viewsets.ModelViewSet): +class ExternalDataSchemaViewset(TeamAndOrgViewSetMixin, LogEntryMixin, viewsets.ModelViewSet): scope_object = "INTERNAL" queryset = ExternalDataSchema.objects.all() serializer_class = ExternalDataSchemaSerializer filter_backends = [filters.SearchFilter] search_fields = ["name"] ordering = "-created_at" + log_source = "external_data_jobs" def get_serializer_context(self) -> dict[str, Any]: context = super().get_serializer_context() diff --git a/posthog/warehouse/api/test/test_log_entry.py b/posthog/warehouse/api/test/test_log_entry.py new file mode 100644 index 00000000000..c7ed98c572f --- /dev/null +++ b/posthog/warehouse/api/test/test_log_entry.py @@ -0,0 +1,157 @@ +import datetime as dt +import pytest + +from posthog.api.test.test_organization import create_organization +from posthog.api.test.test_team import create_team +from posthog.api.test.test_user import create_user +from posthog.client import sync_execute +from django.test.client import Client as TestClient +from posthog.warehouse.models import ( + ExternalDataSchema, + ExternalDataJob, + ExternalDataSource, + DataWarehouseTable, + DataWarehouseCredential, +) +from posthog.utils import encode_get_request_params + + +def create_external_data_job_log_entry( + *, + team_id: int, + external_data_schema_id: str, + run_id: str | None, + message: str, + level: str, +): + from posthog.clickhouse.log_entries import INSERT_LOG_ENTRY_SQL + + sync_execute( + INSERT_LOG_ENTRY_SQL, + { + "team_id": team_id, + "log_source": "external_data_jobs", + "log_source_id": external_data_schema_id, + "instance_id": run_id, + "timestamp": dt.datetime.now(dt.UTC).strftime("%Y-%m-%d %H:%M:%S.%f"), + "level": level, + "message": message, + }, + ) + + +@pytest.fixture +def organization(): + organization = create_organization("Test Org") + + yield organization + + organization.delete() + + +@pytest.fixture +def team(organization): + team = create_team(organization) + + yield team + + team.delete() + + +@pytest.fixture +def external_data_resources(client, organization, team): + user = create_user("test@user.com", "Test User", organization) + client.force_login(user) + + source = ExternalDataSource.objects.create( + team=team, + source_id="source_id", + connection_id="connection_id", + status=ExternalDataSource.Status.COMPLETED, + source_type=ExternalDataSource.Type.STRIPE, + ) + credentials = DataWarehouseCredential.objects.create(access_key="blah", access_secret="blah", team=team) + warehouse_table = DataWarehouseTable.objects.create( + name="table_1", + format="Parquet", + team=team, + external_data_source=source, + external_data_source_id=source.id, + credential=credentials, + url_pattern="https://bucket.s3/data/*", + columns={"id": {"hogql": "StringDatabaseField", "clickhouse": "Nullable(String)", "schema_valid": True}}, + ) + schema = ExternalDataSchema.objects.create( + team=team, + name="table_1", + source=source, + table=warehouse_table, + should_sync=True, + last_synced_at="2024-01-01", + # No status but should be completed because a data warehouse table already exists + ) + job = ExternalDataJob.objects.create( + pipeline=source, schema=schema, workflow_id="fake_workflow_id", team=team, status="Running", rows_synced=100000 + ) + + return { + "source": source, + "schema": schema, + "job": job, + } + + +def get_external_data_schema_run_log_entries(client: TestClient, team_id: int, external_data_schema_id: str, **extra): + return client.get( + f"/api/projects/{team_id}/external_data_schemas/{external_data_schema_id}/logs", + data=encode_get_request_params(extra), + ) + + +@pytest.mark.django_db +def test_external_data_schema_log_api_with_level_filter(client, external_data_resources, team): + """Test fetching batch export run log entries using the API.""" + run_id = external_data_resources["job"].pk + schema_id = external_data_resources["schema"].pk + + create_external_data_job_log_entry( + team_id=team.pk, + external_data_schema_id=schema_id, + run_id=run_id, + message="Test log. Much INFO.", + level="INFO", + ) + + create_external_data_job_log_entry( + team_id=team.pk, + external_data_schema_id=schema_id, + run_id="fake_workflow_id", + message="Test log. Much INFO.", + level="INFO", + ) + + create_external_data_job_log_entry( + team_id=team.pk, + external_data_schema_id=schema_id, + run_id=run_id, + message="Test log. Much DEBUG.", + level="DEBUG", + ) + + response = get_external_data_schema_run_log_entries( + client, + team_id=team.pk, + external_data_schema_id=schema_id, + level="INFO", + instance_id=run_id, + ) + + json_response = response.json() + results = json_response["results"] + + assert response.status_code == 200 + assert json_response["count"] == 1 + assert len(results) == 1 + assert results[0]["message"] == "Test log. Much INFO." + assert results[0]["level"] == "INFO" + assert results[0]["log_source_id"] == str(schema_id)