0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-21 13:39:22 +01:00

feat: make rusty-hook Hog-aware, sending responses back via kafka (#23619)

Co-authored-by: Ben White <ben@posthog.com>
This commit is contained in:
Brett Hoerner 2024-07-16 11:12:48 -06:00 committed by GitHub
parent 0adb5d5857
commit 8f4df45984
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 1630 additions and 185 deletions

4
.vscode/launch.json vendored
View File

@ -105,7 +105,9 @@
"DATABASE_URL": "postgres://posthog:posthog@localhost:5432/posthog",
"KAFKA_HOSTS": "localhost:9092",
"WORKER_CONCURRENCY": "2",
"OBJECT_STORAGE_ENABLED": "True"
"OBJECT_STORAGE_ENABLED": "True",
"HOG_HOOK_URL": "http://localhost:3300/hoghook",
"CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS": "*"
},
"presentation": {
"group": "main"

View File

@ -7,6 +7,7 @@ trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT
export DEBUG=${DEBUG:-1}
export SKIP_SERVICE_VERSION_REQUIREMENTS=1
export BILLING_SERVICE_URL=${BILLING_SERVICE_URL:-https://billing.dev.posthog.dev}
export HOG_HOOK_URL=${HOG_HOOK_URL:-http://localhost:3300/hoghook}
service_warning() {
echo -e "\033[0;31m$1 isn't ready. You can run the stack with:\ndocker compose -f docker-compose.dev.yml up\nIf you have already ran that, just make sure that services are starting properly, and sit back.\nWaiting for $1 to start...\033[0m"

View File

@ -1,4 +1,5 @@
import { PluginsServerConfig } from '../types'
import { buildIntegerMatcher } from '../config/config'
import { PluginsServerConfig, ValueMatcher } from '../types'
import { trackedFetch } from '../utils/fetch'
import { status } from '../utils/status'
import { RustyHook } from '../worker/rusty-hook'
@ -9,7 +10,11 @@ export type AsyncFunctionExecutorOptions = {
}
export class AsyncFunctionExecutor {
constructor(private serverConfig: PluginsServerConfig, private rustyHook: RustyHook) {}
hogHookEnabledForTeams: ValueMatcher<number>
constructor(private serverConfig: PluginsServerConfig, private rustyHook: RustyHook) {
this.hogHookEnabledForTeams = buildIntegerMatcher(serverConfig.CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS, true)
}
async execute(
request: HogFunctionInvocationResult,
@ -74,8 +79,10 @@ export class AsyncFunctionExecutor {
// Finally overwrite the args with the sanitized ones
request.asyncFunctionRequest.args = [url, { method, headers, body }]
if (!options?.sync === false) {
// TODO: Add rusty hook support
// If the caller hasn't forced it to be synchronous and the team has the rustyhook enabled, enqueue it
if (!options?.sync && this.hogHookEnabledForTeams(request.teamId)) {
await this.rustyHook.enqueueForHog(request)
return
}
status.info('🦔', `[HogExecutor] Webhook not sent via rustyhook, sending directly instead`)

View File

@ -133,6 +133,7 @@ export function getDefaultConfig(): PluginsServerConfig {
RUSTY_HOOK_FOR_TEAMS: '',
RUSTY_HOOK_ROLLOUT_PERCENTAGE: 0,
RUSTY_HOOK_URL: '',
HOG_HOOK_URL: '',
CAPTURE_CONFIG_REDIS_HOST: null,
STARTUP_PROFILE_DURATION_SECONDS: 300, // 5 minutes
@ -180,6 +181,7 @@ export function getDefaultConfig(): PluginsServerConfig {
CDP_WATCHER_MIN_OBSERVATIONS: 3,
CDP_WATCHER_OVERFLOW_RATING_THRESHOLD: 0.8,
CDP_WATCHER_DISABLED_RATING_THRESHOLD: 0.5,
CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: '',
}
}

View File

@ -102,6 +102,7 @@ export type CdpConfig = {
CDP_WATCHER_MIN_OBSERVATIONS: number
CDP_WATCHER_OVERFLOW_RATING_THRESHOLD: number
CDP_WATCHER_DISABLED_RATING_THRESHOLD: number
CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: string
}
export interface PluginsServerConfig extends CdpConfig {
@ -219,6 +220,7 @@ export interface PluginsServerConfig extends CdpConfig {
RUSTY_HOOK_FOR_TEAMS: string
RUSTY_HOOK_ROLLOUT_PERCENTAGE: number
RUSTY_HOOK_URL: string
HOG_HOOK_URL: string
SKIP_UPDATE_EVENT_AND_PROPERTIES_STEP: boolean
PIPELINE_STEP_STALLED_LOG_TIMEOUT: number
CAPTURE_CONFIG_REDIS_HOST: string | null // Redis cluster to use to coordinate with capture (overflow, routing)

View File

@ -2,6 +2,7 @@ import { Webhook } from '@posthog/plugin-scaffold'
import * as Sentry from '@sentry/node'
import fetch from 'node-fetch'
import { HogFunctionInvocationResult } from '../cdp/types'
import { buildIntegerMatcher } from '../config/config'
import { PluginsServerConfig, ValueMatcher } from '../types'
import { isProdEnv } from '../utils/env-utils'
@ -29,7 +30,11 @@ export class RustyHook {
constructor(
private serverConfig: Pick<
PluginsServerConfig,
'RUSTY_HOOK_URL' | 'RUSTY_HOOK_FOR_TEAMS' | 'RUSTY_HOOK_ROLLOUT_PERCENTAGE' | 'EXTERNAL_REQUEST_TIMEOUT_MS'
| 'RUSTY_HOOK_URL'
| 'HOG_HOOK_URL'
| 'RUSTY_HOOK_FOR_TEAMS'
| 'RUSTY_HOOK_ROLLOUT_PERCENTAGE'
| 'EXTERNAL_REQUEST_TIMEOUT_MS'
>
) {
this.enabledForTeams = buildIntegerMatcher(serverConfig.RUSTY_HOOK_FOR_TEAMS, true)
@ -122,4 +127,49 @@ export class RustyHook {
return true
}
public async enqueueForHog(payload: HogFunctionInvocationResult): Promise<boolean> {
// This is a temporary copy of `enqueueIfEnabledForTeam` above for Hog fetches because the
// API differs. It will likely be replaced with a Kafka topic soon.
const body = JSON.stringify(payload)
// We attempt to enqueue into the rusty-hook service until we succeed. This is deliberatly
// designed to block up the consumer if rusty-hook is down or if we deploy code that
// sends malformed requests. The entire purpose of rusty-hook is to reliably deliver webhooks,
// so we'd rather leave items in the Kafka topic until we manage to get them into rusty-hook.
let attempt = 0
while (true) {
try {
attempt += 1
const response = await fetch(this.serverConfig.HOG_HOOK_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body,
// Sure, it's not an external request, but we should have a timeout and this is as
// good as any.
timeout: this.serverConfig.EXTERNAL_REQUEST_TIMEOUT_MS,
})
if (response.ok) {
// Success, exit the loop.
break
}
// Throw to unify error handling below.
throw new Error(
`rusty-hook for Hog returned ${response.status} ${response.statusText}: ${await response.text()}`
)
} catch (error) {
status.error('🔴', 'Webhook enqueue to rusty-hook for Hog failed', { error, attempt })
Sentry.captureException(error)
}
const delayMs = Math.min(2 ** (attempt - 1) * RUSTY_HOOK_BASE_DELAY_MS, MAX_RUSTY_HOOK_DELAY_MS)
await sleep(delayMs)
}
return true
}
}

View File

@ -200,7 +200,10 @@ describe('CDP Processed Events Consuner', () => {
},
})
expect(decodeKafkaMessage(mockProducer.produce.mock.calls[2][0])).toEqual({
const msg = decodeKafkaMessage(mockProducer.produce.mock.calls[2][0])
// Parse body so it can match by object equality rather than exact string equality
msg.value.asyncFunctionRequest.args[1].body = JSON.parse(msg.value.asyncFunctionRequest.args[1].body)
expect(msg).toEqual({
key: expect.any(String),
topic: 'cdp_function_callbacks_test',
value: {
@ -225,7 +228,7 @@ describe('CDP Processed Events Consuner', () => {
'https://example.com/posthog-webhook',
{
headers: { version: 'v=1.0.0' },
body: JSON.stringify({
body: {
event: {
uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0',
name: '$pageview',
@ -241,7 +244,7 @@ describe('CDP Processed Events Consuner', () => {
person: null,
event_url:
'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null-test',
}),
},
method: 'POST',
},
],

743
rust/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -42,10 +42,11 @@ futures = { version = "0.3.29" }
governor = { version = "0.5.1", features = ["dashmap"] }
http = { version = "1.1.0" }
http-body-util = "0.1.0"
httpmock = "0.7.0"
metrics = "0.22.0"
metrics-exporter-prometheus = "0.14.0"
once_cell = "1.18.0"
opentelemetry = { version = "0.22.0", features = ["trace"]}
opentelemetry = { version = "0.22.0", features = ["trace"] }
opentelemetry-otlp = "0.15.0"
opentelemetry_sdk = { version = "0.22.1", features = ["trace", "rt-tokio"] }
rand = "0.8.5"
@ -76,6 +77,6 @@ tower = "0.4.13"
tower-http = { version = "0.5.2", features = ["cors", "limit", "trace"] }
tracing = "0.1.40"
tracing-opentelemetry = "0.23.0"
tracing-subscriber = { version="0.3.18", features = ["env-filter"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
url = { version = "2.5.0 " }
uuid = { version = "1.6.1", features = ["v7", "serde"] }

25
rust/bin/start-hoghooks Executable file
View File

@ -0,0 +1,25 @@
#!/bin/bash
set -ex
trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT
cargo build
export RUST_LOG=${DEBUG:-debug}
SQLX_QUERY_LEVEL=${SQLX_QUERY_LEVEL:-warn}
export RUST_LOG=$RUST_LOG,sqlx::query=$SQLX_QUERY_LEVEL
export HOG_MODE=true
DATABASE_NAME=${DEBUG:-hoghooks}
export DATABASE_URL=postgres://posthog:posthog@localhost:5432/$DATABASE_NAME
export ALLOW_INTERNAL_IPS=true
sqlx database create -D "$DATABASE_URL"
sqlx migrate run -D "$DATABASE_URL"
./target/debug/hook-api &
./target/debug/hook-worker &
./target/debug/hook-janitor &
wait

View File

@ -2,7 +2,7 @@ use envconfig::Envconfig;
#[derive(Envconfig)]
pub struct Config {
#[envconfig(from = "BIND_HOST", default = "0.0.0.0")]
#[envconfig(from = "BIND_HOST", default = "::")]
pub host: String,
#[envconfig(from = "BIND_PORT", default = "3300")]
@ -19,6 +19,9 @@ pub struct Config {
#[envconfig(default = "5000000")]
pub max_body_size: usize,
#[envconfig(default = "false")]
pub hog_mode: bool,
}
impl Config {

View File

@ -5,17 +5,32 @@ use hook_common::pgqueue::PgQueue;
use super::webhook;
pub fn add_routes(router: Router, pg_pool: PgQueue, max_body_size: usize) -> Router {
router
pub fn add_routes(
router: Router,
pg_pool: PgQueue,
hog_mode: bool,
max_body_size: usize,
) -> Router {
let router = router
.route("/", routing::get(index))
.route("/_readiness", routing::get(index))
.route("/_liveness", routing::get(index)) // No async loop for now, just check axum health
.route(
"/webhook",
routing::post(webhook::post)
.route("/_liveness", routing::get(index)); // No async loop for now, just check axum health
if hog_mode {
router.route(
"/hoghook",
routing::post(webhook::post_hoghook)
.with_state(pg_pool)
.layer(RequestBodyLimitLayer::new(max_body_size)),
)
} else {
router.route(
"/webhook",
routing::post(webhook::post_webhook)
.with_state(pg_pool)
.layer(RequestBodyLimitLayer::new(max_body_size)),
)
}
}
pub async fn index() -> &'static str {
@ -37,8 +52,9 @@ mod tests {
#[sqlx::test(migrations = "../migrations")]
async fn index(db: PgPool) {
let pg_queue = PgQueue::new_from_pool("test_index", db).await;
let hog_mode = false;
let app = add_routes(Router::new(), pg_queue, 1_000_000);
let app = add_routes(Router::new(), pg_queue, hog_mode, 1_000_000);
let response = app
.oneshot(Request::builder().uri("/").body(Body::empty()).unwrap())

View File

@ -1,11 +1,14 @@
use std::collections::HashMap;
use std::time::Instant;
use axum::{extract::State, http::StatusCode, Json};
use hook_common::webhook::{WebhookJobMetadata, WebhookJobParameters};
use serde_derive::Deserialize;
use serde_json::Value;
use url::Url;
use hook_common::pgqueue::{NewJob, PgQueue};
use hook_common::webhook::HttpMethod;
use serde::Serialize;
use tracing::{debug, error};
@ -29,7 +32,7 @@ fn default_max_attempts() -> u32 {
3
}
pub async fn post(
pub async fn post_webhook(
State(pg_queue): State<PgQueue>,
Json(payload): Json<WebhookPostRequestBody>,
) -> Result<Json<WebhookPostResponse>, (StatusCode, Json<WebhookPostResponse>)> {
@ -62,6 +65,106 @@ pub async fn post(
Ok(Json(WebhookPostResponse { error: None }))
}
#[derive(Debug, Deserialize)]
pub struct HogFetchParameters {
pub body: Option<String>,
pub headers: Option<HashMap<String, String>>,
pub method: Option<HttpMethod>,
}
// Hoghook expects a JSON payload in the format of `HogFunctionInvocationResult` (as seen in
// plugin-server), but we accept a plain `Json<Value>` via Axum here, and this is why:
// * The reason we don't decode that into a `HogFunctionInvocationResult`-shaped Rust struct is that
// there's no benefit in mirroring the exact shape of that type (and keeping it sync with the
// plugin-server type).
// * Hoghook only cares about a small subset of the payload (the `asyncFunctionRequest` field), and
// the reason we don't decode *that* into a Rust struct is because the function args are a simple
// array (because this type is used for more than just `fetch` requests), and so we would need to
// manually validate and destructure the array elements anyway.
// * Additionally, don't want to discard the rest of the payload because we pass it back to the
// plugin-server after receiving the response body from the remote server. By accepting a plain
// `Json<Value>` we only decode the JSON once, we can do our minimal validation/extraction, and we
// can save the rest of the payload for later.
pub async fn post_hoghook(
State(pg_queue): State<PgQueue>,
Json(mut payload): Json<Value>,
) -> Result<Json<WebhookPostResponse>, (StatusCode, Json<WebhookPostResponse>)> {
debug!("received payload: {:?}", payload);
let parameters: WebhookJobParameters = match &mut payload {
Value::Object(object) => {
let async_fn_request = object
.get("asyncFunctionRequest")
.ok_or_else(|| bad_request("missing required field 'asyncFunctionRequest'"))?;
let name = async_fn_request
.get("name")
.ok_or_else(|| bad_request("missing required field 'asyncFunctionRequest.name'"))?;
if name != "fetch" {
return Err(bad_request("asyncFunctionRequest.name must be 'fetch'"));
}
let args = async_fn_request
.get("args")
.ok_or_else(|| bad_request("missing required field 'asyncFunctionRequest.args'"))?;
// Note that the URL is parsed (and thus validated as a valid URL) as part of
// `get_hostname` below.
let url = args.get(0).ok_or_else(|| {
bad_request("missing required field 'asyncFunctionRequest.args[0]'")
})?;
let fetch_options: HogFetchParameters = if let Some(value) = args.get(1) {
serde_json::from_value(value.clone()).map_err(|_| {
bad_request("failed to deserialize asyncFunctionRequest.args[1]")
})?
} else {
HogFetchParameters {
body: None,
headers: None,
method: None,
}
};
WebhookJobParameters {
body: fetch_options.body.unwrap_or("".to_owned()),
headers: fetch_options.headers.unwrap_or_default(),
method: fetch_options.method.unwrap_or(HttpMethod::POST),
url: url
.as_str()
.ok_or_else(|| bad_request("url must be a string"))?
.to_owned(),
}
}
_ => return Err(bad_request("expected JSON object")),
};
let url_hostname = get_hostname(&parameters.url)?;
let max_attempts = default_max_attempts() as i32;
let job = NewJob::new(max_attempts, payload, parameters, url_hostname.as_str());
let start_time = Instant::now();
pg_queue.enqueue(job).await.map_err(internal_error)?;
let elapsed_time = start_time.elapsed().as_secs_f64();
metrics::histogram!("webhook_api_enqueue").record(elapsed_time);
Ok(Json(WebhookPostResponse { error: None }))
}
fn bad_request(msg: &str) -> (StatusCode, Json<WebhookPostResponse>) {
error!(msg);
(
StatusCode::BAD_REQUEST,
Json(WebhookPostResponse {
error: Some(msg.to_owned()),
}),
)
}
fn internal_error<E>(err: E) -> (StatusCode, Json<WebhookPostResponse>)
where
E: std::error::Error,
@ -76,23 +179,11 @@ where
}
fn get_hostname(url_str: &str) -> Result<String, (StatusCode, Json<WebhookPostResponse>)> {
let url = Url::parse(url_str).map_err(|_| {
(
StatusCode::BAD_REQUEST,
Json(WebhookPostResponse {
error: Some("could not parse url".to_owned()),
}),
)
})?;
let url = Url::parse(url_str).map_err(|_| bad_request("could not parse url"))?;
match url.host_str() {
Some(hostname) => Ok(hostname.to_owned()),
None => Err((
StatusCode::BAD_REQUEST,
Json(WebhookPostResponse {
error: Some("couldn't extract hostname from url".to_owned()),
}),
)),
None => Err(bad_request("couldn't extract hostname from url")),
}
}
@ -119,8 +210,9 @@ mod tests {
#[sqlx::test(migrations = "../migrations")]
async fn webhook_success(db: PgPool) {
let pg_queue = PgQueue::new_from_pool("test_index", db).await;
let hog_mode = false;
let app = add_routes(Router::new(), pg_queue, MAX_BODY_SIZE);
let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE);
let mut headers = collections::HashMap::new();
headers.insert("Content-Type".to_owned(), "application/json".to_owned());
@ -161,8 +253,9 @@ mod tests {
#[sqlx::test(migrations = "../migrations")]
async fn webhook_bad_url(db: PgPool) {
let pg_queue = PgQueue::new_from_pool("test_index", db).await;
let hog_mode = false;
let app = add_routes(Router::new(), pg_queue, MAX_BODY_SIZE);
let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE);
let response = app
.oneshot(
@ -198,8 +291,9 @@ mod tests {
#[sqlx::test(migrations = "../migrations")]
async fn webhook_payload_missing_fields(db: PgPool) {
let pg_queue = PgQueue::new_from_pool("test_index", db).await;
let hog_mode = false;
let app = add_routes(Router::new(), pg_queue, MAX_BODY_SIZE);
let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE);
let response = app
.oneshot(
@ -219,8 +313,9 @@ mod tests {
#[sqlx::test(migrations = "../migrations")]
async fn webhook_payload_not_json(db: PgPool) {
let pg_queue = PgQueue::new_from_pool("test_index", db).await;
let hog_mode = false;
let app = add_routes(Router::new(), pg_queue, MAX_BODY_SIZE);
let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE);
let response = app
.oneshot(
@ -240,8 +335,9 @@ mod tests {
#[sqlx::test(migrations = "../migrations")]
async fn webhook_payload_body_too_large(db: PgPool) {
let pg_queue = PgQueue::new_from_pool("test_index", db).await;
let hog_mode = false;
let app = add_routes(Router::new(), pg_queue, MAX_BODY_SIZE);
let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE);
let bytes: Vec<u8> = vec![b'a'; MAX_BODY_SIZE + 1];
let long_string = String::from_utf8_lossy(&bytes);
@ -276,4 +372,126 @@ mod tests {
assert_eq!(response.status(), StatusCode::PAYLOAD_TOO_LARGE);
}
#[derive(sqlx::FromRow, Debug)]
struct TestJobRow {
parameters: Value,
metadata: Value,
target: String,
}
#[sqlx::test(migrations = "../migrations")]
async fn hoghook_success(db: PgPool) {
let pg_queue = PgQueue::new_from_pool("test_index", db.clone()).await;
let hog_mode = true;
let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE);
let valid_payloads = vec![
(
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com"]}}"#,
r#"{"body": "", "headers": {}, "method": "POST", "url": "http://example.com"}"#,
),
(
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"method": "GET"}]}}"#,
r#"{"body": "", "headers": {}, "method": "GET", "url": "http://example.com"}"#,
),
(
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"body": "hello, world"}]}}"#,
r#"{"body": "hello, world", "headers": {}, "method": "POST", "url": "http://example.com"}"#,
),
(
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"headers": {"k": "v"}}]}}"#,
r#"{"body": "", "headers": {"k": "v"}, "method": "POST", "url": "http://example.com"}"#,
),
(
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"method": "GET", "body": "hello, world", "headers": {"k": "v"}}]}, "otherField": true}"#,
r#"{"body": "hello, world", "headers": {"k": "v"}, "method": "GET", "url": "http://example.com"}"#,
),
];
for (payload, expected_parameters) in valid_payloads {
let mut headers = collections::HashMap::new();
headers.insert("Content-Type".to_owned(), "application/json".to_owned());
let response = app
.clone()
.oneshot(
Request::builder()
.method(http::Method::POST)
.uri("/hoghook")
.header(http::header::CONTENT_TYPE, "application/json")
.body(Body::from(payload.to_owned()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = response.into_body().collect().await.unwrap().to_bytes();
assert_eq!(&body[..], b"{}");
let mut conn = db.acquire().await.unwrap();
let row = sqlx::query_as::<_, TestJobRow>(
"SELECT parameters, metadata, target FROM job_queue;",
)
.fetch_one(&mut *conn)
.await
.unwrap();
assert_eq!(
row.parameters,
serde_json::from_str::<Value>(expected_parameters).unwrap()
);
assert_eq!(
row.metadata,
serde_json::from_str::<Value>(payload).unwrap()
);
assert_eq!(row.target, "example.com");
sqlx::query("DELETE FROM job_queue")
.execute(&mut *conn)
.await
.unwrap();
}
}
#[sqlx::test(migrations = "../migrations")]
async fn hoghook_bad_requests(db: PgPool) {
let pg_queue = PgQueue::new_from_pool("test_index", db.clone()).await;
let hog_mode = true;
let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE);
let valid_payloads = vec![
r#"{}"#,
r#"{"asyncFunctionRequest":{}"#,
r#"{"asyncFunctionRequest":{"name":"not-fetch","args":[]}}"#,
r#"{"asyncFunctionRequest":{"name":"fetch"}}"#,
r#"{"asyncFunctionRequest":{"name":"fetch","args":{}}}"#,
r#"{"asyncFunctionRequest":{"name":"fetch","args":[]}}"#,
r#"{"asyncFunctionRequest":{"name":"fetch","args":["not-url"]}}"#,
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"method": "not-method"}]}}"#,
];
for payload in valid_payloads {
let mut headers = collections::HashMap::new();
headers.insert("Content-Type".to_owned(), "application/json".to_owned());
let response = app
.clone()
.oneshot(
Request::builder()
.method(http::Method::POST)
.uri("/hoghook")
.header(http::header::CONTENT_TYPE, "application/json")
.body(Body::from(payload.to_owned()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
}
}
}

View File

@ -34,7 +34,12 @@ async fn main() {
.await
.expect("failed to initialize queue");
let app = handlers::add_routes(Router::new(), pg_queue, config.max_body_size);
let app = handlers::add_routes(
Router::new(),
pg_queue,
config.hog_mode,
config.max_body_size,
);
let app = setup_metrics_routes(app);
match listen(app, config.bind()).await {

View File

@ -10,9 +10,12 @@ workspace = true
async-trait = { workspace = true }
axum = { workspace = true, features = ["http2"] }
chrono = { workspace = true }
envconfig = { workspace = true }
health = { path = "../common/health" }
http = { workspace = true }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
rdkafka = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }

View File

@ -0,0 +1,22 @@
use envconfig::Envconfig;
#[derive(Envconfig, Clone)]
pub struct KafkaConfig {
#[envconfig(default = "20")]
pub kafka_producer_linger_ms: u32, // Maximum time between producer batches during low traffic
#[envconfig(default = "400")]
pub kafka_producer_queue_mib: u32, // Size of the in-memory producer queue in mebibytes
#[envconfig(default = "20000")]
pub kafka_message_timeout_ms: u32, // Time before we stop retrying producing a message: 20 seconds
#[envconfig(default = "none")]
pub kafka_compression_codec: String, // none, gzip, snappy, lz4, zstd
#[envconfig(default = "false")]
pub kafka_tls: bool,
#[envconfig(default = "localhost:9092")]
pub kafka_hosts: String,
}

View File

@ -1,5 +1,8 @@
pub mod config;
pub mod kafka_messages;
pub mod kafka_producer;
pub mod metrics;
pub mod pgqueue;
pub mod retry;
pub mod test;
pub mod webhook;

View File

@ -0,0 +1,33 @@
use health::HealthRegistry;
use rdkafka::mocking::MockCluster;
use rdkafka::producer::{DefaultProducerContext, FutureProducer};
use crate::config::KafkaConfig;
use crate::kafka_producer::{create_kafka_producer, KafkaContext};
pub async fn create_mock_kafka() -> (
MockCluster<'static, DefaultProducerContext>,
FutureProducer<KafkaContext>,
) {
let registry = HealthRegistry::new("liveness");
let handle = registry
.register("one".to_string(), time::Duration::seconds(30))
.await;
let cluster = MockCluster::new(1).expect("failed to create mock brokers");
let config = KafkaConfig {
kafka_producer_linger_ms: 0,
kafka_producer_queue_mib: 50,
kafka_message_timeout_ms: 5000,
kafka_compression_codec: "none".to_string(),
kafka_hosts: cluster.bootstrap_servers(),
kafka_tls: false,
};
(
cluster,
create_kafka_producer(&config, handle)
.await
.expect("failed to create mocked kafka producer"),
)
}

View File

@ -1,8 +1,10 @@
use envconfig::Envconfig;
use hook_common::config::KafkaConfig;
#[derive(Envconfig)]
pub struct Config {
#[envconfig(from = "BIND_HOST", default = "0.0.0.0")]
#[envconfig(from = "BIND_HOST", default = "::")]
pub host: String,
#[envconfig(from = "BIND_PORT", default = "3302")]
@ -20,34 +22,14 @@ pub struct Config {
#[envconfig(default = "webhooks")]
pub mode: String,
#[envconfig(nested = true)]
pub kafka: KafkaConfig,
}
#[derive(Envconfig, Clone)]
pub struct KafkaConfig {
#[envconfig(default = "20")]
pub kafka_producer_linger_ms: u32, // Maximum time between producer batches during low traffic
#[envconfig(default = "400")]
pub kafka_producer_queue_mib: u32, // Size of the in-memory producer queue in mebibytes
#[envconfig(default = "20000")]
pub kafka_message_timeout_ms: u32, // Time before we stop retrying producing a message: 20 seconds
#[envconfig(default = "none")]
pub kafka_compression_codec: String, // none, gzip, snappy, lz4, zstd
#[envconfig(default = "false")]
pub kafka_tls: bool,
pub hog_mode: bool,
#[envconfig(default = "clickhouse_app_metrics")]
pub app_metrics_topic: String,
#[envconfig(default = "plugin_log_entries")]
pub plugin_log_entries_topic: String,
pub kafka_hosts: String,
#[envconfig(nested = true)]
pub kafka: KafkaConfig,
}
impl Config {

View File

@ -5,17 +5,16 @@ use envconfig::Envconfig;
use eyre::Result;
use futures::future::{select, Either};
use health::{HealthHandle, HealthRegistry};
use kafka_producer::create_kafka_producer;
use std::{str::FromStr, time::Duration};
use tokio::sync::Semaphore;
use webhooks::WebhookCleaner;
use hook_common::kafka_producer::create_kafka_producer;
use hook_common::metrics::setup_metrics_routes;
mod cleanup;
mod config;
mod handlers;
mod kafka_producer;
mod webhooks;
async fn listen(app: Router, bind: String) -> Result<()> {
@ -63,7 +62,8 @@ async fn main() {
WebhookCleaner::new(
&config.database_url,
kafka_producer,
config.kafka.app_metrics_topic.to_owned(),
config.app_metrics_topic.to_owned(),
config.hog_mode,
)
.expect("unable to create webhook cleaner"),
)

View File

@ -15,9 +15,9 @@ use thiserror::Error;
use tracing::{debug, error, info};
use crate::cleanup::Cleaner;
use crate::kafka_producer::KafkaContext;
use hook_common::kafka_messages::app_metrics::{AppMetric, AppMetricCategory};
use hook_common::kafka_producer::KafkaContext;
use hook_common::metrics::get_current_timestamp_seconds;
#[derive(Error, Debug)]
@ -58,6 +58,7 @@ pub struct WebhookCleaner {
pg_pool: PgPool,
kafka_producer: FutureProducer<KafkaContext>,
app_metrics_topic: String,
hog_mode: bool,
}
#[derive(sqlx::FromRow, Debug)]
@ -155,6 +156,7 @@ impl WebhookCleaner {
database_url: &str,
kafka_producer: FutureProducer<KafkaContext>,
app_metrics_topic: String,
hog_mode: bool,
) -> Result<Self> {
let options = PgConnectOptions::from_str(database_url)
.map_err(|error| WebhookCleanerError::PoolCreationError { error })?
@ -167,6 +169,7 @@ impl WebhookCleaner {
pg_pool,
kafka_producer,
app_metrics_topic,
hog_mode,
})
}
@ -175,11 +178,13 @@ impl WebhookCleaner {
pg_pool: PgPool,
kafka_producer: FutureProducer<KafkaContext>,
app_metrics_topic: String,
hog_mode: bool,
) -> Result<Self> {
Ok(Self {
pg_pool,
kafka_producer,
app_metrics_topic,
hog_mode,
})
}
@ -394,7 +399,13 @@ impl WebhookCleaner {
let (completed_row_count, completed_agg_row_count) = {
let completed_row_count = self.get_row_count_for_status(&mut tx, "completed").await?;
let completed_agg_rows = self.get_completed_agg_rows(&mut tx).await?;
let completed_agg_rows = if self.hog_mode {
// Hog mode doesn't need to send metrics to Kafka (and can't aggregate by
// plugin anyway), so we can skip this.
vec![]
} else {
self.get_completed_agg_rows(&mut tx).await?
};
let agg_row_count = completed_agg_rows.len() as u64;
let completed_app_metrics: Vec<AppMetric> =
completed_agg_rows.into_iter().map(Into::into).collect();
@ -404,7 +415,13 @@ impl WebhookCleaner {
let (failed_row_count, failed_agg_row_count) = {
let failed_row_count = self.get_row_count_for_status(&mut tx, "failed").await?;
let failed_agg_rows = self.get_failed_agg_rows(&mut tx).await?;
let failed_agg_rows = if self.hog_mode {
// Hog mode doesn't need to send metrics to Kafka (and can't aggregate by
// plugin anyway), so we can skip this.
vec![]
} else {
self.get_failed_agg_rows(&mut tx).await?
};
let agg_row_count = failed_agg_rows.len() as u64;
let failed_app_metrics: Vec<AppMetric> =
failed_agg_rows.into_iter().map(Into::into).collect();
@ -413,7 +430,7 @@ impl WebhookCleaner {
};
let mut rows_deleted = 0;
if completed_agg_row_count + failed_agg_row_count != 0 {
if completed_row_count + failed_row_count != 0 {
rows_deleted = self.delete_observed_rows(&mut tx).await?;
if rows_deleted != completed_row_count + failed_row_count {
@ -493,18 +510,15 @@ impl Cleaner for WebhookCleaner {
#[cfg(test)]
mod tests {
use super::*;
use crate::config;
use crate::kafka_producer::{create_kafka_producer, KafkaContext};
use health::HealthRegistry;
use hook_common::kafka_messages::app_metrics::{
Error as WebhookError, ErrorDetails, ErrorType,
};
use hook_common::pgqueue::PgQueueJob;
use hook_common::pgqueue::{NewJob, PgQueue, PgTransactionBatch};
use hook_common::test::create_mock_kafka;
use hook_common::webhook::{HttpMethod, WebhookJobMetadata, WebhookJobParameters};
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::mocking::MockCluster;
use rdkafka::producer::{DefaultProducerContext, FutureProducer};
use rdkafka::types::{RDKafkaApiKey, RDKafkaRespErr};
use rdkafka::{ClientConfig, Message};
use sqlx::{PgPool, Row};
@ -513,35 +527,6 @@ mod tests {
const APP_METRICS_TOPIC: &str = "app_metrics";
async fn create_mock_kafka() -> (
MockCluster<'static, DefaultProducerContext>,
FutureProducer<KafkaContext>,
) {
let registry = HealthRegistry::new("liveness");
let handle = registry
.register("one".to_string(), time::Duration::seconds(30))
.await;
let cluster = MockCluster::new(1).expect("failed to create mock brokers");
let config = config::KafkaConfig {
kafka_producer_linger_ms: 0,
kafka_producer_queue_mib: 50,
kafka_message_timeout_ms: 5000,
kafka_compression_codec: "none".to_string(),
kafka_hosts: cluster.bootstrap_servers(),
app_metrics_topic: APP_METRICS_TOPIC.to_string(),
plugin_log_entries_topic: "plugin_log_entries".to_string(),
kafka_tls: false,
};
(
cluster,
create_kafka_producer(&config, handle)
.await
.expect("failed to create mocked kafka producer"),
)
}
fn check_app_metric_vector_equality(v1: &[AppMetric], v2: &[AppMetric]) {
// Ignores `error_uuid`s.
assert_eq!(v1.len(), v2.len());
@ -569,9 +554,14 @@ mod tests {
.expect("failed to create mock consumer");
consumer.subscribe(&[APP_METRICS_TOPIC]).unwrap();
let webhook_cleaner =
WebhookCleaner::new_from_pool(db, mock_producer, APP_METRICS_TOPIC.to_owned())
.expect("unable to create webhook cleaner");
let hog_mode = false;
let webhook_cleaner = WebhookCleaner::new_from_pool(
db,
mock_producer,
APP_METRICS_TOPIC.to_owned(),
hog_mode,
)
.expect("unable to create webhook cleaner");
let cleanup_stats = webhook_cleaner
.cleanup_impl()
@ -762,9 +752,14 @@ mod tests {
.expect("failed to create mock consumer");
consumer.subscribe(&[APP_METRICS_TOPIC]).unwrap();
let webhook_cleaner =
WebhookCleaner::new_from_pool(db, mock_producer, APP_METRICS_TOPIC.to_owned())
.expect("unable to create webhook cleaner");
let hog_mode = false;
let webhook_cleaner = WebhookCleaner::new_from_pool(
db,
mock_producer,
APP_METRICS_TOPIC.to_owned(),
hog_mode,
)
.expect("unable to create webhook cleaner");
let cleanup_stats = webhook_cleaner
.cleanup_impl()
@ -782,9 +777,14 @@ mod tests {
#[sqlx::test(migrations = "../migrations", fixtures("webhook_cleanup"))]
async fn test_serializable_isolation(db: PgPool) {
let (_, mock_producer) = create_mock_kafka().await;
let webhook_cleaner =
WebhookCleaner::new_from_pool(db.clone(), mock_producer, APP_METRICS_TOPIC.to_owned())
.expect("unable to create webhook cleaner");
let hog_mode = false;
let webhook_cleaner = WebhookCleaner::new_from_pool(
db.clone(),
mock_producer,
APP_METRICS_TOPIC.to_owned(),
hog_mode,
)
.expect("unable to create webhook cleaner");
let queue = PgQueue::new_from_pool("webhooks", db.clone()).await;

View File

@ -15,7 +15,9 @@ health = { path = "../common/health" }
hook-common = { path = "../hook-common" }
http = { workspace = true }
metrics = { workspace = true }
rdkafka = { workspace = true }
reqwest = { workspace = true }
serde_json = { workspace = true }
sqlx = { workspace = true }
thiserror = { workspace = true }
time = { workspace = true }
@ -23,3 +25,6 @@ tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
url = { version = "2.2" }
[dev-dependencies]
httpmock = { workspace = true }

View File

@ -3,9 +3,11 @@ use std::time;
use envconfig::Envconfig;
use hook_common::config::KafkaConfig;
#[derive(Envconfig, Clone)]
pub struct Config {
#[envconfig(from = "BIND_HOST", default = "0.0.0.0")]
#[envconfig(from = "BIND_HOST", default = "::")]
pub host: String,
#[envconfig(from = "BIND_PORT", default = "3301")]
@ -40,6 +42,15 @@ pub struct Config {
#[envconfig(default = "false")]
pub allow_internal_ips: bool,
#[envconfig(default = "false")]
pub hog_mode: bool,
#[envconfig(default = "cdp_function_callbacks")]
pub cdp_function_callbacks_topic: String,
#[envconfig(nested = true)]
pub kafka: KafkaConfig,
}
impl Config {

View File

@ -5,6 +5,7 @@ use envconfig::Envconfig;
use std::future::ready;
use health::HealthRegistry;
use hook_common::kafka_producer::create_kafka_producer;
use hook_common::{
metrics::serve, metrics::setup_metrics_routes, pgqueue::PgQueue, retry::RetryPolicy,
};
@ -44,6 +45,13 @@ async fn main() -> Result<(), WorkerError> {
.await
.expect("failed to initialize queue");
let kafka_liveness = liveness
.register("rdkafka".to_string(), time::Duration::seconds(30))
.await;
let kafka_producer = create_kafka_producer(&config.kafka, kafka_liveness)
.await
.expect("failed to create kafka producer");
let worker = WebhookWorker::new(
&config.worker_name,
&queue,
@ -53,6 +61,9 @@ async fn main() -> Result<(), WorkerError> {
config.max_concurrent_jobs,
retry_policy_builder.provide(),
config.allow_internal_ips,
kafka_producer,
config.cdp_function_callbacks_topic.to_owned(),
config.hog_mode,
worker_liveness,
);

View File

@ -1,20 +1,27 @@
use std::collections;
use std::sync::Arc;
use std::time;
use std::{collections, iter};
use chrono::Utc;
use futures::channel::oneshot::Canceled;
use futures::future::join_all;
use health::HealthHandle;
use http::StatusCode;
use rdkafka::error::KafkaError;
use rdkafka::producer::{FutureProducer, FutureRecord};
use reqwest::{header, Client};
use serde_json::{json, Value};
use tokio::sync;
use tokio::time::{sleep, Duration};
use tracing::error;
use hook_common::kafka_producer::KafkaContext;
use hook_common::pgqueue::PgTransactionBatch;
use hook_common::{
pgqueue::{Job, PgQueue, PgQueueJob, PgTransactionJob, RetryError, RetryInvalidError},
retry::RetryPolicy,
webhook::{HttpMethod, WebhookJobError, WebhookJobMetadata, WebhookJobParameters},
webhook::{HttpMethod, WebhookJobError, WebhookJobParameters},
};
use http::StatusCode;
use reqwest::{header, Client};
use tokio::sync;
use tracing::error;
use crate::dns::{NoPublicIPv4Error, PublicIPv4Resolver};
use crate::error::{
@ -22,11 +29,11 @@ use crate::error::{
};
use crate::util::first_n_bytes_of_response;
/// A WebhookJob is any `PgQueueJob` with `WebhookJobParameters` and `WebhookJobMetadata`.
/// A WebhookJob is any `PgQueueJob` with `WebhookJobParameters` and `Value`.
trait WebhookJob: PgQueueJob + std::marker::Send {
fn parameters(&self) -> &WebhookJobParameters;
fn metadata(&self) -> &WebhookJobMetadata;
fn job(&self) -> &Job<WebhookJobParameters, WebhookJobMetadata>;
fn take_metadata(&mut self) -> Value;
fn job(&self) -> &Job<WebhookJobParameters, Value>;
fn attempt(&self) -> i32 {
self.job().attempt
@ -36,21 +43,22 @@ trait WebhookJob: PgQueueJob + std::marker::Send {
self.job().queue.to_owned()
}
#[allow(dead_code)]
fn target(&self) -> String {
self.job().target.to_owned()
}
}
impl WebhookJob for PgTransactionJob<'_, WebhookJobParameters, WebhookJobMetadata> {
impl WebhookJob for PgTransactionJob<'_, WebhookJobParameters, Value> {
fn parameters(&self) -> &WebhookJobParameters {
&self.job.parameters
}
fn metadata(&self) -> &WebhookJobMetadata {
&self.job.metadata
fn take_metadata(&mut self) -> Value {
self.job.metadata.take()
}
fn job(&self) -> &Job<WebhookJobParameters, WebhookJobMetadata> {
fn job(&self) -> &Job<WebhookJobParameters, Value> {
&self.job
}
}
@ -66,11 +74,17 @@ pub struct WebhookWorker<'p> {
/// The interval for polling the queue.
poll_interval: time::Duration,
/// The client used for HTTP requests.
client: reqwest::Client,
http_client: reqwest::Client,
/// Maximum number of concurrent jobs being processed.
max_concurrent_jobs: usize,
/// The retry policy used to calculate retry intervals when a job fails with a retryable error.
retry_policy: RetryPolicy,
/// Kafka producer used to send results when in Hog mode
kafka_producer: FutureProducer<KafkaContext>,
/// The topic to send results to when in Hog mode
cdp_function_callbacks_topic: &'static str,
/// Whether we are running in Hog mode or not
hog_mode: bool,
/// The liveness check handle, to call on a schedule to report healthy
liveness: HealthHandle,
}
@ -105,9 +119,12 @@ impl<'p> WebhookWorker<'p> {
max_concurrent_jobs: usize,
retry_policy: RetryPolicy,
allow_internal_ips: bool,
kafka_producer: FutureProducer<KafkaContext>,
cdp_function_callbacks_topic: String,
hog_mode: bool,
liveness: HealthHandle,
) -> Self {
let client = build_http_client(request_timeout, allow_internal_ips)
let http_client = build_http_client(request_timeout, allow_internal_ips)
.expect("failed to construct reqwest client for webhook worker");
Self {
@ -115,17 +132,18 @@ impl<'p> WebhookWorker<'p> {
queue,
dequeue_batch_size,
poll_interval,
client,
http_client,
max_concurrent_jobs,
retry_policy,
kafka_producer,
cdp_function_callbacks_topic: cdp_function_callbacks_topic.leak(),
hog_mode,
liveness,
}
}
/// Wait until at least one job becomes available in our queue in transactional mode.
async fn wait_for_jobs_tx<'a>(
&self,
) -> PgTransactionBatch<'a, WebhookJobParameters, WebhookJobMetadata> {
async fn wait_for_jobs_tx<'a>(&self) -> PgTransactionBatch<'a, WebhookJobParameters, Value> {
let mut interval = tokio::time::interval(self.poll_interval);
loop {
@ -163,7 +181,7 @@ impl<'p> WebhookWorker<'p> {
// `min(semaphore.available_permits(), dequeue_batch_size)`
// And then dequeue only up to that many jobs. We'd then need to hand back the
// difference in permits based on how many jobs were dequeued.
let mut batch = self.wait_for_jobs_tx().await;
let batch = self.wait_for_jobs_tx().await;
dequeue_batch_size_histogram.record(batch.jobs.len() as f64);
// Get enough permits for the jobs before spawning a task.
@ -173,41 +191,178 @@ impl<'p> WebhookWorker<'p> {
.await
.expect("semaphore has been closed");
let client = self.client.clone();
let http_client = self.http_client.clone();
let retry_policy = self.retry_policy.clone();
let kafka_producer = self.kafka_producer.clone();
let cdp_function_callbacks_topic = self.cdp_function_callbacks_topic;
let hog_mode = self.hog_mode;
tokio::spawn(async move {
let mut futures = Vec::new();
// Move `permits` into the closure so they will be dropped when the scope ends.
let _permits = permits;
// We have to `take` the Vec of jobs from the batch to avoid a borrow checker
// error below when we commit.
for job in std::mem::take(&mut batch.jobs) {
let client = client.clone();
let retry_policy = retry_policy.clone();
let future =
async move { process_webhook_job(client, job, &retry_policy).await };
futures.push(future);
}
let results = join_all(futures).await;
for result in results {
if let Err(e) = result {
error!("error processing webhook job: {}", e);
}
}
let _ = batch.commit().await.map_err(|e| {
error!("error committing transactional batch: {}", e);
});
drop(permits);
process_batch(
batch,
http_client,
retry_policy,
kafka_producer,
cdp_function_callbacks_topic,
hog_mode,
)
.await
});
}
}
}
async fn log_kafka_error_and_sleep(step: &str, error: Option<KafkaError>) {
match error {
Some(error) => error!("error sending hog message to kafka ({}): {}", step, error),
None => error!("error sending hog message to kafka ({})", step),
}
// Errors producing to Kafka *should* be exceedingly rare, but when they happen we don't want
// to enter a tight loop where we re-send the hook payload, fail to produce to Kafka, and
// repeat over and over again. We also don't want to commit the job as done and not produce
// something to Kafka, as the Hog task would then be lost.
//
// For this reason, we sleep before aborting the batch, in hopes that Kafka has recovered by the
// time we retry.
//
// In the future we may want to consider dequeueing completed jobs from PG itself rather than
// using a Kafka intermediary.
sleep(Duration::from_secs(30)).await;
}
async fn process_batch<'a>(
mut batch: PgTransactionBatch<'a, WebhookJobParameters, Value>,
http_client: Client,
retry_policy: RetryPolicy,
kafka_producer: FutureProducer<KafkaContext>,
cdp_function_callbacks_topic: &'static str,
hog_mode: bool,
) {
let mut futures = Vec::with_capacity(batch.jobs.len());
let mut metadata_vec = Vec::with_capacity(batch.jobs.len());
// We have to `take` the Vec of jobs from the batch to avoid a borrow checker
// error below when we commit.
for mut job in std::mem::take(&mut batch.jobs) {
let http_client = http_client.clone();
let retry_policy = retry_policy.clone();
metadata_vec.push(job.take_metadata());
let read_body = hog_mode;
let future =
async move { process_webhook_job(http_client, job, &retry_policy, read_body).await };
futures.push(future);
}
let results = join_all(futures).await;
let mut kafka_ack_futures = Vec::new();
for (result, mut metadata) in iter::zip(results, metadata_vec) {
match result {
Ok(result) => {
if hog_mode {
if let Some(payload) = create_hoghook_kafka_payload(result, &mut metadata).await
{
match kafka_producer.send_result(FutureRecord {
topic: cdp_function_callbacks_topic,
payload: Some(&payload),
partition: None,
key: None::<&str>,
timestamp: None,
headers: None,
}) {
Ok(future) => kafka_ack_futures.push(future),
Err((error, _)) => {
// Return early to avoid committing the batch.
return log_kafka_error_and_sleep("send", Some(error)).await;
}
};
}
}
}
Err(e) => {
error!("error processing webhook job: {}", e)
}
}
}
for result in join_all(kafka_ack_futures).await {
match result {
Ok(Ok(_)) => {}
Ok(Err((error, _))) => {
// Return early to avoid committing the batch.
return log_kafka_error_and_sleep("ack", Some(error)).await;
}
Err(Canceled) => {
// Cancelled due to timeout while retrying
// Return early to avoid committing the batch.
return log_kafka_error_and_sleep("timeout", None).await;
}
}
}
let _ = batch.commit().await.map_err(|e| {
error!("error committing transactional batch: {}", e);
});
}
async fn create_hoghook_kafka_payload(
result: WebhookResult,
metadata: &mut Value,
) -> Option<String> {
if let Value::Object(ref mut object) = metadata {
// Add the response or error in the `asyncFunctionResponse` field.
match result {
WebhookResult::Success(response) => {
let async_function_response = json!({
"timings": [{
"kind": "async_function",
"duration_ms": response.duration.as_millis().try_into().unwrap_or(u32::MAX)
}],
"response": {
"status": response.status_code,
"body": response.body
}
});
object.insert("asyncFunctionResponse".to_owned(), async_function_response);
}
WebhookResult::Failed(error) => {
let async_function_response = json!({
"error": error.to_string(),
});
object.insert("asyncFunctionResponse".to_owned(), async_function_response);
}
WebhookResult::WillRetry => {
// Nothing to do, and we don't want to produce anything
// to Kafka.
return None;
}
}
}
Some(serde_json::to_string(&metadata).expect("unable to serialize metadata"))
}
struct WebhookSuccess {
status_code: u16,
duration: Duration,
body: Option<String>,
}
enum WebhookResult {
Success(WebhookSuccess),
WillRetry,
Failed(String),
}
/// Process a webhook job by transitioning it to its appropriate state after its request is sent.
/// After we finish, the webhook job will be set as completed (if the request was successful), retryable (if the request
/// was unsuccessful but we can still attempt a retry), or failed (if the request was unsuccessful and no more retries
@ -223,10 +378,11 @@ impl<'p> WebhookWorker<'p> {
/// * `webhook_job`: The webhook job to process as dequeued from `hook_common::pgqueue::PgQueue`.
/// * `retry_policy`: The retry policy used to set retry parameters if a job fails and has remaining attempts.
async fn process_webhook_job<W: WebhookJob>(
client: reqwest::Client,
http_client: reqwest::Client,
webhook_job: W,
retry_policy: &RetryPolicy,
) -> Result<(), WorkerError> {
read_body: bool,
) -> Result<WebhookResult, WorkerError> {
let parameters = webhook_job.parameters();
let labels = [("queue", webhook_job.queue())];
@ -235,7 +391,7 @@ async fn process_webhook_job<W: WebhookJob>(
let now = tokio::time::Instant::now();
let send_result = send_webhook(
client,
http_client,
&parameters.method,
&parameters.url,
&parameters.headers,
@ -243,10 +399,36 @@ async fn process_webhook_job<W: WebhookJob>(
)
.await;
let elapsed = now.elapsed().as_secs_f64();
match send_result {
Ok(_) => {
Ok(response) => {
// First, read the body if needed so that the read time is included in `duration`.
let status = response.status();
let body = if read_body {
match first_n_bytes_of_response(response, 1024 * 1024).await {
Ok(body) => Some(body), // Once told me...
Err(e) => {
webhook_job
.fail(WebhookJobError::new_parse(&e.to_string()))
.await
.map_err(|job_error| {
metrics::counter!("webhook_jobs_database_error", &labels)
.increment(1);
job_error
})?;
metrics::counter!("webhook_jobs_failed", &labels).increment(1);
return Ok(WebhookResult::Failed(
"failed to read response body".to_owned(),
));
}
}
} else {
None
};
let duration = now.elapsed();
let created_at = webhook_job.job().created_at;
let retries = webhook_job.job().attempt - 1;
let labels_with_retries = [
@ -267,9 +449,13 @@ async fn process_webhook_job<W: WebhookJob>(
.record((insert_to_complete_duration.num_milliseconds() as f64) / 1_000_f64);
metrics::counter!("webhook_jobs_completed", &labels).increment(1);
metrics::histogram!("webhook_jobs_processing_duration_seconds", &labels)
.record(elapsed);
.record(duration.as_secs_f64());
Ok(())
Ok(WebhookResult::Success(WebhookSuccess {
status_code: status.as_u16(),
duration,
body,
}))
}
Err(WebhookError::Parse(WebhookParseError::ParseHeadersError(e))) => {
webhook_job
@ -282,7 +468,7 @@ async fn process_webhook_job<W: WebhookJob>(
metrics::counter!("webhook_jobs_failed", &labels).increment(1);
Ok(())
Ok(WebhookResult::Failed(e.to_string()))
}
Err(WebhookError::Parse(WebhookParseError::ParseHttpMethodError(e))) => {
webhook_job
@ -295,7 +481,7 @@ async fn process_webhook_job<W: WebhookJob>(
metrics::counter!("webhook_jobs_failed", &labels).increment(1);
Ok(())
Ok(WebhookResult::Failed(e.to_string()))
}
Err(WebhookError::Parse(WebhookParseError::ParseUrlError(e))) => {
webhook_job
@ -308,7 +494,7 @@ async fn process_webhook_job<W: WebhookJob>(
metrics::counter!("webhook_jobs_failed", &labels).increment(1);
Ok(())
Ok(WebhookResult::Failed(e.to_string()))
}
Err(WebhookError::Request(request_error)) => {
let webhook_job_error = WebhookJobError::from(&request_error);
@ -329,7 +515,7 @@ async fn process_webhook_job<W: WebhookJob>(
Ok(_) => {
metrics::counter!("webhook_jobs_retried", &labels).increment(1);
Ok(())
Ok(WebhookResult::WillRetry)
}
Err(RetryError::RetryInvalidError(RetryInvalidError {
job: webhook_job,
@ -346,7 +532,7 @@ async fn process_webhook_job<W: WebhookJob>(
metrics::counter!("webhook_jobs_failed", &labels).increment(1);
Ok(())
Ok(WebhookResult::Failed(error.to_string()))
}
Err(RetryError::DatabaseError(job_error)) => {
metrics::counter!("webhook_jobs_database_error", &labels).increment(1);
@ -354,7 +540,7 @@ async fn process_webhook_job<W: WebhookJob>(
}
}
}
WebhookRequestError::NonRetryableRetryableRequestError { .. } => {
WebhookRequestError::NonRetryableRetryableRequestError { error, .. } => {
webhook_job
.fail(webhook_job_error)
.await
@ -365,7 +551,7 @@ async fn process_webhook_job<W: WebhookJob>(
metrics::counter!("webhook_jobs_failed", &labels).increment(1);
Ok(())
Ok(WebhookResult::Failed(error.to_string()))
}
}
}
@ -496,6 +682,8 @@ mod tests {
// See: https://github.com/rust-lang/rust/issues/46379.
use health::HealthRegistry;
use hook_common::pgqueue::{DatabaseError, NewJob};
use hook_common::test::create_mock_kafka;
use hook_common::webhook::WebhookJobMetadata;
use sqlx::PgPool;
/// Use process id as a worker id for tests.
@ -512,7 +700,7 @@ mod tests {
queue: &PgQueue,
max_attempts: i32,
job_parameters: WebhookJobParameters,
job_metadata: WebhookJobMetadata,
job_metadata: Value,
) -> Result<(), DatabaseError> {
let job_target = job_parameters.url.to_owned();
let new_job = NewJob::new(max_attempts, job_metadata, job_parameters, &job_target);
@ -579,10 +767,12 @@ mod tests {
&queue,
1,
webhook_job_parameters.clone(),
webhook_job_metadata,
serde_json::to_value(webhook_job_metadata).unwrap(),
)
.await
.expect("failed to enqueue job");
let (_mock_cluster, mock_producer) = create_mock_kafka().await;
let hog_mode = false;
let worker = WebhookWorker::new(
&worker_id,
&queue,
@ -592,6 +782,9 @@ mod tests {
10,
RetryPolicy::default(),
false,
mock_producer,
"cdp_function_callbacks".to_string(),
hog_mode,
liveness,
);
@ -617,6 +810,124 @@ mod tests {
assert!(registry.get_status().healthy)
}
#[sqlx::test(migrations = "../migrations")]
async fn test_hoghook_sends_kafka_payload(db: PgPool) {
use httpmock::prelude::*;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::{ClientConfig, Message};
let worker_id = worker_id();
let queue_name = "test_hoghook_sends_kafka_payload".to_string();
let queue = PgQueue::new_from_pool(&queue_name, db).await;
let topic = "cdp_function_callbacks";
let server = MockServer::start();
server.mock(|when, then| {
when.method(POST).path("/");
then.status(200)
.header("content-type", "application/json; charset=UTF-8")
.body(r#"{"message": "hello, world"}"#);
});
let mock_url = server.url("/");
let webhook_job_parameters = WebhookJobParameters {
body: "".to_owned(),
headers: collections::HashMap::new(),
method: HttpMethod::POST,
url: mock_url,
};
let webhook_job_metadata = json!({"someOtherField": true});
enqueue_job(
&queue,
1,
webhook_job_parameters.clone(),
serde_json::to_value(webhook_job_metadata).unwrap(),
)
.await
.expect("failed to enqueue job");
let registry = HealthRegistry::new("liveness");
let liveness = registry
.register("worker".to_string(), ::time::Duration::seconds(30))
.await;
let (mock_cluster, mock_producer) = create_mock_kafka().await;
let hog_mode = true;
let worker = WebhookWorker::new(
&worker_id,
&queue,
1,
time::Duration::from_millis(100),
time::Duration::from_millis(5000),
10,
RetryPolicy::default(),
false,
mock_producer,
topic.to_string(),
hog_mode,
liveness,
);
let batch = worker.wait_for_jobs_tx().await;
process_batch(
batch,
worker.http_client,
worker.retry_policy,
worker.kafka_producer,
worker.cdp_function_callbacks_topic,
hog_mode,
)
.await;
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", mock_cluster.bootstrap_servers())
.set("group.id", "mock")
.set("auto.offset.reset", "earliest")
.create()
.expect("failed to create mock consumer");
consumer.subscribe(&[topic]).unwrap();
let kafka_msg = consumer.recv().await.unwrap();
let kafka_payload_str = String::from_utf8(kafka_msg.payload().unwrap().to_vec()).unwrap();
let received = serde_json::from_str::<Value>(&kafka_payload_str).unwrap();
// Verify data is passed through, and that response and timings are correct.
assert!(received.get("someOtherField").unwrap().as_bool().unwrap());
let async_function_response = received.get("asyncFunctionResponse").unwrap();
let received_response = async_function_response.get("response").unwrap();
assert_eq!(
json!({
"body": "{\"message\": \"hello, world\"}",
"status": 200
}),
*received_response
);
let first_timing = async_function_response
.get("timings")
.unwrap()
.as_array()
.unwrap()
.get(0)
.unwrap();
first_timing
.get("duration_ms")
.unwrap()
.as_number()
.unwrap();
assert_eq!(
"async_function",
first_timing.get("kind").unwrap().as_str().unwrap()
);
}
#[tokio::test]
async fn test_send_webhook() {
let method = HttpMethod::POST;