0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-11-30 00:56:44 +01:00

SERVER-23959 Conditionally create a Windows job object in smoke.py.

Changes smoke.py to (like resmoke.py) only create a job object if the
Python process isn't already inside of one. This allows the Evergreen
agent to create a job object for managing processes spawned by a task to
ensure they are cleaned up reliably.
This commit is contained in:
Max Hirschhorn 2017-02-02 10:49:12 -05:00
parent 81901db84f
commit dd6acd3551
2 changed files with 116 additions and 20 deletions

View File

@ -238,6 +238,12 @@ def run_and_echo(command):
"""
proc = subprocess.Popen(command)
# We write the pid of the spawned process as the first line of buildlogger.py's stdout because
# smoke.py expects to use it to terminate processes individually if already running inside a job
# object.
sys.stdout.write("[buildlogger.py] pid: %d\n" % (proc.pid))
sys.stdout.flush()
def handle_sigterm(signum, frame):
try:
proc.send_signal(signum)
@ -415,6 +421,12 @@ def loop_and_callback(command, callback):
stderr=subprocess.STDOUT,
)
# We write the pid of the spawned process as the first line of buildlogger.py's stdout because
# smoke.py expects to use it to terminate processes individually if already running inside a job
# object.
sys.stdout.write("[buildlogger.py] pid: %d\n" % (proc.pid))
sys.stdout.flush()
def handle_sigterm(signum, frame):
try:
proc.send_signal(signum)

View File

@ -36,6 +36,7 @@
from datetime import datetime
from itertools import izip
import glob
import logging
from optparse import OptionParser
import os
import pprint
@ -75,6 +76,12 @@ except:
except:
json = None
# Get relative imports to work when the package is not installed on the PYTHONPATH.
if __name__ == "__main__" and __package__ is None:
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(os.path.realpath(__file__)))))
from buildscripts.resmokelib.core import pipe
# TODO clean this up so we don't need globals...
mongo_repo = os.getcwd() #'./'
@ -167,6 +174,10 @@ class mongod(NullMongod):
self.proc = None
self.auth = False
self.job_object = None
self._inner_proc_pid = None
self._stdout_pipe = None
def ensure_test_dirs(self):
utils.ensureDir(smoke_db_prefix + "/tmp/unittest/")
utils.ensureDir(smoke_db_prefix + "/data/")
@ -262,6 +273,28 @@ class mongod(NullMongod):
print "running " + " ".join(argv)
self.proc = self._start(buildlogger(argv, is_global=True))
# If the mongod process is spawned under buildlogger.py, then the first line of output
# should include the pid of the underlying mongod process. If smoke.py didn't create its own
# job object because it is already inside one, then the pid is used to attempt to terminate
# the underlying mongod process.
first_line = self.proc.stdout.readline()
match = re.search("^\[buildlogger.py\] pid: (?P<pid>[0-9]+)$", first_line.rstrip())
if match is not None:
self._inner_proc_pid = int(match.group("pid"))
else:
# The first line of output didn't include the pid of the underlying mongod process. We
# write the first line of output to smoke.py's stdout to ensure the message doesn't get
# lost since it's possible that buildlogger.py isn't being used.
sys.stdout.write(first_line)
logger = logging.Logger("", level=logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(fmt="%(message)s"))
logger.addHandler(handler)
self._stdout_pipe = pipe.LoggerPipe(logger, logging.INFO, self.proc.stdout)
self._stdout_pipe.wait_until_started()
if not self.did_mongod_start(self.port):
raise Exception("Failed to start mongod")
@ -275,12 +308,15 @@ class mongod(NullMongod):
synced = synced and "syncedTo" in source and source["syncedTo"]
def _start(self, argv):
"""In most cases, just call subprocess.Popen(). On windows,
add the started process to a new Job Object, so that any
child processes of this process can be killed with a single
call to TerminateJobObject (see self.stop()).
"""In most cases, just call subprocess.Popen(). On Windows, this
method also assigns the started process to a job object if a new
one was created. This ensures that any child processes of this
process can be killed with a single call to TerminateJobObject
(see self.stop()).
"""
creation_flags = 0
if os.sys.platform == "win32":
# Create a job object with the "kill on job close"
# flag; this is inherited by child processes (ie
@ -288,28 +324,30 @@ class mongod(NullMongod):
# and lets us terminate the whole tree of processes
# rather than orphaning the mongod.
import win32job
import win32process
# Magic number needed to allow job reassignment in Windows 7
# see: MSDN - Process Creation Flags - ms684863
CREATE_BREAKAWAY_FROM_JOB = 0x01000000
# Don't create a job object if the current process is already inside one.
if not win32job.IsProcessInJob(win32process.GetCurrentProcess(), None):
self.job_object = win32job.CreateJobObject(None, '')
proc = Popen(argv, creationflags=CREATE_BREAKAWAY_FROM_JOB)
job_info = win32job.QueryInformationJobObject(
self.job_object, win32job.JobObjectExtendedLimitInformation)
job_info['BasicLimitInformation']['LimitFlags'] |= \
win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE
win32job.SetInformationJobObject(
self.job_object,
win32job.JobObjectExtendedLimitInformation,
job_info)
self.job_object = win32job.CreateJobObject(None, '')
# Magic number needed to allow job reassignment in Windows 7
# see: MSDN - Process Creation Flags - ms684863
creation_flags |= win32process.CREATE_BREAKAWAY_FROM_JOB
job_info = win32job.QueryInformationJobObject(
self.job_object, win32job.JobObjectExtendedLimitInformation)
job_info['BasicLimitInformation']['LimitFlags'] |= win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE
win32job.SetInformationJobObject(
self.job_object,
win32job.JobObjectExtendedLimitInformation,
job_info)
proc = Popen(argv, creationflags=creation_flags, stdout=PIPE, stderr=None, bufsize=0)
if self.job_object is not None:
win32job.AssignProcessToJobObject(self.job_object, proc._handle)
else:
proc = Popen(argv)
return proc
def stop(self):
@ -317,11 +355,53 @@ class mongod(NullMongod):
print >> sys.stderr, "probable bug: self.proc unset in stop()"
return
try:
if os.sys.platform == "win32":
if os.sys.platform == "win32" and self.job_object is not None:
# If smoke.py created its own job object, then we clean up the spawned processes by
# terminating it.
import win32job
win32job.TerminateJobObject(self.job_object, -1)
# Windows doesn't seem to kill the process immediately, so give it some time to die
time.sleep(5)
elif os.sys.platform == "win32":
# If smoke.py didn't create its own job object, then we attempt to clean up the
# spawned processes by terminating them individually.
import win32api
import win32con
import win32event
import win32process
import winerror
def win32_terminate(handle):
# Adapted from implementation of Popen.terminate() in subprocess.py of Python
# 2.7 because earlier versions do not catch exceptions.
try:
win32process.TerminateProcess(handle, -1)
except win32process.error as err:
# ERROR_ACCESS_DENIED (winerror=5) is received when the process has
# already died.
if err.winerror != winerror.ERROR_ACCESS_DENIED:
raise
return_code = win32process.GetExitCodeProcess(handle)
if return_code == win32con.STILL_ACTIVE:
raise
# Terminate the mongod process underlying buildlogger.py if one exists.
if self._inner_proc_pid is not None:
# The PROCESS_TERMINATE privilege is necessary to call TerminateProcess() and
# the SYNCHRONIZE privilege is necessary to call WaitForSingleObject(). See
# https://msdn.microsoft.com/en-us/library/windows/desktop/ms684880(v=vs.85).aspx
# for more details.
required_access = win32con.PROCESS_TERMINATE | win32con.SYNCHRONIZE
inner_proc_handle = win32api.OpenProcess(required_access,
False,
self._inner_proc_pid)
try:
win32_terminate(inner_proc_handle)
win32event.WaitForSingleObject(inner_proc_handle, win32event.INFINITE)
finally:
win32api.CloseHandle(inner_proc_handle)
win32_terminate(self.proc._handle)
elif hasattr(self.proc, "terminate"):
# This method added in Python 2.6
self.proc.terminate()
@ -331,6 +411,10 @@ class mongod(NullMongod):
print >> sys.stderr, "error shutting down mongod"
print >> sys.stderr, e
self.proc.wait()
if self._stdout_pipe is not None:
self._stdout_pipe.wait_until_finished()
sys.stderr.flush()
sys.stdout.flush()