From 76626f1285978ee7bf661c8bb8d5eaf7e74a916b Mon Sep 17 00:00:00 2001 From: Ross Date: Fri, 1 Nov 2024 17:19:04 +0000 Subject: [PATCH] feat(data-warehouse): Added Chargebee data source (#25818) --- frontend/public/services/chargebee.png | Bin 0 -> 8364 bytes .../data-warehouse/new/sourceWizardLogic.tsx | 20 ++ .../DataWarehouseManagedSourcesTable.tsx | 2 + frontend/src/types.ts | 1 + latest_migrations.manifest | 2 +- ...07_alter_externaldatasource_source_type.py | 32 +++ .../pipelines/chargebee/__init__.py | 254 ++++++++++++++++++ .../pipelines/chargebee/settings.py | 70 +++++ .../data_imports/pipelines/schemas.py | 34 ++- .../workflow_activities/import_data.py | 76 ++++-- .../temporal/tests/data_imports/conftest.py | 72 +++++ .../tests/data_imports/test_end_to_end.py | 51 ++-- posthog/warehouse/api/external_data_source.py | 118 +++++--- .../warehouse/models/external_data_source.py | 14 +- pyproject.toml | 3 + 15 files changed, 670 insertions(+), 79 deletions(-) create mode 100644 frontend/public/services/chargebee.png create mode 100644 posthog/migrations/0507_alter_externaldatasource_source_type.py create mode 100644 posthog/temporal/data_imports/pipelines/chargebee/__init__.py create mode 100644 posthog/temporal/data_imports/pipelines/chargebee/settings.py diff --git a/frontend/public/services/chargebee.png b/frontend/public/services/chargebee.png new file mode 100644 index 0000000000000000000000000000000000000000..eaf01aaa76842de25f13ec52634ea9d59cc3a633 GIT binary patch literal 8364 zcmbt(cQ~9+*YNCGvijoIdBAd+wQ713h(8A_gJ=0I8;iiV*-190&n$0$g*) z>xBcZfjKJbDgscKNPKF8kLx4tG>mis2;u?&9SOiOZV0^$fUhtBtJVNWX8}Oxk=JY} zi<^kFH`jE~)djpb7!L3ulmHJ0K`y007{DdBOL()_E%D}?-)VuCjZMv#opV_%gMvX$=wZk$=Al#-PcEs zjqOsEq<^UrBD>|27L}pwQ)vkcgnL(0_R2NM$carS-j>>~WGW`OAyQ{)72{ zjs07Wtk9+K|3jI7S^Cc?&Qy6KS)u>9O`hoLz4r>vcfJ@0{ES=Hj`*9^*d8MOSJ*#9I%m1wT(L5I8vR7YvqBj`Zbh1P zEu$R_@E-L{k&_0MCmf4=7B zP z^DlpX*sZLs=Brb3h&u`OeYUAGoPnAEZ|wQTc33tGGQ4%P|C<#(v11u(G%)j#_U=i| z)Zlcm?TqSmZ(C?82&x%aYz%lUG&q^Ld>A%Qx=hK%6+{XuFYuqp`|`;DJXA)*=E!^V zdF0HSGJoSUCnbbAv&im6&1M^g4392YmJ6?&Rvhyy7cFV%9USKU&L%Qo1CdK+8Uq%Ay%0) z%{|!Ashl}QChz0WxA~tQ#b>9Zl=mIV&yK$&y<<{E!#t>qr^)yB-iGA`ekH&XE}gR6 zTCVli00KL+%ZJa!mm73ShJRWu8Jz5>B(j;ETV8!7zK~E``5Tcr ziTg2ta1K79B^}CXpGJUcJ~10D-_n6j7+>#^KG*7jlcLL36{hgS+{vE;LThz={`LoQAYQfWdINyNmp$PN$S|El(s&hVGn_j_KU=V}Q$$Kb=?#~<^f-U3-Bm7bdDYCj!e(Fs6RzwNYN-4Q%*k5*2$Cjv z`m!QOAx~>|wPlWMN7kV#qgJ=N_BnN_Ujsj$E$7U8R*MO&*F?Z-9i-Y%{|Q90X95b@ zXF%sKN&c=^%{B@&(OruE>kBdHH-=;^zMwGB1!JB4){QeFQDq577e@Q#E$+px1_F?Y zs78wk?;P&G(Q!8n{|Lx6pD@>ycVASdfPo|ZRKG3Pp$F9TKzsDrW}EIR-;kpTCF_*UhB;yRw@SpW)sN%M7fLVvkW&UD@Z%;lywNYaS%QuRlKk{H9^eC#&lvie*ot z06&o(J<$)+S_1TDo1#qi?~f&HXb}>8gU|O|0WR@=$aVJ8(m>!$vE=wqwoUIXq5NkV z;_B|Hs)M)1_qmgbb3JA+gDRn6Rv}FW7XZqT)VXR@A_1!JNMR3SXr-bFjO->~{bndv zs!1ihofr&;rX{l-hRcwHlxP7TTRsBq6Sd6aht!#yZSmF&pOg)$KopcX0DhjPp}TK; zR=)a7!)3?h=?g-XF!sp@?L4A)39@@iV8Us|^_P_4qeQrhKA?@Gj)Ka41q0LLcKwh8 z_H5tcNh!RLiRfcfpvJO07}{QHm?}**H-aHYDHr9#dp%y$=k4m3zmDS8^nzI{K5=FF zfVwm^dRf$|ESm$O(rCM_U3e8VpFAJ6#7pDSgw{%ZdL7u^dgsTlhM!C`alb$*5u6U`jqo+VRtx(HV8MMR>mL*u%mew6^&*cktwQ3CunWM^APU2NnqZ5 zg3I+u!L>%~FI4E`BDDHt!94Rz5Gl=&g=lbQC}R$Y_(on`mI|UO^G1ylB(TKyS9@yP z*i+e=dpX?WEJ5k1rVJKMrnF7>7shenYGsC(0|FaYI6ws%W z@cz%m@;e825zeD@?E8+>>3Nu$$I35}VsFsCg%?9H&P1_aj!*NEv?V*zwrLgW<* zGw`MaR+5M1S@dTlZ~Ml#g4^6&0Y7P22qUi?oy_2`An}REs3Zm~LuGM}K>H0LJaYW+ zm(ko}rj2Js_Mn#7(2EjFTlpaxDnN>+&*)C#n59frI8jEygKvbgOjy$uck98>YJr3{ z@}1#_dG0C7&qZP1ejtmoZj4-rAI$;VX3`N$KbEek z-kU3E;%z%vscd3@HsglBjE=voY@`O?s>pLe6`cFb%Ow=wCt+bhX`B+k4cx+_O@(b`oP*JU$Vf-}Mt&?x&Nx)$wzX1^QP$pqY=uZ(RxS0I)Rz!yE_&aZMREqbCa^iz~BV9ePa(tP!Bby|k9v6F%KDsQhJnAmuAi8$XT(@Op~q zXRXHaJZd#Et#5K?C+E*hy!fEpT_2Tzd?z%PM!4s+dXgKjg^9CcT9Q1tw>b0A$4bufm6;+KW#vJ|z{1Hj)v5_pm*HefBO@xDgYCw#vz>_?Y`KVEdR%Wf z(PA$?ggoXE%_;7@n)HFDMj?acFLqH_@eSpRmHI%9lOpW>3nROHxEbl@**rVUZEtOU9JWB*~961r&{88->$tW3REKx zHlbMli6F~iljy;=?E2)rIpKS=E;t)<=@PZ*hDjOLZ7W}r-kzewI)0R&(opP5gxc;o z3tYh#?QN*;(M}c5%~qp%iNll&_k-D21&Fpb-n$bi|lOg0!Qon zT#5xssy4t}yV2HQT(B9=3x)F+wLANhYG1LY}j(RyTgqM1C$gf!iK?;W;LKW=Ue*dObiM_)_{2 zW_BUu3uhfU_UUf%5n>KT8j=muaOD1Ex?*K8u|^h``Vq6(EgzoJrHm_lA%5pQ*4JI; z@WSkW7bUgE5MX}85W2}0gwmge8(t@uyq}ws4|ick`_a?Pj9ZE4rAJ>I$2K~X*ga~x z3HhOHKW%V7HDV3Q(1$;=i?1~n>N#{C0^@vd3jeujiS%Sx*NO>rjz)PyA7r|H)n7uK zL&?-=u8Ldaz47N3zU?Z5Z%d(ERJy_Zpw_!9LByYJ;4q*0<#11*t;{q(V(rCi3h85- zf?!N1fe6SD<6aQ+u$Xc4vyhw!Jm>cjgB+Ty!ru&nKO-NcjM{k4?@bcdc%FqWEtJYg zoyyj+Fr@bh{`8EP6*~=U-5s{R{y=0e2E|BP7{BI{s7fzhh3_R~%{b{H5l-uoaKhqo z=R=K0M09-`UVl(qb+s zA)=-UU~J4`-x1i_6zATM8>$D)*Y0F6(=jgSuFFFfq|)smLx1JNZx*EgY~_FrR{!?v z78X(EeP*hhP0xn1-Vb#~a-RWUv=OsA+!nIZXm%`sBGbp*ow}L^edl@6O0bcV=%ZYC z6Eb3KIqmgS%nwrcYDXe-ql@gjQsKe{#?hOp%EPg({$9AQBzeVxYk_^YM}%^Qcz^PY z?QZ|6D(na1fjq2xU^~+-TZO@ejOLtuW-oIHlbdF6$ z+|6IXaklCRr6lEbxHvx)P9O=(aRH72RWIAp)-sGq>c+bE1=?w>nooMB!o>K$B~Fm7 z8@Uz#?72M#J0v6KFsFcyAD*a5&hKe}D!tjjwOaMcu;~ZCf(e%X4EK@g2+JTpf2f_O z{X)z-;rhS4&zP(KRi$_z%v`vHES&i2`XSd#0UIaxBAQioGQ?NyM#2^-#?#=82G=^n+zqlX<+#FoAmx1+SLwN)r)gKXd+UQfDkmS7vwQJ!BL~C}7`7}HTkLUuV|)45t;5tL z-fJT+hFV5pNq1V_5xR4ftw$&2 zFwq5gcGucq)@WF_t5ndohXY!82v5VV`8~@^6e20Z&4-YZ?fdUdwI}&ang-+ZW83VDfz)nUTy4+|t-mEf+tahgOMS@u8|w z83>>yohP_L8V7OidJMi>V#o1s*?zU8Tbb}xA8pq=N3+%j!BwcVIxFjA&iEsdI80{e zdO1jw5Ham&U+)4V`J!81r0KU?Vl06_HBOlCSTeL8B#(xM+dAHyJ|Nk0N7iD6SR-9X z7@H(J2*kU>-f#g;O7{u{Pv;>Nv21x?)|G6AE3Id~!C>VAZL+3af4gg$b( z24SEbA2e_AV@JDgHNd;}UFg@}t6Zlxm7P67_p&754Cm;*9o>I8tukC$yz_$u15Ml^ z)&(EpLEERP3+B(2BzCU~XlKQ%ZiVS9l7>ape>*KRf9~U0@PXZJhU0Fxa0H2aj%=@^ zG%IqSwQHFq&JBi);)UHF<6Rn2I{KU5*nTm9MKl%gyg>{?`^L6*q@|$5zmSnhtu}+P zAiPt-gFw;bqFE=N1T*>*vW_qvl9g0P-Sc@YM=8di0^Twx3Xit;mDjP>K7lGVf84KH zmF$mTM@DXM-zd+Lqtyg1t~@VRt(LswveB0g=f?WktS)Zt3x2P#a$Mm&_FU7&MNG4fr$}o35?N;KHf-t#-n<0!YL^!V~_ZjNUtK?CS){jZ8#$4)|qWYXi(60{chiVdcELx4weu zRwu$-lg)}j!pUvGh$h+b0{%1#tkXK2@2uq%(~+VgYn3r&M-^45rwW2FHANtN>STOY zW%j!ZVg8(87Oj1U)^tIZ3S$T7&5&Q;${~9Bt>DR8Si)!*O5wnK4rJS zo#NRqzkDCkn1_wRYTaojyrr?tBmm+5G5admW%Y6$H6`bXVhx|+0xBIBxDV9i58$$F zZmzBLiko_OJqQ|NxhcFfrbX6^5#o)u<7i9HelnPj-0OI`i&;!}GjiB--%Bxo_3-EW-S^uH<|(gRF5on~87G>66 zyZE8COIhx=Vu0OBHOcE@QEHQKOZOyZ4K#nF@p_>aUw zQ(U3~C}Ii#@AAfX4F3m$ijSO49f4;$lJmae3$1!l02Zoa{B7Ipm?E>$)2vIc!qgd% zr_}t2WC{JlCJ0Mm1DKh%^i5$7jV@nv63n|X2mAOsCZY!~3rf5=)q&AM^wn^a3@Scz zhDWuW)49Eo@B?c@;}g3S7&4uM1GHehQ={&p_Jx7@uq4vRK_dfH9@qL|`2UPXJ|V@p zNYORv`Eotyb?{pHMTm0#dR*I^Vrw^}ZV^#=1ARqjh3gck^3&lC;VPbbX?Req@MD~n ztw$#|43Mv13y%sEq_Yb%KwqrR+hiam@QVDDtqFzK1;?BCd0PX=;<3wcRAtEs!Tro4 z7WVpkT_Pk1^t;Ddg$Pw#X*aoH*MKp9h(|#WW${SA!LUdAV$L9h`36MeTlFNtJ>GGG zg+TxtpPggG7NNyOMs4of`FWA3POIveUJERlZj^W1F$CWf#i@Yea$G3kSA zw0T*t8b6wz8NcBp!#BA_@z>8LAQIbIJ4eQV;hM$Jh{&bf+9z!$tg2zA#9~$r)nVB) zs){GpuqYp@nJWq5Cmd*k^uic;n>(_mS-tu?*L39QXdGMx`oK_tCB#60A^J5OXgO@t zj9^3f?G=+frj^zM9RkB6=)<4fhXaz$HF=q{3*X+0(@C{OM(cKTblBIJ#>GhOn3y`G4PIIH@10EN?dhSXWgIV9(#5NE03KX zb*NaatKu@(5866pe<}#a21yOB&pMEa9$xM8X1aR*#X#}xFfUmb{{;hXo5Ra;xku?N z(2x37@QxT-R-$8AWJpGLzgpo_^3RDog!F?$*4=FiF}d3xTbE@yzCXje@Pc91yJwFa zIpO^6c!WP9NHV1Kzg~okr(_umebHNokZi)Grf~Q0n}uS}uftGKv7{+t{V(xC+0{Rw mVUIW+V`Kl{Ze%TYv literal 0 HcmV?d00001 diff --git a/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx b/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx index 95a525987b8..4656a338d63 100644 --- a/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx +++ b/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx @@ -628,6 +628,26 @@ export const SOURCE_DETAILS: Record = { ], caption: '', }, + Chargebee: { + name: 'Chargebee', + fields: [ + { + name: 'api_key', + label: 'API key', + type: 'text', + required: true, + placeholder: '', + }, + { + type: 'text', + name: 'site_name', + label: 'Site name (subdomain)', + required: true, + placeholder: '', + }, + ], + caption: '', + }, } export const buildKeaFormDefaultFromSourceDetails = ( diff --git a/frontend/src/scenes/data-warehouse/settings/DataWarehouseManagedSourcesTable.tsx b/frontend/src/scenes/data-warehouse/settings/DataWarehouseManagedSourcesTable.tsx index 8a95510bac4..9d9f7265dbd 100644 --- a/frontend/src/scenes/data-warehouse/settings/DataWarehouseManagedSourcesTable.tsx +++ b/frontend/src/scenes/data-warehouse/settings/DataWarehouseManagedSourcesTable.tsx @@ -6,6 +6,7 @@ import { LemonTableLink } from 'lib/lemon-ui/LemonTable/LemonTableLink' import IconAwsS3 from 'public/services/aws-s3.png' import Iconazure from 'public/services/azure.png' import IconBigQuery from 'public/services/bigquery.png' +import IconChargebee from 'public/services/chargebee.png' import IconCloudflare from 'public/services/cloudflare.png' import IconGoogleCloudStorage from 'public/services/google-cloud-storage.png' import IconHubspot from 'public/services/hubspot.png' @@ -193,6 +194,7 @@ export function RenderDataWarehouseSourceIcon({ MSSQL: IconMSSQL, Vitally: IconVitally, BigQuery: IconBigQuery, + Chargebee: IconChargebee, }[type] return ( diff --git a/frontend/src/types.ts b/frontend/src/types.ts index 846b63401d2..ec291c496f8 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -4057,6 +4057,7 @@ export const externalDataSources = [ 'Salesforce', 'Vitally', 'BigQuery', + 'Chargebee', ] as const export type ExternalDataSourceType = (typeof externalDataSources)[number] diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 23c51f37f70..07739d4d910 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0016_rolemembership_organization_member otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0506_productintent_activated_at_and_more +posthog: 0507_alter_externaldatasource_source_type sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/posthog/migrations/0507_alter_externaldatasource_source_type.py b/posthog/migrations/0507_alter_externaldatasource_source_type.py new file mode 100644 index 00000000000..4e84cc9ee4a --- /dev/null +++ b/posthog/migrations/0507_alter_externaldatasource_source_type.py @@ -0,0 +1,32 @@ +# Generated by Django 4.2.15 on 2024-11-01 16:52 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0506_productintent_activated_at_and_more"), + ] + + operations = [ + migrations.AlterField( + model_name="externaldatasource", + name="source_type", + field=models.CharField( + choices=[ + ("Stripe", "Stripe"), + ("Hubspot", "Hubspot"), + ("Postgres", "Postgres"), + ("Zendesk", "Zendesk"), + ("Snowflake", "Snowflake"), + ("Salesforce", "Salesforce"), + ("MySQL", "MySQL"), + ("MSSQL", "MSSQL"), + ("Vitally", "Vitally"), + ("BigQuery", "BigQuery"), + ("Chargebee", "Chargebee"), + ], + max_length=128, + ), + ), + ] diff --git a/posthog/temporal/data_imports/pipelines/chargebee/__init__.py b/posthog/temporal/data_imports/pipelines/chargebee/__init__.py new file mode 100644 index 00000000000..245afb6e5d8 --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/chargebee/__init__.py @@ -0,0 +1,254 @@ +import base64 +from typing import Any, Optional + +import dlt +import requests +from dlt.sources.helpers.requests import Request, Response +from dlt.sources.helpers.rest_client.paginators import BasePaginator + +from posthog.temporal.data_imports.pipelines.rest_source import ( + RESTAPIConfig, + rest_api_resources, +) +from posthog.temporal.data_imports.pipelines.rest_source.typing import EndpointResource + + +def get_resource(name: str, is_incremental: bool) -> EndpointResource: + resources: dict[str, EndpointResource] = { + "Customers": { + "name": "Customers", + "table_name": "customers", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "list[*].customer", + "path": "/v2/customers", + "params": { + # the parameters below can optionally be configured + "updated_at[after]": { + "type": "incremental", + "cursor_path": "updated_at", + "initial_value": 0, # type: ignore + } + if is_incremental + else None, + "limit": 100, + # by default, API does not return deleted resources + "include_deleted": "true", + }, + }, + "table_format": "delta", + }, + # Note: it is possible to filter by event type, but for now we're + # fetching all events + "Events": { + "name": "Events", + "table_name": "events", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "list[*].event", + "path": "/v2/events", + "params": { + # the parameters below can optionally be configured + "occurred_at[after]": { + "type": "incremental", + "cursor_path": "occurred_at", + "initial_value": 0, # type: ignore + } + if is_incremental + else None, + "limit": 100, + }, + }, + "table_format": "delta", + }, + "Invoices": { + "name": "Invoices", + "table_name": "invoices", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "list[*].invoice", + "path": "/v2/invoices", + "params": { + # the parameters below can optionally be configured + "updated_at[after]": { + "type": "incremental", + "cursor_path": "updated_at", + "initial_value": 0, # type: ignore + } + if is_incremental + else None, + "limit": 100, + # by default, API does not return deleted resources + "include_deleted": "true", + }, + }, + "table_format": "delta", + }, + "Orders": { + "name": "Orders", + "table_name": "orders", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "list[*].order", + "path": "/v2/orders", + "params": { + # the parameters below can optionally be configured + "updated_at[after]": { + "type": "incremental", + "cursor_path": "updated_at", + "initial_value": 0, # type: ignore + } + if is_incremental + else None, + "limit": 100, + # by default, API does not return deleted resources + "include_deleted": "true", + }, + }, + "table_format": "delta", + }, + "Subscriptions": { + "name": "Subscriptions", + "table_name": "subscriptions", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "list[*].subscription", + "path": "/v2/subscriptions", + "params": { + # the parameters below can optionally be configured + "updated_at[after]": { + "type": "incremental", + "cursor_path": "updated_at", + "initial_value": 0, # type: ignore + } + if is_incremental + else None, + "limit": 100, + # by default, API does not return deleted resources + "include_deleted": "true", + }, + }, + "table_format": "delta", + }, + "Transactions": { + "name": "Transactions", + "table_name": "transactions", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "list[*].transaction", + "path": "/v2/transactions", + "params": { + # the parameters below can optionally be configured + "updated_at[after]": { + "type": "incremental", + "cursor_path": "updated_at", + "initial_value": 0, # type: ignore + } + if is_incremental + else None, + "limit": 100, + # by default, API does not return deleted resources + "include_deleted": "true", + }, + }, + "table_format": "delta", + }, + } + return resources[name] + + +class ChargebeePaginator(BasePaginator): + def update_state(self, response: Response, data: Optional[list[Any]] = None) -> None: + res = response.json() + + self._next_offset = None + + if not res: + self._has_next_page = False + return + + if "next_offset" in res: + self._has_next_page = True + self._next_offset = res["next_offset"] + else: + self._has_next_page = False + + def update_request(self, request: Request) -> None: + if request.params is None: + request.params = {} + + request.params["offset"] = self._next_offset + + +@dlt.source(max_table_nesting=0) +def chargebee_source( + api_key: str, site_name: str, endpoint: str, team_id: int, job_id: str, is_incremental: bool = False +): + config: RESTAPIConfig = { + "client": { + "base_url": f"https://{site_name}.chargebee.com/api", + "auth": { + "type": "http_basic", + "username": api_key, + "password": "", + }, + "paginator": ChargebeePaginator(), + }, + "resource_defaults": { + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + }, + "resources": [get_resource(endpoint, is_incremental)], + } + + yield from rest_api_resources(config, team_id, job_id) + + +def validate_credentials(api_key: str, site_name: str) -> bool: + basic_token = base64.b64encode(f"{api_key}:".encode("ascii")).decode("ascii") + res = requests.get( + f"https://{site_name}.chargebee.com/api/v2/customers?limit=1", + headers={"Authorization": f"Basic {basic_token}"}, + ) + return res.status_code == 200 diff --git a/posthog/temporal/data_imports/pipelines/chargebee/settings.py b/posthog/temporal/data_imports/pipelines/chargebee/settings.py new file mode 100644 index 00000000000..fe8e1ad94c5 --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/chargebee/settings.py @@ -0,0 +1,70 @@ +from posthog.warehouse.types import IncrementalField, IncrementalFieldType + +ENDPOINTS = ( + "Customers", + "Events", + "Invoices", + "Orders", + "Subscriptions", + "Transactions", +) + +INCREMENTAL_ENDPOINTS = ( + "Customers", + "Events", + "Invoices", + "Orders", + "Subscriptions", + "Transactions", +) + +INCREMENTAL_FIELDS: dict[str, list[IncrementalField]] = { + "Customers": [ + { + "label": "updated_at", + "type": IncrementalFieldType.DateTime, + "field": "updated_at", + "field_type": IncrementalFieldType.Integer, + }, + ], + "Events": [ + { + "label": "occurred_at", + "type": IncrementalFieldType.DateTime, + "field": "occurred_at", + "field_type": IncrementalFieldType.Integer, + }, + ], + "Invoices": [ + { + "label": "updated_at", + "type": IncrementalFieldType.DateTime, + "field": "updated_at", + "field_type": IncrementalFieldType.Integer, + }, + ], + "Orders": [ + { + "label": "updated_at", + "type": IncrementalFieldType.DateTime, + "field": "updated_at", + "field_type": IncrementalFieldType.Integer, + }, + ], + "Subscriptions": [ + { + "label": "updated_at", + "type": IncrementalFieldType.DateTime, + "field": "updated_at", + "field_type": IncrementalFieldType.Integer, + }, + ], + "Transactions": [ + { + "label": "updated_at", + "type": IncrementalFieldType.DateTime, + "field": "updated_at", + "field_type": IncrementalFieldType.Integer, + }, + ], +} diff --git a/posthog/temporal/data_imports/pipelines/schemas.py b/posthog/temporal/data_imports/pipelines/schemas.py index 0813cd7446f..53f9a387035 100644 --- a/posthog/temporal/data_imports/pipelines/schemas.py +++ b/posthog/temporal/data_imports/pipelines/schemas.py @@ -1,27 +1,34 @@ -from posthog.warehouse.types import IncrementalField -from posthog.temporal.data_imports.pipelines.zendesk.settings import ( - BASE_ENDPOINTS, - SUPPORT_ENDPOINTS, - INCREMENTAL_ENDPOINTS as ZENDESK_INCREMENTAL_ENDPOINTS, - INCREMENTAL_FIELDS as ZENDESK_INCREMENTAL_FIELDS, +from posthog.temporal.data_imports.pipelines.chargebee.settings import ( + ENDPOINTS as CHARGEBEE_ENDPOINTS, + INCREMENTAL_ENDPOINTS as CHARGEBEE_INCREMENTAL_ENDPOINTS, + INCREMENTAL_FIELDS as CHARGEBEE_INCREMENTAL_FIELDS, ) -from posthog.warehouse.models import ExternalDataSource -from posthog.temporal.data_imports.pipelines.stripe.settings import ( - ENDPOINTS as STRIPE_ENDPOINTS, - INCREMENTAL_ENDPOINTS as STRIPE_INCREMENTAL_ENDPOINTS, - INCREMENTAL_FIELDS as STRIPE_INCREMENTAL_FIELDS, +from posthog.temporal.data_imports.pipelines.hubspot.settings import ( + ENDPOINTS as HUBSPOT_ENDPOINTS, ) -from posthog.temporal.data_imports.pipelines.hubspot.settings import ENDPOINTS as HUBSPOT_ENDPOINTS from posthog.temporal.data_imports.pipelines.salesforce.settings import ( ENDPOINTS as SALESFORCE_ENDPOINTS, INCREMENTAL_ENDPOINTS as SALESFORCE_INCREMENTAL_ENDPOINTS, INCREMENTAL_FIELDS as SALESFORCE_INCREMENTAL_FIELDS, ) +from posthog.temporal.data_imports.pipelines.stripe.settings import ( + ENDPOINTS as STRIPE_ENDPOINTS, + INCREMENTAL_ENDPOINTS as STRIPE_INCREMENTAL_ENDPOINTS, + INCREMENTAL_FIELDS as STRIPE_INCREMENTAL_FIELDS, +) from posthog.temporal.data_imports.pipelines.vitally.settings import ( ENDPOINTS as VITALLY_ENDPOINTS, INCREMENTAL_ENDPOINTS as VITALLY_INCREMENTAL_ENDPOINTS, INCREMENTAL_FIELDS as VITALLY_INCREMENTAL_FIELDS, ) +from posthog.temporal.data_imports.pipelines.zendesk.settings import ( + BASE_ENDPOINTS, + INCREMENTAL_ENDPOINTS as ZENDESK_INCREMENTAL_ENDPOINTS, + INCREMENTAL_FIELDS as ZENDESK_INCREMENTAL_FIELDS, + SUPPORT_ENDPOINTS, +) +from posthog.warehouse.models import ExternalDataSource +from posthog.warehouse.types import IncrementalField PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING = { ExternalDataSource.Type.STRIPE: STRIPE_ENDPOINTS, @@ -36,6 +43,7 @@ PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING = { ExternalDataSource.Type.MSSQL: (), ExternalDataSource.Type.VITALLY: VITALLY_ENDPOINTS, ExternalDataSource.Type.BIGQUERY: (), + ExternalDataSource.Type.CHARGEBEE: CHARGEBEE_ENDPOINTS, } PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING = { @@ -49,6 +57,7 @@ PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING = { ExternalDataSource.Type.MSSQL: (), ExternalDataSource.Type.VITALLY: VITALLY_INCREMENTAL_ENDPOINTS, ExternalDataSource.Type.BIGQUERY: (), + ExternalDataSource.Type.CHARGEBEE: CHARGEBEE_INCREMENTAL_ENDPOINTS, } PIPELINE_TYPE_INCREMENTAL_FIELDS_MAPPING: dict[ExternalDataSource.Type, dict[str, list[IncrementalField]]] = { @@ -62,4 +71,5 @@ PIPELINE_TYPE_INCREMENTAL_FIELDS_MAPPING: dict[ExternalDataSource.Type, dict[str ExternalDataSource.Type.MSSQL: {}, ExternalDataSource.Type.VITALLY: VITALLY_INCREMENTAL_FIELDS, ExternalDataSource.Type.BIGQUERY: {}, + ExternalDataSource.Type.CHARGEBEE: CHARGEBEE_INCREMENTAL_FIELDS, } diff --git a/posthog/temporal/data_imports/workflow_activities/import_data.py b/posthog/temporal/data_imports/workflow_activities/import_data.py index a88150fe9bc..50430ded585 100644 --- a/posthog/temporal/data_imports/workflow_activities/import_data.py +++ b/posthog/temporal/data_imports/workflow_activities/import_data.py @@ -1,24 +1,32 @@ import dataclasses +import uuid from datetime import datetime from typing import Any -import uuid +from structlog.typing import FilteringBoundLogger from temporalio import activity from posthog.settings.utils import get_from_env from posthog.temporal.common.heartbeat import Heartbeater +from posthog.temporal.common.logger import bind_temporal_worker_logger from posthog.temporal.data_imports.pipelines.bigquery import delete_table -from posthog.temporal.data_imports.pipelines.helpers import aremove_reset_pipeline, aupdate_job_count - -from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline, PipelineInputs +from posthog.temporal.data_imports.pipelines.helpers import ( + aremove_reset_pipeline, + aupdate_job_count, +) +from posthog.temporal.data_imports.pipelines.pipeline import ( + DataImportPipeline, + PipelineInputs, +) from posthog.warehouse.models import ( ExternalDataJob, ExternalDataSource, get_external_data_job, ) -from posthog.temporal.common.logger import bind_temporal_worker_logger -from structlog.typing import FilteringBoundLogger -from posthog.warehouse.models.external_data_schema import ExternalDataSchema, aget_schema_by_id +from posthog.warehouse.models.external_data_schema import ( + ExternalDataSchema, + aget_schema_by_id, +) from posthog.warehouse.models.ssh_tunnel import SSHTunnel @@ -86,8 +94,10 @@ async def import_data_activity(inputs: ImportDataActivityInputs): reset_pipeline=reset_pipeline, ) elif model.pipeline.source_type == ExternalDataSource.Type.HUBSPOT: - from posthog.temporal.data_imports.pipelines.hubspot.auth import hubspot_refresh_access_token from posthog.temporal.data_imports.pipelines.hubspot import hubspot + from posthog.temporal.data_imports.pipelines.hubspot.auth import ( + hubspot_refresh_access_token, + ) hubspot_access_code = model.pipeline.job_inputs.get("hubspot_secret_key", None) refresh_token = model.pipeline.job_inputs.get("hubspot_refresh_token", None) @@ -117,9 +127,13 @@ async def import_data_activity(inputs: ImportDataActivityInputs): ExternalDataSource.Type.MSSQL, ]: if is_posthog_team(inputs.team_id): - from posthog.temporal.data_imports.pipelines.sql_database_v2 import sql_source_for_type + from posthog.temporal.data_imports.pipelines.sql_database_v2 import ( + sql_source_for_type, + ) else: - from posthog.temporal.data_imports.pipelines.sql_database import sql_source_for_type + from posthog.temporal.data_imports.pipelines.sql_database import ( + sql_source_for_type, + ) host = model.pipeline.job_inputs.get("host") port = model.pipeline.job_inputs.get("port") @@ -208,9 +222,13 @@ async def import_data_activity(inputs: ImportDataActivityInputs): ) elif model.pipeline.source_type == ExternalDataSource.Type.SNOWFLAKE: if is_posthog_team(inputs.team_id): - from posthog.temporal.data_imports.pipelines.sql_database_v2 import snowflake_source + from posthog.temporal.data_imports.pipelines.sql_database_v2 import ( + snowflake_source, + ) else: - from posthog.temporal.data_imports.pipelines.sql_database import snowflake_source + from posthog.temporal.data_imports.pipelines.sql_database import ( + snowflake_source, + ) account_id = model.pipeline.job_inputs.get("account_id") user = model.pipeline.job_inputs.get("user") @@ -244,9 +262,13 @@ async def import_data_activity(inputs: ImportDataActivityInputs): reset_pipeline=reset_pipeline, ) elif model.pipeline.source_type == ExternalDataSource.Type.SALESFORCE: - from posthog.temporal.data_imports.pipelines.salesforce.auth import salesforce_refresh_access_token - from posthog.temporal.data_imports.pipelines.salesforce import salesforce_source from posthog.models.integration import aget_integration_by_id + from posthog.temporal.data_imports.pipelines.salesforce import ( + salesforce_source, + ) + from posthog.temporal.data_imports.pipelines.salesforce.auth import ( + salesforce_refresh_access_token, + ) salesforce_integration_id = model.pipeline.job_inputs.get("salesforce_integration_id", None) @@ -328,7 +350,9 @@ async def import_data_activity(inputs: ImportDataActivityInputs): reset_pipeline=reset_pipeline, ) elif model.pipeline.source_type == ExternalDataSource.Type.BIGQUERY: - from posthog.temporal.data_imports.pipelines.sql_database_v2 import bigquery_source + from posthog.temporal.data_imports.pipelines.sql_database_v2 import ( + bigquery_source, + ) dataset_id = model.pipeline.job_inputs.get("dataset_id") project_id = model.pipeline.job_inputs.get("project_id") @@ -377,6 +401,28 @@ async def import_data_activity(inputs: ImportDataActivityInputs): token_uri=token_uri, ) logger.info(f"Deleting bigquery temp destination table: {destination_table}") + elif model.pipeline.source_type == ExternalDataSource.Type.CHARGEBEE: + from posthog.temporal.data_imports.pipelines.chargebee import ( + chargebee_source, + ) + + source = chargebee_source( + api_key=model.pipeline.job_inputs.get("api_key"), + site_name=model.pipeline.job_inputs.get("site_name"), + endpoint=schema.name, + team_id=inputs.team_id, + job_id=inputs.run_id, + is_incremental=schema.is_incremental, + ) + + return await _run( + job_inputs=job_inputs, + source=source, + logger=logger, + inputs=inputs, + schema=schema, + reset_pipeline=reset_pipeline, + ) else: raise ValueError(f"Source type {model.pipeline.source_type} not supported") diff --git a/posthog/temporal/tests/data_imports/conftest.py b/posthog/temporal/tests/data_imports/conftest.py index 2460ca45d0a..55042162379 100644 --- a/posthog/temporal/tests/data_imports/conftest.py +++ b/posthog/temporal/tests/data_imports/conftest.py @@ -1,4 +1,5 @@ import json + import pytest @@ -896,3 +897,74 @@ def zendesk_ticket_metric_events(): } """ ) + + +@pytest.fixture +def chargebee_customer(): + # note that chargebee actually return both a customer and a card if one is + # attached (we ignore this when ingesting the data) + return json.loads( + """ + { + "list": [ + { + "card": { + "card_type": "american_express", + "created_at": 1729612767, + "customer_id": "cbdemo_douglas", + "expiry_month": 5, + "expiry_year": 2028, + "first_name": "Douglas", + "funding_type": "not_known", + "gateway": "chargebee", + "gateway_account_id": "gw_199Ne4URwspru2qp", + "iin": "371449", + "last4": "8431", + "last_name": "Quaid", + "masked_number": "***********8431", + "object": "card", + "payment_source_id": "pm_19A7lVURwsu9pPnQ", + "resource_version": 1729612767061, + "status": "valid", + "updated_at": 1729612767 + }, + "customer": { + "allow_direct_debit": false, + "auto_collection": "on", + "card_status": "valid", + "channel": "web", + "company": "Greenplus Enterprises", + "created_at": 1729612766, + "deleted": false, + "email": "douglas_AT_test.com@example.com", + "excess_payments": 0, + "first_name": "Douglas", + "id": "cbdemo_douglas", + "last_name": "Quaid", + "mrr": 0, + "net_term_days": 0, + "object": "customer", + "payment_method": { + "gateway": "chargebee", + "gateway_account_id": "gw_199Ne4URwspru2qp", + "object": "payment_method", + "reference_id": "tok_19A7lVURwsu9hPnP", + "status": "valid", + "type": "card" + }, + "phone": "2344903756", + "pii_cleared": "active", + "preferred_currency_code": "GBP", + "primary_payment_source_id": "pm_19A7lVURwsu9pPnQ", + "promotional_credits": 0, + "refundable_credits": 0, + "resource_version": 1729612767062, + "taxability": "taxable", + "unbilled_charges": 0, + "updated_at": 1729612767 + } + } + ] + } + """ + ) diff --git a/posthog/temporal/tests/data_imports/test_end_to_end.py b/posthog/temporal/tests/data_imports/test_end_to_end.py index 98d81cc153b..b9c74c4efef 100644 --- a/posthog/temporal/tests/data_imports/test_end_to_end.py +++ b/posthog/temporal/tests/data_imports/test_end_to_end.py @@ -1,19 +1,29 @@ -from typing import Any, Optional -from unittest import mock -import aioboto3 import functools import uuid +from typing import Any, Optional +from unittest import mock + +import aioboto3 +import posthoganalytics +import psycopg +import pytest +import pytest_asyncio from asgiref.sync import sync_to_async from django.conf import settings from django.test import override_settings -import posthoganalytics -import pytest -import pytest_asyncio -import psycopg +from dlt.common.configuration.specs.aws_credentials import AwsCredentials +from dlt.sources.helpers.rest_client.client import RESTClient +from temporalio.common import RetryPolicy +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import UnsandboxedWorkflowRunner, Worker + +from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE from posthog.hogql.modifiers import create_default_modifiers_for_team from posthog.hogql.query import execute_hogql_query from posthog.hogql_queries.insights.funnels.funnel import Funnel -from posthog.hogql_queries.insights.funnels.funnel_query_context import FunnelQueryContext +from posthog.hogql_queries.insights.funnels.funnel_query_context import ( + FunnelQueryContext, +) from posthog.models.team.team import Team from posthog.schema import ( BreakdownFilter, @@ -26,23 +36,15 @@ from posthog.schema import ( from posthog.temporal.data_imports import ACTIVITIES from posthog.temporal.data_imports.external_data_job import ExternalDataJobWorkflow from posthog.temporal.utils import ExternalDataWorkflowInputs -from posthog.warehouse.models.external_table_definitions import external_tables from posthog.warehouse.models import ( ExternalDataJob, - ExternalDataSource, ExternalDataSchema, + ExternalDataSource, ) -from temporalio.testing import WorkflowEnvironment -from temporalio.common import RetryPolicy -from temporalio.worker import UnsandboxedWorkflowRunner, Worker -from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE from posthog.warehouse.models.external_data_job import get_latest_run_if_exists -from dlt.sources.helpers.rest_client.client import RESTClient -from dlt.common.configuration.specs.aws_credentials import AwsCredentials - +from posthog.warehouse.models.external_table_definitions import external_tables from posthog.warehouse.models.join import DataWarehouseJoin - BUCKET_NAME = "test-pipeline" SESSION = aioboto3.Session() create_test_client = functools.partial(SESSION.client, endpoint_url=settings.OBJECT_STORAGE_ENDPOINT) @@ -461,6 +463,19 @@ async def test_zendesk_ticket_metric_events(team, zendesk_ticket_metric_events): ) +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_chargebee_customer(team, chargebee_customer): + await _run( + team=team, + schema_name="Customers", + table_name="chargebee_customers", + source_type="Chargebee", + job_inputs={"api_key": "test-key", "site_name": "site-test"}, + mock_data_response=[chargebee_customer["list"][0]["customer"]], + ) + + @pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_reset_pipeline(team, stripe_balance_transaction): diff --git a/posthog/warehouse/api/external_data_source.py b/posthog/warehouse/api/external_data_source.py index 249a8e43bc2..8760a48f5fd 100644 --- a/posthog/warehouse/api/external_data_source.py +++ b/posthog/warehouse/api/external_data_source.py @@ -1,62 +1,79 @@ import re -from dateutil import parser import uuid from typing import Any -from psycopg2 import OperationalError -from sentry_sdk import capture_exception import structlog +import temporalio +from dateutil import parser +from django.db.models import Prefetch +from psycopg2 import OperationalError from rest_framework import filters, serializers, status, viewsets -from posthog.api.utils import action from rest_framework.request import Request from rest_framework.response import Response +from sentry_sdk import capture_exception +from snowflake.connector.errors import DatabaseError, ForbiddenError, ProgrammingError +from sshtunnel import BaseSSHTunnelForwarderError from posthog.api.routing import TeamAndOrgViewSetMixin -from posthog.warehouse.data_load.service import ( - sync_external_data_job_workflow, - delete_external_data_schedule, - cancel_external_data_workflow, - delete_data_import_folder, - is_any_external_data_schema_paused, - trigger_external_data_source_workflow, -) -from posthog.warehouse.models import ExternalDataSource, ExternalDataSchema, ExternalDataJob -from posthog.warehouse.api.external_data_schema import ExternalDataSchemaSerializer, SimpleExternalDataSchemaSerializer +from posthog.api.utils import action +from posthog.cloud_utils import is_cloud from posthog.hogql.database.database import create_hogql_database -from posthog.temporal.data_imports.pipelines.stripe import validate_credentials as validate_stripe_credentials -from posthog.temporal.data_imports.pipelines.zendesk import validate_credentials as validate_zendesk_credentials -from posthog.temporal.data_imports.pipelines.vitally import validate_credentials as validate_vitally_credentials +from posthog.temporal.data_imports.pipelines.bigquery import ( + filter_incremental_fields as filter_bigquery_incremental_fields, +) +from posthog.temporal.data_imports.pipelines.bigquery import ( + get_schemas as get_bigquery_schemas, +) from posthog.temporal.data_imports.pipelines.bigquery import ( validate_credentials as validate_bigquery_credentials, - get_schemas as get_bigquery_schemas, - filter_incremental_fields as filter_bigquery_incremental_fields, +) +from posthog.temporal.data_imports.pipelines.chargebee import ( + validate_credentials as validate_chargebee_credentials, +) +from posthog.temporal.data_imports.pipelines.hubspot.auth import ( + get_hubspot_access_token_from_code, ) from posthog.temporal.data_imports.pipelines.schemas import ( PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING, PIPELINE_TYPE_INCREMENTAL_FIELDS_MAPPING, PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING, ) -from posthog.temporal.data_imports.pipelines.hubspot.auth import ( - get_hubspot_access_token_from_code, +from posthog.temporal.data_imports.pipelines.stripe import ( + validate_credentials as validate_stripe_credentials, +) +from posthog.temporal.data_imports.pipelines.vitally import ( + validate_credentials as validate_vitally_credentials, +) +from posthog.temporal.data_imports.pipelines.zendesk import ( + validate_credentials as validate_zendesk_credentials, +) +from posthog.utils import get_instance_region +from posthog.warehouse.api.external_data_schema import ( + ExternalDataSchemaSerializer, + SimpleExternalDataSchemaSerializer, +) +from posthog.warehouse.data_load.service import ( + cancel_external_data_workflow, + delete_data_import_folder, + delete_external_data_schedule, + is_any_external_data_schema_paused, + sync_external_data_job_workflow, + trigger_external_data_source_workflow, +) +from posthog.warehouse.models import ( + ExternalDataJob, + ExternalDataSchema, + ExternalDataSource, ) from posthog.warehouse.models.external_data_schema import ( filter_mssql_incremental_fields, filter_mysql_incremental_fields, filter_postgres_incremental_fields, filter_snowflake_incremental_fields, - get_sql_schemas_for_source_type, get_snowflake_schemas, + get_sql_schemas_for_source_type, ) - -import temporalio - -from posthog.cloud_utils import is_cloud -from posthog.utils import get_instance_region from posthog.warehouse.models.ssh_tunnel import SSHTunnel -from sshtunnel import BaseSSHTunnelForwarderError -from snowflake.connector.errors import ProgrammingError, DatabaseError, ForbiddenError -from django.db.models import Prefetch - logger = structlog.get_logger(__name__) @@ -310,6 +327,8 @@ class ExternalDataSourceViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewSet): new_source_model, snowflake_schemas = self._handle_snowflake_source(request, *args, **kwargs) elif source_type == ExternalDataSource.Type.BIGQUERY: new_source_model, bigquery_schemas = self._handle_bigquery_source(request, *args, **kwargs) + elif source_type == ExternalDataSource.Type.CHARGEBEE: + new_source_model = self._handle_chargebee_source(request, *args, **kwargs) else: raise NotImplementedError(f"Source type {source_type} not implemented") @@ -434,6 +453,26 @@ class ExternalDataSourceViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewSet): return new_source_model + def _handle_chargebee_source(self, request: Request, *args: Any, **kwargs: Any) -> ExternalDataSource: + payload = request.data["payload"] + api_key = payload.get("api_key") + site_name = payload.get("site_name") + prefix = request.data.get("prefix", None) + source_type = request.data["source_type"] + + new_source_model = ExternalDataSource.objects.create( + source_id=str(uuid.uuid4()), + connection_id=str(uuid.uuid4()), + destination_id=str(uuid.uuid4()), + team=self.team, + status="Running", + source_type=source_type, + job_inputs={"api_key": api_key, "site_name": site_name}, + prefix=prefix, + ) + + return new_source_model + def _handle_zendesk_source(self, request: Request, *args: Any, **kwargs: Any) -> ExternalDataSource: payload = request.data["payload"] api_key = payload.get("api_key") @@ -837,6 +876,23 @@ class ExternalDataSourceViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewSet): ] return Response(status=status.HTTP_200_OK, data=result_mapped_to_options) + elif source_type == ExternalDataSource.Type.CHARGEBEE: + api_key = request.data.get("api_key", "") + site_name = request.data.get("site_name", "") + + # Chargebee uses the term 'site' but it is effectively the subdomain + subdomain_regex = re.compile("^[a-zA-Z-]+$") + if not subdomain_regex.match(site_name): + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={"message": "Invalid credentials: Chargebee site name is incorrect"}, + ) + + if not validate_chargebee_credentials(api_key=api_key, site_name=site_name): + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={"message": "Invalid credentials: Chargebee credentials are incorrect"}, + ) # Get schemas and validate SQL credentials if source_type in [ diff --git a/posthog/warehouse/models/external_data_source.py b/posthog/warehouse/models/external_data_source.py index e84da4761d7..22a46bc4cb0 100644 --- a/posthog/warehouse/models/external_data_source.py +++ b/posthog/warehouse/models/external_data_source.py @@ -7,7 +7,13 @@ from django.db import models from posthog.helpers.encrypted_fields import EncryptedJSONField from posthog.models.team import Team -from posthog.models.utils import CreatedMetaFields, DeletedMetaFields, UpdatedMetaFields, UUIDModel, sane_repr +from posthog.models.utils import ( + CreatedMetaFields, + DeletedMetaFields, + UpdatedMetaFields, + UUIDModel, + sane_repr, +) from posthog.warehouse.util import database_sync_to_async logger = structlog.get_logger(__name__) @@ -25,6 +31,7 @@ class ExternalDataSource(CreatedMetaFields, UpdatedMetaFields, UUIDModel, Delete MSSQL = "MSSQL", "MSSQL" VITALLY = "Vitally", "Vitally" BIGQUERY = "BigQuery", "BigQuery" + CHARGEBEE = "Chargebee", "Chargebee" class Status(models.TextChoices): RUNNING = "Running", "Running" @@ -65,7 +72,10 @@ class ExternalDataSource(CreatedMetaFields, UpdatedMetaFields, UUIDModel, Delete self.save() def reload_schemas(self): - from posthog.warehouse.data_load.service import sync_external_data_job_workflow, trigger_external_data_workflow + from posthog.warehouse.data_load.service import ( + sync_external_data_job_workflow, + trigger_external_data_workflow, + ) from posthog.warehouse.models.external_data_schema import ExternalDataSchema for schema in ( diff --git a/pyproject.toml b/pyproject.toml index bcc00106149..0049771f5ae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,6 +55,9 @@ select = [ [tool.ruff.lint.mccabe] max-complexity = 10 +[tool.ruff.lint.isort] +combine-as-imports = true + [tool.ruff.lint.per-file-ignores] "./posthog/queries/column_optimizer/column_optimizer.py" = ["F401"] "./posthog/migrations/0027_move_elements_to_group.py" = ["T201"]