diff --git a/bin/celery-queues.env b/bin/celery-queues.env
index 746280ebb6e..d9dc4d97c33 100644
--- a/bin/celery-queues.env
+++ b/bin/celery-queues.env
@@ -2,4 +2,4 @@
# Important: Add new queues to make Celery consume tasks from them.
# NOTE: Keep in sync with posthog/tasks/utils.py
-CELERY_WORKER_QUEUES=celery,stats,email,analytics_queries,analytics_limited,long_running,exports,subscription_delivery,usage_reports,session_replay_embeddings,session_replay_general,session_replay_persistence
+CELERY_WORKER_QUEUES=celery,stats,email,analytics_queries,analytics_limited,long_running,exports,subscription_delivery,usage_reports,session_replay_embeddings,session_replay_general,session_replay_persistence,integrations
diff --git a/frontend/__snapshots__/components-integrations-slack--slack-integration-added--light.png b/frontend/__snapshots__/components-integrations-slack--slack-integration-added--light.png
index efe7f131e58..8f52cf03135 100644
Binary files a/frontend/__snapshots__/components-integrations-slack--slack-integration-added--light.png and b/frontend/__snapshots__/components-integrations-slack--slack-integration-added--light.png differ
diff --git a/frontend/__snapshots__/scenes-app-insights--funnel-top-to-bottom-breakdown--light.png b/frontend/__snapshots__/scenes-app-insights--funnel-top-to-bottom-breakdown--light.png
index 9866de0ef51..e708c340d53 100644
Binary files a/frontend/__snapshots__/scenes-app-insights--funnel-top-to-bottom-breakdown--light.png and b/frontend/__snapshots__/scenes-app-insights--funnel-top-to-bottom-breakdown--light.png differ
diff --git a/frontend/__snapshots__/scenes-app-insights--funnel-top-to-bottom-edit--dark.png b/frontend/__snapshots__/scenes-app-insights--funnel-top-to-bottom-edit--dark.png
index 3c5aa1d0ace..d83ed27fe8d 100644
Binary files a/frontend/__snapshots__/scenes-app-insights--funnel-top-to-bottom-edit--dark.png and b/frontend/__snapshots__/scenes-app-insights--funnel-top-to-bottom-edit--dark.png differ
diff --git a/frontend/src/lib/integrations/IntegrationView.tsx b/frontend/src/lib/integrations/IntegrationView.tsx
index b3322ed02b7..31cd12e82eb 100644
--- a/frontend/src/lib/integrations/IntegrationView.tsx
+++ b/frontend/src/lib/integrations/IntegrationView.tsx
@@ -1,3 +1,5 @@
+import { LemonBanner } from '@posthog/lemon-ui'
+import api from 'lib/api'
import { UserActivityIndicator } from 'lib/components/UserActivityIndicator/UserActivityIndicator'
import { IntegrationType } from '~/types'
@@ -9,26 +11,50 @@ export function IntegrationView({
integration: IntegrationType
suffix?: JSX.Element
}): JSX.Element {
+ const errors = (integration.errors && integration.errors?.split(',')) || []
+
return (
-
-
-
-
+
+
+
+
- Connected to
{integration.name}
+
+ Connected to {integration.display_name}
+
+ {integration.created_by ? (
+
+ ) : null}
- {integration.created_by ? (
-
- ) : null}
+
+ {suffix}
- {suffix}
+ {errors.length > 0 && (
+
+
+ {errors[0] === 'TOKEN_REFRESH_FAILED'
+ ? 'Authentication token could not be refreshed. Please reconnect.'
+ : `There was an error with this integration: ${errors[0]}`}
+
+
+ )}
)
}
diff --git a/frontend/src/lib/integrations/integrationsLogic.ts b/frontend/src/lib/integrations/integrationsLogic.ts
index 15b56991009..63e826bbff9 100644
--- a/frontend/src/lib/integrations/integrationsLogic.ts
+++ b/frontend/src/lib/integrations/integrationsLogic.ts
@@ -4,6 +4,7 @@ import { loaders } from 'kea-loaders'
import { router, urlToAction } from 'kea-router'
import api from 'lib/api'
import { fromParamsGivenUrl } from 'lib/utils'
+import IconHubspot from 'public/services/hubspot.png'
import IconSalesforce from 'public/services/salesforce.png'
import IconSlack from 'public/services/slack.png'
import { preflightLogic } from 'scenes/PreflightCheck/preflightLogic'
@@ -13,9 +14,10 @@ import { IntegrationKind, IntegrationType } from '~/types'
import type { integrationsLogicType } from './integrationsLogicType'
-const ICONS = {
+const ICONS: Record
= {
slack: IconSlack,
salesforce: IconSalesforce,
+ hubspot: IconHubspot,
}
export const integrationsLogic = kea([
@@ -41,12 +43,6 @@ export const integrationsLogic = kea([
return res.results.map((integration) => {
return {
...integration,
- name:
- integration.kind === 'slack'
- ? integration.config.team.name
- : integration.kind === 'salesforce'
- ? integration.config.instance_url
- : 'Unknown',
// TODO: Make the icons endpoint independent of hog functions
icon_url: ICONS[integration.kind],
}
diff --git a/frontend/src/lib/lemon-ui/LemonButton/LemonButton.tsx b/frontend/src/lib/lemon-ui/LemonButton/LemonButton.tsx
index 301f1910048..dd73779802c 100644
--- a/frontend/src/lib/lemon-ui/LemonButton/LemonButton.tsx
+++ b/frontend/src/lib/lemon-ui/LemonButton/LemonButton.tsx
@@ -75,6 +75,7 @@ export type SideAction = Pick<
LemonButtonProps,
| 'onClick'
| 'to'
+ | 'disableClientSideRouting'
| 'disabled'
| 'icon'
| 'type'
diff --git a/frontend/src/scenes/pipeline/hogfunctions/HogFunctionInputs.tsx b/frontend/src/scenes/pipeline/hogfunctions/HogFunctionInputs.tsx
index 716501847f7..0e74eaefd67 100644
--- a/frontend/src/scenes/pipeline/hogfunctions/HogFunctionInputs.tsx
+++ b/frontend/src/scenes/pipeline/hogfunctions/HogFunctionInputs.tsx
@@ -273,6 +273,7 @@ function HogFunctionInputSchemaControls({ value, onChange, onDone }: HogFunction
options={[
{ label: 'Slack', value: 'slack' },
{ label: 'Salesforce', value: 'salesforce' },
+ { label: 'Hubspot', value: 'hubspot' },
]}
placeholder="Choose kind"
/>
diff --git a/frontend/src/scenes/pipeline/hogfunctions/integrations/HogFunctionInputIntegration.tsx b/frontend/src/scenes/pipeline/hogfunctions/integrations/HogFunctionInputIntegration.tsx
index 52e3754b0a0..b77da718b89 100644
--- a/frontend/src/scenes/pipeline/hogfunctions/integrations/HogFunctionInputIntegration.tsx
+++ b/frontend/src/scenes/pipeline/hogfunctions/integrations/HogFunctionInputIntegration.tsx
@@ -50,7 +50,7 @@ function HogFunctionIntegrationChoice({
icon: ,
onClick: () => onChange?.(integration.id),
active: integration.id === value,
- label: integration.name,
+ label: integration.display_name,
})) || []),
],
}
diff --git a/frontend/src/test/mocks.ts b/frontend/src/test/mocks.ts
index d85c01f4642..5d2e4f2fff3 100644
--- a/frontend/src/test/mocks.ts
+++ b/frontend/src/test/mocks.ts
@@ -219,7 +219,7 @@ export const mockIntegration: IntegrationType = {
},
},
icon_url: '',
- name: '',
+ display_name: '',
created_at: '2022-01-01T00:09:00',
created_by: mockBasicUser,
}
diff --git a/frontend/src/types.ts b/frontend/src/types.ts
index 3873a9ca1df..ebd54615eec 100644
--- a/frontend/src/types.ts
+++ b/frontend/src/types.ts
@@ -3539,16 +3539,17 @@ export enum EventDefinitionType {
EventPostHog = 'event_posthog',
}
-export type IntegrationKind = 'slack' | 'salesforce'
+export type IntegrationKind = 'slack' | 'salesforce' | 'hubspot'
export interface IntegrationType {
id: number
kind: IntegrationKind
- name: string
+ display_name: string
icon_url: string
config: any
created_by?: UserBasicType | null
created_at: string
+ errors?: string
}
export interface SlackChannelType {
diff --git a/latest_migrations.manifest b/latest_migrations.manifest
index 07ebd2016f4..d9d360ae6dc 100644
--- a/latest_migrations.manifest
+++ b/latest_migrations.manifest
@@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0016_rolemembership_organization_member
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
-posthog: 0446_annotation_dashboard_alter_annotation_scope
+posthog: 0447_alter_integration_kind
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
diff --git a/plugin-server/src/cdp/hog-function-manager.ts b/plugin-server/src/cdp/hog-function-manager.ts
index 59c765651b2..17de1514744 100644
--- a/plugin-server/src/cdp/hog-function-manager.ts
+++ b/plugin-server/src/cdp/hog-function-manager.ts
@@ -28,6 +28,11 @@ export class HogFunctionManager {
const { hogFunctionIds, teamId } = JSON.parse(message)
await this.reloadHogFunctions(teamId, hogFunctionIds)
},
+
+ 'reload-integrations': async (message) => {
+ const { integrationIds, teamId } = JSON.parse(message)
+ await this.reloadIntegrations(teamId, integrationIds)
+ },
})
}
@@ -149,6 +154,15 @@ export class HogFunctionManager {
return items[0] ?? null
}
+ public reloadIntegrations(teamId: Team['id'], ids: IntegrationType['id'][]): Promise {
+ // We need to find all hog functions that depend on these integrations and re-enrich them
+
+ const items: HogFunctionType[] = Object.values(this.cache[teamId] || {})
+ const itemsToReload = items.filter((item) => ids.some((id) => item.depends_on_integration_ids?.has(id)))
+
+ return this.enrichWithIntegrations(itemsToReload)
+ }
+
public async enrichWithIntegrations(items: HogFunctionType[]): Promise {
const integrationIds: number[] = []
@@ -158,6 +172,8 @@ export class HogFunctionManager {
const input = item.inputs?.[schema.key]
if (input && typeof input.value === 'number') {
integrationIds.push(input.value)
+ item.depends_on_integration_ids = item.depends_on_integration_ids || new Set()
+ item.depends_on_integration_ids.add(input.value)
}
}
})
diff --git a/plugin-server/src/cdp/types.ts b/plugin-server/src/cdp/types.ts
index 1d74071736f..5b3e75fc028 100644
--- a/plugin-server/src/cdp/types.ts
+++ b/plugin-server/src/cdp/types.ts
@@ -211,6 +211,7 @@ export type HogFunctionType = {
inputs_schema?: HogFunctionInputSchemaType[]
inputs?: Record
filters?: HogFunctionFilters | null
+ depends_on_integration_ids?: Set
}
export type HogFunctionInputType = {
diff --git a/plugin-server/tests/cdp/hog-function-manager.test.ts b/plugin-server/tests/cdp/hog-function-manager.test.ts
index 4de599cb9d7..044c4d280d8 100644
--- a/plugin-server/tests/cdp/hog-function-manager.test.ts
+++ b/plugin-server/tests/cdp/hog-function-manager.test.ts
@@ -113,6 +113,7 @@ describe('HogFunctionManager', () => {
value: integrations[0].id,
},
},
+ depends_on_integration_ids: new Set([integrations[0].id]),
},
])
diff --git a/posthog/api/integration.py b/posthog/api/integration.py
index e595b412782..db9a9f92cbb 100644
--- a/posthog/api/integration.py
+++ b/posthog/api/integration.py
@@ -20,15 +20,8 @@ class IntegrationSerializer(serializers.ModelSerializer):
class Meta:
model = Integration
- fields = [
- "id",
- "kind",
- "config",
- "created_at",
- "created_by",
- "errors",
- ]
- read_only_fields = ["id", "created_at", "created_by", "errors"]
+ fields = ["id", "kind", "config", "created_at", "created_by", "errors", "display_name"]
+ read_only_fields = ["id", "created_at", "created_by", "errors", "display_name"]
def create(self, validated_data: Any) -> Any:
request = self.context["request"]
diff --git a/posthog/cdp/templates/hubspot/template_hubspot.py b/posthog/cdp/templates/hubspot/template_hubspot.py
index 15c000c4057..cf0f370cafa 100644
--- a/posthog/cdp/templates/hubspot/template_hubspot.py
+++ b/posthog/cdp/templates/hubspot/template_hubspot.py
@@ -21,7 +21,7 @@ let body := {
}
let headers := {
- 'Authorization': f'Bearer {inputs.access_token}',
+ 'Authorization': f'Bearer {inputs.oauth.access_token}',
'Content-Type': 'application/json'
}
@@ -51,16 +51,14 @@ if (res.status == 409) {
} else {
print('Contact created successfully!')
}
-
-
""".strip(),
inputs_schema=[
{
- "key": "access_token",
- "type": "string",
- "label": "Access token",
- "description": "Can be acquired under Profile Preferences -> Integrations -> Private Apps",
- "secret": True,
+ "key": "oauth",
+ "type": "integration",
+ "integration": "hubspot",
+ "label": "Hubspot connection",
+ "secret": False,
"required": True,
},
{
diff --git a/posthog/cdp/templates/hubspot/test_template_hubspot.py b/posthog/cdp/templates/hubspot/test_template_hubspot.py
index d9c2133a2e1..98adbe86f5d 100644
--- a/posthog/cdp/templates/hubspot/test_template_hubspot.py
+++ b/posthog/cdp/templates/hubspot/test_template_hubspot.py
@@ -7,7 +7,7 @@ class TestTemplateHubspot(BaseHogFunctionTemplateTest):
def _inputs(self, **kwargs):
inputs = {
- "access_token": "TOKEN",
+ "oauth": {"access_token": "TOKEN"},
"email": "example@posthog.com",
"properties": {
"company": "PostHog",
diff --git a/posthog/migrations/0447_alter_integration_kind.py b/posthog/migrations/0447_alter_integration_kind.py
new file mode 100644
index 00000000000..c21aaa27c10
--- /dev/null
+++ b/posthog/migrations/0447_alter_integration_kind.py
@@ -0,0 +1,19 @@
+# Generated by Django 4.2.14 on 2024-07-18 14:26
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+ dependencies = [
+ ("posthog", "0446_annotation_dashboard_alter_annotation_scope"),
+ ]
+
+ operations = [
+ migrations.AlterField(
+ model_name="integration",
+ name="kind",
+ field=models.CharField(
+ choices=[("slack", "Slack"), ("salesforce", "Salesforce"), ("hubspot", "Hubspot")], max_length=10
+ ),
+ ),
+ ]
diff --git a/posthog/models/integration.py b/posthog/models/integration.py
index 6238a5fa738..675fff704c0 100644
--- a/posthog/models/integration.py
+++ b/posthog/models/integration.py
@@ -3,7 +3,7 @@ import hashlib
import hmac
import time
from datetime import timedelta
-from typing import Any, Literal
+from typing import Any, Literal, Optional
from urllib.parse import urlencode
from django.db import models
@@ -15,12 +15,29 @@ from django.conf import settings
from posthog.cache_utils import cache_for
from posthog.models.instance_setting import get_instance_settings
from posthog.models.user import User
+import structlog
+
+from posthog.plugins.plugin_server_api import reload_integrations_on_workers
+
+logger = structlog.get_logger(__name__)
+
+
+def dot_get(d: Any, path: str, default: Any = None) -> Any:
+ for key in path.split("."):
+ if not isinstance(d, dict):
+ return default
+ d = d.get(key, default)
+ return d
+
+
+ERROR_TOKEN_REFRESH_FAILED = "TOKEN_REFRESH_FAILED"
class Integration(models.Model):
class IntegrationKind(models.TextChoices):
SLACK = "slack"
SALESFORCE = "salesforce"
+ HUBSPOT = "hubspot"
class Meta:
constraints = [
@@ -46,6 +63,14 @@ class Integration(models.Model):
created_at: models.DateTimeField = models.DateTimeField(auto_now_add=True, blank=True)
created_by: models.ForeignKey = models.ForeignKey("User", on_delete=models.SET_NULL, null=True, blank=True)
+ @property
+ def display_name(self) -> str:
+ if self.kind in OauthIntegration.supported_kinds:
+ oauth_config = OauthIntegration.oauth_config_for_kind(self.kind)
+ return dot_get(self.config, oauth_config.name_path, self.integration_id)
+
+ return f"ID: {self.integration_id}"
+
@dataclass
class OauthConfig:
@@ -55,16 +80,17 @@ class OauthConfig:
client_secret: str
scope: str
id_path: str
+ name_path: str
+ token_info_url: Optional[str] = None
+ token_info_config_fields: Optional[list[str]] = None
class OauthIntegration:
- supported_kinds = ["slack", "salesforce"]
+ supported_kinds = ["slack", "salesforce", "hubspot"]
integration: Integration
- kind: str
- def __init__(self, integration: Integration, kind: str) -> None:
+ def __init__(self, integration: Integration) -> None:
self.integration = integration
- self.kind = kind
@classmethod
@cache_for(timedelta(minutes=5))
@@ -88,6 +114,7 @@ class OauthIntegration:
client_secret=from_settings["SLACK_APP_CLIENT_SECRET"],
scope="channels:read,groups:read,chat:write,chat:write.customize",
id_path="team.id",
+ name_path="team.name",
)
elif kind == "salesforce":
if not settings.SALESFORCE_CONSUMER_KEY or not settings.SALESFORCE_CONSUMER_SECRET:
@@ -100,6 +127,22 @@ class OauthIntegration:
client_secret=settings.SALESFORCE_CONSUMER_SECRET,
scope="full",
id_path="instance_url",
+ name_path="instance_url",
+ )
+ elif kind == "hubspot":
+ if not settings.HUBSPOT_APP_CLIENT_ID or not settings.HUBSPOT_APP_CLIENT_SECRET:
+ raise NotImplementedError("Hubspot app not configured")
+
+ return OauthConfig(
+ authorize_url="https://app.hubspot.com/oauth/authorize",
+ token_url="https://api.hubapi.com/oauth/v1/token",
+ token_info_url="https://api.hubapi.com/oauth/v1/access-tokens/:access_token",
+ token_info_config_fields=["hub_id", "hub_domain", "user", "user_id"],
+ client_id=settings.HUBSPOT_APP_CLIENT_ID,
+ client_secret=settings.HUBSPOT_APP_CLIENT_SECRET,
+ scope="tickets crm.objects.contacts.write sales-email-read crm.objects.companies.read crm.objects.deals.read crm.objects.contacts.read crm.objects.quotes.read",
+ id_path="hub_id",
+ name_path="hub_domain",
)
raise NotImplementedError(f"Oauth config for kind {kind} not implemented")
@@ -125,7 +168,7 @@ class OauthIntegration:
@classmethod
def integration_from_oauth_response(
- cls, kind: str, team_id: str, created_by: User, params: dict[str, str]
+ cls, kind: str, team_id: int, created_by: User, params: dict[str, str]
) -> Integration:
oauth_config = cls.oauth_config_for_kind(kind)
@@ -145,10 +188,23 @@ class OauthIntegration:
if res.status_code != 200 or not config.get("access_token"):
raise Exception("Oauth error")
- integration_id: Any = config
+ if oauth_config.token_info_url:
+ # If token info url is given we call it and check the integration id from there
+ token_info_res = requests.get(
+ oauth_config.token_info_url.replace(":access_token", config["access_token"]),
+ headers={"Authorization": f"Bearer {config['access_token']}"},
+ )
- for key in oauth_config.id_path.split("."):
- integration_id = integration_id.get(key)
+ if token_info_res.status_code == 200:
+ data = token_info_res.json()
+ if oauth_config.token_info_config_fields:
+ for field in oauth_config.token_info_config_fields:
+ config[field] = dot_get(data, field)
+
+ integration_id = dot_get(config, oauth_config.id_path)
+
+ if isinstance(integration_id, int):
+ integration_id = str(integration_id)
if not isinstance(integration_id, str):
raise Exception("Oauth error")
@@ -161,6 +217,8 @@ class OauthIntegration:
"id_token": config.pop("id_token", None),
}
+ config["refreshed_at"] = int(time.time())
+
integration, created = Integration.objects.update_or_create(
team_id=team_id,
kind=kind,
@@ -172,8 +230,56 @@ class OauthIntegration:
},
)
+ if integration.errors:
+ integration.errors = ""
+ integration.save()
+
return integration
+ def access_token_expired(self, time_threshold: Optional[timedelta] = None) -> bool:
+ # Not all integrations have refresh tokens or expiries, so we just return False if we can't check
+
+ refresh_token = self.integration.sensitive_config.get("refresh_token")
+ expires_in = self.integration.config.get("expires_in")
+ refreshed_at = self.integration.config.get("refreshed_at")
+ if not refresh_token or not expires_in or not refreshed_at:
+ return False
+
+ # To be really safe we refresh if its half way through the expiry
+ time_threshold = time_threshold or timedelta(seconds=expires_in / 2)
+
+ return time.time() > refreshed_at + expires_in - time_threshold.total_seconds()
+
+ def refresh_access_token(self):
+ """
+ Refresh the access token for the integration if necessary
+ """
+
+ oauth_config = self.oauth_config_for_kind(self.integration.kind)
+
+ res = requests.post(
+ oauth_config.token_url,
+ data={
+ "client_id": oauth_config.client_id,
+ "client_secret": oauth_config.client_secret,
+ "refresh_token": self.integration.sensitive_config["refresh_token"],
+ "grant_type": "refresh_token",
+ },
+ )
+
+ config: dict = res.json()
+
+ if res.status_code != 200 or not config.get("access_token"):
+ logger.warning(f"Failed to refresh token for {self}", response=res.text)
+ self.integration.errors = ERROR_TOKEN_REFRESH_FAILED
+ else:
+ logger.info(f"Refreshed access token for {self}")
+ self.integration.sensitive_config["access_token"] = config["access_token"]
+ self.integration.config["expires_in"] = config.get("expires_in")
+ self.integration.config["refreshed_at"] = int(time.time())
+ reload_integrations_on_workers(self.integration.team_id, [self.integration.id])
+ self.integration.save()
+
class SlackIntegrationError(Exception):
pass
diff --git a/posthog/models/test/test_integration_model.py b/posthog/models/test/test_integration_model.py
index 57c87807304..501f6f940e0 100644
--- a/posthog/models/test/test_integration_model.py
+++ b/posthog/models/test/test_integration_model.py
@@ -1,5 +1,12 @@
+from datetime import datetime, timedelta
+import time
+from typing import Optional
+from unittest.mock import patch
+
+from freezegun import freeze_time
+import pytest
from posthog.models.instance_setting import set_instance_setting
-from posthog.models.integration import SlackIntegration
+from posthog.models.integration import Integration, OauthIntegration, SlackIntegration
from posthog.test.base import BaseTest
@@ -20,3 +27,207 @@ class TestIntegrationModel(BaseTest):
"SLACK_APP_CLIENT_SECRET": "client-secret",
"SLACK_APP_SIGNING_SECRET": "not-so-secret",
}
+
+
+class TestOauthIntegrationModel(BaseTest):
+ mock_settings = {
+ "SALESFORCE_CONSUMER_KEY": "salesforce-client-id",
+ "SALESFORCE_CONSUMER_SECRET": "salesforce-client-secret",
+ "HUBSPOT_APP_CLIENT_ID": "hubspot-client-id",
+ "HUBSPOT_APP_CLIENT_SECRET": "hubspot-client-secret",
+ }
+
+ def create_integration(
+ self, kind: str, config: Optional[dict] = None, sensitive_config: Optional[dict] = None
+ ) -> Integration:
+ _config = {"refreshed_at": int(time.time()), "expires_in": 3600}
+ _sensitive_config = {"refresh_token": "REFRESH"}
+ _config.update(config or {})
+ _sensitive_config.update(sensitive_config or {})
+
+ return Integration.objects.create(team=self.team, kind=kind, config=_config, sensitive_config=_sensitive_config)
+
+ def test_authorize_url_raises_if_not_configured(self):
+ with pytest.raises(NotImplementedError):
+ OauthIntegration.authorize_url("salesforce", next="/projects/test")
+
+ def test_authorize_url(self):
+ with self.settings(**self.mock_settings):
+ url = OauthIntegration.authorize_url("salesforce", next="/projects/test")
+ assert (
+ url
+ == "https://login.salesforce.com/services/oauth2/authorize?client_id=salesforce-client-id&scope=full&redirect_uri=https%3A%2F%2Flocalhost%3A8000%2Fintegrations%2Fsalesforce%2Fcallback&response_type=code&state=next%3D%252Fprojects%252Ftest"
+ )
+
+ @patch("posthog.models.integration.requests.post")
+ def test_integration_from_oauth_response(self, mock_post):
+ with self.settings(**self.mock_settings):
+ mock_post.return_value.status_code = 200
+ mock_post.return_value.json.return_value = {
+ "access_token": "FAKES_ACCESS_TOKEN",
+ "refresh_token": "FAKE_REFRESH_TOKEN",
+ "instance_url": "https://fake.salesforce.com",
+ "expires_in": 3600,
+ }
+
+ with freeze_time("2024-01-01T12:00:00Z"):
+ integration = OauthIntegration.integration_from_oauth_response(
+ "salesforce",
+ self.team.id,
+ self.user,
+ {
+ "code": "code",
+ "state": "next=/projects/test",
+ },
+ )
+
+ assert integration.team == self.team
+ assert integration.created_by == self.user
+
+ assert integration.config == {
+ "instance_url": "https://fake.salesforce.com",
+ "refreshed_at": 1704110400,
+ "expires_in": 3600,
+ }
+ assert integration.sensitive_config == {
+ "access_token": "FAKES_ACCESS_TOKEN",
+ "refresh_token": "FAKE_REFRESH_TOKEN",
+ "id_token": None,
+ }
+
+ @patch("posthog.models.integration.requests.post")
+ def test_integration_errors_if_id_cannot_be_generated(self, mock_post):
+ with self.settings(**self.mock_settings):
+ mock_post.return_value.status_code = 200
+ mock_post.return_value.json.return_value = {
+ "access_token": "FAKES_ACCESS_TOKEN",
+ "refresh_token": "FAKE_REFRESH_TOKEN",
+ "not_instance_url": "https://fake.salesforce.com",
+ "expires_in": 3600,
+ }
+
+ with pytest.raises(Exception):
+ OauthIntegration.integration_from_oauth_response(
+ "salesforce",
+ self.team.id,
+ self.user,
+ {
+ "code": "code",
+ "state": "next=/projects/test",
+ },
+ )
+
+ @patch("posthog.models.integration.requests.post")
+ @patch("posthog.models.integration.requests.get")
+ def test_integration_fetches_info_from_token_info_url(self, mock_get, mock_post):
+ with self.settings(**self.mock_settings):
+ mock_post.return_value.status_code = 200
+ mock_post.return_value.json.return_value = {
+ "access_token": "FAKES_ACCESS_TOKEN",
+ "refresh_token": "FAKE_REFRESH_TOKEN",
+ "expires_in": 3600,
+ }
+
+ mock_get.return_value.status_code = 200
+ mock_get.return_value.json.return_value = {
+ "hub_id": "hub_id",
+ "hub_domain": "hub_domain",
+ "user": "user",
+ "user_id": "user_id",
+ "should_not": "be_saved",
+ }
+
+ with freeze_time("2024-01-01T12:00:00Z"):
+ integration = OauthIntegration.integration_from_oauth_response(
+ "hubspot",
+ self.team.id,
+ self.user,
+ {
+ "code": "code",
+ "state": "next=/projects/test",
+ },
+ )
+
+ assert integration.config == {
+ "expires_in": 3600,
+ "hub_id": "hub_id",
+ "hub_domain": "hub_domain",
+ "user": "user",
+ "user_id": "user_id",
+ "refreshed_at": 1704110400,
+ }
+ assert integration.sensitive_config == {
+ "access_token": "FAKES_ACCESS_TOKEN",
+ "refresh_token": "FAKE_REFRESH_TOKEN",
+ "id_token": None,
+ }
+
+ def test_integration_access_token_expired(self):
+ now = datetime.now()
+ with freeze_time(now):
+ integration = self.create_integration(kind="hubspot", config={"expires_in": 1000})
+
+ with freeze_time(now):
+ # Access token is not expired
+ assert not OauthIntegration(integration).access_token_expired()
+
+ with freeze_time(now + timedelta(seconds=1000) - timedelta(seconds=501)):
+ # After the expiry but before the threshold it is not expired
+ assert not OauthIntegration(integration).access_token_expired()
+
+ with freeze_time(now + timedelta(seconds=1000) - timedelta(seconds=499)):
+ # After the threshold it is expired
+ assert OauthIntegration(integration).access_token_expired()
+
+ with freeze_time(now + timedelta(seconds=1000)):
+ # After the threshold it is expired
+ assert OauthIntegration(integration).access_token_expired()
+
+ @patch("posthog.models.integration.reload_integrations_on_workers")
+ @patch("posthog.models.integration.requests.post")
+ def test_refresh_access_token(self, mock_post, mock_reload):
+ mock_post.return_value.status_code = 200
+ mock_post.return_value.json.return_value = {
+ "access_token": "REFRESHED_ACCESS_TOKEN",
+ "expires_in": 1000,
+ }
+
+ integration = self.create_integration(kind="hubspot", config={"expires_in": 1000})
+
+ with freeze_time("2024-01-01T14:00:00Z"):
+ with self.settings(**self.mock_settings):
+ OauthIntegration(integration).refresh_access_token()
+
+ mock_post.assert_called_with(
+ "https://api.hubapi.com/oauth/v1/token",
+ data={
+ "grant_type": "refresh_token",
+ "client_id": "hubspot-client-id",
+ "client_secret": "hubspot-client-secret",
+ "refresh_token": "REFRESH",
+ },
+ )
+
+ assert integration.config["expires_in"] == 1000
+ assert integration.config["refreshed_at"] == 1704117600
+ assert integration.sensitive_config["access_token"] == "REFRESHED_ACCESS_TOKEN"
+
+ mock_reload.assert_called_once_with(self.team.id, [integration.id])
+
+ @patch("posthog.models.integration.reload_integrations_on_workers")
+ @patch("posthog.models.integration.requests.post")
+ def test_refresh_access_token_handles_errors(self, mock_post, mock_reload):
+ mock_post.return_value.status_code = 401
+ mock_post.return_value.json.return_value = {"error": "BROKEN"}
+
+ integration = self.create_integration(kind="hubspot", config={"expires_in": 1000, "refreshed_at": 1700000000})
+
+ with freeze_time("2024-01-01T14:00:00Z"):
+ with self.settings(**self.mock_settings):
+ OauthIntegration(integration).refresh_access_token()
+
+ assert integration.config["expires_in"] == 1000
+ assert integration.config["refreshed_at"] == 1700000000
+ assert integration.errors == "TOKEN_REFRESH_FAILED"
+
+ mock_reload.assert_not_called()
diff --git a/posthog/plugins/plugin_server_api.py b/posthog/plugins/plugin_server_api.py
index eb02b0caede..45b53a71bf1 100644
--- a/posthog/plugins/plugin_server_api.py
+++ b/posthog/plugins/plugin_server_api.py
@@ -37,6 +37,11 @@ def reload_hog_functions_on_workers(team_id: int, hog_function_ids: list[str]):
publish_message("reload-hog-functions", {"teamId": team_id, "hogFunctionIds": hog_function_ids})
+def reload_integrations_on_workers(team_id: int, integration_ids: list[int]):
+ logger.info(f"Reloading integrations {integration_ids} on workers")
+ publish_message("reload-integrations", {"teamId": team_id, "integrationIds": integration_ids})
+
+
def reset_available_product_features_cache_on_workers(organization_id: str):
logger.info(f"Resetting available product features cache for organization {organization_id} on workers")
publish_message(
diff --git a/posthog/tasks/__init__.py b/posthog/tasks/__init__.py
index 8e3e665eb77..22dc1bb28d6 100644
--- a/posthog/tasks/__init__.py
+++ b/posthog/tasks/__init__.py
@@ -9,6 +9,7 @@ from . import (
email,
exporter,
hog_functions,
+ integrations,
process_scheduled_changes,
split_person,
sync_all_organization_available_product_features,
@@ -28,6 +29,7 @@ __all__ = [
"email",
"exporter",
"hog_functions",
+ "integrations",
"process_scheduled_changes",
"split_person",
"sync_all_organization_available_product_features",
diff --git a/posthog/tasks/integrations.py b/posthog/tasks/integrations.py
new file mode 100644
index 00000000000..e0f543874bf
--- /dev/null
+++ b/posthog/tasks/integrations.py
@@ -0,0 +1,31 @@
+from celery import shared_task
+
+from posthog.tasks.utils import CeleryQueue
+
+
+@shared_task(ignore_result=True, queue=CeleryQueue.INTEGRATIONS.value)
+def refresh_integrations() -> int:
+ from posthog.models.integration import Integration, OauthIntegration
+
+ oauth_integrations = Integration.objects.filter(kind__in=OauthIntegration.supported_kinds).all()
+
+ for integration in oauth_integrations:
+ oauth_integration = OauthIntegration(integration)
+
+ if oauth_integration.access_token_expired():
+ refresh_integration.delay(integration.id)
+
+ return 0
+
+
+@shared_task(ignore_result=True, queue=CeleryQueue.INTEGRATIONS.value)
+def refresh_integration(id: int) -> int:
+ from posthog.models.integration import Integration, OauthIntegration
+
+ integration = Integration.objects.get(id=id)
+
+ if integration.kind in OauthIntegration.supported_kinds:
+ oauth_integration = OauthIntegration(integration)
+ oauth_integration.refresh_access_token()
+
+ return 0
diff --git a/posthog/tasks/scheduled.py b/posthog/tasks/scheduled.py
index 779fa229a3a..d94dadf0c69 100644
--- a/posthog/tasks/scheduled.py
+++ b/posthog/tasks/scheduled.py
@@ -8,6 +8,7 @@ from django.conf import settings
from posthog.caching.warming import schedule_warming_for_teams_task
from posthog.celery import app
+from posthog.tasks.integrations import refresh_integrations
from posthog.tasks.tasks import (
calculate_cohort,
calculate_decide_usage,
@@ -330,3 +331,11 @@ def setup_periodic_tasks(sender: Celery, **kwargs: Any) -> None:
calculate_external_data_rows_synced.s(),
name="calculate external data rows synced",
)
+
+ # Check integrations to refresh every minute
+ add_periodic_task_with_expiry(
+ sender,
+ 60,
+ refresh_integrations.s(),
+ name="refresh integrations",
+ )
diff --git a/posthog/tasks/test/test_integrations.py b/posthog/tasks/test/test_integrations.py
new file mode 100644
index 00000000000..14af3f1d6e2
--- /dev/null
+++ b/posthog/tasks/test/test_integrations.py
@@ -0,0 +1,38 @@
+import time
+from typing import Optional
+from unittest.mock import patch
+
+
+from posthog.models.integration import Integration
+from posthog.tasks.integrations import refresh_integrations
+from posthog.test.base import APIBaseTest
+
+
+class TestIntegrationsTasks(APIBaseTest):
+ integrations: list[Integration] = []
+
+ def setUp(self) -> None:
+ super().setUp()
+
+ def create_integration(
+ self, kind: str, config: Optional[dict] = None, sensitive_config: Optional[dict] = None
+ ) -> Integration:
+ _config = {"refreshed_at": int(time.time()), "expires_in": 3600}
+ _sensitive_config = {"refresh_token": "REFRESH"}
+ _config.update(config or {})
+ _sensitive_config.update(sensitive_config or {})
+
+ return Integration.objects.create(team=self.team, kind=kind, config=_config, sensitive_config=_sensitive_config)
+
+ def test_refresh_integrations_schedules_refreshes_for_expired(self) -> None:
+ _integration_1 = self.create_integration("other") # not an oauth one
+ _integration_2 = self.create_integration("slack") # not expired
+ integration_3 = self.create_integration("slack", config={"refreshed_at": time.time() - 3600}) # expired
+ integration_4 = self.create_integration(
+ "slack", config={"refreshed_at": time.time() - 3600 + 170}
+ ) # expired with buffer
+
+ with patch("posthog.tasks.integrations.refresh_integration.delay") as refresh_integration_mock:
+ refresh_integrations()
+ # Both 3 and 4 should be refreshed
+ assert refresh_integration_mock.call_args_list == [((integration_3.id,),), ((integration_4.id,),)]
diff --git a/posthog/tasks/utils.py b/posthog/tasks/utils.py
index 2bd26a38503..34c4dd525af 100644
--- a/posthog/tasks/utils.py
+++ b/posthog/tasks/utils.py
@@ -38,3 +38,4 @@ class CeleryQueue(Enum):
SESSION_REPLAY_EMBEDDINGS = "session_replay_embeddings"
SESSION_REPLAY_PERSISTENCE = "session_replay_persistence"
SESSION_REPLAY_GENERAL = "session_replay_general"
+ INTEGRATIONS = "integrations"