diff --git a/frontend/public/services/chargebee.png b/frontend/public/services/chargebee.png new file mode 100644 index 00000000000..eaf01aaa768 Binary files /dev/null and b/frontend/public/services/chargebee.png differ diff --git a/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx b/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx index 95a525987b8..4656a338d63 100644 --- a/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx +++ b/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx @@ -628,6 +628,26 @@ export const SOURCE_DETAILS: Record = { ], caption: '', }, + Chargebee: { + name: 'Chargebee', + fields: [ + { + name: 'api_key', + label: 'API key', + type: 'text', + required: true, + placeholder: '', + }, + { + type: 'text', + name: 'site_name', + label: 'Site name (subdomain)', + required: true, + placeholder: '', + }, + ], + caption: '', + }, } export const buildKeaFormDefaultFromSourceDetails = ( diff --git a/frontend/src/scenes/data-warehouse/settings/DataWarehouseManagedSourcesTable.tsx b/frontend/src/scenes/data-warehouse/settings/DataWarehouseManagedSourcesTable.tsx index 8a95510bac4..9d9f7265dbd 100644 --- a/frontend/src/scenes/data-warehouse/settings/DataWarehouseManagedSourcesTable.tsx +++ b/frontend/src/scenes/data-warehouse/settings/DataWarehouseManagedSourcesTable.tsx @@ -6,6 +6,7 @@ import { LemonTableLink } from 'lib/lemon-ui/LemonTable/LemonTableLink' import IconAwsS3 from 'public/services/aws-s3.png' import Iconazure from 'public/services/azure.png' import IconBigQuery from 'public/services/bigquery.png' +import IconChargebee from 'public/services/chargebee.png' import IconCloudflare from 'public/services/cloudflare.png' import IconGoogleCloudStorage from 'public/services/google-cloud-storage.png' import IconHubspot from 'public/services/hubspot.png' @@ -193,6 +194,7 @@ export function RenderDataWarehouseSourceIcon({ MSSQL: IconMSSQL, Vitally: IconVitally, BigQuery: IconBigQuery, + Chargebee: IconChargebee, }[type] return ( diff --git a/frontend/src/types.ts b/frontend/src/types.ts index 846b63401d2..ec291c496f8 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -4057,6 +4057,7 @@ export const externalDataSources = [ 'Salesforce', 'Vitally', 'BigQuery', + 'Chargebee', ] as const export type ExternalDataSourceType = (typeof externalDataSources)[number] diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 23c51f37f70..07739d4d910 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0016_rolemembership_organization_member otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0506_productintent_activated_at_and_more +posthog: 0507_alter_externaldatasource_source_type sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/posthog/migrations/0507_alter_externaldatasource_source_type.py b/posthog/migrations/0507_alter_externaldatasource_source_type.py new file mode 100644 index 00000000000..4e84cc9ee4a --- /dev/null +++ b/posthog/migrations/0507_alter_externaldatasource_source_type.py @@ -0,0 +1,32 @@ +# Generated by Django 4.2.15 on 2024-11-01 16:52 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0506_productintent_activated_at_and_more"), + ] + + operations = [ + migrations.AlterField( + model_name="externaldatasource", + name="source_type", + field=models.CharField( + choices=[ + ("Stripe", "Stripe"), + ("Hubspot", "Hubspot"), + ("Postgres", "Postgres"), + ("Zendesk", "Zendesk"), + ("Snowflake", "Snowflake"), + ("Salesforce", "Salesforce"), + ("MySQL", "MySQL"), + ("MSSQL", "MSSQL"), + ("Vitally", "Vitally"), + ("BigQuery", "BigQuery"), + ("Chargebee", "Chargebee"), + ], + max_length=128, + ), + ), + ] diff --git a/posthog/temporal/data_imports/pipelines/chargebee/__init__.py b/posthog/temporal/data_imports/pipelines/chargebee/__init__.py new file mode 100644 index 00000000000..245afb6e5d8 --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/chargebee/__init__.py @@ -0,0 +1,254 @@ +import base64 +from typing import Any, Optional + +import dlt +import requests +from dlt.sources.helpers.requests import Request, Response +from dlt.sources.helpers.rest_client.paginators import BasePaginator + +from posthog.temporal.data_imports.pipelines.rest_source import ( + RESTAPIConfig, + rest_api_resources, +) +from posthog.temporal.data_imports.pipelines.rest_source.typing import EndpointResource + + +def get_resource(name: str, is_incremental: bool) -> EndpointResource: + resources: dict[str, EndpointResource] = { + "Customers": { + "name": "Customers", + "table_name": "customers", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "list[*].customer", + "path": "/v2/customers", + "params": { + # the parameters below can optionally be configured + "updated_at[after]": { + "type": "incremental", + "cursor_path": "updated_at", + "initial_value": 0, # type: ignore + } + if is_incremental + else None, + "limit": 100, + # by default, API does not return deleted resources + "include_deleted": "true", + }, + }, + "table_format": "delta", + }, + # Note: it is possible to filter by event type, but for now we're + # fetching all events + "Events": { + "name": "Events", + "table_name": "events", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "list[*].event", + "path": "/v2/events", + "params": { + # the parameters below can optionally be configured + "occurred_at[after]": { + "type": "incremental", + "cursor_path": "occurred_at", + "initial_value": 0, # type: ignore + } + if is_incremental + else None, + "limit": 100, + }, + }, + "table_format": "delta", + }, + "Invoices": { + "name": "Invoices", + "table_name": "invoices", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "list[*].invoice", + "path": "/v2/invoices", + "params": { + # the parameters below can optionally be configured + "updated_at[after]": { + "type": "incremental", + "cursor_path": "updated_at", + "initial_value": 0, # type: ignore + } + if is_incremental + else None, + "limit": 100, + # by default, API does not return deleted resources + "include_deleted": "true", + }, + }, + "table_format": "delta", + }, + "Orders": { + "name": "Orders", + "table_name": "orders", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "list[*].order", + "path": "/v2/orders", + "params": { + # the parameters below can optionally be configured + "updated_at[after]": { + "type": "incremental", + "cursor_path": "updated_at", + "initial_value": 0, # type: ignore + } + if is_incremental + else None, + "limit": 100, + # by default, API does not return deleted resources + "include_deleted": "true", + }, + }, + "table_format": "delta", + }, + "Subscriptions": { + "name": "Subscriptions", + "table_name": "subscriptions", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "list[*].subscription", + "path": "/v2/subscriptions", + "params": { + # the parameters below can optionally be configured + "updated_at[after]": { + "type": "incremental", + "cursor_path": "updated_at", + "initial_value": 0, # type: ignore + } + if is_incremental + else None, + "limit": 100, + # by default, API does not return deleted resources + "include_deleted": "true", + }, + }, + "table_format": "delta", + }, + "Transactions": { + "name": "Transactions", + "table_name": "transactions", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "list[*].transaction", + "path": "/v2/transactions", + "params": { + # the parameters below can optionally be configured + "updated_at[after]": { + "type": "incremental", + "cursor_path": "updated_at", + "initial_value": 0, # type: ignore + } + if is_incremental + else None, + "limit": 100, + # by default, API does not return deleted resources + "include_deleted": "true", + }, + }, + "table_format": "delta", + }, + } + return resources[name] + + +class ChargebeePaginator(BasePaginator): + def update_state(self, response: Response, data: Optional[list[Any]] = None) -> None: + res = response.json() + + self._next_offset = None + + if not res: + self._has_next_page = False + return + + if "next_offset" in res: + self._has_next_page = True + self._next_offset = res["next_offset"] + else: + self._has_next_page = False + + def update_request(self, request: Request) -> None: + if request.params is None: + request.params = {} + + request.params["offset"] = self._next_offset + + +@dlt.source(max_table_nesting=0) +def chargebee_source( + api_key: str, site_name: str, endpoint: str, team_id: int, job_id: str, is_incremental: bool = False +): + config: RESTAPIConfig = { + "client": { + "base_url": f"https://{site_name}.chargebee.com/api", + "auth": { + "type": "http_basic", + "username": api_key, + "password": "", + }, + "paginator": ChargebeePaginator(), + }, + "resource_defaults": { + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + }, + "resources": [get_resource(endpoint, is_incremental)], + } + + yield from rest_api_resources(config, team_id, job_id) + + +def validate_credentials(api_key: str, site_name: str) -> bool: + basic_token = base64.b64encode(f"{api_key}:".encode("ascii")).decode("ascii") + res = requests.get( + f"https://{site_name}.chargebee.com/api/v2/customers?limit=1", + headers={"Authorization": f"Basic {basic_token}"}, + ) + return res.status_code == 200 diff --git a/posthog/temporal/data_imports/pipelines/chargebee/settings.py b/posthog/temporal/data_imports/pipelines/chargebee/settings.py new file mode 100644 index 00000000000..fe8e1ad94c5 --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/chargebee/settings.py @@ -0,0 +1,70 @@ +from posthog.warehouse.types import IncrementalField, IncrementalFieldType + +ENDPOINTS = ( + "Customers", + "Events", + "Invoices", + "Orders", + "Subscriptions", + "Transactions", +) + +INCREMENTAL_ENDPOINTS = ( + "Customers", + "Events", + "Invoices", + "Orders", + "Subscriptions", + "Transactions", +) + +INCREMENTAL_FIELDS: dict[str, list[IncrementalField]] = { + "Customers": [ + { + "label": "updated_at", + "type": IncrementalFieldType.DateTime, + "field": "updated_at", + "field_type": IncrementalFieldType.Integer, + }, + ], + "Events": [ + { + "label": "occurred_at", + "type": IncrementalFieldType.DateTime, + "field": "occurred_at", + "field_type": IncrementalFieldType.Integer, + }, + ], + "Invoices": [ + { + "label": "updated_at", + "type": IncrementalFieldType.DateTime, + "field": "updated_at", + "field_type": IncrementalFieldType.Integer, + }, + ], + "Orders": [ + { + "label": "updated_at", + "type": IncrementalFieldType.DateTime, + "field": "updated_at", + "field_type": IncrementalFieldType.Integer, + }, + ], + "Subscriptions": [ + { + "label": "updated_at", + "type": IncrementalFieldType.DateTime, + "field": "updated_at", + "field_type": IncrementalFieldType.Integer, + }, + ], + "Transactions": [ + { + "label": "updated_at", + "type": IncrementalFieldType.DateTime, + "field": "updated_at", + "field_type": IncrementalFieldType.Integer, + }, + ], +} diff --git a/posthog/temporal/data_imports/pipelines/schemas.py b/posthog/temporal/data_imports/pipelines/schemas.py index 0813cd7446f..53f9a387035 100644 --- a/posthog/temporal/data_imports/pipelines/schemas.py +++ b/posthog/temporal/data_imports/pipelines/schemas.py @@ -1,27 +1,34 @@ -from posthog.warehouse.types import IncrementalField -from posthog.temporal.data_imports.pipelines.zendesk.settings import ( - BASE_ENDPOINTS, - SUPPORT_ENDPOINTS, - INCREMENTAL_ENDPOINTS as ZENDESK_INCREMENTAL_ENDPOINTS, - INCREMENTAL_FIELDS as ZENDESK_INCREMENTAL_FIELDS, +from posthog.temporal.data_imports.pipelines.chargebee.settings import ( + ENDPOINTS as CHARGEBEE_ENDPOINTS, + INCREMENTAL_ENDPOINTS as CHARGEBEE_INCREMENTAL_ENDPOINTS, + INCREMENTAL_FIELDS as CHARGEBEE_INCREMENTAL_FIELDS, ) -from posthog.warehouse.models import ExternalDataSource -from posthog.temporal.data_imports.pipelines.stripe.settings import ( - ENDPOINTS as STRIPE_ENDPOINTS, - INCREMENTAL_ENDPOINTS as STRIPE_INCREMENTAL_ENDPOINTS, - INCREMENTAL_FIELDS as STRIPE_INCREMENTAL_FIELDS, +from posthog.temporal.data_imports.pipelines.hubspot.settings import ( + ENDPOINTS as HUBSPOT_ENDPOINTS, ) -from posthog.temporal.data_imports.pipelines.hubspot.settings import ENDPOINTS as HUBSPOT_ENDPOINTS from posthog.temporal.data_imports.pipelines.salesforce.settings import ( ENDPOINTS as SALESFORCE_ENDPOINTS, INCREMENTAL_ENDPOINTS as SALESFORCE_INCREMENTAL_ENDPOINTS, INCREMENTAL_FIELDS as SALESFORCE_INCREMENTAL_FIELDS, ) +from posthog.temporal.data_imports.pipelines.stripe.settings import ( + ENDPOINTS as STRIPE_ENDPOINTS, + INCREMENTAL_ENDPOINTS as STRIPE_INCREMENTAL_ENDPOINTS, + INCREMENTAL_FIELDS as STRIPE_INCREMENTAL_FIELDS, +) from posthog.temporal.data_imports.pipelines.vitally.settings import ( ENDPOINTS as VITALLY_ENDPOINTS, INCREMENTAL_ENDPOINTS as VITALLY_INCREMENTAL_ENDPOINTS, INCREMENTAL_FIELDS as VITALLY_INCREMENTAL_FIELDS, ) +from posthog.temporal.data_imports.pipelines.zendesk.settings import ( + BASE_ENDPOINTS, + INCREMENTAL_ENDPOINTS as ZENDESK_INCREMENTAL_ENDPOINTS, + INCREMENTAL_FIELDS as ZENDESK_INCREMENTAL_FIELDS, + SUPPORT_ENDPOINTS, +) +from posthog.warehouse.models import ExternalDataSource +from posthog.warehouse.types import IncrementalField PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING = { ExternalDataSource.Type.STRIPE: STRIPE_ENDPOINTS, @@ -36,6 +43,7 @@ PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING = { ExternalDataSource.Type.MSSQL: (), ExternalDataSource.Type.VITALLY: VITALLY_ENDPOINTS, ExternalDataSource.Type.BIGQUERY: (), + ExternalDataSource.Type.CHARGEBEE: CHARGEBEE_ENDPOINTS, } PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING = { @@ -49,6 +57,7 @@ PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING = { ExternalDataSource.Type.MSSQL: (), ExternalDataSource.Type.VITALLY: VITALLY_INCREMENTAL_ENDPOINTS, ExternalDataSource.Type.BIGQUERY: (), + ExternalDataSource.Type.CHARGEBEE: CHARGEBEE_INCREMENTAL_ENDPOINTS, } PIPELINE_TYPE_INCREMENTAL_FIELDS_MAPPING: dict[ExternalDataSource.Type, dict[str, list[IncrementalField]]] = { @@ -62,4 +71,5 @@ PIPELINE_TYPE_INCREMENTAL_FIELDS_MAPPING: dict[ExternalDataSource.Type, dict[str ExternalDataSource.Type.MSSQL: {}, ExternalDataSource.Type.VITALLY: VITALLY_INCREMENTAL_FIELDS, ExternalDataSource.Type.BIGQUERY: {}, + ExternalDataSource.Type.CHARGEBEE: CHARGEBEE_INCREMENTAL_FIELDS, } diff --git a/posthog/temporal/data_imports/workflow_activities/import_data.py b/posthog/temporal/data_imports/workflow_activities/import_data.py index a88150fe9bc..50430ded585 100644 --- a/posthog/temporal/data_imports/workflow_activities/import_data.py +++ b/posthog/temporal/data_imports/workflow_activities/import_data.py @@ -1,24 +1,32 @@ import dataclasses +import uuid from datetime import datetime from typing import Any -import uuid +from structlog.typing import FilteringBoundLogger from temporalio import activity from posthog.settings.utils import get_from_env from posthog.temporal.common.heartbeat import Heartbeater +from posthog.temporal.common.logger import bind_temporal_worker_logger from posthog.temporal.data_imports.pipelines.bigquery import delete_table -from posthog.temporal.data_imports.pipelines.helpers import aremove_reset_pipeline, aupdate_job_count - -from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline, PipelineInputs +from posthog.temporal.data_imports.pipelines.helpers import ( + aremove_reset_pipeline, + aupdate_job_count, +) +from posthog.temporal.data_imports.pipelines.pipeline import ( + DataImportPipeline, + PipelineInputs, +) from posthog.warehouse.models import ( ExternalDataJob, ExternalDataSource, get_external_data_job, ) -from posthog.temporal.common.logger import bind_temporal_worker_logger -from structlog.typing import FilteringBoundLogger -from posthog.warehouse.models.external_data_schema import ExternalDataSchema, aget_schema_by_id +from posthog.warehouse.models.external_data_schema import ( + ExternalDataSchema, + aget_schema_by_id, +) from posthog.warehouse.models.ssh_tunnel import SSHTunnel @@ -86,8 +94,10 @@ async def import_data_activity(inputs: ImportDataActivityInputs): reset_pipeline=reset_pipeline, ) elif model.pipeline.source_type == ExternalDataSource.Type.HUBSPOT: - from posthog.temporal.data_imports.pipelines.hubspot.auth import hubspot_refresh_access_token from posthog.temporal.data_imports.pipelines.hubspot import hubspot + from posthog.temporal.data_imports.pipelines.hubspot.auth import ( + hubspot_refresh_access_token, + ) hubspot_access_code = model.pipeline.job_inputs.get("hubspot_secret_key", None) refresh_token = model.pipeline.job_inputs.get("hubspot_refresh_token", None) @@ -117,9 +127,13 @@ async def import_data_activity(inputs: ImportDataActivityInputs): ExternalDataSource.Type.MSSQL, ]: if is_posthog_team(inputs.team_id): - from posthog.temporal.data_imports.pipelines.sql_database_v2 import sql_source_for_type + from posthog.temporal.data_imports.pipelines.sql_database_v2 import ( + sql_source_for_type, + ) else: - from posthog.temporal.data_imports.pipelines.sql_database import sql_source_for_type + from posthog.temporal.data_imports.pipelines.sql_database import ( + sql_source_for_type, + ) host = model.pipeline.job_inputs.get("host") port = model.pipeline.job_inputs.get("port") @@ -208,9 +222,13 @@ async def import_data_activity(inputs: ImportDataActivityInputs): ) elif model.pipeline.source_type == ExternalDataSource.Type.SNOWFLAKE: if is_posthog_team(inputs.team_id): - from posthog.temporal.data_imports.pipelines.sql_database_v2 import snowflake_source + from posthog.temporal.data_imports.pipelines.sql_database_v2 import ( + snowflake_source, + ) else: - from posthog.temporal.data_imports.pipelines.sql_database import snowflake_source + from posthog.temporal.data_imports.pipelines.sql_database import ( + snowflake_source, + ) account_id = model.pipeline.job_inputs.get("account_id") user = model.pipeline.job_inputs.get("user") @@ -244,9 +262,13 @@ async def import_data_activity(inputs: ImportDataActivityInputs): reset_pipeline=reset_pipeline, ) elif model.pipeline.source_type == ExternalDataSource.Type.SALESFORCE: - from posthog.temporal.data_imports.pipelines.salesforce.auth import salesforce_refresh_access_token - from posthog.temporal.data_imports.pipelines.salesforce import salesforce_source from posthog.models.integration import aget_integration_by_id + from posthog.temporal.data_imports.pipelines.salesforce import ( + salesforce_source, + ) + from posthog.temporal.data_imports.pipelines.salesforce.auth import ( + salesforce_refresh_access_token, + ) salesforce_integration_id = model.pipeline.job_inputs.get("salesforce_integration_id", None) @@ -328,7 +350,9 @@ async def import_data_activity(inputs: ImportDataActivityInputs): reset_pipeline=reset_pipeline, ) elif model.pipeline.source_type == ExternalDataSource.Type.BIGQUERY: - from posthog.temporal.data_imports.pipelines.sql_database_v2 import bigquery_source + from posthog.temporal.data_imports.pipelines.sql_database_v2 import ( + bigquery_source, + ) dataset_id = model.pipeline.job_inputs.get("dataset_id") project_id = model.pipeline.job_inputs.get("project_id") @@ -377,6 +401,28 @@ async def import_data_activity(inputs: ImportDataActivityInputs): token_uri=token_uri, ) logger.info(f"Deleting bigquery temp destination table: {destination_table}") + elif model.pipeline.source_type == ExternalDataSource.Type.CHARGEBEE: + from posthog.temporal.data_imports.pipelines.chargebee import ( + chargebee_source, + ) + + source = chargebee_source( + api_key=model.pipeline.job_inputs.get("api_key"), + site_name=model.pipeline.job_inputs.get("site_name"), + endpoint=schema.name, + team_id=inputs.team_id, + job_id=inputs.run_id, + is_incremental=schema.is_incremental, + ) + + return await _run( + job_inputs=job_inputs, + source=source, + logger=logger, + inputs=inputs, + schema=schema, + reset_pipeline=reset_pipeline, + ) else: raise ValueError(f"Source type {model.pipeline.source_type} not supported") diff --git a/posthog/temporal/tests/data_imports/conftest.py b/posthog/temporal/tests/data_imports/conftest.py index 2460ca45d0a..55042162379 100644 --- a/posthog/temporal/tests/data_imports/conftest.py +++ b/posthog/temporal/tests/data_imports/conftest.py @@ -1,4 +1,5 @@ import json + import pytest @@ -896,3 +897,74 @@ def zendesk_ticket_metric_events(): } """ ) + + +@pytest.fixture +def chargebee_customer(): + # note that chargebee actually return both a customer and a card if one is + # attached (we ignore this when ingesting the data) + return json.loads( + """ + { + "list": [ + { + "card": { + "card_type": "american_express", + "created_at": 1729612767, + "customer_id": "cbdemo_douglas", + "expiry_month": 5, + "expiry_year": 2028, + "first_name": "Douglas", + "funding_type": "not_known", + "gateway": "chargebee", + "gateway_account_id": "gw_199Ne4URwspru2qp", + "iin": "371449", + "last4": "8431", + "last_name": "Quaid", + "masked_number": "***********8431", + "object": "card", + "payment_source_id": "pm_19A7lVURwsu9pPnQ", + "resource_version": 1729612767061, + "status": "valid", + "updated_at": 1729612767 + }, + "customer": { + "allow_direct_debit": false, + "auto_collection": "on", + "card_status": "valid", + "channel": "web", + "company": "Greenplus Enterprises", + "created_at": 1729612766, + "deleted": false, + "email": "douglas_AT_test.com@example.com", + "excess_payments": 0, + "first_name": "Douglas", + "id": "cbdemo_douglas", + "last_name": "Quaid", + "mrr": 0, + "net_term_days": 0, + "object": "customer", + "payment_method": { + "gateway": "chargebee", + "gateway_account_id": "gw_199Ne4URwspru2qp", + "object": "payment_method", + "reference_id": "tok_19A7lVURwsu9hPnP", + "status": "valid", + "type": "card" + }, + "phone": "2344903756", + "pii_cleared": "active", + "preferred_currency_code": "GBP", + "primary_payment_source_id": "pm_19A7lVURwsu9pPnQ", + "promotional_credits": 0, + "refundable_credits": 0, + "resource_version": 1729612767062, + "taxability": "taxable", + "unbilled_charges": 0, + "updated_at": 1729612767 + } + } + ] + } + """ + ) diff --git a/posthog/temporal/tests/data_imports/test_end_to_end.py b/posthog/temporal/tests/data_imports/test_end_to_end.py index 98d81cc153b..b9c74c4efef 100644 --- a/posthog/temporal/tests/data_imports/test_end_to_end.py +++ b/posthog/temporal/tests/data_imports/test_end_to_end.py @@ -1,19 +1,29 @@ -from typing import Any, Optional -from unittest import mock -import aioboto3 import functools import uuid +from typing import Any, Optional +from unittest import mock + +import aioboto3 +import posthoganalytics +import psycopg +import pytest +import pytest_asyncio from asgiref.sync import sync_to_async from django.conf import settings from django.test import override_settings -import posthoganalytics -import pytest -import pytest_asyncio -import psycopg +from dlt.common.configuration.specs.aws_credentials import AwsCredentials +from dlt.sources.helpers.rest_client.client import RESTClient +from temporalio.common import RetryPolicy +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import UnsandboxedWorkflowRunner, Worker + +from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE from posthog.hogql.modifiers import create_default_modifiers_for_team from posthog.hogql.query import execute_hogql_query from posthog.hogql_queries.insights.funnels.funnel import Funnel -from posthog.hogql_queries.insights.funnels.funnel_query_context import FunnelQueryContext +from posthog.hogql_queries.insights.funnels.funnel_query_context import ( + FunnelQueryContext, +) from posthog.models.team.team import Team from posthog.schema import ( BreakdownFilter, @@ -26,23 +36,15 @@ from posthog.schema import ( from posthog.temporal.data_imports import ACTIVITIES from posthog.temporal.data_imports.external_data_job import ExternalDataJobWorkflow from posthog.temporal.utils import ExternalDataWorkflowInputs -from posthog.warehouse.models.external_table_definitions import external_tables from posthog.warehouse.models import ( ExternalDataJob, - ExternalDataSource, ExternalDataSchema, + ExternalDataSource, ) -from temporalio.testing import WorkflowEnvironment -from temporalio.common import RetryPolicy -from temporalio.worker import UnsandboxedWorkflowRunner, Worker -from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE from posthog.warehouse.models.external_data_job import get_latest_run_if_exists -from dlt.sources.helpers.rest_client.client import RESTClient -from dlt.common.configuration.specs.aws_credentials import AwsCredentials - +from posthog.warehouse.models.external_table_definitions import external_tables from posthog.warehouse.models.join import DataWarehouseJoin - BUCKET_NAME = "test-pipeline" SESSION = aioboto3.Session() create_test_client = functools.partial(SESSION.client, endpoint_url=settings.OBJECT_STORAGE_ENDPOINT) @@ -461,6 +463,19 @@ async def test_zendesk_ticket_metric_events(team, zendesk_ticket_metric_events): ) +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_chargebee_customer(team, chargebee_customer): + await _run( + team=team, + schema_name="Customers", + table_name="chargebee_customers", + source_type="Chargebee", + job_inputs={"api_key": "test-key", "site_name": "site-test"}, + mock_data_response=[chargebee_customer["list"][0]["customer"]], + ) + + @pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_reset_pipeline(team, stripe_balance_transaction): diff --git a/posthog/warehouse/api/external_data_source.py b/posthog/warehouse/api/external_data_source.py index 249a8e43bc2..8760a48f5fd 100644 --- a/posthog/warehouse/api/external_data_source.py +++ b/posthog/warehouse/api/external_data_source.py @@ -1,62 +1,79 @@ import re -from dateutil import parser import uuid from typing import Any -from psycopg2 import OperationalError -from sentry_sdk import capture_exception import structlog +import temporalio +from dateutil import parser +from django.db.models import Prefetch +from psycopg2 import OperationalError from rest_framework import filters, serializers, status, viewsets -from posthog.api.utils import action from rest_framework.request import Request from rest_framework.response import Response +from sentry_sdk import capture_exception +from snowflake.connector.errors import DatabaseError, ForbiddenError, ProgrammingError +from sshtunnel import BaseSSHTunnelForwarderError from posthog.api.routing import TeamAndOrgViewSetMixin -from posthog.warehouse.data_load.service import ( - sync_external_data_job_workflow, - delete_external_data_schedule, - cancel_external_data_workflow, - delete_data_import_folder, - is_any_external_data_schema_paused, - trigger_external_data_source_workflow, -) -from posthog.warehouse.models import ExternalDataSource, ExternalDataSchema, ExternalDataJob -from posthog.warehouse.api.external_data_schema import ExternalDataSchemaSerializer, SimpleExternalDataSchemaSerializer +from posthog.api.utils import action +from posthog.cloud_utils import is_cloud from posthog.hogql.database.database import create_hogql_database -from posthog.temporal.data_imports.pipelines.stripe import validate_credentials as validate_stripe_credentials -from posthog.temporal.data_imports.pipelines.zendesk import validate_credentials as validate_zendesk_credentials -from posthog.temporal.data_imports.pipelines.vitally import validate_credentials as validate_vitally_credentials +from posthog.temporal.data_imports.pipelines.bigquery import ( + filter_incremental_fields as filter_bigquery_incremental_fields, +) +from posthog.temporal.data_imports.pipelines.bigquery import ( + get_schemas as get_bigquery_schemas, +) from posthog.temporal.data_imports.pipelines.bigquery import ( validate_credentials as validate_bigquery_credentials, - get_schemas as get_bigquery_schemas, - filter_incremental_fields as filter_bigquery_incremental_fields, +) +from posthog.temporal.data_imports.pipelines.chargebee import ( + validate_credentials as validate_chargebee_credentials, +) +from posthog.temporal.data_imports.pipelines.hubspot.auth import ( + get_hubspot_access_token_from_code, ) from posthog.temporal.data_imports.pipelines.schemas import ( PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING, PIPELINE_TYPE_INCREMENTAL_FIELDS_MAPPING, PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING, ) -from posthog.temporal.data_imports.pipelines.hubspot.auth import ( - get_hubspot_access_token_from_code, +from posthog.temporal.data_imports.pipelines.stripe import ( + validate_credentials as validate_stripe_credentials, +) +from posthog.temporal.data_imports.pipelines.vitally import ( + validate_credentials as validate_vitally_credentials, +) +from posthog.temporal.data_imports.pipelines.zendesk import ( + validate_credentials as validate_zendesk_credentials, +) +from posthog.utils import get_instance_region +from posthog.warehouse.api.external_data_schema import ( + ExternalDataSchemaSerializer, + SimpleExternalDataSchemaSerializer, +) +from posthog.warehouse.data_load.service import ( + cancel_external_data_workflow, + delete_data_import_folder, + delete_external_data_schedule, + is_any_external_data_schema_paused, + sync_external_data_job_workflow, + trigger_external_data_source_workflow, +) +from posthog.warehouse.models import ( + ExternalDataJob, + ExternalDataSchema, + ExternalDataSource, ) from posthog.warehouse.models.external_data_schema import ( filter_mssql_incremental_fields, filter_mysql_incremental_fields, filter_postgres_incremental_fields, filter_snowflake_incremental_fields, - get_sql_schemas_for_source_type, get_snowflake_schemas, + get_sql_schemas_for_source_type, ) - -import temporalio - -from posthog.cloud_utils import is_cloud -from posthog.utils import get_instance_region from posthog.warehouse.models.ssh_tunnel import SSHTunnel -from sshtunnel import BaseSSHTunnelForwarderError -from snowflake.connector.errors import ProgrammingError, DatabaseError, ForbiddenError -from django.db.models import Prefetch - logger = structlog.get_logger(__name__) @@ -310,6 +327,8 @@ class ExternalDataSourceViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewSet): new_source_model, snowflake_schemas = self._handle_snowflake_source(request, *args, **kwargs) elif source_type == ExternalDataSource.Type.BIGQUERY: new_source_model, bigquery_schemas = self._handle_bigquery_source(request, *args, **kwargs) + elif source_type == ExternalDataSource.Type.CHARGEBEE: + new_source_model = self._handle_chargebee_source(request, *args, **kwargs) else: raise NotImplementedError(f"Source type {source_type} not implemented") @@ -434,6 +453,26 @@ class ExternalDataSourceViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewSet): return new_source_model + def _handle_chargebee_source(self, request: Request, *args: Any, **kwargs: Any) -> ExternalDataSource: + payload = request.data["payload"] + api_key = payload.get("api_key") + site_name = payload.get("site_name") + prefix = request.data.get("prefix", None) + source_type = request.data["source_type"] + + new_source_model = ExternalDataSource.objects.create( + source_id=str(uuid.uuid4()), + connection_id=str(uuid.uuid4()), + destination_id=str(uuid.uuid4()), + team=self.team, + status="Running", + source_type=source_type, + job_inputs={"api_key": api_key, "site_name": site_name}, + prefix=prefix, + ) + + return new_source_model + def _handle_zendesk_source(self, request: Request, *args: Any, **kwargs: Any) -> ExternalDataSource: payload = request.data["payload"] api_key = payload.get("api_key") @@ -837,6 +876,23 @@ class ExternalDataSourceViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewSet): ] return Response(status=status.HTTP_200_OK, data=result_mapped_to_options) + elif source_type == ExternalDataSource.Type.CHARGEBEE: + api_key = request.data.get("api_key", "") + site_name = request.data.get("site_name", "") + + # Chargebee uses the term 'site' but it is effectively the subdomain + subdomain_regex = re.compile("^[a-zA-Z-]+$") + if not subdomain_regex.match(site_name): + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={"message": "Invalid credentials: Chargebee site name is incorrect"}, + ) + + if not validate_chargebee_credentials(api_key=api_key, site_name=site_name): + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={"message": "Invalid credentials: Chargebee credentials are incorrect"}, + ) # Get schemas and validate SQL credentials if source_type in [ diff --git a/posthog/warehouse/models/external_data_source.py b/posthog/warehouse/models/external_data_source.py index e84da4761d7..22a46bc4cb0 100644 --- a/posthog/warehouse/models/external_data_source.py +++ b/posthog/warehouse/models/external_data_source.py @@ -7,7 +7,13 @@ from django.db import models from posthog.helpers.encrypted_fields import EncryptedJSONField from posthog.models.team import Team -from posthog.models.utils import CreatedMetaFields, DeletedMetaFields, UpdatedMetaFields, UUIDModel, sane_repr +from posthog.models.utils import ( + CreatedMetaFields, + DeletedMetaFields, + UpdatedMetaFields, + UUIDModel, + sane_repr, +) from posthog.warehouse.util import database_sync_to_async logger = structlog.get_logger(__name__) @@ -25,6 +31,7 @@ class ExternalDataSource(CreatedMetaFields, UpdatedMetaFields, UUIDModel, Delete MSSQL = "MSSQL", "MSSQL" VITALLY = "Vitally", "Vitally" BIGQUERY = "BigQuery", "BigQuery" + CHARGEBEE = "Chargebee", "Chargebee" class Status(models.TextChoices): RUNNING = "Running", "Running" @@ -65,7 +72,10 @@ class ExternalDataSource(CreatedMetaFields, UpdatedMetaFields, UUIDModel, Delete self.save() def reload_schemas(self): - from posthog.warehouse.data_load.service import sync_external_data_job_workflow, trigger_external_data_workflow + from posthog.warehouse.data_load.service import ( + sync_external_data_job_workflow, + trigger_external_data_workflow, + ) from posthog.warehouse.models.external_data_schema import ExternalDataSchema for schema in ( diff --git a/pyproject.toml b/pyproject.toml index bcc00106149..0049771f5ae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,6 +55,9 @@ select = [ [tool.ruff.lint.mccabe] max-complexity = 10 +[tool.ruff.lint.isort] +combine-as-imports = true + [tool.ruff.lint.per-file-ignores] "./posthog/queries/column_optimizer/column_optimizer.py" = ["F401"] "./posthog/migrations/0027_move_elements_to_group.py" = ["T201"]