0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-21 13:39:22 +01:00

feat(data-warehouse): Added Chargebee data source (#25818)

This commit is contained in:
Ross 2024-11-01 17:19:04 +00:00 committed by GitHub
parent 54efcdcbc0
commit 76626f1285
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 670 additions and 79 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.2 KiB

View File

@ -628,6 +628,26 @@ export const SOURCE_DETAILS: Record<ExternalDataSourceType, SourceConfig> = {
],
caption: '',
},
Chargebee: {
name: 'Chargebee',
fields: [
{
name: 'api_key',
label: 'API key',
type: 'text',
required: true,
placeholder: '',
},
{
type: 'text',
name: 'site_name',
label: 'Site name (subdomain)',
required: true,
placeholder: '',
},
],
caption: '',
},
}
export const buildKeaFormDefaultFromSourceDetails = (

View File

@ -6,6 +6,7 @@ import { LemonTableLink } from 'lib/lemon-ui/LemonTable/LemonTableLink'
import IconAwsS3 from 'public/services/aws-s3.png'
import Iconazure from 'public/services/azure.png'
import IconBigQuery from 'public/services/bigquery.png'
import IconChargebee from 'public/services/chargebee.png'
import IconCloudflare from 'public/services/cloudflare.png'
import IconGoogleCloudStorage from 'public/services/google-cloud-storage.png'
import IconHubspot from 'public/services/hubspot.png'
@ -193,6 +194,7 @@ export function RenderDataWarehouseSourceIcon({
MSSQL: IconMSSQL,
Vitally: IconVitally,
BigQuery: IconBigQuery,
Chargebee: IconChargebee,
}[type]
return (

View File

@ -4057,6 +4057,7 @@ export const externalDataSources = [
'Salesforce',
'Vitally',
'BigQuery',
'Chargebee',
] as const
export type ExternalDataSourceType = (typeof externalDataSources)[number]

View File

@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0016_rolemembership_organization_member
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0506_productintent_activated_at_and_more
posthog: 0507_alter_externaldatasource_source_type
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019

View File

@ -0,0 +1,32 @@
# Generated by Django 4.2.15 on 2024-11-01 16:52
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("posthog", "0506_productintent_activated_at_and_more"),
]
operations = [
migrations.AlterField(
model_name="externaldatasource",
name="source_type",
field=models.CharField(
choices=[
("Stripe", "Stripe"),
("Hubspot", "Hubspot"),
("Postgres", "Postgres"),
("Zendesk", "Zendesk"),
("Snowflake", "Snowflake"),
("Salesforce", "Salesforce"),
("MySQL", "MySQL"),
("MSSQL", "MSSQL"),
("Vitally", "Vitally"),
("BigQuery", "BigQuery"),
("Chargebee", "Chargebee"),
],
max_length=128,
),
),
]

View File

@ -0,0 +1,254 @@
import base64
from typing import Any, Optional
import dlt
import requests
from dlt.sources.helpers.requests import Request, Response
from dlt.sources.helpers.rest_client.paginators import BasePaginator
from posthog.temporal.data_imports.pipelines.rest_source import (
RESTAPIConfig,
rest_api_resources,
)
from posthog.temporal.data_imports.pipelines.rest_source.typing import EndpointResource
def get_resource(name: str, is_incremental: bool) -> EndpointResource:
resources: dict[str, EndpointResource] = {
"Customers": {
"name": "Customers",
"table_name": "customers",
"primary_key": "id",
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
}
if is_incremental
else "replace",
"endpoint": {
"data_selector": "list[*].customer",
"path": "/v2/customers",
"params": {
# the parameters below can optionally be configured
"updated_at[after]": {
"type": "incremental",
"cursor_path": "updated_at",
"initial_value": 0, # type: ignore
}
if is_incremental
else None,
"limit": 100,
# by default, API does not return deleted resources
"include_deleted": "true",
},
},
"table_format": "delta",
},
# Note: it is possible to filter by event type, but for now we're
# fetching all events
"Events": {
"name": "Events",
"table_name": "events",
"primary_key": "id",
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
}
if is_incremental
else "replace",
"endpoint": {
"data_selector": "list[*].event",
"path": "/v2/events",
"params": {
# the parameters below can optionally be configured
"occurred_at[after]": {
"type": "incremental",
"cursor_path": "occurred_at",
"initial_value": 0, # type: ignore
}
if is_incremental
else None,
"limit": 100,
},
},
"table_format": "delta",
},
"Invoices": {
"name": "Invoices",
"table_name": "invoices",
"primary_key": "id",
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
}
if is_incremental
else "replace",
"endpoint": {
"data_selector": "list[*].invoice",
"path": "/v2/invoices",
"params": {
# the parameters below can optionally be configured
"updated_at[after]": {
"type": "incremental",
"cursor_path": "updated_at",
"initial_value": 0, # type: ignore
}
if is_incremental
else None,
"limit": 100,
# by default, API does not return deleted resources
"include_deleted": "true",
},
},
"table_format": "delta",
},
"Orders": {
"name": "Orders",
"table_name": "orders",
"primary_key": "id",
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
}
if is_incremental
else "replace",
"endpoint": {
"data_selector": "list[*].order",
"path": "/v2/orders",
"params": {
# the parameters below can optionally be configured
"updated_at[after]": {
"type": "incremental",
"cursor_path": "updated_at",
"initial_value": 0, # type: ignore
}
if is_incremental
else None,
"limit": 100,
# by default, API does not return deleted resources
"include_deleted": "true",
},
},
"table_format": "delta",
},
"Subscriptions": {
"name": "Subscriptions",
"table_name": "subscriptions",
"primary_key": "id",
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
}
if is_incremental
else "replace",
"endpoint": {
"data_selector": "list[*].subscription",
"path": "/v2/subscriptions",
"params": {
# the parameters below can optionally be configured
"updated_at[after]": {
"type": "incremental",
"cursor_path": "updated_at",
"initial_value": 0, # type: ignore
}
if is_incremental
else None,
"limit": 100,
# by default, API does not return deleted resources
"include_deleted": "true",
},
},
"table_format": "delta",
},
"Transactions": {
"name": "Transactions",
"table_name": "transactions",
"primary_key": "id",
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
}
if is_incremental
else "replace",
"endpoint": {
"data_selector": "list[*].transaction",
"path": "/v2/transactions",
"params": {
# the parameters below can optionally be configured
"updated_at[after]": {
"type": "incremental",
"cursor_path": "updated_at",
"initial_value": 0, # type: ignore
}
if is_incremental
else None,
"limit": 100,
# by default, API does not return deleted resources
"include_deleted": "true",
},
},
"table_format": "delta",
},
}
return resources[name]
class ChargebeePaginator(BasePaginator):
def update_state(self, response: Response, data: Optional[list[Any]] = None) -> None:
res = response.json()
self._next_offset = None
if not res:
self._has_next_page = False
return
if "next_offset" in res:
self._has_next_page = True
self._next_offset = res["next_offset"]
else:
self._has_next_page = False
def update_request(self, request: Request) -> None:
if request.params is None:
request.params = {}
request.params["offset"] = self._next_offset
@dlt.source(max_table_nesting=0)
def chargebee_source(
api_key: str, site_name: str, endpoint: str, team_id: int, job_id: str, is_incremental: bool = False
):
config: RESTAPIConfig = {
"client": {
"base_url": f"https://{site_name}.chargebee.com/api",
"auth": {
"type": "http_basic",
"username": api_key,
"password": "",
},
"paginator": ChargebeePaginator(),
},
"resource_defaults": {
"primary_key": "id",
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
}
if is_incremental
else "replace",
},
"resources": [get_resource(endpoint, is_incremental)],
}
yield from rest_api_resources(config, team_id, job_id)
def validate_credentials(api_key: str, site_name: str) -> bool:
basic_token = base64.b64encode(f"{api_key}:".encode("ascii")).decode("ascii")
res = requests.get(
f"https://{site_name}.chargebee.com/api/v2/customers?limit=1",
headers={"Authorization": f"Basic {basic_token}"},
)
return res.status_code == 200

View File

@ -0,0 +1,70 @@
from posthog.warehouse.types import IncrementalField, IncrementalFieldType
ENDPOINTS = (
"Customers",
"Events",
"Invoices",
"Orders",
"Subscriptions",
"Transactions",
)
INCREMENTAL_ENDPOINTS = (
"Customers",
"Events",
"Invoices",
"Orders",
"Subscriptions",
"Transactions",
)
INCREMENTAL_FIELDS: dict[str, list[IncrementalField]] = {
"Customers": [
{
"label": "updated_at",
"type": IncrementalFieldType.DateTime,
"field": "updated_at",
"field_type": IncrementalFieldType.Integer,
},
],
"Events": [
{
"label": "occurred_at",
"type": IncrementalFieldType.DateTime,
"field": "occurred_at",
"field_type": IncrementalFieldType.Integer,
},
],
"Invoices": [
{
"label": "updated_at",
"type": IncrementalFieldType.DateTime,
"field": "updated_at",
"field_type": IncrementalFieldType.Integer,
},
],
"Orders": [
{
"label": "updated_at",
"type": IncrementalFieldType.DateTime,
"field": "updated_at",
"field_type": IncrementalFieldType.Integer,
},
],
"Subscriptions": [
{
"label": "updated_at",
"type": IncrementalFieldType.DateTime,
"field": "updated_at",
"field_type": IncrementalFieldType.Integer,
},
],
"Transactions": [
{
"label": "updated_at",
"type": IncrementalFieldType.DateTime,
"field": "updated_at",
"field_type": IncrementalFieldType.Integer,
},
],
}

View File

@ -1,27 +1,34 @@
from posthog.warehouse.types import IncrementalField
from posthog.temporal.data_imports.pipelines.zendesk.settings import (
BASE_ENDPOINTS,
SUPPORT_ENDPOINTS,
INCREMENTAL_ENDPOINTS as ZENDESK_INCREMENTAL_ENDPOINTS,
INCREMENTAL_FIELDS as ZENDESK_INCREMENTAL_FIELDS,
from posthog.temporal.data_imports.pipelines.chargebee.settings import (
ENDPOINTS as CHARGEBEE_ENDPOINTS,
INCREMENTAL_ENDPOINTS as CHARGEBEE_INCREMENTAL_ENDPOINTS,
INCREMENTAL_FIELDS as CHARGEBEE_INCREMENTAL_FIELDS,
)
from posthog.warehouse.models import ExternalDataSource
from posthog.temporal.data_imports.pipelines.stripe.settings import (
ENDPOINTS as STRIPE_ENDPOINTS,
INCREMENTAL_ENDPOINTS as STRIPE_INCREMENTAL_ENDPOINTS,
INCREMENTAL_FIELDS as STRIPE_INCREMENTAL_FIELDS,
from posthog.temporal.data_imports.pipelines.hubspot.settings import (
ENDPOINTS as HUBSPOT_ENDPOINTS,
)
from posthog.temporal.data_imports.pipelines.hubspot.settings import ENDPOINTS as HUBSPOT_ENDPOINTS
from posthog.temporal.data_imports.pipelines.salesforce.settings import (
ENDPOINTS as SALESFORCE_ENDPOINTS,
INCREMENTAL_ENDPOINTS as SALESFORCE_INCREMENTAL_ENDPOINTS,
INCREMENTAL_FIELDS as SALESFORCE_INCREMENTAL_FIELDS,
)
from posthog.temporal.data_imports.pipelines.stripe.settings import (
ENDPOINTS as STRIPE_ENDPOINTS,
INCREMENTAL_ENDPOINTS as STRIPE_INCREMENTAL_ENDPOINTS,
INCREMENTAL_FIELDS as STRIPE_INCREMENTAL_FIELDS,
)
from posthog.temporal.data_imports.pipelines.vitally.settings import (
ENDPOINTS as VITALLY_ENDPOINTS,
INCREMENTAL_ENDPOINTS as VITALLY_INCREMENTAL_ENDPOINTS,
INCREMENTAL_FIELDS as VITALLY_INCREMENTAL_FIELDS,
)
from posthog.temporal.data_imports.pipelines.zendesk.settings import (
BASE_ENDPOINTS,
INCREMENTAL_ENDPOINTS as ZENDESK_INCREMENTAL_ENDPOINTS,
INCREMENTAL_FIELDS as ZENDESK_INCREMENTAL_FIELDS,
SUPPORT_ENDPOINTS,
)
from posthog.warehouse.models import ExternalDataSource
from posthog.warehouse.types import IncrementalField
PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING = {
ExternalDataSource.Type.STRIPE: STRIPE_ENDPOINTS,
@ -36,6 +43,7 @@ PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING = {
ExternalDataSource.Type.MSSQL: (),
ExternalDataSource.Type.VITALLY: VITALLY_ENDPOINTS,
ExternalDataSource.Type.BIGQUERY: (),
ExternalDataSource.Type.CHARGEBEE: CHARGEBEE_ENDPOINTS,
}
PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING = {
@ -49,6 +57,7 @@ PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING = {
ExternalDataSource.Type.MSSQL: (),
ExternalDataSource.Type.VITALLY: VITALLY_INCREMENTAL_ENDPOINTS,
ExternalDataSource.Type.BIGQUERY: (),
ExternalDataSource.Type.CHARGEBEE: CHARGEBEE_INCREMENTAL_ENDPOINTS,
}
PIPELINE_TYPE_INCREMENTAL_FIELDS_MAPPING: dict[ExternalDataSource.Type, dict[str, list[IncrementalField]]] = {
@ -62,4 +71,5 @@ PIPELINE_TYPE_INCREMENTAL_FIELDS_MAPPING: dict[ExternalDataSource.Type, dict[str
ExternalDataSource.Type.MSSQL: {},
ExternalDataSource.Type.VITALLY: VITALLY_INCREMENTAL_FIELDS,
ExternalDataSource.Type.BIGQUERY: {},
ExternalDataSource.Type.CHARGEBEE: CHARGEBEE_INCREMENTAL_FIELDS,
}

View File

@ -1,24 +1,32 @@
import dataclasses
import uuid
from datetime import datetime
from typing import Any
import uuid
from structlog.typing import FilteringBoundLogger
from temporalio import activity
from posthog.settings.utils import get_from_env
from posthog.temporal.common.heartbeat import Heartbeater
from posthog.temporal.common.logger import bind_temporal_worker_logger
from posthog.temporal.data_imports.pipelines.bigquery import delete_table
from posthog.temporal.data_imports.pipelines.helpers import aremove_reset_pipeline, aupdate_job_count
from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline, PipelineInputs
from posthog.temporal.data_imports.pipelines.helpers import (
aremove_reset_pipeline,
aupdate_job_count,
)
from posthog.temporal.data_imports.pipelines.pipeline import (
DataImportPipeline,
PipelineInputs,
)
from posthog.warehouse.models import (
ExternalDataJob,
ExternalDataSource,
get_external_data_job,
)
from posthog.temporal.common.logger import bind_temporal_worker_logger
from structlog.typing import FilteringBoundLogger
from posthog.warehouse.models.external_data_schema import ExternalDataSchema, aget_schema_by_id
from posthog.warehouse.models.external_data_schema import (
ExternalDataSchema,
aget_schema_by_id,
)
from posthog.warehouse.models.ssh_tunnel import SSHTunnel
@ -86,8 +94,10 @@ async def import_data_activity(inputs: ImportDataActivityInputs):
reset_pipeline=reset_pipeline,
)
elif model.pipeline.source_type == ExternalDataSource.Type.HUBSPOT:
from posthog.temporal.data_imports.pipelines.hubspot.auth import hubspot_refresh_access_token
from posthog.temporal.data_imports.pipelines.hubspot import hubspot
from posthog.temporal.data_imports.pipelines.hubspot.auth import (
hubspot_refresh_access_token,
)
hubspot_access_code = model.pipeline.job_inputs.get("hubspot_secret_key", None)
refresh_token = model.pipeline.job_inputs.get("hubspot_refresh_token", None)
@ -117,9 +127,13 @@ async def import_data_activity(inputs: ImportDataActivityInputs):
ExternalDataSource.Type.MSSQL,
]:
if is_posthog_team(inputs.team_id):
from posthog.temporal.data_imports.pipelines.sql_database_v2 import sql_source_for_type
from posthog.temporal.data_imports.pipelines.sql_database_v2 import (
sql_source_for_type,
)
else:
from posthog.temporal.data_imports.pipelines.sql_database import sql_source_for_type
from posthog.temporal.data_imports.pipelines.sql_database import (
sql_source_for_type,
)
host = model.pipeline.job_inputs.get("host")
port = model.pipeline.job_inputs.get("port")
@ -208,9 +222,13 @@ async def import_data_activity(inputs: ImportDataActivityInputs):
)
elif model.pipeline.source_type == ExternalDataSource.Type.SNOWFLAKE:
if is_posthog_team(inputs.team_id):
from posthog.temporal.data_imports.pipelines.sql_database_v2 import snowflake_source
from posthog.temporal.data_imports.pipelines.sql_database_v2 import (
snowflake_source,
)
else:
from posthog.temporal.data_imports.pipelines.sql_database import snowflake_source
from posthog.temporal.data_imports.pipelines.sql_database import (
snowflake_source,
)
account_id = model.pipeline.job_inputs.get("account_id")
user = model.pipeline.job_inputs.get("user")
@ -244,9 +262,13 @@ async def import_data_activity(inputs: ImportDataActivityInputs):
reset_pipeline=reset_pipeline,
)
elif model.pipeline.source_type == ExternalDataSource.Type.SALESFORCE:
from posthog.temporal.data_imports.pipelines.salesforce.auth import salesforce_refresh_access_token
from posthog.temporal.data_imports.pipelines.salesforce import salesforce_source
from posthog.models.integration import aget_integration_by_id
from posthog.temporal.data_imports.pipelines.salesforce import (
salesforce_source,
)
from posthog.temporal.data_imports.pipelines.salesforce.auth import (
salesforce_refresh_access_token,
)
salesforce_integration_id = model.pipeline.job_inputs.get("salesforce_integration_id", None)
@ -328,7 +350,9 @@ async def import_data_activity(inputs: ImportDataActivityInputs):
reset_pipeline=reset_pipeline,
)
elif model.pipeline.source_type == ExternalDataSource.Type.BIGQUERY:
from posthog.temporal.data_imports.pipelines.sql_database_v2 import bigquery_source
from posthog.temporal.data_imports.pipelines.sql_database_v2 import (
bigquery_source,
)
dataset_id = model.pipeline.job_inputs.get("dataset_id")
project_id = model.pipeline.job_inputs.get("project_id")
@ -377,6 +401,28 @@ async def import_data_activity(inputs: ImportDataActivityInputs):
token_uri=token_uri,
)
logger.info(f"Deleting bigquery temp destination table: {destination_table}")
elif model.pipeline.source_type == ExternalDataSource.Type.CHARGEBEE:
from posthog.temporal.data_imports.pipelines.chargebee import (
chargebee_source,
)
source = chargebee_source(
api_key=model.pipeline.job_inputs.get("api_key"),
site_name=model.pipeline.job_inputs.get("site_name"),
endpoint=schema.name,
team_id=inputs.team_id,
job_id=inputs.run_id,
is_incremental=schema.is_incremental,
)
return await _run(
job_inputs=job_inputs,
source=source,
logger=logger,
inputs=inputs,
schema=schema,
reset_pipeline=reset_pipeline,
)
else:
raise ValueError(f"Source type {model.pipeline.source_type} not supported")

View File

@ -1,4 +1,5 @@
import json
import pytest
@ -896,3 +897,74 @@ def zendesk_ticket_metric_events():
}
"""
)
@pytest.fixture
def chargebee_customer():
# note that chargebee actually return both a customer and a card if one is
# attached (we ignore this when ingesting the data)
return json.loads(
"""
{
"list": [
{
"card": {
"card_type": "american_express",
"created_at": 1729612767,
"customer_id": "cbdemo_douglas",
"expiry_month": 5,
"expiry_year": 2028,
"first_name": "Douglas",
"funding_type": "not_known",
"gateway": "chargebee",
"gateway_account_id": "gw_199Ne4URwspru2qp",
"iin": "371449",
"last4": "8431",
"last_name": "Quaid",
"masked_number": "***********8431",
"object": "card",
"payment_source_id": "pm_19A7lVURwsu9pPnQ",
"resource_version": 1729612767061,
"status": "valid",
"updated_at": 1729612767
},
"customer": {
"allow_direct_debit": false,
"auto_collection": "on",
"card_status": "valid",
"channel": "web",
"company": "Greenplus Enterprises",
"created_at": 1729612766,
"deleted": false,
"email": "douglas_AT_test.com@example.com",
"excess_payments": 0,
"first_name": "Douglas",
"id": "cbdemo_douglas",
"last_name": "Quaid",
"mrr": 0,
"net_term_days": 0,
"object": "customer",
"payment_method": {
"gateway": "chargebee",
"gateway_account_id": "gw_199Ne4URwspru2qp",
"object": "payment_method",
"reference_id": "tok_19A7lVURwsu9hPnP",
"status": "valid",
"type": "card"
},
"phone": "2344903756",
"pii_cleared": "active",
"preferred_currency_code": "GBP",
"primary_payment_source_id": "pm_19A7lVURwsu9pPnQ",
"promotional_credits": 0,
"refundable_credits": 0,
"resource_version": 1729612767062,
"taxability": "taxable",
"unbilled_charges": 0,
"updated_at": 1729612767
}
}
]
}
"""
)

View File

@ -1,19 +1,29 @@
from typing import Any, Optional
from unittest import mock
import aioboto3
import functools
import uuid
from typing import Any, Optional
from unittest import mock
import aioboto3
import posthoganalytics
import psycopg
import pytest
import pytest_asyncio
from asgiref.sync import sync_to_async
from django.conf import settings
from django.test import override_settings
import posthoganalytics
import pytest
import pytest_asyncio
import psycopg
from dlt.common.configuration.specs.aws_credentials import AwsCredentials
from dlt.sources.helpers.rest_client.client import RESTClient
from temporalio.common import RetryPolicy
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import UnsandboxedWorkflowRunner, Worker
from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE
from posthog.hogql.modifiers import create_default_modifiers_for_team
from posthog.hogql.query import execute_hogql_query
from posthog.hogql_queries.insights.funnels.funnel import Funnel
from posthog.hogql_queries.insights.funnels.funnel_query_context import FunnelQueryContext
from posthog.hogql_queries.insights.funnels.funnel_query_context import (
FunnelQueryContext,
)
from posthog.models.team.team import Team
from posthog.schema import (
BreakdownFilter,
@ -26,23 +36,15 @@ from posthog.schema import (
from posthog.temporal.data_imports import ACTIVITIES
from posthog.temporal.data_imports.external_data_job import ExternalDataJobWorkflow
from posthog.temporal.utils import ExternalDataWorkflowInputs
from posthog.warehouse.models.external_table_definitions import external_tables
from posthog.warehouse.models import (
ExternalDataJob,
ExternalDataSource,
ExternalDataSchema,
ExternalDataSource,
)
from temporalio.testing import WorkflowEnvironment
from temporalio.common import RetryPolicy
from temporalio.worker import UnsandboxedWorkflowRunner, Worker
from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE
from posthog.warehouse.models.external_data_job import get_latest_run_if_exists
from dlt.sources.helpers.rest_client.client import RESTClient
from dlt.common.configuration.specs.aws_credentials import AwsCredentials
from posthog.warehouse.models.external_table_definitions import external_tables
from posthog.warehouse.models.join import DataWarehouseJoin
BUCKET_NAME = "test-pipeline"
SESSION = aioboto3.Session()
create_test_client = functools.partial(SESSION.client, endpoint_url=settings.OBJECT_STORAGE_ENDPOINT)
@ -461,6 +463,19 @@ async def test_zendesk_ticket_metric_events(team, zendesk_ticket_metric_events):
)
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_chargebee_customer(team, chargebee_customer):
await _run(
team=team,
schema_name="Customers",
table_name="chargebee_customers",
source_type="Chargebee",
job_inputs={"api_key": "test-key", "site_name": "site-test"},
mock_data_response=[chargebee_customer["list"][0]["customer"]],
)
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_reset_pipeline(team, stripe_balance_transaction):

View File

@ -1,62 +1,79 @@
import re
from dateutil import parser
import uuid
from typing import Any
from psycopg2 import OperationalError
from sentry_sdk import capture_exception
import structlog
import temporalio
from dateutil import parser
from django.db.models import Prefetch
from psycopg2 import OperationalError
from rest_framework import filters, serializers, status, viewsets
from posthog.api.utils import action
from rest_framework.request import Request
from rest_framework.response import Response
from sentry_sdk import capture_exception
from snowflake.connector.errors import DatabaseError, ForbiddenError, ProgrammingError
from sshtunnel import BaseSSHTunnelForwarderError
from posthog.api.routing import TeamAndOrgViewSetMixin
from posthog.warehouse.data_load.service import (
sync_external_data_job_workflow,
delete_external_data_schedule,
cancel_external_data_workflow,
delete_data_import_folder,
is_any_external_data_schema_paused,
trigger_external_data_source_workflow,
)
from posthog.warehouse.models import ExternalDataSource, ExternalDataSchema, ExternalDataJob
from posthog.warehouse.api.external_data_schema import ExternalDataSchemaSerializer, SimpleExternalDataSchemaSerializer
from posthog.api.utils import action
from posthog.cloud_utils import is_cloud
from posthog.hogql.database.database import create_hogql_database
from posthog.temporal.data_imports.pipelines.stripe import validate_credentials as validate_stripe_credentials
from posthog.temporal.data_imports.pipelines.zendesk import validate_credentials as validate_zendesk_credentials
from posthog.temporal.data_imports.pipelines.vitally import validate_credentials as validate_vitally_credentials
from posthog.temporal.data_imports.pipelines.bigquery import (
filter_incremental_fields as filter_bigquery_incremental_fields,
)
from posthog.temporal.data_imports.pipelines.bigquery import (
get_schemas as get_bigquery_schemas,
)
from posthog.temporal.data_imports.pipelines.bigquery import (
validate_credentials as validate_bigquery_credentials,
get_schemas as get_bigquery_schemas,
filter_incremental_fields as filter_bigquery_incremental_fields,
)
from posthog.temporal.data_imports.pipelines.chargebee import (
validate_credentials as validate_chargebee_credentials,
)
from posthog.temporal.data_imports.pipelines.hubspot.auth import (
get_hubspot_access_token_from_code,
)
from posthog.temporal.data_imports.pipelines.schemas import (
PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING,
PIPELINE_TYPE_INCREMENTAL_FIELDS_MAPPING,
PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING,
)
from posthog.temporal.data_imports.pipelines.hubspot.auth import (
get_hubspot_access_token_from_code,
from posthog.temporal.data_imports.pipelines.stripe import (
validate_credentials as validate_stripe_credentials,
)
from posthog.temporal.data_imports.pipelines.vitally import (
validate_credentials as validate_vitally_credentials,
)
from posthog.temporal.data_imports.pipelines.zendesk import (
validate_credentials as validate_zendesk_credentials,
)
from posthog.utils import get_instance_region
from posthog.warehouse.api.external_data_schema import (
ExternalDataSchemaSerializer,
SimpleExternalDataSchemaSerializer,
)
from posthog.warehouse.data_load.service import (
cancel_external_data_workflow,
delete_data_import_folder,
delete_external_data_schedule,
is_any_external_data_schema_paused,
sync_external_data_job_workflow,
trigger_external_data_source_workflow,
)
from posthog.warehouse.models import (
ExternalDataJob,
ExternalDataSchema,
ExternalDataSource,
)
from posthog.warehouse.models.external_data_schema import (
filter_mssql_incremental_fields,
filter_mysql_incremental_fields,
filter_postgres_incremental_fields,
filter_snowflake_incremental_fields,
get_sql_schemas_for_source_type,
get_snowflake_schemas,
get_sql_schemas_for_source_type,
)
import temporalio
from posthog.cloud_utils import is_cloud
from posthog.utils import get_instance_region
from posthog.warehouse.models.ssh_tunnel import SSHTunnel
from sshtunnel import BaseSSHTunnelForwarderError
from snowflake.connector.errors import ProgrammingError, DatabaseError, ForbiddenError
from django.db.models import Prefetch
logger = structlog.get_logger(__name__)
@ -310,6 +327,8 @@ class ExternalDataSourceViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewSet):
new_source_model, snowflake_schemas = self._handle_snowflake_source(request, *args, **kwargs)
elif source_type == ExternalDataSource.Type.BIGQUERY:
new_source_model, bigquery_schemas = self._handle_bigquery_source(request, *args, **kwargs)
elif source_type == ExternalDataSource.Type.CHARGEBEE:
new_source_model = self._handle_chargebee_source(request, *args, **kwargs)
else:
raise NotImplementedError(f"Source type {source_type} not implemented")
@ -434,6 +453,26 @@ class ExternalDataSourceViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewSet):
return new_source_model
def _handle_chargebee_source(self, request: Request, *args: Any, **kwargs: Any) -> ExternalDataSource:
payload = request.data["payload"]
api_key = payload.get("api_key")
site_name = payload.get("site_name")
prefix = request.data.get("prefix", None)
source_type = request.data["source_type"]
new_source_model = ExternalDataSource.objects.create(
source_id=str(uuid.uuid4()),
connection_id=str(uuid.uuid4()),
destination_id=str(uuid.uuid4()),
team=self.team,
status="Running",
source_type=source_type,
job_inputs={"api_key": api_key, "site_name": site_name},
prefix=prefix,
)
return new_source_model
def _handle_zendesk_source(self, request: Request, *args: Any, **kwargs: Any) -> ExternalDataSource:
payload = request.data["payload"]
api_key = payload.get("api_key")
@ -837,6 +876,23 @@ class ExternalDataSourceViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewSet):
]
return Response(status=status.HTTP_200_OK, data=result_mapped_to_options)
elif source_type == ExternalDataSource.Type.CHARGEBEE:
api_key = request.data.get("api_key", "")
site_name = request.data.get("site_name", "")
# Chargebee uses the term 'site' but it is effectively the subdomain
subdomain_regex = re.compile("^[a-zA-Z-]+$")
if not subdomain_regex.match(site_name):
return Response(
status=status.HTTP_400_BAD_REQUEST,
data={"message": "Invalid credentials: Chargebee site name is incorrect"},
)
if not validate_chargebee_credentials(api_key=api_key, site_name=site_name):
return Response(
status=status.HTTP_400_BAD_REQUEST,
data={"message": "Invalid credentials: Chargebee credentials are incorrect"},
)
# Get schemas and validate SQL credentials
if source_type in [

View File

@ -7,7 +7,13 @@ from django.db import models
from posthog.helpers.encrypted_fields import EncryptedJSONField
from posthog.models.team import Team
from posthog.models.utils import CreatedMetaFields, DeletedMetaFields, UpdatedMetaFields, UUIDModel, sane_repr
from posthog.models.utils import (
CreatedMetaFields,
DeletedMetaFields,
UpdatedMetaFields,
UUIDModel,
sane_repr,
)
from posthog.warehouse.util import database_sync_to_async
logger = structlog.get_logger(__name__)
@ -25,6 +31,7 @@ class ExternalDataSource(CreatedMetaFields, UpdatedMetaFields, UUIDModel, Delete
MSSQL = "MSSQL", "MSSQL"
VITALLY = "Vitally", "Vitally"
BIGQUERY = "BigQuery", "BigQuery"
CHARGEBEE = "Chargebee", "Chargebee"
class Status(models.TextChoices):
RUNNING = "Running", "Running"
@ -65,7 +72,10 @@ class ExternalDataSource(CreatedMetaFields, UpdatedMetaFields, UUIDModel, Delete
self.save()
def reload_schemas(self):
from posthog.warehouse.data_load.service import sync_external_data_job_workflow, trigger_external_data_workflow
from posthog.warehouse.data_load.service import (
sync_external_data_job_workflow,
trigger_external_data_workflow,
)
from posthog.warehouse.models.external_data_schema import ExternalDataSchema
for schema in (

View File

@ -55,6 +55,9 @@ select = [
[tool.ruff.lint.mccabe]
max-complexity = 10
[tool.ruff.lint.isort]
combine-as-imports = true
[tool.ruff.lint.per-file-ignores]
"./posthog/queries/column_optimizer/column_optimizer.py" = ["F401"]
"./posthog/migrations/0027_move_elements_to_group.py" = ["T201"]