0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-21 13:39:22 +01:00

feat(data-warehouse): log entries on data warehouse backend (#23416)

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
Eric Duong 2024-07-16 12:52:10 -04:00 committed by GitHub
parent e92af80222
commit 0adb5d5857
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 161 additions and 2 deletions

View File

@ -198,7 +198,7 @@ def get_temporal_context() -> dict[str, str | int]:
log_source_id = workflow_id.split("-Backfill")[0]
log_source = "batch_exports_backfill"
elif workflow_type == "external-data-job":
# This works because the WorkflowID is made up like f"{external_data_source_id}-{data_interval_end}"
# This works because the WorkflowID is made up like f"{external_data_schema_id}-{data_interval_end}"
log_source_id = workflow_id.rsplit("-", maxsplit=3)[0]
log_source = "external_data_jobs"
else:

View File

@ -11,6 +11,7 @@ from rest_framework.exceptions import ValidationError
from rest_framework.request import Request
from rest_framework.response import Response
from posthog.hogql.database.database import create_hogql_database
from posthog.api.log_entries import LogEntryMixin
from posthog.warehouse.data_load.service import (
external_data_workflow_exists,
@ -166,13 +167,14 @@ class SimpleExternalDataSchemaSerializer(serializers.ModelSerializer):
fields = ["id", "name", "should_sync", "last_synced_at"]
class ExternalDataSchemaViewset(TeamAndOrgViewSetMixin, viewsets.ModelViewSet):
class ExternalDataSchemaViewset(TeamAndOrgViewSetMixin, LogEntryMixin, viewsets.ModelViewSet):
scope_object = "INTERNAL"
queryset = ExternalDataSchema.objects.all()
serializer_class = ExternalDataSchemaSerializer
filter_backends = [filters.SearchFilter]
search_fields = ["name"]
ordering = "-created_at"
log_source = "external_data_jobs"
def get_serializer_context(self) -> dict[str, Any]:
context = super().get_serializer_context()

View File

@ -0,0 +1,157 @@
import datetime as dt
import pytest
from posthog.api.test.test_organization import create_organization
from posthog.api.test.test_team import create_team
from posthog.api.test.test_user import create_user
from posthog.client import sync_execute
from django.test.client import Client as TestClient
from posthog.warehouse.models import (
ExternalDataSchema,
ExternalDataJob,
ExternalDataSource,
DataWarehouseTable,
DataWarehouseCredential,
)
from posthog.utils import encode_get_request_params
def create_external_data_job_log_entry(
*,
team_id: int,
external_data_schema_id: str,
run_id: str | None,
message: str,
level: str,
):
from posthog.clickhouse.log_entries import INSERT_LOG_ENTRY_SQL
sync_execute(
INSERT_LOG_ENTRY_SQL,
{
"team_id": team_id,
"log_source": "external_data_jobs",
"log_source_id": external_data_schema_id,
"instance_id": run_id,
"timestamp": dt.datetime.now(dt.UTC).strftime("%Y-%m-%d %H:%M:%S.%f"),
"level": level,
"message": message,
},
)
@pytest.fixture
def organization():
organization = create_organization("Test Org")
yield organization
organization.delete()
@pytest.fixture
def team(organization):
team = create_team(organization)
yield team
team.delete()
@pytest.fixture
def external_data_resources(client, organization, team):
user = create_user("test@user.com", "Test User", organization)
client.force_login(user)
source = ExternalDataSource.objects.create(
team=team,
source_id="source_id",
connection_id="connection_id",
status=ExternalDataSource.Status.COMPLETED,
source_type=ExternalDataSource.Type.STRIPE,
)
credentials = DataWarehouseCredential.objects.create(access_key="blah", access_secret="blah", team=team)
warehouse_table = DataWarehouseTable.objects.create(
name="table_1",
format="Parquet",
team=team,
external_data_source=source,
external_data_source_id=source.id,
credential=credentials,
url_pattern="https://bucket.s3/data/*",
columns={"id": {"hogql": "StringDatabaseField", "clickhouse": "Nullable(String)", "schema_valid": True}},
)
schema = ExternalDataSchema.objects.create(
team=team,
name="table_1",
source=source,
table=warehouse_table,
should_sync=True,
last_synced_at="2024-01-01",
# No status but should be completed because a data warehouse table already exists
)
job = ExternalDataJob.objects.create(
pipeline=source, schema=schema, workflow_id="fake_workflow_id", team=team, status="Running", rows_synced=100000
)
return {
"source": source,
"schema": schema,
"job": job,
}
def get_external_data_schema_run_log_entries(client: TestClient, team_id: int, external_data_schema_id: str, **extra):
return client.get(
f"/api/projects/{team_id}/external_data_schemas/{external_data_schema_id}/logs",
data=encode_get_request_params(extra),
)
@pytest.mark.django_db
def test_external_data_schema_log_api_with_level_filter(client, external_data_resources, team):
"""Test fetching batch export run log entries using the API."""
run_id = external_data_resources["job"].pk
schema_id = external_data_resources["schema"].pk
create_external_data_job_log_entry(
team_id=team.pk,
external_data_schema_id=schema_id,
run_id=run_id,
message="Test log. Much INFO.",
level="INFO",
)
create_external_data_job_log_entry(
team_id=team.pk,
external_data_schema_id=schema_id,
run_id="fake_workflow_id",
message="Test log. Much INFO.",
level="INFO",
)
create_external_data_job_log_entry(
team_id=team.pk,
external_data_schema_id=schema_id,
run_id=run_id,
message="Test log. Much DEBUG.",
level="DEBUG",
)
response = get_external_data_schema_run_log_entries(
client,
team_id=team.pk,
external_data_schema_id=schema_id,
level="INFO",
instance_id=run_id,
)
json_response = response.json()
results = json_response["results"]
assert response.status_code == 200
assert json_response["count"] == 1
assert len(results) == 1
assert results[0]["message"] == "Test log. Much INFO."
assert results[0]["level"] == "INFO"
assert results[0]["log_source_id"] == str(schema_id)