mirror of
https://github.com/mongodb/mongo.git
synced 2024-11-22 04:59:34 +01:00
157 lines
4.6 KiB
Django/Jinja
157 lines
4.6 KiB
Django/Jinja
"""Script to configure a sharded cluster in Antithesis from the mongos container."""
|
|
import json
|
|
import subprocess
|
|
from time import sleep
|
|
import subprocess
|
|
import time
|
|
import datetime
|
|
|
|
"""Util functions to assist in setting up a sharded cluster topology in Antithesis."""
|
|
def mongo_process_running(host, port):
|
|
"""Check to see if the process at the given host & port is running."""
|
|
return subprocess.run(['mongo', '--host', host, '--port',
|
|
str(port), '--eval', '"db.stats()"'], check=True)
|
|
|
|
|
|
def retry_until_success(func, kwargs=None, wait_time=datetime.timedelta(seconds=1), timeout_period=datetime.timedelta(seconds=30)):
|
|
"""
|
|
Retry the function periodically until timeout.
|
|
|
|
:param func: Void function that we are attempting to run.
|
|
:param kwargs: Dictionary of keyword arguments for the function.
|
|
:param wait_time: Number of seconds to wait before retrying function call.
|
|
:param timeout_period: Number of seconds we allow function to run before raising TimeoutError.
|
|
:return: None
|
|
"""
|
|
kwargs = {} if kwargs is None else kwargs
|
|
timeout = time.time() + timeout_period.seconds
|
|
while True:
|
|
if time.time() > timeout:
|
|
raise TimeoutError(
|
|
f"{func.__name__} called with {kwargs} timed out after {timeout_period.seconds} second(s).")
|
|
try:
|
|
func(**kwargs)
|
|
break
|
|
except: # pylint: disable=bare-except
|
|
print(f"Retrying {func.__name__} called with {kwargs} after {wait_time.seconds} second(s).")
|
|
time.sleep(wait_time.seconds)
|
|
|
|
|
|
# Create Config
|
|
CONFIGSVR_CONFIG = {
|
|
"_id": "config-rs",
|
|
"configsvr": True,
|
|
"members": [
|
|
{%- for c in configsvr.nodes %}
|
|
{%- set i = loop.index0 %}
|
|
{"_id": {{ i }}, "host": "configsvr{{ i }}:{{ CONFIG_PORT }}"},
|
|
{%- endfor %}
|
|
],
|
|
"protocolVersion": 1,
|
|
"settings": {
|
|
{%- for key, value in get_replset_settings(configsvr).items() %}
|
|
"{{ key }}": {{ value }},
|
|
{%- endfor %}
|
|
}
|
|
}
|
|
|
|
{% for c in configsvr.nodes -%}
|
|
{% set i = loop.index0 -%}
|
|
retry_until_success(mongo_process_running, {"host": "configsvr{{ i }}", "port": {{ CONFIG_PORT }}})
|
|
{% endfor -%}
|
|
retry_until_success(
|
|
subprocess.run, {
|
|
"args": [
|
|
"mongo",
|
|
"--host",
|
|
"configsvr0",
|
|
"--port",
|
|
"{{ CONFIG_PORT }}",
|
|
"--eval",
|
|
f"rs.initiate({json.dumps(CONFIGSVR_CONFIG)})",
|
|
],
|
|
"check": True,
|
|
})
|
|
|
|
{%- for shard in shards %}
|
|
{% set s = loop.index0 %}
|
|
# Create Shard{{ s }}
|
|
SHARD{{ s }}_CONFIG = {
|
|
"_id": "Shard{{ s }}",
|
|
"members": [
|
|
{%- for node in shard.nodes %}
|
|
{%- set i = s*shard.num_nodes+loop.index0 %}
|
|
{"_id": {{ loop.index0 }}, "host": "mongod{{ i }}:{{ MONGOD_PORT }}"},
|
|
{%- endfor %}
|
|
],
|
|
"protocolVersion": 1,
|
|
"settings": {
|
|
{%- for key, value in get_replset_settings(shard).items() %}
|
|
"{{ key }}": {{ value }},
|
|
{%- endfor %}
|
|
}
|
|
}
|
|
|
|
{% for node in shard.nodes -%}
|
|
{%- set i = s*shard.num_nodes+loop.index0 -%}
|
|
retry_until_success(mongo_process_running, {"host": "mongod{{ i }}", "port": {{ MONGOD_PORT }}})
|
|
{% endfor -%}
|
|
retry_until_success(
|
|
subprocess.run, {
|
|
"args": [
|
|
"mongo",
|
|
"--host",
|
|
"mongod{{ s*shard.num_nodes }}",
|
|
"--port",
|
|
"{{ MONGOD_PORT }}",
|
|
"--eval",
|
|
f"rs.initiate({json.dumps(SHARD{{ s }}_CONFIG)})",
|
|
],
|
|
"check": True,
|
|
})
|
|
{%- endfor %}
|
|
|
|
# Create Mongos
|
|
retry_until_success(
|
|
subprocess.run, {
|
|
"args": [
|
|
{% for arg in mongos_args -%}
|
|
"{{ arg }}",
|
|
{% endfor -%}
|
|
"--setParameter",
|
|
"fassertOnLockTimeoutForStepUpDown=0",
|
|
"--logpath",
|
|
"/var/log/mongodb/mongodb.log",
|
|
"--bind_ip",
|
|
"0.0.0.0",
|
|
"--fork"],
|
|
"check": True,
|
|
})
|
|
|
|
{%- for shard in shards %}
|
|
{% set s = loop.index0 %}
|
|
# Add Shard{{ s }} to cluster
|
|
retry_until_success(
|
|
subprocess.run, {
|
|
"args": [
|
|
"mongo",
|
|
"--host",
|
|
"{{ mongos_name }}",
|
|
"--port",
|
|
"{{ MONGOS_PORT }}",
|
|
"--eval",
|
|
{%- set members = [] -%}
|
|
{%- for node in shard.nodes -%}
|
|
{{ members.append("mongod" + (s*shard.num_nodes+loop.index0)|string + ":" + MONGOD_PORT|string) or "" }}
|
|
{%- endfor %}
|
|
'sh.addShard("Shard{{ s }}/{{ members|join(',')}}")'
|
|
],
|
|
"check": True,
|
|
})
|
|
{%- endfor %}
|
|
|
|
print("{{ mongos_name }} setup completed successfully.")
|
|
|
|
while True:
|
|
sleep(10)
|