mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
SERVER-41096 Fix file-based protocol for permitting stepdowns.
Changes the file-based protocol for controlling when stepdowns are permitted to be a one-shot mechanism usable only once during a test. That is to say, the indication for whether the stepdown thread isn't currently and will no longer continue to run stepdowns during the test persists until after the test finishes.
This commit is contained in:
parent
8503bfafad
commit
eea65efbdd
@ -253,14 +253,12 @@ executor:
|
||||
runningWithShardStepdowns: true
|
||||
traceExceptions: false
|
||||
useStepdownPermittedFile: true
|
||||
useSteppingDownFile: true
|
||||
usingReplicaSetShards: true
|
||||
hooks:
|
||||
- class: ContinuousStepdown
|
||||
config_stepdown: true
|
||||
shard_stepdown: true
|
||||
use_stepdown_permitted_file: true
|
||||
use_stepping_down_file: true
|
||||
wait_for_mongos_retarget: true
|
||||
- class: CheckReplDBHash
|
||||
- class: ValidateCollections
|
||||
|
@ -197,14 +197,12 @@ executor:
|
||||
runningWithAutoSplit: false
|
||||
runningWithBalancer: false
|
||||
useStepdownPermittedFile: true
|
||||
useSteppingDownFile: true
|
||||
usingReplicaSetShards: true
|
||||
hooks:
|
||||
- class: ContinuousStepdown
|
||||
config_stepdown: true
|
||||
shard_stepdown: true
|
||||
use_stepdown_permitted_file: true
|
||||
use_stepping_down_file: true
|
||||
wait_for_mongos_retarget: true
|
||||
- class: CheckReplDBHash
|
||||
- class: ValidateCollections
|
||||
|
@ -197,14 +197,12 @@ executor:
|
||||
runningWithConfigStepdowns: true
|
||||
runningWithShardStepdowns: true
|
||||
useStepdownPermittedFile: true
|
||||
useSteppingDownFile: true
|
||||
usingReplicaSetShards: true
|
||||
hooks:
|
||||
- class: ContinuousStepdown
|
||||
config_stepdown: true
|
||||
shard_stepdown: true
|
||||
use_stepdown_permitted_file: true
|
||||
use_stepping_down_file: true
|
||||
wait_for_mongos_retarget: true
|
||||
- class: CheckReplDBHash
|
||||
- class: ValidateCollections
|
||||
|
@ -25,8 +25,8 @@ class ContinuousStepdown(interface.Hook): # pylint: disable=too-many-instance-a
|
||||
def __init__( # pylint: disable=too-many-arguments
|
||||
self, hook_logger, fixture, config_stepdown=True, shard_stepdown=True,
|
||||
stepdown_interval_ms=8000, terminate=False, kill=False,
|
||||
use_stepdown_permitted_file=False, use_stepping_down_file=False,
|
||||
wait_for_mongos_retarget=False, stepdown_via_heartbeats=True):
|
||||
use_stepdown_permitted_file=False, wait_for_mongos_retarget=False,
|
||||
stepdown_via_heartbeats=True):
|
||||
"""Initialize the ContinuousStepdown.
|
||||
|
||||
Args:
|
||||
@ -38,7 +38,6 @@ class ContinuousStepdown(interface.Hook): # pylint: disable=too-many-instance-a
|
||||
terminate: shut down the node cleanly as a means of stepping it down.
|
||||
kill: With a 50% probability, kill the node instead of shutting it down cleanly.
|
||||
use_stepdown_permitted_file: use a file to control if stepdown thread should do a stepdown.
|
||||
use_stepping_down_file: use a file to denote when stepdown is active.
|
||||
wait_for_mongos_retarget: whether to run validate on all mongoses for each collection
|
||||
in each database, after pausing the stepdown thread.
|
||||
stepdown_via_heartbeats: step up secondaries instead of stepping down primary.
|
||||
@ -69,26 +68,21 @@ class ContinuousStepdown(interface.Hook): # pylint: disable=too-many-instance-a
|
||||
dbpath_prefix = fixture.get_dbpath_prefix()
|
||||
|
||||
if use_stepdown_permitted_file:
|
||||
self._stepdown_permitted_file = os.path.join(
|
||||
dbpath_prefix, "concurrency_sharded_stepdown_stepdown_permitted")
|
||||
stepdown_files = StepdownFiles._make(
|
||||
[os.path.join(dbpath_prefix, field) for field in StepdownFiles._fields])
|
||||
self.__lifecycle = FileBasedStepdownLifecycle(stepdown_files)
|
||||
else:
|
||||
self._stepdown_permitted_file = None
|
||||
if use_stepping_down_file:
|
||||
self._stepping_down_file = os.path.join(dbpath_prefix,
|
||||
"concurrency_sharded_stepdown_stepping_down")
|
||||
else:
|
||||
self._stepping_down_file = None
|
||||
self.__lifecycle = FlagBasedStepdownLifecycle()
|
||||
|
||||
def before_suite(self, test_report):
|
||||
"""Before suite."""
|
||||
if not self._rs_fixtures:
|
||||
self._add_fixture(self._fixture)
|
||||
utils.remove_if_exists(self._stepdown_permitted_file)
|
||||
utils.remove_if_exists(self._stepping_down_file)
|
||||
|
||||
self._stepdown_thread = _StepdownThread(
|
||||
self.logger, self._mongos_fixtures, self._rs_fixtures, self._stepdown_interval_secs,
|
||||
self._terminate, self._kill, self._stepdown_permitted_file, self._stepping_down_file,
|
||||
self._wait_for_mongos_retarget, self._stepdown_via_heartbeats)
|
||||
self._terminate, self._kill, self.__lifecycle, self._wait_for_mongos_retarget,
|
||||
self._stepdown_via_heartbeats)
|
||||
self.logger.info("Starting the stepdown thread.")
|
||||
self._stepdown_thread.start()
|
||||
|
||||
@ -102,10 +96,7 @@ class ContinuousStepdown(interface.Hook): # pylint: disable=too-many-instance-a
|
||||
"""Before test."""
|
||||
self._check_thread()
|
||||
self.logger.info("Resuming the stepdown thread.")
|
||||
# Once the stepdown thread has started any files it creates must be deleted within the
|
||||
# thread, since the Windows file handle is still open.
|
||||
self._stepdown_thread.pause()
|
||||
self._stepdown_thread.clean_stepdown_files()
|
||||
self._stepdown_thread.resume()
|
||||
|
||||
def after_test(self, test, test_report):
|
||||
@ -139,11 +130,220 @@ class ContinuousStepdown(interface.Hook): # pylint: disable=too-many-instance-a
|
||||
self._mongos_fixtures.append(mongos_fixture)
|
||||
|
||||
|
||||
class FlagBasedStepdownLifecycle(object):
|
||||
"""Class for managing the various states of the stepdown thread.
|
||||
|
||||
The job thread alternates between calling mark_test_started() and mark_test_finished(). The
|
||||
stepdown thread is allowed to perform stepdowns at any point between these two calls. Note that
|
||||
the job thread synchronizes with the stepdown thread outside the context of this object to know
|
||||
it isn't in the process of running a stepdown.
|
||||
"""
|
||||
|
||||
_TEST_STARTED_STATE = "start"
|
||||
_TEST_FINISHED_STATE = "finished"
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the FlagBasedStepdownLifecycle instance."""
|
||||
self.__lock = threading.Lock()
|
||||
self.__cond = threading.Condition(self.__lock)
|
||||
|
||||
self.__test_state = self._TEST_FINISHED_STATE
|
||||
self.__should_stop = False
|
||||
|
||||
def mark_test_started(self):
|
||||
"""Signal to the stepdown thread that a new test has started.
|
||||
|
||||
This function should be called during before_test(). Calling it causes the
|
||||
wait_for_stepdown_permitted() function to no longer block and to instead return true.
|
||||
"""
|
||||
with self.__lock:
|
||||
self.__test_state = self._TEST_STARTED_STATE
|
||||
self.__cond.notify_all()
|
||||
|
||||
def mark_test_finished(self):
|
||||
"""Signal to the stepdown thread that the current test has finished.
|
||||
|
||||
This function should be called during after_test(). Calling it causes the
|
||||
wait_for_stepdown_permitted() function to block until mark_test_started() is called again.
|
||||
"""
|
||||
with self.__lock:
|
||||
self.__test_state = self._TEST_FINISHED_STATE
|
||||
self.__cond.notify_all()
|
||||
|
||||
def stop(self):
|
||||
"""Signal to the stepdown thread that it should exit.
|
||||
|
||||
This function should be called during after_suite(). Calling it causes the
|
||||
wait_for_stepdown_permitted() function to no longer block and to instead return false.
|
||||
"""
|
||||
with self.__lock:
|
||||
self.__should_stop = True
|
||||
self.__cond.notify_all()
|
||||
|
||||
def wait_for_stepdown_permitted(self):
|
||||
"""Block until stepdowns are permitted, or until stop() is called.
|
||||
|
||||
:return: true if stepdowns are permitted, and false if steps are not permitted.
|
||||
"""
|
||||
with self.__lock:
|
||||
while not self.__should_stop:
|
||||
if self.__test_state == self._TEST_STARTED_STATE:
|
||||
return True
|
||||
|
||||
self.__cond.wait()
|
||||
|
||||
return False
|
||||
|
||||
def wait_for_stepdown_interval(self, timeout):
|
||||
"""Block for 'timeout' seconds, or until stop() is called."""
|
||||
with self.__lock:
|
||||
self.__cond.wait(timeout)
|
||||
|
||||
def poll_for_idle_request(self): # noqa: D205,D400
|
||||
"""Return true if the stepdown thread should continue running stepdowns, or false if it
|
||||
should temporarily stop running stepdowns.
|
||||
"""
|
||||
with self.__lock:
|
||||
return self.__test_state == self._TEST_FINISHED_STATE
|
||||
|
||||
def send_idle_acknowledgement(self):
|
||||
"""No-op.
|
||||
|
||||
This method exists so this class has the same interface as FileBasedStepdownLifecycle.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
StepdownFiles = collections.namedtuple("StepdownFiles", ["permitted", "idle_request", "idle_ack"])
|
||||
|
||||
|
||||
class FileBasedStepdownLifecycle(object):
|
||||
"""Class for managing the various states of the stepdown thread using files.
|
||||
|
||||
Unlike in the FlagBasedStepdownLifecycle class, the job thread alternating between calls to
|
||||
mark_test_started() and mark_test_finished() doesn't automatically grant permission for the
|
||||
stepdown thread to perform stepdowns. Instead, the test will part-way through allow stepdowns to
|
||||
be performed and then will part-way through disallow stepdowns from continuing to be performed.
|
||||
|
||||
See jstests/concurrency/fsm_libs/resmoke_runner.js for the other half of the file-base protocol.
|
||||
|
||||
Python inside of resmoke.py JavaScript inside of the mongo shell
|
||||
--------------------------- ------------------------------------
|
||||
|
||||
FSM workload starts.
|
||||
Call $config.setup() function.
|
||||
Create "permitted" file.
|
||||
|
||||
Wait for "permitted" file to be created.
|
||||
|
||||
Stepdown runs.
|
||||
Check if "idle_request" file exists.
|
||||
|
||||
Stepdown runs.
|
||||
Check if "idle_request" file exists.
|
||||
|
||||
FSM workload threads all finish.
|
||||
Create "idle_request" file.
|
||||
|
||||
Stepdown runs.
|
||||
Check if "idle_request" file exists.
|
||||
Create "idle_ack" file.
|
||||
(No more stepdowns run.)
|
||||
|
||||
Wait for "idle_ack" file.
|
||||
Call $config.teardown() function.
|
||||
FSM workload finishes.
|
||||
|
||||
Note that the job thread still synchronizes with the stepdown thread outside the context of this
|
||||
object to know it isn't in the process of running a stepdown.
|
||||
"""
|
||||
|
||||
def __init__(self, stepdown_files):
|
||||
"""Initialize the FileBasedStepdownLifecycle instance."""
|
||||
self.__stepdown_files = stepdown_files
|
||||
|
||||
self.__lock = threading.Lock()
|
||||
self.__cond = threading.Condition(self.__lock)
|
||||
|
||||
self.__should_stop = False
|
||||
|
||||
def mark_test_started(self):
|
||||
"""Signal to the stepdown thread that a new test has started.
|
||||
|
||||
This function should be called during before_test(). Calling it does nothing because
|
||||
permission for running stepdowns is given by the test itself writing the "permitted" file.
|
||||
"""
|
||||
pass
|
||||
|
||||
def mark_test_finished(self):
|
||||
"""Signal to the stepdown thread that the current test has finished.
|
||||
|
||||
This function should be called during after_test(). Calling it causes the
|
||||
wait_for_stepdown_permitted() function to block until the next test gives permission for
|
||||
running stepdowns.
|
||||
"""
|
||||
# It is possible something went wrong during the test's execution and prevented the
|
||||
# "permitted" and "idle_request" files from being created. We therefore don't consider it an
|
||||
# error if they don't exist after the test has finished.
|
||||
utils.remove_if_exists(self.__stepdown_files.permitted)
|
||||
utils.remove_if_exists(self.__stepdown_files.idle_request)
|
||||
utils.remove_if_exists(self.__stepdown_files.idle_ack)
|
||||
|
||||
def stop(self):
|
||||
"""Signal to the stepdown thread that it should exit.
|
||||
|
||||
This function should be called during after_suite(). Calling it causes the
|
||||
wait_for_stepdown_permitted() function to no longer block and to instead return false.
|
||||
"""
|
||||
with self.__lock:
|
||||
self.__should_stop = True
|
||||
self.__cond.notify_all()
|
||||
|
||||
def wait_for_stepdown_permitted(self):
|
||||
"""Block until stepdowns are permitted, or until stop() is called.
|
||||
|
||||
:return: true if stepdowns are permitted, and false if steps are not permitted.
|
||||
"""
|
||||
with self.__lock:
|
||||
while not self.__should_stop:
|
||||
if os.path.isfile(self.__stepdown_files.permitted):
|
||||
return True
|
||||
|
||||
# Wait a little bit before checking for the "permitted" file again.
|
||||
self.__cond.wait(0.1)
|
||||
|
||||
return False
|
||||
|
||||
def wait_for_stepdown_interval(self, timeout):
|
||||
"""Block for 'timeout' seconds, or until stop() is called."""
|
||||
with self.__lock:
|
||||
self.__cond.wait(timeout)
|
||||
|
||||
def poll_for_idle_request(self): # noqa: D205,D400
|
||||
"""Return true if the stepdown thread should continue running stepdowns, or false if it
|
||||
should temporarily stop running stepdowns.
|
||||
"""
|
||||
if os.path.isfile(self.__stepdown_files.idle_request):
|
||||
with self.__lock:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def send_idle_acknowledgement(self):
|
||||
"""Signal to the running test that stepdown thread won't continue to run stepdowns."""
|
||||
|
||||
with open(self.__stepdown_files.idle_ack, "w"):
|
||||
pass
|
||||
|
||||
# We remove the "permitted" file to revoke permission for the stepdown thread to continue
|
||||
# performing stepdowns.
|
||||
os.remove(self.__stepdown_files.permitted)
|
||||
|
||||
|
||||
class _StepdownThread(threading.Thread): # pylint: disable=too-many-instance-attributes
|
||||
def __init__( # pylint: disable=too-many-arguments
|
||||
self, logger, mongos_fixtures, rs_fixtures, stepdown_interval_secs, terminate, kill,
|
||||
stepdown_permitted_file, stepping_down_file, wait_for_mongos_retarget,
|
||||
stepdown_via_heartbeats):
|
||||
stepdown_lifecycle, wait_for_mongos_retarget, stepdown_via_heartbeats):
|
||||
"""Initialize _StepdownThread."""
|
||||
threading.Thread.__init__(self, name="StepdownThread")
|
||||
self.daemon = True
|
||||
@ -157,17 +357,13 @@ class _StepdownThread(threading.Thread): # pylint: disable=too-many-instance-at
|
||||
self._stepdown_duration_secs = 24 * 60 * 60 # 24 hours
|
||||
self._terminate = terminate
|
||||
self._kill = kill
|
||||
self._stepdown_permitted_file = stepdown_permitted_file
|
||||
self._stepping_down_file = stepping_down_file
|
||||
self.__lifecycle = stepdown_lifecycle
|
||||
self._should_wait_for_mongos_retarget = wait_for_mongos_retarget
|
||||
self._stepdown_via_heartbeats = stepdown_via_heartbeats
|
||||
|
||||
self._last_exec = time.time()
|
||||
# Event set when the thread has been stopped using the 'stop()' method.
|
||||
self._is_stopped_evt = threading.Event()
|
||||
# Event set when the thread is not paused.
|
||||
self._is_resumed_evt = threading.Event()
|
||||
self._is_resumed_evt.set()
|
||||
# Event set when the thread is not performing stepdowns.
|
||||
self._is_idle_evt = threading.Event()
|
||||
self._is_idle_evt.set()
|
||||
@ -182,9 +378,14 @@ class _StepdownThread(threading.Thread): # pylint: disable=too-many-instance-at
|
||||
|
||||
try:
|
||||
while True:
|
||||
if self.__is_stopped():
|
||||
self._is_idle_evt.set()
|
||||
|
||||
permitted = self.__lifecycle.wait_for_stepdown_permitted()
|
||||
if not permitted:
|
||||
break
|
||||
self._wait_for_permission_or_resume()
|
||||
|
||||
self._is_idle_evt.clear()
|
||||
|
||||
now = time.time()
|
||||
if now - self._last_exec > self._stepdown_interval_secs:
|
||||
self.logger.info("Starting stepdown of all primaries")
|
||||
@ -194,13 +395,17 @@ class _StepdownThread(threading.Thread): # pylint: disable=too-many-instance-at
|
||||
self._last_exec = time.time()
|
||||
self.logger.info("Completed stepdown of all primaries in %0d ms",
|
||||
(self._last_exec - now) * 1000)
|
||||
now = time.time()
|
||||
if self._is_permitted():
|
||||
# The 'wait_secs' is used to wait 'self._stepdown_interval_secs' from the moment
|
||||
# the last stepdown command was sent.
|
||||
wait_secs = max(0, self._stepdown_interval_secs - (now - self._last_exec))
|
||||
self._wait(wait_secs)
|
||||
|
||||
found_idle_request = self.__lifecycle.poll_for_idle_request()
|
||||
if found_idle_request:
|
||||
self.__lifecycle.send_idle_acknowledgement()
|
||||
continue
|
||||
|
||||
# The 'wait_secs' is used to wait 'self._stepdown_interval_secs' from the moment
|
||||
# the last stepdown command was sent.
|
||||
now = time.time()
|
||||
wait_secs = max(0, self._stepdown_interval_secs - (now - self._last_exec))
|
||||
self.__lifecycle.wait_for_stepdown_interval(wait_secs)
|
||||
except Exception: # pylint: disable=W0703
|
||||
# Proactively log the exception when it happens so it will be
|
||||
# flushed immediately.
|
||||
@ -208,17 +413,16 @@ class _StepdownThread(threading.Thread): # pylint: disable=too-many-instance-at
|
||||
|
||||
def stop(self):
|
||||
"""Stop the thread."""
|
||||
self.__lifecycle.stop()
|
||||
self._is_stopped_evt.set()
|
||||
# Unpause to allow the thread to finish.
|
||||
self.resume()
|
||||
self.join()
|
||||
|
||||
def __is_stopped(self):
|
||||
return self._is_stopped_evt.is_set()
|
||||
|
||||
def pause(self):
|
||||
"""Pause the thread."""
|
||||
self._is_resumed_evt.clear()
|
||||
self.__lifecycle.mark_test_finished()
|
||||
|
||||
# Wait until we are no longer executing stepdowns.
|
||||
self._is_idle_evt.wait()
|
||||
# Wait until we all the replica sets have primaries.
|
||||
@ -228,21 +432,12 @@ class _StepdownThread(threading.Thread): # pylint: disable=too-many-instance-at
|
||||
|
||||
def resume(self):
|
||||
"""Resume the thread."""
|
||||
self._is_resumed_evt.set()
|
||||
self.__lifecycle.mark_test_started()
|
||||
|
||||
self.logger.info(
|
||||
"Current statistics about which nodes have been successfully stepped up: %s",
|
||||
self._step_up_stats)
|
||||
|
||||
def _wait_for_permission_or_resume(self):
|
||||
# Wait until stop, _stepdown_permitted_file or resume.
|
||||
if self._stepdown_permitted_file:
|
||||
while not os.path.isfile(self._stepdown_permitted_file) and not self.__is_stopped():
|
||||
# Set a short sleep during busy wait time for self._stepdown_permitted_file.
|
||||
self._wait(0.1)
|
||||
else:
|
||||
self._is_resumed_evt.wait()
|
||||
|
||||
def _wait(self, timeout):
|
||||
# Wait until stop or timeout.
|
||||
self._is_stopped_evt.wait(timeout)
|
||||
@ -252,15 +447,8 @@ class _StepdownThread(threading.Thread): # pylint: disable=too-many-instance-at
|
||||
fixture.get_primary()
|
||||
|
||||
def _step_down_all(self):
|
||||
self._is_idle_evt.clear()
|
||||
self._stepdown_starting()
|
||||
try:
|
||||
if self._is_permitted():
|
||||
for rs_fixture in self._rs_fixtures:
|
||||
self._step_down(rs_fixture)
|
||||
finally:
|
||||
self._stepdown_completed()
|
||||
self._is_idle_evt.set()
|
||||
for rs_fixture in self._rs_fixtures:
|
||||
self._step_down(rs_fixture)
|
||||
|
||||
# pylint: disable=R0912,R0915
|
||||
def _step_down(self, rs_fixture):
|
||||
@ -460,30 +648,3 @@ class _StepdownThread(threading.Thread): # pylint: disable=too-many-instance-at
|
||||
retarget_time = time.time() - start_time
|
||||
self.logger.info("Finished waiting for mongos: %s to retarget db: %s, in %d ms",
|
||||
mongos_conn_str, db, retarget_time * 1000)
|
||||
|
||||
def _is_permitted(self):
|
||||
"""Permit a stepdown if the permitted file is not specified or it exists.
|
||||
|
||||
The self._permitted_file is created by an external framework, i.e., JS tests.
|
||||
"""
|
||||
if self._stepdown_permitted_file:
|
||||
return os.path.isfile(self._stepdown_permitted_file)
|
||||
return self._is_resumed_evt.is_set()
|
||||
|
||||
def _stepdown_starting(self):
|
||||
"""Create self._stepping_down_file, if specified."""
|
||||
if self._stepping_down_file:
|
||||
if os.path.isfile(self._stepping_down_file):
|
||||
raise RuntimeError("Stepping down file {} already exists".format(
|
||||
self._stepping_down_file))
|
||||
with open(self._stepping_down_file, "w") as fh:
|
||||
fh.write("")
|
||||
|
||||
def _stepdown_completed(self):
|
||||
"""Delete self._stepping_down_file, if specified."""
|
||||
utils.remove_if_exists(self._stepping_down_file)
|
||||
|
||||
def clean_stepdown_files(self):
|
||||
"""Remove the stepdown files."""
|
||||
utils.remove_if_exists(self._stepdown_permitted_file)
|
||||
utils.remove_if_exists(self._stepping_down_file)
|
||||
|
@ -48,11 +48,10 @@ def is_windows():
|
||||
|
||||
def remove_if_exists(path):
|
||||
"""Remove path if it exists."""
|
||||
if path is not None and os.path.exists(path):
|
||||
try:
|
||||
os.remove(path)
|
||||
except OSError:
|
||||
pass
|
||||
try:
|
||||
os.remove(path)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def is_string_list(lst):
|
||||
|
140
buildscripts/tests/resmokelib/testing/hooks/test_stepdown.py
Normal file
140
buildscripts/tests/resmokelib/testing/hooks/test_stepdown.py
Normal file
@ -0,0 +1,140 @@
|
||||
"""Unit tests for buildscripts/resmokelib/testing/hooks/stepdown.py."""
|
||||
|
||||
import os
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
|
||||
from buildscripts.resmokelib.testing.hooks import stepdown as _stepdown
|
||||
|
||||
# pylint: disable=missing-docstring
|
||||
|
||||
|
||||
def _get_threading_lock(test_case, MockCondition): # pylint: disable=invalid-name
|
||||
# There doesn't seem to be a better way to get the arguments that were passed in to the
|
||||
# constructor. We need to call release() on the threading.Lock in order for other methods on the
|
||||
# lifecycle object to be able to acquire() it.
|
||||
test_case.assertEqual(1, len(MockCondition.call_args_list))
|
||||
lock = MockCondition.call_args[0][0]
|
||||
return lock
|
||||
|
||||
|
||||
class TestFlagBasedStepdownLifecycle(unittest.TestCase):
|
||||
def test_becomes_idle_after_test_finishes(self):
|
||||
lifecycle = _stepdown.FlagBasedStepdownLifecycle()
|
||||
lifecycle.mark_test_started()
|
||||
self.assertFalse(lifecycle.poll_for_idle_request())
|
||||
lifecycle.mark_test_finished()
|
||||
self.assertTrue(lifecycle.poll_for_idle_request())
|
||||
|
||||
def test_stepdown_permitted_after_test_starts(self):
|
||||
lifecycle = _stepdown.FlagBasedStepdownLifecycle()
|
||||
lifecycle.mark_test_started()
|
||||
self.assertTrue(lifecycle.wait_for_stepdown_permitted())
|
||||
|
||||
@mock.patch("threading.Condition")
|
||||
def test_waiting_for_stepdown_permitted_is_interruptible(self, MockCondition): # pylint: disable=invalid-name
|
||||
lifecycle = _stepdown.FlagBasedStepdownLifecycle()
|
||||
lifecycle.mark_test_started()
|
||||
lifecycle.mark_test_finished()
|
||||
|
||||
def call_stop_while_waiting():
|
||||
lock = _get_threading_lock(self, MockCondition)
|
||||
lock.release()
|
||||
lifecycle.stop()
|
||||
lock.acquire()
|
||||
|
||||
cond = MockCondition.return_value
|
||||
cond.wait.side_effect = call_stop_while_waiting
|
||||
|
||||
self.assertFalse(lifecycle.wait_for_stepdown_permitted())
|
||||
self.assertTrue(cond.wait.called)
|
||||
|
||||
|
||||
class TestFileBasedStepdownLifecycle(unittest.TestCase):
|
||||
|
||||
STEPDOWN_FILES = _stepdown.StepdownFiles._make(_stepdown.StepdownFiles._fields)
|
||||
|
||||
def test_still_idle_after_test_starts(self):
|
||||
lifecycle = _stepdown.FileBasedStepdownLifecycle(self.STEPDOWN_FILES)
|
||||
lifecycle.mark_test_started()
|
||||
self.assertFalse(lifecycle.poll_for_idle_request())
|
||||
|
||||
@mock.patch("os.remove")
|
||||
def test_files_cleaned_up_after_test_finishes(self, mock_os_remove):
|
||||
lifecycle = _stepdown.FileBasedStepdownLifecycle(self.STEPDOWN_FILES)
|
||||
lifecycle.mark_test_started()
|
||||
|
||||
lifecycle.mark_test_finished()
|
||||
mock_os_remove.assert_any_call("permitted")
|
||||
mock_os_remove.assert_any_call("idle_request")
|
||||
mock_os_remove.assert_any_call("idle_ack")
|
||||
|
||||
mock_os_remove.reset_mock()
|
||||
mock_os_remove.side_effect = OSError("Pretend that the file wasn't found")
|
||||
|
||||
lifecycle.mark_test_finished()
|
||||
mock_os_remove.assert_any_call("permitted")
|
||||
mock_os_remove.assert_any_call("idle_request")
|
||||
mock_os_remove.assert_any_call("idle_ack")
|
||||
|
||||
@mock.patch("os.path")
|
||||
def test_stepdown_permitted_if_permitted_file_exists(self, mock_os_path):
|
||||
lifecycle = _stepdown.FileBasedStepdownLifecycle(self.STEPDOWN_FILES)
|
||||
lifecycle.mark_test_started()
|
||||
|
||||
def mock_does_permitted_file_exists(filename):
|
||||
if filename == "permitted":
|
||||
return permitted_file_exists
|
||||
|
||||
return os.path.isfile(filename)
|
||||
|
||||
mock_os_path.isfile = mock_does_permitted_file_exists
|
||||
|
||||
permitted_file_exists = True
|
||||
self.assertTrue(lifecycle.wait_for_stepdown_permitted())
|
||||
|
||||
@mock.patch("threading.Condition")
|
||||
@mock.patch("os.path")
|
||||
def test_stepdown_waits_until_permitted_file_exists(self, mock_os_path, MockCondition): # pylint: disable=invalid-name
|
||||
lifecycle = _stepdown.FileBasedStepdownLifecycle(self.STEPDOWN_FILES)
|
||||
lifecycle.mark_test_started()
|
||||
|
||||
def mock_does_permitted_file_exists(filename): # pylint: disable=inconsistent-return-statements
|
||||
if filename == "permitted":
|
||||
return permitted_file_exists
|
||||
|
||||
self.fail("Mock called with unexpected filename: %s" % (filename, ))
|
||||
|
||||
mock_os_path.isfile = mock_does_permitted_file_exists
|
||||
|
||||
def create_permitted_file_while_waiting(_timeout):
|
||||
nonlocal permitted_file_exists
|
||||
permitted_file_exists = True
|
||||
|
||||
cond = MockCondition.return_value
|
||||
cond.wait.side_effect = create_permitted_file_while_waiting
|
||||
|
||||
permitted_file_exists = False
|
||||
self.assertTrue(lifecycle.wait_for_stepdown_permitted())
|
||||
self.assertTrue(cond.wait.called)
|
||||
|
||||
@mock.patch("threading.Condition")
|
||||
@mock.patch("os.path")
|
||||
def test_waiting_for_stepdown_permitted_is_interruptible(self, mock_os_path, MockCondition): # pylint: disable=invalid-name
|
||||
lifecycle = _stepdown.FileBasedStepdownLifecycle(self.STEPDOWN_FILES)
|
||||
lifecycle.mark_test_started()
|
||||
|
||||
mock_os_path.isfile.return_value = False
|
||||
|
||||
def call_stop_while_waiting(_timeout):
|
||||
lock = _get_threading_lock(self, MockCondition)
|
||||
lock.release()
|
||||
lifecycle.stop()
|
||||
lock.acquire()
|
||||
|
||||
cond = MockCondition.return_value
|
||||
cond.wait.side_effect = call_stop_while_waiting
|
||||
|
||||
self.assertFalse(lifecycle.wait_for_stepdown_permitted())
|
||||
self.assertTrue(cond.wait.called)
|
@ -100,8 +100,7 @@
|
||||
|
||||
// Await replication after running the $config.setup() function when stepdowns are
|
||||
// permitted to ensure its effects aren't rolled back.
|
||||
if (cluster.isReplication() &&
|
||||
typeof executionOptions.stepdownPermittedFile === 'string') {
|
||||
if (cluster.isReplication() && executionOptions.stepdownFiles !== undefined) {
|
||||
cluster.awaitReplication();
|
||||
}
|
||||
|
||||
@ -127,8 +126,8 @@
|
||||
// Indicate that the stepdown thread can run. It is unnecessary for the stepdown thread
|
||||
// to indicate that it is going to start running because it will eventually after the
|
||||
// worker threads have started.
|
||||
if (typeof executionOptions.stepdownPermittedFile === 'string') {
|
||||
writeFile(executionOptions.stepdownPermittedFile, '');
|
||||
if (executionOptions.stepdownFiles !== undefined) {
|
||||
writeFile(executionOptions.stepdownFiles.permitted, '');
|
||||
}
|
||||
|
||||
// Since the worker threads may be running with causal consistency enabled, we set the
|
||||
@ -175,14 +174,14 @@
|
||||
// signal that it has stopped.
|
||||
//
|
||||
// Signal to the stepdown thread to stop stepping down the cluster.
|
||||
if (typeof executionOptions.stepdownPermittedFile === 'string' &&
|
||||
typeof executionOptions.steppingDownFile === 'string') {
|
||||
removeFile(executionOptions.stepdownPermittedFile);
|
||||
// Wait for the steppingDownFile to be removed by the stepdown thread.
|
||||
if (executionOptions.stepdownFiles !== undefined) {
|
||||
writeFile(executionOptions.stepdownFiles.idleRequest, '');
|
||||
|
||||
// Wait for the acknowledgement file to be created by the stepdown thread.
|
||||
assert.soonNoExcept(function() {
|
||||
if (ls().indexOf(executionOptions.steppingDownFile) === -1) {
|
||||
return true;
|
||||
}
|
||||
// The cat() function will throw an exception if the file isn't found.
|
||||
cat(executionOptions.stepdownFiles.idleAck);
|
||||
return true;
|
||||
}, "stepdown still in progress");
|
||||
}
|
||||
}
|
||||
@ -271,13 +270,11 @@
|
||||
// The stepdown file names need to match the same construction as found in
|
||||
// buildscripts/resmokelib/testing/hooks/stepdown.py.
|
||||
if (TestData.useStepdownPermittedFile) {
|
||||
executionOptions.stepdownPermittedFile =
|
||||
resmokeDbPathPrefix + '/concurrency_sharded_stepdown_stepdown_permitted';
|
||||
}
|
||||
|
||||
if (TestData.useSteppingDownFile) {
|
||||
executionOptions.steppingDownFile =
|
||||
resmokeDbPathPrefix + '/concurrency_sharded_stepdown_stepping_down';
|
||||
executionOptions.stepdownFiles = {
|
||||
permitted: resmokeDbPathPrefix + '/permitted',
|
||||
idleRequest: resmokeDbPathPrefix + '/idle_request',
|
||||
idleAck: resmokeDbPathPrefix + '/idle_ack',
|
||||
};
|
||||
}
|
||||
|
||||
if (Object.keys(sessionOptions).length > 0 || TestData.runningWithSessions) {
|
||||
|
@ -45,8 +45,7 @@ var runner = (function() {
|
||||
'dbNamePrefix',
|
||||
'iterationMultiplier',
|
||||
'sessionOptions',
|
||||
'stepdownPermittedFile',
|
||||
'steppingDownFile',
|
||||
'stepdownFiles',
|
||||
'threadMultiplier'
|
||||
];
|
||||
|
||||
@ -101,16 +100,18 @@ var runner = (function() {
|
||||
1,
|
||||
'expected iterationMultiplier to be greater than or equal to 1');
|
||||
|
||||
if (typeof options.stepdownPermittedFile !== 'undefined') {
|
||||
if (typeof options.stepdownFiles !== 'undefined') {
|
||||
assert.eq('string',
|
||||
typeof options.stepdownPermittedFile,
|
||||
'expected stepdownPermittedFile to be a string');
|
||||
}
|
||||
typeof options.stepdownFiles.permitted,
|
||||
'expected stepdownFiles.permitted to be a string');
|
||||
|
||||
if (typeof options.steppingDownFile !== 'undefined') {
|
||||
assert.eq('string',
|
||||
typeof options.steppingDownFile,
|
||||
'expected steppingDownFile to be a string');
|
||||
typeof options.stepdownFiles.idleRequest,
|
||||
'expected stepdownFiles.idleRequest to be a string');
|
||||
|
||||
assert.eq('string',
|
||||
typeof options.stepdownFiles.idleAck,
|
||||
'expected stepdownFiles.idleAck to be a string');
|
||||
}
|
||||
|
||||
options.threadMultiplier = options.threadMultiplier || 1;
|
||||
|
Loading…
Reference in New Issue
Block a user