mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-24 09:14:46 +01:00
133 lines
5.2 KiB
Python
133 lines
5.2 KiB
Python
|
"""
|
||
|
Helper methods for creating the kafka-python KafkaProducer and KafkaConsumer objects.
|
||
|
https://github.com/heroku/kafka-helper
|
||
|
"""
|
||
|
|
||
|
import base64
|
||
|
import json
|
||
|
import os
|
||
|
import ssl
|
||
|
from tempfile import NamedTemporaryFile
|
||
|
|
||
|
try:
|
||
|
from urllib.parse import urlparse
|
||
|
except ImportError:
|
||
|
from urlparse import urlparse # type: ignore
|
||
|
|
||
|
from base64 import standard_b64encode
|
||
|
|
||
|
from cryptography.hazmat.backends import default_backend
|
||
|
from cryptography.hazmat.primitives import serialization
|
||
|
from kafka import KafkaConsumer, KafkaProducer
|
||
|
|
||
|
|
||
|
def get_kafka_ssl_context():
|
||
|
"""
|
||
|
Returns an SSL context based on the certificate information in the Kafka config vars.
|
||
|
"""
|
||
|
# NOTE: We assume that Kafka environment variables are present. If using
|
||
|
# Apache Kafka on Heroku, they will be available in your app configuration.
|
||
|
#
|
||
|
# 1. Write the PEM certificates necessary for connecting to the Kafka brokers to physical
|
||
|
# files. The broker connection SSL certs are passed in environment/config variables and
|
||
|
# the python and ssl libraries require them in physical files. The public keys are written
|
||
|
# to short lived NamedTemporaryFile files; the client key is encrypted before writing to
|
||
|
# the short lived NamedTemporaryFile
|
||
|
#
|
||
|
# 2. Create and return an SSLContext for connecting to the Kafka brokers referencing the
|
||
|
# PEM certificates written above
|
||
|
#
|
||
|
|
||
|
# stash the kafka certs in named temporary files for loading into SSLContext. Initialize the
|
||
|
# SSLContext inside the with so when it goes out of scope the files are removed which has them
|
||
|
# existing for the shortest amount of time. As extra caution password
|
||
|
# protect/encrypt the client key
|
||
|
with NamedTemporaryFile(suffix=".crt") as cert_file, NamedTemporaryFile(
|
||
|
suffix=".key"
|
||
|
) as key_file, NamedTemporaryFile(suffix=".crt") as trust_file:
|
||
|
cert_file.write(base64.b64decode(os.environ["KAFKA_CLIENT_CERT_B64"].encode("utf-8")))
|
||
|
cert_file.flush()
|
||
|
|
||
|
# setup cryptography to password encrypt/protect the client key so it's not in the clear on
|
||
|
# the filesystem. Use the generated password in the call to load_cert_chain
|
||
|
passwd = standard_b64encode(os.urandom(33))
|
||
|
private_key = serialization.load_pem_private_key(
|
||
|
base64.b64decode(os.environ["KAFKA_CLIENT_CERT_KEY_B64"].encode("utf-8")),
|
||
|
password=None,
|
||
|
backend=default_backend(),
|
||
|
)
|
||
|
pem = private_key.private_bytes(
|
||
|
encoding=serialization.Encoding.PEM,
|
||
|
format=serialization.PrivateFormat.PKCS8,
|
||
|
encryption_algorithm=serialization.BestAvailableEncryption(passwd),
|
||
|
)
|
||
|
key_file.write(pem)
|
||
|
key_file.flush()
|
||
|
|
||
|
trust_file.write(base64.b64decode(os.environ["KAFKA_TRUSTED_CERT_B64"].encode("utf-8")))
|
||
|
trust_file.flush()
|
||
|
|
||
|
# create an SSLContext for passing into the kafka provider using the create_default_context
|
||
|
# function which creates an SSLContext with protocol set to PROTOCOL_SSLv23, OP_NO_SSLv2,
|
||
|
# and OP_NO_SSLv3 when purpose=SERVER_AUTH.
|
||
|
ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=trust_file.name)
|
||
|
ssl_context.load_cert_chain(cert_file.name, keyfile=key_file.name, password=passwd)
|
||
|
|
||
|
# Intentionally disabling hostname checking. The Kafka cluster runs in the cloud and Apache
|
||
|
# Kafka on Heroku doesn't currently provide stable hostnames. We're pinned to a specific certificate
|
||
|
# for this connection even though the certificate doesn't include host information. We rely
|
||
|
# on the ca trust_cert for this purpose.
|
||
|
ssl_context.check_hostname = False
|
||
|
|
||
|
return ssl_context
|
||
|
|
||
|
|
||
|
def get_kafka_brokers():
|
||
|
"""
|
||
|
Parses the KAKFA_URL and returns a list of hostname:port pairs in the format
|
||
|
that kafka-python expects.
|
||
|
"""
|
||
|
# NOTE: The Kafka environment variables need to be present. If using
|
||
|
# Apache Kafka on Heroku, they will be available in your app configuration.
|
||
|
if not os.environ.get("KAFKA_URL"):
|
||
|
raise RuntimeError("The KAFKA_URL config variable is not set.")
|
||
|
|
||
|
return [
|
||
|
"{}:{}".format(parsedUrl.hostname, parsedUrl.port)
|
||
|
for parsedUrl in [urlparse(url) for url in os.environ.get("KAFKA_URL", "").split(",")]
|
||
|
]
|
||
|
|
||
|
|
||
|
def get_kafka_producer(acks="all", value_serializer=lambda v: json.dumps(v).encode("utf-8")):
|
||
|
"""
|
||
|
Return a KafkaProducer that uses the SSLContext created with create_ssl_context.
|
||
|
"""
|
||
|
|
||
|
producer = KafkaProducer(
|
||
|
bootstrap_servers=get_kafka_brokers(),
|
||
|
security_protocol="SSL",
|
||
|
ssl_context=get_kafka_ssl_context(),
|
||
|
value_serializer=value_serializer,
|
||
|
acks=acks,
|
||
|
)
|
||
|
|
||
|
return producer
|
||
|
|
||
|
|
||
|
def get_kafka_consumer(topic=None, value_deserializer=lambda v: json.loads(v.decode("utf-8"))):
|
||
|
"""
|
||
|
Return a KafkaConsumer that uses the SSLContext created with create_ssl_context.
|
||
|
"""
|
||
|
|
||
|
# Create the KafkaConsumer connected to the specified brokers. Use the
|
||
|
# SSLContext that is created with create_ssl_context.
|
||
|
consumer = KafkaConsumer(
|
||
|
topic,
|
||
|
bootstrap_servers=get_kafka_brokers(),
|
||
|
security_protocol="SSL",
|
||
|
ssl_context=get_kafka_ssl_context(),
|
||
|
value_deserializer=value_deserializer,
|
||
|
)
|
||
|
|
||
|
return consumer
|