diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py index 1b9e7481f2e..402ceffd1bb 100644 --- a/Lib/test/support/interpreters/queues.py +++ b/Lib/test/support/interpreters/queues.py @@ -12,9 +12,11 @@ from _interpqueues import ( ) __all__ = [ + 'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE', 'create', 'list_all', 'Queue', 'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull', + 'ItemInterpreterDestroyed', ] @@ -32,26 +34,90 @@ class QueueFull(QueueError, queue.Full): """ +class ItemInterpreterDestroyed(QueueError): + """Raised from get() and get_nowait().""" + + _SHARED_ONLY = 0 _PICKLED = 1 -def create(maxsize=0, *, syncobj=False): + +class UnboundItem: + """Represents a Queue item no longer bound to an interpreter. + + An item is unbound when the interpreter that added it to the queue + is destroyed. + """ + + __slots__ = () + + def __new__(cls): + return UNBOUND + + def __repr__(self): + return f'interpreters.queues.UNBOUND' + + +UNBOUND = object.__new__(UnboundItem) +UNBOUND_ERROR = object() +UNBOUND_REMOVE = object() + +_UNBOUND_CONSTANT_TO_FLAG = { + UNBOUND_REMOVE: 1, + UNBOUND_ERROR: 2, + UNBOUND: 3, +} +_UNBOUND_FLAG_TO_CONSTANT = {v: k + for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()} + +def _serialize_unbound(unbound): + op = unbound + try: + flag = _UNBOUND_CONSTANT_TO_FLAG[op] + except KeyError: + raise NotImplementedError(f'unsupported unbound replacement op {op!r}') + return flag, + + +def _resolve_unbound(flag): + try: + op = _UNBOUND_FLAG_TO_CONSTANT[flag] + except KeyError: + raise NotImplementedError(f'unsupported unbound replacement op {flag!r}') + if op is UNBOUND_REMOVE: + # "remove" not possible here + raise NotImplementedError + elif op is UNBOUND_ERROR: + raise ItemInterpreterDestroyed("item's original interpreter destroyed") + elif op is UNBOUND: + return UNBOUND + else: + raise NotImplementedError(repr(op)) + + +def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND): """Return a new cross-interpreter queue. The queue may be used to pass data safely between interpreters. "syncobj" sets the default for Queue.put() and Queue.put_nowait(). + + "unbounditems" likewise sets the default. See Queue.put() for + supported values. The default value is UNBOUND, which replaces + the unbound item. """ fmt = _SHARED_ONLY if syncobj else _PICKLED - qid = _queues.create(maxsize, fmt) - return Queue(qid, _fmt=fmt) + unbound = _serialize_unbound(unbounditems) + unboundop, = unbound + qid = _queues.create(maxsize, fmt, unboundop) + return Queue(qid, _fmt=fmt, _unbound=unbound) def list_all(): """Return a list of all open queues.""" - return [Queue(qid, _fmt=fmt) - for qid, fmt in _queues.list_all()] + return [Queue(qid, _fmt=fmt, _unbound=(unboundop,)) + for qid, fmt, unboundop in _queues.list_all()] _known_queues = weakref.WeakValueDictionary() @@ -59,20 +125,28 @@ _known_queues = weakref.WeakValueDictionary() class Queue: """A cross-interpreter queue.""" - def __new__(cls, id, /, *, _fmt=None): + def __new__(cls, id, /, *, _fmt=None, _unbound=None): # There is only one instance for any given ID. if isinstance(id, int): id = int(id) else: raise TypeError(f'id must be an int, got {id!r}') if _fmt is None: - _fmt, = _queues.get_queue_defaults(id) + if _unbound is None: + _fmt, op = _queues.get_queue_defaults(id) + _unbound = (op,) + else: + _fmt, _ = _queues.get_queue_defaults(id) + elif _unbound is None: + _, op = _queues.get_queue_defaults(id) + _unbound = (op,) try: self = _known_queues[id] except KeyError: self = super().__new__(cls) self._id = id self._fmt = _fmt + self._unbound = _unbound _known_queues[id] = self _queues.bind(id) return self @@ -124,6 +198,7 @@ class Queue: def put(self, obj, timeout=None, *, syncobj=None, + unbound=None, _delay=10 / 1000, # 10 milliseconds ): """Add the object to the queue. @@ -131,7 +206,7 @@ class Queue: This blocks while the queue is full. If "syncobj" is None (the default) then it uses the - queue's default, set with create_queue().. + queue's default, set with create_queue(). If "syncobj" is false then all objects are supported, at the expense of worse performance. @@ -152,11 +227,37 @@ class Queue: actually is. That's a slightly different and stronger promise than just (initial) equality, which is all "syncobj=False" can promise. + + "unbound" controls the behavior of Queue.get() for the given + object if the current interpreter (calling put()) is later + destroyed. + + If "unbound" is None (the default) then it uses the + queue's default, set with create_queue(), + which is usually UNBOUND. + + If "unbound" is UNBOUND_ERROR then get() will raise an + ItemInterpreterDestroyed exception if the original interpreter + has been destroyed. This does not otherwise affect the queue; + the next call to put() will work like normal, returning the next + item in the queue. + + If "unbound" is UNBOUND_REMOVE then the item will be removed + from the queue as soon as the original interpreter is destroyed. + Be aware that this will introduce an imbalance between put() + and get() calls. + + If "unbound" is UNBOUND then it is returned by get() in place + of the unbound item. """ if syncobj is None: fmt = self._fmt else: fmt = _SHARED_ONLY if syncobj else _PICKLED + if unbound is None: + unboundop, = self._unbound + else: + unboundop, = _serialize_unbound(unbound) if timeout is not None: timeout = int(timeout) if timeout < 0: @@ -166,7 +267,7 @@ class Queue: obj = pickle.dumps(obj) while True: try: - _queues.put(self._id, obj, fmt) + _queues.put(self._id, obj, fmt, unboundop) except QueueFull as exc: if timeout is not None and time.time() >= end: raise # re-raise @@ -174,14 +275,18 @@ class Queue: else: break - def put_nowait(self, obj, *, syncobj=None): + def put_nowait(self, obj, *, syncobj=None, unbound=None): if syncobj is None: fmt = self._fmt else: fmt = _SHARED_ONLY if syncobj else _PICKLED + if unbound is None: + unboundop, = self._unbound + else: + unboundop, = _serialize_unbound(unbound) if fmt is _PICKLED: obj = pickle.dumps(obj) - _queues.put(self._id, obj, fmt) + _queues.put(self._id, obj, fmt, unboundop) def get(self, timeout=None, *, _delay=10 / 1000, # 10 milliseconds @@ -189,6 +294,10 @@ class Queue: """Return the next object from the queue. This blocks while the queue is empty. + + If the next item's original interpreter has been destroyed + then the "next object" is determined by the value of the + "unbound" argument to put(). """ if timeout is not None: timeout = int(timeout) @@ -197,13 +306,16 @@ class Queue: end = time.time() + timeout while True: try: - obj, fmt = _queues.get(self._id) + obj, fmt, unboundop = _queues.get(self._id) except QueueEmpty as exc: if timeout is not None and time.time() >= end: raise # re-raise time.sleep(_delay) else: break + if unboundop is not None: + assert obj is None, repr(obj) + return _resolve_unbound(unboundop) if fmt == _PICKLED: obj = pickle.loads(obj) else: @@ -217,9 +329,12 @@ class Queue: is the same as get(). """ try: - obj, fmt = _queues.get(self._id) + obj, fmt, unboundop = _queues.get(self._id) except QueueEmpty as exc: raise # re-raise + if unboundop is not None: + assert obj is None, repr(obj) + return _resolve_unbound(unboundop) if fmt == _PICKLED: obj = pickle.loads(obj) else: diff --git a/Lib/test/test_interpreters/test_queues.py b/Lib/test/test_interpreters/test_queues.py index 9ee7647ed79..30d58a5b291 100644 --- a/Lib/test/test_interpreters/test_queues.py +++ b/Lib/test/test_interpreters/test_queues.py @@ -12,13 +12,16 @@ from test.support.interpreters import queues from .utils import _run_output, TestBase as _TestBase +REPLACE = queues._UNBOUND_CONSTANT_TO_FLAG[queues.UNBOUND] + + def get_num_queues(): return len(_queues.list_all()) class TestBase(_TestBase): def tearDown(self): - for qid, _ in _queues.list_all(): + for qid, _, _ in _queues.list_all(): try: _queues.destroy(qid) except Exception: @@ -39,7 +42,7 @@ class LowLevelTests(TestBase): importlib.reload(queues) def test_create_destroy(self): - qid = _queues.create(2, 0) + qid = _queues.create(2, 0, REPLACE) _queues.destroy(qid) self.assertEqual(get_num_queues(), 0) with self.assertRaises(queues.QueueNotFoundError): @@ -53,7 +56,7 @@ class LowLevelTests(TestBase): '-c', dedent(f""" import {_queues.__name__} as _queues - _queues.create(2, 0) + _queues.create(2, 0, {REPLACE}) """), ) self.assertEqual(stdout, '') @@ -64,13 +67,13 @@ class LowLevelTests(TestBase): def test_bind_release(self): with self.subTest('typical'): - qid = _queues.create(2, 0) + qid = _queues.create(2, 0, REPLACE) _queues.bind(qid) _queues.release(qid) self.assertEqual(get_num_queues(), 0) with self.subTest('bind too much'): - qid = _queues.create(2, 0) + qid = _queues.create(2, 0, REPLACE) _queues.bind(qid) _queues.bind(qid) _queues.release(qid) @@ -78,7 +81,7 @@ class LowLevelTests(TestBase): self.assertEqual(get_num_queues(), 0) with self.subTest('nested'): - qid = _queues.create(2, 0) + qid = _queues.create(2, 0, REPLACE) _queues.bind(qid) _queues.bind(qid) _queues.release(qid) @@ -86,7 +89,7 @@ class LowLevelTests(TestBase): self.assertEqual(get_num_queues(), 0) with self.subTest('release without binding'): - qid = _queues.create(2, 0) + qid = _queues.create(2, 0, REPLACE) with self.assertRaises(queues.QueueError): _queues.release(qid) @@ -426,26 +429,206 @@ class TestQueueOps(TestBase): self.assertNotEqual(id(obj2), int(out)) def test_put_cleared_with_subinterpreter(self): - interp = interpreters.create() - queue = queues.create() + def common(queue, unbound=None, presize=0): + if not unbound: + extraargs = '' + elif unbound is queues.UNBOUND: + extraargs = ', unbound=queues.UNBOUND' + elif unbound is queues.UNBOUND_ERROR: + extraargs = ', unbound=queues.UNBOUND_ERROR' + elif unbound is queues.UNBOUND_REMOVE: + extraargs = ', unbound=queues.UNBOUND_REMOVE' + else: + raise NotImplementedError(repr(unbound)) + interp = interpreters.create() - out = _run_output( - interp, - dedent(f""" + _run_output(interp, dedent(f""" from test.support.interpreters import queues queue = queues.Queue({queue.id}) obj1 = b'spam' obj2 = b'eggs' - queue.put(obj1, syncobj=True) - queue.put(obj2, syncobj=True) + queue.put(obj1, syncobj=True{extraargs}) + queue.put(obj2, syncobj=True{extraargs}) """)) - self.assertEqual(queue.qsize(), 2) + self.assertEqual(queue.qsize(), presize + 2) - obj1 = queue.get() - self.assertEqual(obj1, b'spam') - self.assertEqual(queue.qsize(), 1) + if presize == 0: + obj1 = queue.get() + self.assertEqual(obj1, b'spam') + self.assertEqual(queue.qsize(), presize + 1) + + return interp + + with self.subTest('default'): # UNBOUND + queue = queues.create() + interp = common(queue) + del interp + obj1 = queue.get() + self.assertIs(obj1, queues.UNBOUND) + self.assertEqual(queue.qsize(), 0) + with self.assertRaises(queues.QueueEmpty): + queue.get_nowait() + + with self.subTest('UNBOUND'): + queue = queues.create() + interp = common(queue, queues.UNBOUND) + del interp + obj1 = queue.get() + self.assertIs(obj1, queues.UNBOUND) + self.assertEqual(queue.qsize(), 0) + with self.assertRaises(queues.QueueEmpty): + queue.get_nowait() + + with self.subTest('UNBOUND_ERROR'): + queue = queues.create() + interp = common(queue, queues.UNBOUND_ERROR) + + del interp + self.assertEqual(queue.qsize(), 1) + with self.assertRaises(queues.ItemInterpreterDestroyed): + queue.get() + + self.assertEqual(queue.qsize(), 0) + with self.assertRaises(queues.QueueEmpty): + queue.get_nowait() + + with self.subTest('UNBOUND_REMOVE'): + queue = queues.create() + + interp = common(queue, queues.UNBOUND_REMOVE) + del interp + self.assertEqual(queue.qsize(), 0) + with self.assertRaises(queues.QueueEmpty): + queue.get_nowait() + + queue.put(b'ham', unbound=queues.UNBOUND_REMOVE) + self.assertEqual(queue.qsize(), 1) + interp = common(queue, queues.UNBOUND_REMOVE, 1) + self.assertEqual(queue.qsize(), 3) + queue.put(42, unbound=queues.UNBOUND_REMOVE) + self.assertEqual(queue.qsize(), 4) + del interp + self.assertEqual(queue.qsize(), 2) + obj1 = queue.get() + obj2 = queue.get() + self.assertEqual(obj1, b'ham') + self.assertEqual(obj2, 42) + self.assertEqual(queue.qsize(), 0) + with self.assertRaises(queues.QueueEmpty): + queue.get_nowait() + + def test_put_cleared_with_subinterpreter_mixed(self): + queue = queues.create() + interp = interpreters.create() + _run_output(interp, dedent(f""" + from test.support.interpreters import queues + queue = queues.Queue({queue.id}) + queue.put(1, syncobj=True, unbound=queues.UNBOUND) + queue.put(2, syncobj=True, unbound=queues.UNBOUND_ERROR) + queue.put(3, syncobj=True) + queue.put(4, syncobj=True, unbound=queues.UNBOUND_REMOVE) + queue.put(5, syncobj=True, unbound=queues.UNBOUND) + """)) + self.assertEqual(queue.qsize(), 5) del interp + self.assertEqual(queue.qsize(), 4) + + obj1 = queue.get() + self.assertIs(obj1, queues.UNBOUND) + self.assertEqual(queue.qsize(), 3) + + with self.assertRaises(queues.ItemInterpreterDestroyed): + queue.get() + self.assertEqual(queue.qsize(), 2) + + obj2 = queue.get() + self.assertIs(obj2, queues.UNBOUND) + self.assertEqual(queue.qsize(), 1) + + obj3 = queue.get() + self.assertIs(obj3, queues.UNBOUND) + self.assertEqual(queue.qsize(), 0) + + def test_put_cleared_with_subinterpreter_multiple(self): + queue = queues.create() + interp1 = interpreters.create() + interp2 = interpreters.create() + + queue.put(1, syncobj=True) + _run_output(interp1, dedent(f""" + from test.support.interpreters import queues + queue = queues.Queue({queue.id}) + obj1 = queue.get() + queue.put(2, syncobj=True, unbound=queues.UNBOUND) + queue.put(obj1, syncobj=True, unbound=queues.UNBOUND_REMOVE) + """)) + _run_output(interp2, dedent(f""" + from test.support.interpreters import queues + queue = queues.Queue({queue.id}) + obj2 = queue.get() + obj1 = queue.get() + """)) + self.assertEqual(queue.qsize(), 0) + queue.put(3) + _run_output(interp1, dedent(""" + queue.put(4, syncobj=True, unbound=queues.UNBOUND) + # interp closed here + queue.put(5, syncobj=True, unbound=queues.UNBOUND_REMOVE) + queue.put(6, syncobj=True, unbound=queues.UNBOUND) + """)) + _run_output(interp2, dedent(""" + queue.put(7, syncobj=True, unbound=queues.UNBOUND_ERROR) + # interp closed here + queue.put(obj1, syncobj=True, unbound=queues.UNBOUND_ERROR) + queue.put(obj2, syncobj=True, unbound=queues.UNBOUND_REMOVE) + queue.put(8, syncobj=True, unbound=queues.UNBOUND) + """)) + _run_output(interp1, dedent(""" + queue.put(9, syncobj=True, unbound=queues.UNBOUND_REMOVE) + queue.put(10, syncobj=True, unbound=queues.UNBOUND) + """)) + self.assertEqual(queue.qsize(), 10) + + obj3 = queue.get() + self.assertEqual(obj3, 3) + self.assertEqual(queue.qsize(), 9) + + obj4 = queue.get() + self.assertEqual(obj4, 4) + self.assertEqual(queue.qsize(), 8) + + del interp1 + self.assertEqual(queue.qsize(), 6) + + # obj5 was removed + + obj6 = queue.get() + self.assertIs(obj6, queues.UNBOUND) + self.assertEqual(queue.qsize(), 5) + + obj7 = queue.get() + self.assertEqual(obj7, 7) + self.assertEqual(queue.qsize(), 4) + + del interp2 + self.assertEqual(queue.qsize(), 3) + + # obj1 + with self.assertRaises(queues.ItemInterpreterDestroyed): + queue.get() + self.assertEqual(queue.qsize(), 2) + + # obj2 was removed + + obj8 = queue.get() + self.assertIs(obj8, queues.UNBOUND) + self.assertEqual(queue.qsize(), 1) + + # obj9 was removed + + obj10 = queue.get() + self.assertIs(obj10, queues.UNBOUND) self.assertEqual(queue.qsize(), 0) def test_put_get_different_threads(self): diff --git a/Modules/_interpqueuesmodule.c b/Modules/_interpqueuesmodule.c index c99111b861c..8e827891987 100644 --- a/Modules/_interpqueuesmodule.c +++ b/Modules/_interpqueuesmodule.c @@ -58,6 +58,19 @@ _release_xid_data(_PyCrossInterpreterData *data, int flags) return res; } +static inline int64_t +_get_interpid(_PyCrossInterpreterData *data) +{ + int64_t interpid; + if (data != NULL) { + interpid = _PyCrossInterpreterData_INTERPID(data); + assert(!PyErr_Occurred()); + } + else { + interpid = PyInterpreterState_GetID(PyInterpreterState_Get()); + } + return interpid; +} static PyInterpreterState * _get_current_interp(void) @@ -389,47 +402,98 @@ handle_queue_error(int err, PyObject *mod, int64_t qid) } +/* unbound items ************************************************************/ + +#define UNBOUND_REMOVE 1 +#define UNBOUND_ERROR 2 +#define UNBOUND_REPLACE 3 + +// It would also be possible to add UNBOUND_REPLACE where the replacement +// value is user-provided. There would be some limitations there, though. +// Another possibility would be something like UNBOUND_COPY, where the +// object is released but the underlying data is copied (with the "raw" +// allocator) and used when the item is popped off the queue. + +static int +check_unbound(int unboundop) +{ + switch (unboundop) { + case UNBOUND_REMOVE: + case UNBOUND_ERROR: + case UNBOUND_REPLACE: + return 1; + default: + return 0; + } +} + + /* the basic queue **********************************************************/ struct _queueitem; typedef struct _queueitem { + /* The interpreter that added the item to the queue. + The actual bound interpid is found in item->data. + This is necessary because item->data might be NULL, + meaning the interpreter has been destroyed. */ + int64_t interpid; _PyCrossInterpreterData *data; int fmt; + int unboundop; struct _queueitem *next; } _queueitem; static void _queueitem_init(_queueitem *item, - _PyCrossInterpreterData *data, int fmt) + int64_t interpid, _PyCrossInterpreterData *data, + int fmt, int unboundop) { + if (interpid < 0) { + interpid = _get_interpid(data); + } + else { + assert(data == NULL + || _PyCrossInterpreterData_INTERPID(data) < 0 + || interpid == _PyCrossInterpreterData_INTERPID(data)); + } + assert(check_unbound(unboundop)); *item = (_queueitem){ + .interpid = interpid, .data = data, .fmt = fmt, + .unboundop = unboundop, }; } +static void +_queueitem_clear_data(_queueitem *item) +{ + if (item->data == NULL) { + return; + } + // It was allocated in queue_put(). + (void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE); + item->data = NULL; +} + static void _queueitem_clear(_queueitem *item) { item->next = NULL; - - if (item->data != NULL) { - // It was allocated in queue_put(). - (void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE); - item->data = NULL; - } + _queueitem_clear_data(item); } static _queueitem * -_queueitem_new(_PyCrossInterpreterData *data, int fmt) +_queueitem_new(int64_t interpid, _PyCrossInterpreterData *data, + int fmt, int unboundop) { _queueitem *item = GLOBAL_MALLOC(_queueitem); if (item == NULL) { PyErr_NoMemory(); return NULL; } - _queueitem_init(item, data, fmt); + _queueitem_init(item, interpid, data, fmt, unboundop); return item; } @@ -452,15 +516,44 @@ _queueitem_free_all(_queueitem *item) static void _queueitem_popped(_queueitem *item, - _PyCrossInterpreterData **p_data, int *p_fmt) + _PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop) { *p_data = item->data; *p_fmt = item->fmt; + *p_unboundop = item->unboundop; // We clear them here, so they won't be released in _queueitem_clear(). item->data = NULL; _queueitem_free(item); } +static int +_queueitem_clear_interpreter(_queueitem *item) +{ + assert(item->interpid >= 0); + if (item->data == NULL) { + // Its interpreter was already cleared (or it was never bound). + // For UNBOUND_REMOVE it should have been freed at that time. + assert(item->unboundop != UNBOUND_REMOVE); + return 0; + } + assert(_PyCrossInterpreterData_INTERPID(item->data) == item->interpid); + + switch (item->unboundop) { + case UNBOUND_REMOVE: + // The caller must free/clear it. + return 1; + case UNBOUND_ERROR: + case UNBOUND_REPLACE: + // We won't need the cross-interpreter data later + // so we completely throw it away. + _queueitem_clear_data(item); + return 0; + default: + Py_FatalError("not reachable"); + return -1; + } +} + /* the queue */ @@ -474,12 +567,16 @@ typedef struct _queue { _queueitem *first; _queueitem *last; } items; - int fmt; + struct { + int fmt; + int unboundop; + } defaults; } _queue; static int -_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt) +_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unboundop) { + assert(check_unbound(unboundop)); PyThread_type_lock mutex = PyThread_allocate_lock(); if (mutex == NULL) { return ERR_QUEUE_ALLOC; @@ -490,7 +587,10 @@ _queue_init(_queue *queue, Py_ssize_t maxsize, int fmt) .items = { .maxsize = maxsize, }, - .fmt = fmt, + .defaults = { + .fmt = fmt, + .unboundop = unboundop, + }, }; return 0; } @@ -571,7 +671,8 @@ _queue_unlock(_queue *queue) } static int -_queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt) +_queue_add(_queue *queue, int64_t interpid, _PyCrossInterpreterData *data, + int fmt, int unboundop) { int err = _queue_lock(queue); if (err < 0) { @@ -587,7 +688,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt) return ERR_QUEUE_FULL; } - _queueitem *item = _queueitem_new(data, fmt); + _queueitem *item = _queueitem_new(interpid, data, fmt, unboundop); if (item == NULL) { _queue_unlock(queue); return -1; @@ -608,7 +709,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt) static int _queue_next(_queue *queue, - _PyCrossInterpreterData **p_data, int *p_fmt) + _PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop) { int err = _queue_lock(queue); if (err < 0) { @@ -627,7 +728,7 @@ _queue_next(_queue *queue, } queue->items.count -= 1; - _queueitem_popped(item, p_data, p_fmt); + _queueitem_popped(item, p_data, p_fmt, p_unboundop); _queue_unlock(queue); return 0; @@ -692,14 +793,17 @@ _queue_clear_interpreter(_queue *queue, int64_t interpid) while (next != NULL) { _queueitem *item = next; next = item->next; - if (_PyCrossInterpreterData_INTERPID(item->data) == interpid) { + int remove = (item->interpid == interpid) + ? _queueitem_clear_interpreter(item) + : 0; + if (remove) { + _queueitem_free(item); if (prev == NULL) { - queue->items.first = item->next; + queue->items.first = next; } else { - prev->next = item->next; + prev->next = next; } - _queueitem_free(item); queue->items.count -= 1; } else { @@ -966,18 +1070,19 @@ finally: return res; } -struct queue_id_and_fmt { +struct queue_id_and_info { int64_t id; int fmt; + int unboundop; }; -static struct queue_id_and_fmt * -_queues_list_all(_queues *queues, int64_t *count) +static struct queue_id_and_info * +_queues_list_all(_queues *queues, int64_t *p_count) { - struct queue_id_and_fmt *qids = NULL; + struct queue_id_and_info *qids = NULL; PyThread_acquire_lock(queues->mutex, WAIT_LOCK); - struct queue_id_and_fmt *ids = PyMem_NEW(struct queue_id_and_fmt, - (Py_ssize_t)(queues->count)); + struct queue_id_and_info *ids = PyMem_NEW(struct queue_id_and_info, + (Py_ssize_t)(queues->count)); if (ids == NULL) { goto done; } @@ -985,9 +1090,10 @@ _queues_list_all(_queues *queues, int64_t *count) for (int64_t i=0; ref != NULL; ref = ref->next, i++) { ids[i].id = ref->qid; assert(ref->queue != NULL); - ids[i].fmt = ref->queue->fmt; + ids[i].fmt = ref->queue->defaults.fmt; + ids[i].unboundop = ref->queue->defaults.unboundop; } - *count = queues->count; + *p_count = queues->count; qids = ids; done: @@ -1021,13 +1127,13 @@ _queue_free(_queue *queue) // Create a new queue. static int64_t -queue_create(_queues *queues, Py_ssize_t maxsize, int fmt) +queue_create(_queues *queues, Py_ssize_t maxsize, int fmt, int unboundop) { _queue *queue = GLOBAL_MALLOC(_queue); if (queue == NULL) { return ERR_QUEUE_ALLOC; } - int err = _queue_init(queue, maxsize, fmt); + int err = _queue_init(queue, maxsize, fmt, unboundop); if (err < 0) { GLOBAL_FREE(queue); return (int64_t)err; @@ -1056,7 +1162,7 @@ queue_destroy(_queues *queues, int64_t qid) // Push an object onto the queue. static int -queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt) +queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop) { // Look up the queue. _queue *queue = NULL; @@ -1077,9 +1183,12 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt) GLOBAL_FREE(data); return -1; } + assert(_PyCrossInterpreterData_INTERPID(data) == \ + PyInterpreterState_GetID(PyInterpreterState_Get())); // Add the data to the queue. - int res = _queue_add(queue, data, fmt); + int64_t interpid = -1; // _queueitem_init() will set it. + int res = _queue_add(queue, interpid, data, fmt, unboundop); _queue_unmark_waiter(queue, queues->mutex); if (res != 0) { // We may chain an exception here: @@ -1094,7 +1203,8 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt) // Pop the next object off the queue. Fail if empty. // XXX Support a "wait" mutex? static int -queue_get(_queues *queues, int64_t qid, PyObject **res, int *p_fmt) +queue_get(_queues *queues, int64_t qid, + PyObject **res, int *p_fmt, int *p_unboundop) { int err; *res = NULL; @@ -1110,7 +1220,7 @@ queue_get(_queues *queues, int64_t qid, PyObject **res, int *p_fmt) // Pop off the next item from the queue. _PyCrossInterpreterData *data = NULL; - err = _queue_next(queue, &data, p_fmt); + err = _queue_next(queue, &data, p_fmt, p_unboundop); _queue_unmark_waiter(queue, queues->mutex); if (err != 0) { return err; @@ -1397,15 +1507,22 @@ qidarg_converter(PyObject *arg, void *ptr) static PyObject * queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"maxsize", "fmt", NULL}; + static char *kwlist[] = {"maxsize", "fmt", "unboundop", NULL}; Py_ssize_t maxsize; int fmt; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "ni:create", kwlist, - &maxsize, &fmt)) { + int unboundop; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "nii:create", kwlist, + &maxsize, &fmt, &unboundop)) + { + return NULL; + } + if (!check_unbound(unboundop)) { + PyErr_Format(PyExc_ValueError, + "unsupported unboundop %d", unboundop); return NULL; } - int64_t qid = queue_create(&_globals.queues, maxsize, fmt); + int64_t qid = queue_create(&_globals.queues, maxsize, fmt, unboundop); if (qid < 0) { (void)handle_queue_error((int)qid, self, qid); return NULL; @@ -1427,7 +1544,7 @@ queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds) } PyDoc_STRVAR(queuesmod_create_doc, -"create(maxsize, fmt) -> qid\n\ +"create(maxsize, fmt, unboundop) -> qid\n\ \n\ Create a new cross-interpreter queue and return its unique generated ID.\n\ It is a new reference as though bind() had been called on the queue.\n\ @@ -1463,9 +1580,9 @@ static PyObject * queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) { int64_t count = 0; - struct queue_id_and_fmt *qids = _queues_list_all(&_globals.queues, &count); + struct queue_id_and_info *qids = _queues_list_all(&_globals.queues, &count); if (qids == NULL) { - if (count == 0) { + if (!PyErr_Occurred() && count == 0) { return PyList_New(0); } return NULL; @@ -1474,9 +1591,10 @@ queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) if (ids == NULL) { goto finally; } - struct queue_id_and_fmt *cur = qids; + struct queue_id_and_info *cur = qids; for (int64_t i=0; i < count; cur++, i++) { - PyObject *item = Py_BuildValue("Li", cur->id, cur->fmt); + PyObject *item = Py_BuildValue("Lii", cur->id, cur->fmt, + cur->unboundop); if (item == NULL) { Py_SETREF(ids, NULL); break; @@ -1498,18 +1616,26 @@ Each corresponding default format is also included."); static PyObject * queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"qid", "obj", "fmt", NULL}; + static char *kwlist[] = {"qid", "obj", "fmt", "unboundop", NULL}; qidarg_converter_data qidarg; PyObject *obj; int fmt; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oi:put", kwlist, - qidarg_converter, &qidarg, &obj, &fmt)) { + int unboundop; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oii:put", kwlist, + qidarg_converter, &qidarg, &obj, &fmt, + &unboundop)) + { return NULL; } int64_t qid = qidarg.id; + if (!check_unbound(unboundop)) { + PyErr_Format(PyExc_ValueError, + "unsupported unboundop %d", unboundop); + return NULL; + } /* Queue up the object. */ - int err = queue_put(&_globals.queues, qid, obj, fmt); + int err = queue_put(&_globals.queues, qid, obj, fmt, unboundop); // This is the only place that raises QueueFull. if (handle_queue_error(err, self, qid)) { return NULL; @@ -1536,13 +1662,17 @@ queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds) PyObject *obj = NULL; int fmt = 0; - int err = queue_get(&_globals.queues, qid, &obj, &fmt); + int unboundop = 0; + int err = queue_get(&_globals.queues, qid, &obj, &fmt, &unboundop); // This is the only place that raises QueueEmpty. if (handle_queue_error(err, self, qid)) { return NULL; } - PyObject *res = Py_BuildValue("Oi", obj, fmt); + if (obj == NULL) { + return Py_BuildValue("Oii", Py_None, fmt, unboundop); + } + PyObject *res = Py_BuildValue("OiO", obj, fmt, Py_None); Py_DECREF(obj); return res; } @@ -1656,17 +1786,12 @@ queuesmod_get_queue_defaults(PyObject *self, PyObject *args, PyObject *kwds) if (handle_queue_error(err, self, qid)) { return NULL; } - int fmt = queue->fmt; + int fmt = queue->defaults.fmt; + int unboundop = queue->defaults.unboundop; _queue_unmark_waiter(queue, _globals.queues.mutex); - PyObject *fmt_obj = PyLong_FromLong(fmt); - if (fmt_obj == NULL) { - return NULL; - } - // For now queues only have one default. - PyObject *res = PyTuple_Pack(1, fmt_obj); - Py_DECREF(fmt_obj); - return res; + PyObject *defaults = Py_BuildValue("ii", fmt, unboundop); + return defaults; } PyDoc_STRVAR(queuesmod_get_queue_defaults_doc,