mirror of
https://github.com/python/cpython.git
synced 2024-11-27 15:27:06 +01:00
8b209fd4f8
See 6b98b274b6
for an explanation of the problem and solution. Here I've applied the solution to channels.
1970 lines
49 KiB
C
1970 lines
49 KiB
C
/* interpreters module */
|
|
/* low-level access to interpreter primitives */
|
|
|
|
#ifndef Py_BUILD_CORE_BUILTIN
|
|
# define Py_BUILD_CORE_MODULE 1
|
|
#endif
|
|
|
|
#include "Python.h"
|
|
#include "pycore_crossinterp.h" // struct _xid
|
|
|
|
#define REGISTERS_HEAP_TYPES
|
|
#define HAS_UNBOUND_ITEMS
|
|
#include "_interpreters_common.h"
|
|
#undef HAS_UNBOUND_ITEMS
|
|
#undef REGISTERS_HEAP_TYPES
|
|
|
|
|
|
#define MODULE_NAME _interpqueues
|
|
#define MODULE_NAME_STR Py_STRINGIFY(MODULE_NAME)
|
|
#define MODINIT_FUNC_NAME RESOLVE_MODINIT_FUNC_NAME(MODULE_NAME)
|
|
|
|
|
|
#define GLOBAL_MALLOC(TYPE) \
|
|
PyMem_RawMalloc(sizeof(TYPE))
|
|
#define GLOBAL_FREE(VAR) \
|
|
PyMem_RawFree(VAR)
|
|
|
|
|
|
#define XID_IGNORE_EXC 1
|
|
#define XID_FREE 2
|
|
|
|
static int
|
|
_release_xid_data(_PyCrossInterpreterData *data, int flags)
|
|
{
|
|
int ignoreexc = flags & XID_IGNORE_EXC;
|
|
PyObject *exc;
|
|
if (ignoreexc) {
|
|
exc = PyErr_GetRaisedException();
|
|
}
|
|
int res;
|
|
if (flags & XID_FREE) {
|
|
res = _PyCrossInterpreterData_ReleaseAndRawFree(data);
|
|
}
|
|
else {
|
|
res = _PyCrossInterpreterData_Release(data);
|
|
}
|
|
if (res < 0) {
|
|
/* The owning interpreter is already destroyed. */
|
|
if (ignoreexc) {
|
|
// XXX Emit a warning?
|
|
PyErr_Clear();
|
|
}
|
|
}
|
|
if (flags & XID_FREE) {
|
|
/* Either way, we free the data. */
|
|
}
|
|
if (ignoreexc) {
|
|
PyErr_SetRaisedException(exc);
|
|
}
|
|
return res;
|
|
}
|
|
|
|
static PyInterpreterState *
|
|
_get_current_interp(void)
|
|
{
|
|
// PyInterpreterState_Get() aborts if lookup fails, so don't need
|
|
// to check the result for NULL.
|
|
return PyInterpreterState_Get();
|
|
}
|
|
|
|
static PyObject *
|
|
_get_current_module(void)
|
|
{
|
|
PyObject *name = PyUnicode_FromString(MODULE_NAME_STR);
|
|
if (name == NULL) {
|
|
return NULL;
|
|
}
|
|
PyObject *mod = PyImport_GetModule(name);
|
|
Py_DECREF(name);
|
|
if (mod == NULL) {
|
|
return NULL;
|
|
}
|
|
assert(mod != Py_None);
|
|
return mod;
|
|
}
|
|
|
|
|
|
struct idarg_int64_converter_data {
|
|
// input:
|
|
const char *label;
|
|
// output:
|
|
int64_t id;
|
|
};
|
|
|
|
static int
|
|
idarg_int64_converter(PyObject *arg, void *ptr)
|
|
{
|
|
int64_t id;
|
|
struct idarg_int64_converter_data *data = ptr;
|
|
|
|
const char *label = data->label;
|
|
if (label == NULL) {
|
|
label = "ID";
|
|
}
|
|
|
|
if (PyIndex_Check(arg)) {
|
|
int overflow = 0;
|
|
id = PyLong_AsLongLongAndOverflow(arg, &overflow);
|
|
if (id == -1 && PyErr_Occurred()) {
|
|
return 0;
|
|
}
|
|
else if (id == -1 && overflow == 1) {
|
|
PyErr_Format(PyExc_OverflowError,
|
|
"max %s is %lld, got %R", label, INT64_MAX, arg);
|
|
return 0;
|
|
}
|
|
else if (id < 0) {
|
|
PyErr_Format(PyExc_ValueError,
|
|
"%s must be a non-negative int, got %R", label, arg);
|
|
return 0;
|
|
}
|
|
}
|
|
else {
|
|
PyErr_Format(PyExc_TypeError,
|
|
"%s must be an int, got %.100s",
|
|
label, Py_TYPE(arg)->tp_name);
|
|
return 0;
|
|
}
|
|
data->id = id;
|
|
return 1;
|
|
}
|
|
|
|
|
|
static int
|
|
ensure_highlevel_module_loaded(void)
|
|
{
|
|
PyObject *highlevel = PyImport_ImportModule("interpreters.queues");
|
|
if (highlevel == NULL) {
|
|
PyErr_Clear();
|
|
highlevel = PyImport_ImportModule("test.support.interpreters.queues");
|
|
if (highlevel == NULL) {
|
|
return -1;
|
|
}
|
|
}
|
|
Py_DECREF(highlevel);
|
|
return 0;
|
|
}
|
|
|
|
|
|
/* module state *************************************************************/
|
|
|
|
typedef struct {
|
|
/* external types (added at runtime by interpreters module) */
|
|
PyTypeObject *queue_type;
|
|
|
|
/* QueueError (and its subclasses) */
|
|
PyObject *QueueError;
|
|
PyObject *QueueNotFoundError;
|
|
PyObject *QueueEmpty;
|
|
PyObject *QueueFull;
|
|
} module_state;
|
|
|
|
static inline module_state *
|
|
get_module_state(PyObject *mod)
|
|
{
|
|
assert(mod != NULL);
|
|
module_state *state = PyModule_GetState(mod);
|
|
assert(state != NULL);
|
|
return state;
|
|
}
|
|
|
|
static int
|
|
traverse_module_state(module_state *state, visitproc visit, void *arg)
|
|
{
|
|
/* external types */
|
|
Py_VISIT(state->queue_type);
|
|
|
|
/* QueueError */
|
|
Py_VISIT(state->QueueError);
|
|
Py_VISIT(state->QueueNotFoundError);
|
|
Py_VISIT(state->QueueEmpty);
|
|
Py_VISIT(state->QueueFull);
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
clear_module_state(module_state *state)
|
|
{
|
|
/* external types */
|
|
if (state->queue_type != NULL) {
|
|
(void)clear_xid_class(state->queue_type);
|
|
}
|
|
Py_CLEAR(state->queue_type);
|
|
|
|
/* QueueError */
|
|
Py_CLEAR(state->QueueError);
|
|
Py_CLEAR(state->QueueNotFoundError);
|
|
Py_CLEAR(state->QueueEmpty);
|
|
Py_CLEAR(state->QueueFull);
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
/* error codes **************************************************************/
|
|
|
|
#define ERR_EXCEPTION_RAISED (-1)
|
|
// multi-queue errors
|
|
#define ERR_QUEUES_ALLOC (-11)
|
|
#define ERR_QUEUE_ALLOC (-12)
|
|
#define ERR_NO_NEXT_QUEUE_ID (-13)
|
|
#define ERR_QUEUE_NOT_FOUND (-14)
|
|
// single-queue errors
|
|
#define ERR_QUEUE_EMPTY (-21)
|
|
#define ERR_QUEUE_FULL (-22)
|
|
#define ERR_QUEUE_NEVER_BOUND (-23)
|
|
|
|
static int ensure_external_exc_types(module_state *);
|
|
|
|
static int
|
|
resolve_module_errcode(module_state *state, int errcode, int64_t qid,
|
|
PyObject **p_exctype, PyObject **p_msgobj)
|
|
{
|
|
PyObject *exctype = NULL;
|
|
PyObject *msg = NULL;
|
|
switch (errcode) {
|
|
case ERR_NO_NEXT_QUEUE_ID:
|
|
exctype = state->QueueError;
|
|
msg = PyUnicode_FromString("ran out of queue IDs");
|
|
break;
|
|
case ERR_QUEUE_NOT_FOUND:
|
|
exctype = state->QueueNotFoundError;
|
|
msg = PyUnicode_FromFormat("queue %" PRId64 " not found", qid);
|
|
break;
|
|
case ERR_QUEUE_EMPTY:
|
|
if (ensure_external_exc_types(state) < 0) {
|
|
return -1;
|
|
}
|
|
exctype = state->QueueEmpty;
|
|
msg = PyUnicode_FromFormat("queue %" PRId64 " is empty", qid);
|
|
break;
|
|
case ERR_QUEUE_FULL:
|
|
if (ensure_external_exc_types(state) < 0) {
|
|
return -1;
|
|
}
|
|
exctype = state->QueueFull;
|
|
msg = PyUnicode_FromFormat("queue %" PRId64 " is full", qid);
|
|
break;
|
|
case ERR_QUEUE_NEVER_BOUND:
|
|
exctype = state->QueueError;
|
|
msg = PyUnicode_FromFormat("queue %" PRId64 " never bound", qid);
|
|
break;
|
|
default:
|
|
PyErr_Format(PyExc_ValueError,
|
|
"unsupported error code %d", errcode);
|
|
return -1;
|
|
}
|
|
|
|
if (msg == NULL) {
|
|
assert(PyErr_Occurred());
|
|
return -1;
|
|
}
|
|
*p_exctype = exctype;
|
|
*p_msgobj = msg;
|
|
return 0;
|
|
}
|
|
|
|
|
|
/* QueueError ***************************************************************/
|
|
|
|
static int
|
|
add_exctype(PyObject *mod, PyObject **p_state_field,
|
|
const char *qualname, const char *doc, PyObject *base)
|
|
{
|
|
#ifndef NDEBUG
|
|
const char *dot = strrchr(qualname, '.');
|
|
assert(dot != NULL);
|
|
const char *name = dot+1;
|
|
assert(*p_state_field == NULL);
|
|
assert(!PyObject_HasAttrStringWithError(mod, name));
|
|
#endif
|
|
PyObject *exctype = PyErr_NewExceptionWithDoc(qualname, doc, base, NULL);
|
|
if (exctype == NULL) {
|
|
return -1;
|
|
}
|
|
if (PyModule_AddType(mod, (PyTypeObject *)exctype) < 0) {
|
|
Py_DECREF(exctype);
|
|
return -1;
|
|
}
|
|
*p_state_field = exctype;
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
add_QueueError(PyObject *mod)
|
|
{
|
|
module_state *state = get_module_state(mod);
|
|
|
|
#define PREFIX "test.support.interpreters."
|
|
#define ADD_EXCTYPE(NAME, BASE, DOC) \
|
|
assert(state->NAME == NULL); \
|
|
if (add_exctype(mod, &state->NAME, PREFIX #NAME, DOC, BASE) < 0) { \
|
|
return -1; \
|
|
}
|
|
ADD_EXCTYPE(QueueError, PyExc_RuntimeError,
|
|
"Indicates that a queue-related error happened.")
|
|
ADD_EXCTYPE(QueueNotFoundError, state->QueueError, NULL)
|
|
// QueueEmpty and QueueFull are set by set_external_exc_types().
|
|
state->QueueEmpty = NULL;
|
|
state->QueueFull = NULL;
|
|
#undef ADD_EXCTYPE
|
|
#undef PREFIX
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
set_external_exc_types(module_state *state,
|
|
PyObject *emptyerror, PyObject *fullerror)
|
|
{
|
|
if (state->QueueEmpty != NULL) {
|
|
assert(state->QueueFull != NULL);
|
|
Py_CLEAR(state->QueueEmpty);
|
|
Py_CLEAR(state->QueueFull);
|
|
}
|
|
else {
|
|
assert(state->QueueFull == NULL);
|
|
}
|
|
assert(PyObject_IsSubclass(emptyerror, state->QueueError));
|
|
assert(PyObject_IsSubclass(fullerror, state->QueueError));
|
|
state->QueueEmpty = Py_NewRef(emptyerror);
|
|
state->QueueFull = Py_NewRef(fullerror);
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
ensure_external_exc_types(module_state *state)
|
|
{
|
|
if (state->QueueEmpty != NULL) {
|
|
assert(state->QueueFull != NULL);
|
|
return 0;
|
|
}
|
|
assert(state->QueueFull == NULL);
|
|
|
|
// Force the module to be loaded, to register the type.
|
|
if (ensure_highlevel_module_loaded() < 0) {
|
|
return -1;
|
|
}
|
|
assert(state->QueueEmpty != NULL);
|
|
assert(state->QueueFull != NULL);
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
handle_queue_error(int err, PyObject *mod, int64_t qid)
|
|
{
|
|
if (err == 0) {
|
|
assert(!PyErr_Occurred());
|
|
return 0;
|
|
}
|
|
assert(err < 0);
|
|
assert((err == -1) == (PyErr_Occurred() != NULL));
|
|
|
|
module_state *state;
|
|
switch (err) {
|
|
case ERR_QUEUE_ALLOC: _Py_FALLTHROUGH;
|
|
case ERR_QUEUES_ALLOC:
|
|
PyErr_NoMemory();
|
|
break;
|
|
case -1:
|
|
return -1;
|
|
default:
|
|
state = get_module_state(mod);
|
|
assert(state->QueueError != NULL);
|
|
PyObject *exctype = NULL;
|
|
PyObject *msg = NULL;
|
|
if (resolve_module_errcode(state, err, qid, &exctype, &msg) < 0) {
|
|
return -1;
|
|
}
|
|
PyObject *exc = PyObject_CallOneArg(exctype, msg);
|
|
Py_DECREF(msg);
|
|
if (exc == NULL) {
|
|
return -1;
|
|
}
|
|
PyErr_SetObject(exctype, exc);
|
|
Py_DECREF(exc);
|
|
}
|
|
return 1;
|
|
}
|
|
|
|
|
|
/* the basic queue **********************************************************/
|
|
|
|
struct _queueitem;
|
|
|
|
typedef struct _queueitem {
|
|
/* The interpreter that added the item to the queue.
|
|
The actual bound interpid is found in item->data.
|
|
This is necessary because item->data might be NULL,
|
|
meaning the interpreter has been destroyed. */
|
|
int64_t interpid;
|
|
_PyCrossInterpreterData *data;
|
|
int fmt;
|
|
int unboundop;
|
|
struct _queueitem *next;
|
|
} _queueitem;
|
|
|
|
static void
|
|
_queueitem_init(_queueitem *item,
|
|
int64_t interpid, _PyCrossInterpreterData *data,
|
|
int fmt, int unboundop)
|
|
{
|
|
if (interpid < 0) {
|
|
interpid = _get_interpid(data);
|
|
}
|
|
else {
|
|
assert(data == NULL
|
|
|| _PyCrossInterpreterData_INTERPID(data) < 0
|
|
|| interpid == _PyCrossInterpreterData_INTERPID(data));
|
|
}
|
|
assert(check_unbound(unboundop));
|
|
*item = (_queueitem){
|
|
.interpid = interpid,
|
|
.data = data,
|
|
.fmt = fmt,
|
|
.unboundop = unboundop,
|
|
};
|
|
}
|
|
|
|
static void
|
|
_queueitem_clear_data(_queueitem *item)
|
|
{
|
|
if (item->data == NULL) {
|
|
return;
|
|
}
|
|
// It was allocated in queue_put().
|
|
(void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE);
|
|
item->data = NULL;
|
|
}
|
|
|
|
static void
|
|
_queueitem_clear(_queueitem *item)
|
|
{
|
|
item->next = NULL;
|
|
_queueitem_clear_data(item);
|
|
}
|
|
|
|
static _queueitem *
|
|
_queueitem_new(int64_t interpid, _PyCrossInterpreterData *data,
|
|
int fmt, int unboundop)
|
|
{
|
|
_queueitem *item = GLOBAL_MALLOC(_queueitem);
|
|
if (item == NULL) {
|
|
PyErr_NoMemory();
|
|
return NULL;
|
|
}
|
|
_queueitem_init(item, interpid, data, fmt, unboundop);
|
|
return item;
|
|
}
|
|
|
|
static void
|
|
_queueitem_free(_queueitem *item)
|
|
{
|
|
_queueitem_clear(item);
|
|
GLOBAL_FREE(item);
|
|
}
|
|
|
|
static void
|
|
_queueitem_free_all(_queueitem *item)
|
|
{
|
|
while (item != NULL) {
|
|
_queueitem *last = item;
|
|
item = item->next;
|
|
_queueitem_free(last);
|
|
}
|
|
}
|
|
|
|
static void
|
|
_queueitem_popped(_queueitem *item,
|
|
_PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop)
|
|
{
|
|
*p_data = item->data;
|
|
*p_fmt = item->fmt;
|
|
*p_unboundop = item->unboundop;
|
|
// We clear them here, so they won't be released in _queueitem_clear().
|
|
item->data = NULL;
|
|
_queueitem_free(item);
|
|
}
|
|
|
|
static int
|
|
_queueitem_clear_interpreter(_queueitem *item)
|
|
{
|
|
assert(item->interpid >= 0);
|
|
if (item->data == NULL) {
|
|
// Its interpreter was already cleared (or it was never bound).
|
|
// For UNBOUND_REMOVE it should have been freed at that time.
|
|
assert(item->unboundop != UNBOUND_REMOVE);
|
|
return 0;
|
|
}
|
|
assert(_PyCrossInterpreterData_INTERPID(item->data) == item->interpid);
|
|
|
|
switch (item->unboundop) {
|
|
case UNBOUND_REMOVE:
|
|
// The caller must free/clear it.
|
|
return 1;
|
|
case UNBOUND_ERROR:
|
|
case UNBOUND_REPLACE:
|
|
// We won't need the cross-interpreter data later
|
|
// so we completely throw it away.
|
|
_queueitem_clear_data(item);
|
|
return 0;
|
|
default:
|
|
Py_FatalError("not reachable");
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
|
|
/* the queue */
|
|
|
|
typedef struct _queue {
|
|
Py_ssize_t num_waiters; // protected by global lock
|
|
PyThread_type_lock mutex;
|
|
int alive;
|
|
struct _queueitems {
|
|
Py_ssize_t maxsize;
|
|
Py_ssize_t count;
|
|
_queueitem *first;
|
|
_queueitem *last;
|
|
} items;
|
|
struct {
|
|
int fmt;
|
|
int unboundop;
|
|
} defaults;
|
|
} _queue;
|
|
|
|
static int
|
|
_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unboundop)
|
|
{
|
|
assert(check_unbound(unboundop));
|
|
PyThread_type_lock mutex = PyThread_allocate_lock();
|
|
if (mutex == NULL) {
|
|
return ERR_QUEUE_ALLOC;
|
|
}
|
|
*queue = (_queue){
|
|
.mutex = mutex,
|
|
.alive = 1,
|
|
.items = {
|
|
.maxsize = maxsize,
|
|
},
|
|
.defaults = {
|
|
.fmt = fmt,
|
|
.unboundop = unboundop,
|
|
},
|
|
};
|
|
return 0;
|
|
}
|
|
|
|
static void
|
|
_queue_clear(_queue *queue)
|
|
{
|
|
assert(!queue->alive);
|
|
assert(queue->num_waiters == 0);
|
|
_queueitem_free_all(queue->items.first);
|
|
assert(queue->mutex != NULL);
|
|
PyThread_free_lock(queue->mutex);
|
|
*queue = (_queue){0};
|
|
}
|
|
|
|
static void _queue_free(_queue *);
|
|
|
|
static void
|
|
_queue_kill_and_wait(_queue *queue)
|
|
{
|
|
// Mark it as dead.
|
|
PyThread_acquire_lock(queue->mutex, WAIT_LOCK);
|
|
assert(queue->alive);
|
|
queue->alive = 0;
|
|
PyThread_release_lock(queue->mutex);
|
|
|
|
// Wait for all waiters to fail.
|
|
while (queue->num_waiters > 0) {
|
|
PyThread_acquire_lock(queue->mutex, WAIT_LOCK);
|
|
PyThread_release_lock(queue->mutex);
|
|
};
|
|
}
|
|
|
|
static void
|
|
_queue_mark_waiter(_queue *queue, PyThread_type_lock parent_mutex)
|
|
{
|
|
if (parent_mutex != NULL) {
|
|
PyThread_acquire_lock(parent_mutex, WAIT_LOCK);
|
|
queue->num_waiters += 1;
|
|
PyThread_release_lock(parent_mutex);
|
|
}
|
|
else {
|
|
// The caller must be holding the parent lock already.
|
|
queue->num_waiters += 1;
|
|
}
|
|
}
|
|
|
|
static void
|
|
_queue_unmark_waiter(_queue *queue, PyThread_type_lock parent_mutex)
|
|
{
|
|
if (parent_mutex != NULL) {
|
|
PyThread_acquire_lock(parent_mutex, WAIT_LOCK);
|
|
queue->num_waiters -= 1;
|
|
PyThread_release_lock(parent_mutex);
|
|
}
|
|
else {
|
|
// The caller must be holding the parent lock already.
|
|
queue->num_waiters -= 1;
|
|
}
|
|
}
|
|
|
|
static int
|
|
_queue_lock(_queue *queue)
|
|
{
|
|
// The queue must be marked as a waiter already.
|
|
PyThread_acquire_lock(queue->mutex, WAIT_LOCK);
|
|
if (!queue->alive) {
|
|
PyThread_release_lock(queue->mutex);
|
|
return ERR_QUEUE_NOT_FOUND;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static void
|
|
_queue_unlock(_queue *queue)
|
|
{
|
|
PyThread_release_lock(queue->mutex);
|
|
}
|
|
|
|
static int
|
|
_queue_add(_queue *queue, int64_t interpid, _PyCrossInterpreterData *data,
|
|
int fmt, int unboundop)
|
|
{
|
|
int err = _queue_lock(queue);
|
|
if (err < 0) {
|
|
return err;
|
|
}
|
|
|
|
Py_ssize_t maxsize = queue->items.maxsize;
|
|
if (maxsize <= 0) {
|
|
maxsize = PY_SSIZE_T_MAX;
|
|
}
|
|
if (queue->items.count >= maxsize) {
|
|
_queue_unlock(queue);
|
|
return ERR_QUEUE_FULL;
|
|
}
|
|
|
|
_queueitem *item = _queueitem_new(interpid, data, fmt, unboundop);
|
|
if (item == NULL) {
|
|
_queue_unlock(queue);
|
|
return -1;
|
|
}
|
|
|
|
queue->items.count += 1;
|
|
if (queue->items.first == NULL) {
|
|
queue->items.first = item;
|
|
}
|
|
else {
|
|
queue->items.last->next = item;
|
|
}
|
|
queue->items.last = item;
|
|
|
|
_queue_unlock(queue);
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
_queue_next(_queue *queue,
|
|
_PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop)
|
|
{
|
|
int err = _queue_lock(queue);
|
|
if (err < 0) {
|
|
return err;
|
|
}
|
|
|
|
assert(queue->items.count >= 0);
|
|
_queueitem *item = queue->items.first;
|
|
if (item == NULL) {
|
|
_queue_unlock(queue);
|
|
return ERR_QUEUE_EMPTY;
|
|
}
|
|
queue->items.first = item->next;
|
|
if (queue->items.last == item) {
|
|
queue->items.last = NULL;
|
|
}
|
|
queue->items.count -= 1;
|
|
|
|
_queueitem_popped(item, p_data, p_fmt, p_unboundop);
|
|
|
|
_queue_unlock(queue);
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
_queue_get_maxsize(_queue *queue, Py_ssize_t *p_maxsize)
|
|
{
|
|
int err = _queue_lock(queue);
|
|
if (err < 0) {
|
|
return err;
|
|
}
|
|
|
|
*p_maxsize = queue->items.maxsize;
|
|
|
|
_queue_unlock(queue);
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
_queue_is_full(_queue *queue, int *p_is_full)
|
|
{
|
|
int err = _queue_lock(queue);
|
|
if (err < 0) {
|
|
return err;
|
|
}
|
|
|
|
assert(queue->items.count <= queue->items.maxsize);
|
|
*p_is_full = queue->items.count == queue->items.maxsize;
|
|
|
|
_queue_unlock(queue);
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
_queue_get_count(_queue *queue, Py_ssize_t *p_count)
|
|
{
|
|
int err = _queue_lock(queue);
|
|
if (err < 0) {
|
|
return err;
|
|
}
|
|
|
|
*p_count = queue->items.count;
|
|
|
|
_queue_unlock(queue);
|
|
return 0;
|
|
}
|
|
|
|
static void
|
|
_queue_clear_interpreter(_queue *queue, int64_t interpid)
|
|
{
|
|
int err = _queue_lock(queue);
|
|
if (err == ERR_QUEUE_NOT_FOUND) {
|
|
// The queue is already destroyed, so there's nothing to clear.
|
|
assert(!PyErr_Occurred());
|
|
return;
|
|
}
|
|
assert(err == 0); // There should be no other errors.
|
|
|
|
_queueitem *prev = NULL;
|
|
_queueitem *next = queue->items.first;
|
|
while (next != NULL) {
|
|
_queueitem *item = next;
|
|
next = item->next;
|
|
int remove = (item->interpid == interpid)
|
|
? _queueitem_clear_interpreter(item)
|
|
: 0;
|
|
if (remove) {
|
|
_queueitem_free(item);
|
|
if (prev == NULL) {
|
|
queue->items.first = next;
|
|
}
|
|
else {
|
|
prev->next = next;
|
|
}
|
|
queue->items.count -= 1;
|
|
}
|
|
else {
|
|
prev = item;
|
|
}
|
|
}
|
|
|
|
_queue_unlock(queue);
|
|
}
|
|
|
|
|
|
/* external queue references ************************************************/
|
|
|
|
struct _queueref;
|
|
|
|
typedef struct _queueref {
|
|
struct _queueref *next;
|
|
int64_t qid;
|
|
Py_ssize_t refcount;
|
|
_queue *queue;
|
|
} _queueref;
|
|
|
|
static _queueref *
|
|
_queuerefs_find(_queueref *first, int64_t qid, _queueref **pprev)
|
|
{
|
|
_queueref *prev = NULL;
|
|
_queueref *ref = first;
|
|
while (ref != NULL) {
|
|
if (ref->qid == qid) {
|
|
break;
|
|
}
|
|
prev = ref;
|
|
ref = ref->next;
|
|
}
|
|
if (pprev != NULL) {
|
|
*pprev = prev;
|
|
}
|
|
return ref;
|
|
}
|
|
|
|
static void
|
|
_queuerefs_clear(_queueref *head)
|
|
{
|
|
_queueref *next = head;
|
|
while (next != NULL) {
|
|
_queueref *ref = next;
|
|
next = ref->next;
|
|
|
|
#ifdef Py_DEBUG
|
|
int64_t qid = ref->qid;
|
|
fprintf(stderr, "queue %" PRId64 " still exists\n", qid);
|
|
#endif
|
|
_queue *queue = ref->queue;
|
|
GLOBAL_FREE(ref);
|
|
|
|
_queue_kill_and_wait(queue);
|
|
#ifdef Py_DEBUG
|
|
if (queue->items.count > 0) {
|
|
fprintf(stderr, "queue %" PRId64 " still holds %zd items\n",
|
|
qid, queue->items.count);
|
|
}
|
|
#endif
|
|
_queue_free(queue);
|
|
}
|
|
}
|
|
|
|
|
|
/* a collection of queues ***************************************************/
|
|
|
|
typedef struct _queues {
|
|
PyThread_type_lock mutex;
|
|
_queueref *head;
|
|
int64_t count;
|
|
int64_t next_id;
|
|
} _queues;
|
|
|
|
static void
|
|
_queues_init(_queues *queues, PyThread_type_lock mutex)
|
|
{
|
|
queues->mutex = mutex;
|
|
queues->head = NULL;
|
|
queues->count = 0;
|
|
queues->next_id = 1;
|
|
}
|
|
|
|
static void
|
|
_queues_fini(_queues *queues)
|
|
{
|
|
if (queues->count > 0) {
|
|
PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
|
|
assert((queues->count == 0) != (queues->head != NULL));
|
|
_queueref *head = queues->head;
|
|
queues->head = NULL;
|
|
queues->count = 0;
|
|
PyThread_release_lock(queues->mutex);
|
|
_queuerefs_clear(head);
|
|
}
|
|
if (queues->mutex != NULL) {
|
|
PyThread_free_lock(queues->mutex);
|
|
queues->mutex = NULL;
|
|
}
|
|
}
|
|
|
|
static int64_t
|
|
_queues_next_id(_queues *queues) // needs lock
|
|
{
|
|
int64_t qid = queues->next_id;
|
|
if (qid < 0) {
|
|
/* overflow */
|
|
return ERR_NO_NEXT_QUEUE_ID;
|
|
}
|
|
queues->next_id += 1;
|
|
return qid;
|
|
}
|
|
|
|
static int
|
|
_queues_lookup(_queues *queues, int64_t qid, _queue **res)
|
|
{
|
|
PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
|
|
|
|
_queueref *ref = _queuerefs_find(queues->head, qid, NULL);
|
|
if (ref == NULL) {
|
|
PyThread_release_lock(queues->mutex);
|
|
return ERR_QUEUE_NOT_FOUND;
|
|
}
|
|
assert(ref->queue != NULL);
|
|
_queue *queue = ref->queue;
|
|
_queue_mark_waiter(queue, NULL);
|
|
// The caller must unmark it.
|
|
|
|
PyThread_release_lock(queues->mutex);
|
|
|
|
*res = queue;
|
|
return 0;
|
|
}
|
|
|
|
static int64_t
|
|
_queues_add(_queues *queues, _queue *queue)
|
|
{
|
|
int64_t qid = -1;
|
|
PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
|
|
|
|
// Create a new ref.
|
|
int64_t _qid = _queues_next_id(queues);
|
|
if (_qid < 0) {
|
|
goto done;
|
|
}
|
|
_queueref *ref = GLOBAL_MALLOC(_queueref);
|
|
if (ref == NULL) {
|
|
qid = ERR_QUEUE_ALLOC;
|
|
goto done;
|
|
}
|
|
*ref = (_queueref){
|
|
.qid = _qid,
|
|
.queue = queue,
|
|
};
|
|
|
|
// Add it to the list.
|
|
// We assume that the queue is a new one (not already in the list).
|
|
ref->next = queues->head;
|
|
queues->head = ref;
|
|
queues->count += 1;
|
|
|
|
qid = _qid;
|
|
done:
|
|
PyThread_release_lock(queues->mutex);
|
|
return qid;
|
|
}
|
|
|
|
static void
|
|
_queues_remove_ref(_queues *queues, _queueref *ref, _queueref *prev,
|
|
_queue **p_queue)
|
|
{
|
|
assert(ref->queue != NULL);
|
|
|
|
if (ref == queues->head) {
|
|
queues->head = ref->next;
|
|
}
|
|
else {
|
|
prev->next = ref->next;
|
|
}
|
|
ref->next = NULL;
|
|
queues->count -= 1;
|
|
|
|
*p_queue = ref->queue;
|
|
ref->queue = NULL;
|
|
GLOBAL_FREE(ref);
|
|
}
|
|
|
|
static int
|
|
_queues_remove(_queues *queues, int64_t qid, _queue **p_queue)
|
|
{
|
|
PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
|
|
|
|
_queueref *prev = NULL;
|
|
_queueref *ref = _queuerefs_find(queues->head, qid, &prev);
|
|
if (ref == NULL) {
|
|
PyThread_release_lock(queues->mutex);
|
|
return ERR_QUEUE_NOT_FOUND;
|
|
}
|
|
|
|
_queues_remove_ref(queues, ref, prev, p_queue);
|
|
PyThread_release_lock(queues->mutex);
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
_queues_incref(_queues *queues, int64_t qid)
|
|
{
|
|
// XXX Track interpreter IDs?
|
|
int res = -1;
|
|
PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
|
|
|
|
_queueref *ref = _queuerefs_find(queues->head, qid, NULL);
|
|
if (ref == NULL) {
|
|
assert(!PyErr_Occurred());
|
|
res = ERR_QUEUE_NOT_FOUND;
|
|
goto done;
|
|
}
|
|
ref->refcount += 1;
|
|
|
|
res = 0;
|
|
done:
|
|
PyThread_release_lock(queues->mutex);
|
|
return res;
|
|
}
|
|
|
|
static int
|
|
_queues_decref(_queues *queues, int64_t qid)
|
|
{
|
|
int res = -1;
|
|
PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
|
|
|
|
_queueref *prev = NULL;
|
|
_queueref *ref = _queuerefs_find(queues->head, qid, &prev);
|
|
if (ref == NULL) {
|
|
assert(!PyErr_Occurred());
|
|
res = ERR_QUEUE_NOT_FOUND;
|
|
goto finally;
|
|
}
|
|
if (ref->refcount == 0) {
|
|
res = ERR_QUEUE_NEVER_BOUND;
|
|
goto finally;
|
|
}
|
|
assert(ref->refcount > 0);
|
|
ref->refcount -= 1;
|
|
|
|
// Destroy if no longer used.
|
|
assert(ref->queue != NULL);
|
|
if (ref->refcount == 0) {
|
|
_queue *queue = NULL;
|
|
_queues_remove_ref(queues, ref, prev, &queue);
|
|
PyThread_release_lock(queues->mutex);
|
|
|
|
_queue_kill_and_wait(queue);
|
|
_queue_free(queue);
|
|
return 0;
|
|
}
|
|
|
|
res = 0;
|
|
finally:
|
|
PyThread_release_lock(queues->mutex);
|
|
return res;
|
|
}
|
|
|
|
struct queue_id_and_info {
|
|
int64_t id;
|
|
int fmt;
|
|
int unboundop;
|
|
};
|
|
|
|
static struct queue_id_and_info *
|
|
_queues_list_all(_queues *queues, int64_t *p_count)
|
|
{
|
|
struct queue_id_and_info *qids = NULL;
|
|
PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
|
|
struct queue_id_and_info *ids = PyMem_NEW(struct queue_id_and_info,
|
|
(Py_ssize_t)(queues->count));
|
|
if (ids == NULL) {
|
|
goto done;
|
|
}
|
|
_queueref *ref = queues->head;
|
|
for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
|
|
ids[i].id = ref->qid;
|
|
assert(ref->queue != NULL);
|
|
ids[i].fmt = ref->queue->defaults.fmt;
|
|
ids[i].unboundop = ref->queue->defaults.unboundop;
|
|
}
|
|
*p_count = queues->count;
|
|
|
|
qids = ids;
|
|
done:
|
|
PyThread_release_lock(queues->mutex);
|
|
return qids;
|
|
}
|
|
|
|
static void
|
|
_queues_clear_interpreter(_queues *queues, int64_t interpid)
|
|
{
|
|
PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
|
|
|
|
_queueref *ref = queues->head;
|
|
for (; ref != NULL; ref = ref->next) {
|
|
assert(ref->queue != NULL);
|
|
_queue_clear_interpreter(ref->queue, interpid);
|
|
}
|
|
|
|
PyThread_release_lock(queues->mutex);
|
|
}
|
|
|
|
|
|
/* "high"-level queue-related functions *************************************/
|
|
|
|
static void
|
|
_queue_free(_queue *queue)
|
|
{
|
|
_queue_clear(queue);
|
|
GLOBAL_FREE(queue);
|
|
}
|
|
|
|
// Create a new queue.
|
|
static int64_t
|
|
queue_create(_queues *queues, Py_ssize_t maxsize, int fmt, int unboundop)
|
|
{
|
|
_queue *queue = GLOBAL_MALLOC(_queue);
|
|
if (queue == NULL) {
|
|
return ERR_QUEUE_ALLOC;
|
|
}
|
|
int err = _queue_init(queue, maxsize, fmt, unboundop);
|
|
if (err < 0) {
|
|
GLOBAL_FREE(queue);
|
|
return (int64_t)err;
|
|
}
|
|
int64_t qid = _queues_add(queues, queue);
|
|
if (qid < 0) {
|
|
_queue_clear(queue);
|
|
GLOBAL_FREE(queue);
|
|
}
|
|
return qid;
|
|
}
|
|
|
|
// Completely destroy the queue.
|
|
static int
|
|
queue_destroy(_queues *queues, int64_t qid)
|
|
{
|
|
_queue *queue = NULL;
|
|
int err = _queues_remove(queues, qid, &queue);
|
|
if (err < 0) {
|
|
return err;
|
|
}
|
|
_queue_kill_and_wait(queue);
|
|
_queue_free(queue);
|
|
return 0;
|
|
}
|
|
|
|
// Push an object onto the queue.
|
|
static int
|
|
queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop)
|
|
{
|
|
// Look up the queue.
|
|
_queue *queue = NULL;
|
|
int err = _queues_lookup(queues, qid, &queue);
|
|
if (err != 0) {
|
|
return err;
|
|
}
|
|
assert(queue != NULL);
|
|
|
|
// Convert the object to cross-interpreter data.
|
|
_PyCrossInterpreterData *data = GLOBAL_MALLOC(_PyCrossInterpreterData);
|
|
if (data == NULL) {
|
|
_queue_unmark_waiter(queue, queues->mutex);
|
|
return -1;
|
|
}
|
|
if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
|
|
_queue_unmark_waiter(queue, queues->mutex);
|
|
GLOBAL_FREE(data);
|
|
return -1;
|
|
}
|
|
assert(_PyCrossInterpreterData_INTERPID(data) == \
|
|
PyInterpreterState_GetID(PyInterpreterState_Get()));
|
|
|
|
// Add the data to the queue.
|
|
int64_t interpid = -1; // _queueitem_init() will set it.
|
|
int res = _queue_add(queue, interpid, data, fmt, unboundop);
|
|
_queue_unmark_waiter(queue, queues->mutex);
|
|
if (res != 0) {
|
|
// We may chain an exception here:
|
|
(void)_release_xid_data(data, 0);
|
|
GLOBAL_FREE(data);
|
|
return res;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
// Pop the next object off the queue. Fail if empty.
|
|
// XXX Support a "wait" mutex?
|
|
static int
|
|
queue_get(_queues *queues, int64_t qid,
|
|
PyObject **res, int *p_fmt, int *p_unboundop)
|
|
{
|
|
int err;
|
|
*res = NULL;
|
|
|
|
// Look up the queue.
|
|
_queue *queue = NULL;
|
|
err = _queues_lookup(queues, qid, &queue);
|
|
if (err != 0) {
|
|
return err;
|
|
}
|
|
// Past this point we are responsible for releasing the mutex.
|
|
assert(queue != NULL);
|
|
|
|
// Pop off the next item from the queue.
|
|
_PyCrossInterpreterData *data = NULL;
|
|
err = _queue_next(queue, &data, p_fmt, p_unboundop);
|
|
_queue_unmark_waiter(queue, queues->mutex);
|
|
if (err != 0) {
|
|
return err;
|
|
}
|
|
else if (data == NULL) {
|
|
assert(!PyErr_Occurred());
|
|
return 0;
|
|
}
|
|
|
|
// Convert the data back to an object.
|
|
PyObject *obj = _PyCrossInterpreterData_NewObject(data);
|
|
if (obj == NULL) {
|
|
assert(PyErr_Occurred());
|
|
// It was allocated in queue_put(), so we free it.
|
|
(void)_release_xid_data(data, XID_IGNORE_EXC | XID_FREE);
|
|
return -1;
|
|
}
|
|
// It was allocated in queue_put(), so we free it.
|
|
int release_res = _release_xid_data(data, XID_FREE);
|
|
if (release_res < 0) {
|
|
// The source interpreter has been destroyed already.
|
|
assert(PyErr_Occurred());
|
|
Py_DECREF(obj);
|
|
return -1;
|
|
}
|
|
|
|
*res = obj;
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
queue_get_maxsize(_queues *queues, int64_t qid, Py_ssize_t *p_maxsize)
|
|
{
|
|
_queue *queue = NULL;
|
|
int err = _queues_lookup(queues, qid, &queue);
|
|
if (err < 0) {
|
|
return err;
|
|
}
|
|
err = _queue_get_maxsize(queue, p_maxsize);
|
|
_queue_unmark_waiter(queue, queues->mutex);
|
|
return err;
|
|
}
|
|
|
|
static int
|
|
queue_is_full(_queues *queues, int64_t qid, int *p_is_full)
|
|
{
|
|
_queue *queue = NULL;
|
|
int err = _queues_lookup(queues, qid, &queue);
|
|
if (err < 0) {
|
|
return err;
|
|
}
|
|
err = _queue_is_full(queue, p_is_full);
|
|
_queue_unmark_waiter(queue, queues->mutex);
|
|
return err;
|
|
}
|
|
|
|
static int
|
|
queue_get_count(_queues *queues, int64_t qid, Py_ssize_t *p_count)
|
|
{
|
|
_queue *queue = NULL;
|
|
int err = _queues_lookup(queues, qid, &queue);
|
|
if (err < 0) {
|
|
return err;
|
|
}
|
|
err = _queue_get_count(queue, p_count);
|
|
_queue_unmark_waiter(queue, queues->mutex);
|
|
return err;
|
|
}
|
|
|
|
|
|
/* external Queue objects ***************************************************/
|
|
|
|
static int _queueobj_shared(PyThreadState *,
|
|
PyObject *, _PyCrossInterpreterData *);
|
|
|
|
static int
|
|
set_external_queue_type(module_state *state, PyTypeObject *queue_type)
|
|
{
|
|
// Clear the old value if the .py module was reloaded.
|
|
if (state->queue_type != NULL) {
|
|
(void)clear_xid_class(state->queue_type);
|
|
Py_CLEAR(state->queue_type);
|
|
}
|
|
|
|
// Add and register the new type.
|
|
if (ensure_xid_class(queue_type, _queueobj_shared) < 0) {
|
|
return -1;
|
|
}
|
|
state->queue_type = (PyTypeObject *)Py_NewRef(queue_type);
|
|
|
|
return 0;
|
|
}
|
|
|
|
static PyTypeObject *
|
|
get_external_queue_type(PyObject *module)
|
|
{
|
|
module_state *state = get_module_state(module);
|
|
|
|
PyTypeObject *cls = state->queue_type;
|
|
if (cls == NULL) {
|
|
// Force the module to be loaded, to register the type.
|
|
if (ensure_highlevel_module_loaded() < 0) {
|
|
return NULL;
|
|
}
|
|
cls = state->queue_type;
|
|
assert(cls != NULL);
|
|
}
|
|
return cls;
|
|
}
|
|
|
|
|
|
// XXX Use a new __xid__ protocol instead?
|
|
|
|
struct _queueid_xid {
|
|
int64_t qid;
|
|
};
|
|
|
|
static _queues * _get_global_queues(void);
|
|
|
|
static void *
|
|
_queueid_xid_new(int64_t qid)
|
|
{
|
|
_queues *queues = _get_global_queues();
|
|
if (_queues_incref(queues, qid) < 0) {
|
|
return NULL;
|
|
}
|
|
|
|
struct _queueid_xid *data = PyMem_RawMalloc(sizeof(struct _queueid_xid));
|
|
if (data == NULL) {
|
|
_queues_incref(queues, qid);
|
|
return NULL;
|
|
}
|
|
data->qid = qid;
|
|
return (void *)data;
|
|
}
|
|
|
|
static void
|
|
_queueid_xid_free(void *data)
|
|
{
|
|
int64_t qid = ((struct _queueid_xid *)data)->qid;
|
|
PyMem_RawFree(data);
|
|
_queues *queues = _get_global_queues();
|
|
int res = _queues_decref(queues, qid);
|
|
if (res == ERR_QUEUE_NOT_FOUND) {
|
|
// Already destroyed.
|
|
// XXX Warn?
|
|
}
|
|
else {
|
|
assert(res == 0);
|
|
}
|
|
}
|
|
|
|
static PyObject *
|
|
_queueobj_from_xid(_PyCrossInterpreterData *data)
|
|
{
|
|
int64_t qid = *(int64_t *)_PyCrossInterpreterData_DATA(data);
|
|
PyObject *qidobj = PyLong_FromLongLong(qid);
|
|
if (qidobj == NULL) {
|
|
return NULL;
|
|
}
|
|
|
|
PyObject *mod = _get_current_module();
|
|
if (mod == NULL) {
|
|
// XXX import it?
|
|
PyErr_SetString(PyExc_RuntimeError,
|
|
MODULE_NAME_STR " module not imported yet");
|
|
return NULL;
|
|
}
|
|
|
|
PyTypeObject *cls = get_external_queue_type(mod);
|
|
Py_DECREF(mod);
|
|
if (cls == NULL) {
|
|
Py_DECREF(qidobj);
|
|
return NULL;
|
|
}
|
|
PyObject *obj = PyObject_CallOneArg((PyObject *)cls, (PyObject *)qidobj);
|
|
Py_DECREF(qidobj);
|
|
return obj;
|
|
}
|
|
|
|
static int
|
|
_queueobj_shared(PyThreadState *tstate, PyObject *queueobj,
|
|
_PyCrossInterpreterData *data)
|
|
{
|
|
PyObject *qidobj = PyObject_GetAttrString(queueobj, "_id");
|
|
if (qidobj == NULL) {
|
|
return -1;
|
|
}
|
|
struct idarg_int64_converter_data converted = {
|
|
.label = "queue ID",
|
|
};
|
|
int res = idarg_int64_converter(qidobj, &converted);
|
|
Py_CLEAR(qidobj);
|
|
if (!res) {
|
|
assert(PyErr_Occurred());
|
|
return -1;
|
|
}
|
|
|
|
void *raw = _queueid_xid_new(converted.id);
|
|
if (raw == NULL) {
|
|
return -1;
|
|
}
|
|
_PyCrossInterpreterData_Init(data, tstate->interp, raw, NULL,
|
|
_queueobj_from_xid);
|
|
_PyCrossInterpreterData_SET_FREE(data, _queueid_xid_free);
|
|
return 0;
|
|
}
|
|
|
|
|
|
/* module level code ********************************************************/
|
|
|
|
/* globals is the process-global state for the module. It holds all
|
|
the data that we need to share between interpreters, so it cannot
|
|
hold PyObject values. */
|
|
static struct globals {
|
|
int module_count;
|
|
_queues queues;
|
|
} _globals = {0};
|
|
|
|
static int
|
|
_globals_init(void)
|
|
{
|
|
// XXX This isn't thread-safe.
|
|
_globals.module_count++;
|
|
if (_globals.module_count > 1) {
|
|
// Already initialized.
|
|
return 0;
|
|
}
|
|
|
|
assert(_globals.queues.mutex == NULL);
|
|
PyThread_type_lock mutex = PyThread_allocate_lock();
|
|
if (mutex == NULL) {
|
|
return ERR_QUEUES_ALLOC;
|
|
}
|
|
_queues_init(&_globals.queues, mutex);
|
|
return 0;
|
|
}
|
|
|
|
static void
|
|
_globals_fini(void)
|
|
{
|
|
// XXX This isn't thread-safe.
|
|
_globals.module_count--;
|
|
if (_globals.module_count > 0) {
|
|
return;
|
|
}
|
|
|
|
_queues_fini(&_globals.queues);
|
|
}
|
|
|
|
static _queues *
|
|
_get_global_queues(void)
|
|
{
|
|
return &_globals.queues;
|
|
}
|
|
|
|
|
|
static void
|
|
clear_interpreter(void *data)
|
|
{
|
|
if (_globals.module_count == 0) {
|
|
return;
|
|
}
|
|
PyInterpreterState *interp = (PyInterpreterState *)data;
|
|
assert(interp == _get_current_interp());
|
|
int64_t interpid = PyInterpreterState_GetID(interp);
|
|
_queues_clear_interpreter(&_globals.queues, interpid);
|
|
}
|
|
|
|
|
|
typedef struct idarg_int64_converter_data qidarg_converter_data;
|
|
|
|
static int
|
|
qidarg_converter(PyObject *arg, void *ptr)
|
|
{
|
|
qidarg_converter_data *data = ptr;
|
|
if (data->label == NULL) {
|
|
data->label = "queue ID";
|
|
}
|
|
return idarg_int64_converter(arg, ptr);
|
|
}
|
|
|
|
|
|
static PyObject *
|
|
queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
|
|
{
|
|
static char *kwlist[] = {"maxsize", "fmt", "unboundop", NULL};
|
|
Py_ssize_t maxsize;
|
|
int fmt;
|
|
int unboundop;
|
|
if (!PyArg_ParseTupleAndKeywords(args, kwds, "nii:create", kwlist,
|
|
&maxsize, &fmt, &unboundop))
|
|
{
|
|
return NULL;
|
|
}
|
|
if (!check_unbound(unboundop)) {
|
|
PyErr_Format(PyExc_ValueError,
|
|
"unsupported unboundop %d", unboundop);
|
|
return NULL;
|
|
}
|
|
|
|
int64_t qid = queue_create(&_globals.queues, maxsize, fmt, unboundop);
|
|
if (qid < 0) {
|
|
(void)handle_queue_error((int)qid, self, qid);
|
|
return NULL;
|
|
}
|
|
|
|
PyObject *qidobj = PyLong_FromLongLong(qid);
|
|
if (qidobj == NULL) {
|
|
PyObject *exc = PyErr_GetRaisedException();
|
|
int err = queue_destroy(&_globals.queues, qid);
|
|
if (handle_queue_error(err, self, qid)) {
|
|
// XXX issue a warning?
|
|
PyErr_Clear();
|
|
}
|
|
PyErr_SetRaisedException(exc);
|
|
return NULL;
|
|
}
|
|
|
|
return qidobj;
|
|
}
|
|
|
|
PyDoc_STRVAR(queuesmod_create_doc,
|
|
"create(maxsize, fmt, unboundop) -> qid\n\
|
|
\n\
|
|
Create a new cross-interpreter queue and return its unique generated ID.\n\
|
|
It is a new reference as though bind() had been called on the queue.\n\
|
|
\n\
|
|
The caller is responsible for calling destroy() for the new queue\n\
|
|
before the runtime is finalized.");
|
|
|
|
static PyObject *
|
|
queuesmod_destroy(PyObject *self, PyObject *args, PyObject *kwds)
|
|
{
|
|
static char *kwlist[] = {"qid", NULL};
|
|
qidarg_converter_data qidarg;
|
|
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:destroy", kwlist,
|
|
qidarg_converter, &qidarg)) {
|
|
return NULL;
|
|
}
|
|
int64_t qid = qidarg.id;
|
|
|
|
int err = queue_destroy(&_globals.queues, qid);
|
|
if (handle_queue_error(err, self, qid)) {
|
|
return NULL;
|
|
}
|
|
Py_RETURN_NONE;
|
|
}
|
|
|
|
PyDoc_STRVAR(queuesmod_destroy_doc,
|
|
"destroy(qid)\n\
|
|
\n\
|
|
Clear and destroy the queue. Afterward attempts to use the queue\n\
|
|
will behave as though it never existed.");
|
|
|
|
static PyObject *
|
|
queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
|
|
{
|
|
int64_t count = 0;
|
|
struct queue_id_and_info *qids = _queues_list_all(&_globals.queues, &count);
|
|
if (qids == NULL) {
|
|
if (!PyErr_Occurred() && count == 0) {
|
|
return PyList_New(0);
|
|
}
|
|
return NULL;
|
|
}
|
|
PyObject *ids = PyList_New((Py_ssize_t)count);
|
|
if (ids == NULL) {
|
|
goto finally;
|
|
}
|
|
struct queue_id_and_info *cur = qids;
|
|
for (int64_t i=0; i < count; cur++, i++) {
|
|
PyObject *item = Py_BuildValue("Lii", cur->id, cur->fmt,
|
|
cur->unboundop);
|
|
if (item == NULL) {
|
|
Py_SETREF(ids, NULL);
|
|
break;
|
|
}
|
|
PyList_SET_ITEM(ids, (Py_ssize_t)i, item);
|
|
}
|
|
|
|
finally:
|
|
PyMem_Free(qids);
|
|
return ids;
|
|
}
|
|
|
|
PyDoc_STRVAR(queuesmod_list_all_doc,
|
|
"list_all() -> [(qid, fmt)]\n\
|
|
\n\
|
|
Return the list of IDs for all queues.\n\
|
|
Each corresponding default format is also included.");
|
|
|
|
static PyObject *
|
|
queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds)
|
|
{
|
|
static char *kwlist[] = {"qid", "obj", "fmt", "unboundop", NULL};
|
|
qidarg_converter_data qidarg;
|
|
PyObject *obj;
|
|
int fmt;
|
|
int unboundop;
|
|
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oii:put", kwlist,
|
|
qidarg_converter, &qidarg, &obj, &fmt,
|
|
&unboundop))
|
|
{
|
|
return NULL;
|
|
}
|
|
int64_t qid = qidarg.id;
|
|
if (!check_unbound(unboundop)) {
|
|
PyErr_Format(PyExc_ValueError,
|
|
"unsupported unboundop %d", unboundop);
|
|
return NULL;
|
|
}
|
|
|
|
/* Queue up the object. */
|
|
int err = queue_put(&_globals.queues, qid, obj, fmt, unboundop);
|
|
// This is the only place that raises QueueFull.
|
|
if (handle_queue_error(err, self, qid)) {
|
|
return NULL;
|
|
}
|
|
|
|
Py_RETURN_NONE;
|
|
}
|
|
|
|
PyDoc_STRVAR(queuesmod_put_doc,
|
|
"put(qid, obj, fmt)\n\
|
|
\n\
|
|
Add the object's data to the queue.");
|
|
|
|
static PyObject *
|
|
queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds)
|
|
{
|
|
static char *kwlist[] = {"qid", NULL};
|
|
qidarg_converter_data qidarg;
|
|
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:get", kwlist,
|
|
qidarg_converter, &qidarg)) {
|
|
return NULL;
|
|
}
|
|
int64_t qid = qidarg.id;
|
|
|
|
PyObject *obj = NULL;
|
|
int fmt = 0;
|
|
int unboundop = 0;
|
|
int err = queue_get(&_globals.queues, qid, &obj, &fmt, &unboundop);
|
|
// This is the only place that raises QueueEmpty.
|
|
if (handle_queue_error(err, self, qid)) {
|
|
return NULL;
|
|
}
|
|
|
|
if (obj == NULL) {
|
|
return Py_BuildValue("Oii", Py_None, fmt, unboundop);
|
|
}
|
|
PyObject *res = Py_BuildValue("OiO", obj, fmt, Py_None);
|
|
Py_DECREF(obj);
|
|
return res;
|
|
}
|
|
|
|
PyDoc_STRVAR(queuesmod_get_doc,
|
|
"get(qid) -> (obj, fmt)\n\
|
|
\n\
|
|
Return a new object from the data at the front of the queue.\n\
|
|
The object's format is also returned.\n\
|
|
\n\
|
|
If there is nothing to receive then raise QueueEmpty.");
|
|
|
|
static PyObject *
|
|
queuesmod_bind(PyObject *self, PyObject *args, PyObject *kwds)
|
|
{
|
|
static char *kwlist[] = {"qid", NULL};
|
|
qidarg_converter_data qidarg;
|
|
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:bind", kwlist,
|
|
qidarg_converter, &qidarg)) {
|
|
return NULL;
|
|
}
|
|
int64_t qid = qidarg.id;
|
|
|
|
// XXX Check module state if bound already.
|
|
|
|
int err = _queues_incref(&_globals.queues, qid);
|
|
if (handle_queue_error(err, self, qid)) {
|
|
return NULL;
|
|
}
|
|
|
|
// XXX Update module state.
|
|
|
|
Py_RETURN_NONE;
|
|
}
|
|
|
|
PyDoc_STRVAR(queuesmod_bind_doc,
|
|
"bind(qid)\n\
|
|
\n\
|
|
Take a reference to the identified queue.\n\
|
|
The queue is not destroyed until there are no references left.");
|
|
|
|
static PyObject *
|
|
queuesmod_release(PyObject *self, PyObject *args, PyObject *kwds)
|
|
{
|
|
// Note that only the current interpreter is affected.
|
|
static char *kwlist[] = {"qid", NULL};
|
|
qidarg_converter_data qidarg;
|
|
if (!PyArg_ParseTupleAndKeywords(args, kwds,
|
|
"O&:release", kwlist,
|
|
qidarg_converter, &qidarg)) {
|
|
return NULL;
|
|
}
|
|
int64_t qid = qidarg.id;
|
|
|
|
// XXX Check module state if bound already.
|
|
// XXX Update module state.
|
|
|
|
int err = _queues_decref(&_globals.queues, qid);
|
|
if (handle_queue_error(err, self, qid)) {
|
|
return NULL;
|
|
}
|
|
|
|
Py_RETURN_NONE;
|
|
}
|
|
|
|
PyDoc_STRVAR(queuesmod_release_doc,
|
|
"release(qid)\n\
|
|
\n\
|
|
Release a reference to the queue.\n\
|
|
The queue is destroyed once there are no references left.");
|
|
|
|
static PyObject *
|
|
queuesmod_get_maxsize(PyObject *self, PyObject *args, PyObject *kwds)
|
|
{
|
|
static char *kwlist[] = {"qid", NULL};
|
|
qidarg_converter_data qidarg;
|
|
if (!PyArg_ParseTupleAndKeywords(args, kwds,
|
|
"O&:get_maxsize", kwlist,
|
|
qidarg_converter, &qidarg)) {
|
|
return NULL;
|
|
}
|
|
int64_t qid = qidarg.id;
|
|
|
|
Py_ssize_t maxsize = -1;
|
|
int err = queue_get_maxsize(&_globals.queues, qid, &maxsize);
|
|
if (handle_queue_error(err, self, qid)) {
|
|
return NULL;
|
|
}
|
|
return PyLong_FromLongLong(maxsize);
|
|
}
|
|
|
|
PyDoc_STRVAR(queuesmod_get_maxsize_doc,
|
|
"get_maxsize(qid)\n\
|
|
\n\
|
|
Return the maximum number of items in the queue.");
|
|
|
|
static PyObject *
|
|
queuesmod_get_queue_defaults(PyObject *self, PyObject *args, PyObject *kwds)
|
|
{
|
|
static char *kwlist[] = {"qid", NULL};
|
|
qidarg_converter_data qidarg;
|
|
if (!PyArg_ParseTupleAndKeywords(args, kwds,
|
|
"O&:get_queue_defaults", kwlist,
|
|
qidarg_converter, &qidarg)) {
|
|
return NULL;
|
|
}
|
|
int64_t qid = qidarg.id;
|
|
|
|
_queue *queue = NULL;
|
|
int err = _queues_lookup(&_globals.queues, qid, &queue);
|
|
if (handle_queue_error(err, self, qid)) {
|
|
return NULL;
|
|
}
|
|
int fmt = queue->defaults.fmt;
|
|
int unboundop = queue->defaults.unboundop;
|
|
_queue_unmark_waiter(queue, _globals.queues.mutex);
|
|
|
|
PyObject *defaults = Py_BuildValue("ii", fmt, unboundop);
|
|
return defaults;
|
|
}
|
|
|
|
PyDoc_STRVAR(queuesmod_get_queue_defaults_doc,
|
|
"get_queue_defaults(qid)\n\
|
|
\n\
|
|
Return the queue's default values, set when it was created.");
|
|
|
|
static PyObject *
|
|
queuesmod_is_full(PyObject *self, PyObject *args, PyObject *kwds)
|
|
{
|
|
static char *kwlist[] = {"qid", NULL};
|
|
qidarg_converter_data qidarg;
|
|
if (!PyArg_ParseTupleAndKeywords(args, kwds,
|
|
"O&:is_full", kwlist,
|
|
qidarg_converter, &qidarg)) {
|
|
return NULL;
|
|
}
|
|
int64_t qid = qidarg.id;
|
|
|
|
int is_full = 0;
|
|
int err = queue_is_full(&_globals.queues, qid, &is_full);
|
|
if (handle_queue_error(err, self, qid)) {
|
|
return NULL;
|
|
}
|
|
if (is_full) {
|
|
Py_RETURN_TRUE;
|
|
}
|
|
Py_RETURN_FALSE;
|
|
}
|
|
|
|
PyDoc_STRVAR(queuesmod_is_full_doc,
|
|
"is_full(qid)\n\
|
|
\n\
|
|
Return true if the queue has a maxsize and has reached it.");
|
|
|
|
static PyObject *
|
|
queuesmod_get_count(PyObject *self, PyObject *args, PyObject *kwds)
|
|
{
|
|
static char *kwlist[] = {"qid", NULL};
|
|
qidarg_converter_data qidarg;
|
|
if (!PyArg_ParseTupleAndKeywords(args, kwds,
|
|
"O&:get_count", kwlist,
|
|
qidarg_converter, &qidarg)) {
|
|
return NULL;
|
|
}
|
|
int64_t qid = qidarg.id;
|
|
|
|
Py_ssize_t count = -1;
|
|
int err = queue_get_count(&_globals.queues, qid, &count);
|
|
if (handle_queue_error(err, self, qid)) {
|
|
return NULL;
|
|
}
|
|
assert(count >= 0);
|
|
return PyLong_FromSsize_t(count);
|
|
}
|
|
|
|
PyDoc_STRVAR(queuesmod_get_count_doc,
|
|
"get_count(qid)\n\
|
|
\n\
|
|
Return the number of items in the queue.");
|
|
|
|
static PyObject *
|
|
queuesmod__register_heap_types(PyObject *self, PyObject *args, PyObject *kwds)
|
|
{
|
|
static char *kwlist[] = {"queuetype", "emptyerror", "fullerror", NULL};
|
|
PyObject *queuetype;
|
|
PyObject *emptyerror;
|
|
PyObject *fullerror;
|
|
if (!PyArg_ParseTupleAndKeywords(args, kwds,
|
|
"OOO:_register_heap_types", kwlist,
|
|
&queuetype, &emptyerror, &fullerror)) {
|
|
return NULL;
|
|
}
|
|
if (!PyType_Check(queuetype)) {
|
|
PyErr_SetString(PyExc_TypeError,
|
|
"expected a type for 'queuetype'");
|
|
return NULL;
|
|
}
|
|
if (!PyExceptionClass_Check(emptyerror)) {
|
|
PyErr_SetString(PyExc_TypeError,
|
|
"expected an exception type for 'emptyerror'");
|
|
return NULL;
|
|
}
|
|
if (!PyExceptionClass_Check(fullerror)) {
|
|
PyErr_SetString(PyExc_TypeError,
|
|
"expected an exception type for 'fullerror'");
|
|
return NULL;
|
|
}
|
|
|
|
module_state *state = get_module_state(self);
|
|
|
|
if (set_external_queue_type(state, (PyTypeObject *)queuetype) < 0) {
|
|
return NULL;
|
|
}
|
|
if (set_external_exc_types(state, emptyerror, fullerror) < 0) {
|
|
return NULL;
|
|
}
|
|
|
|
Py_RETURN_NONE;
|
|
}
|
|
|
|
static PyMethodDef module_functions[] = {
|
|
{"create", _PyCFunction_CAST(queuesmod_create),
|
|
METH_VARARGS | METH_KEYWORDS, queuesmod_create_doc},
|
|
{"destroy", _PyCFunction_CAST(queuesmod_destroy),
|
|
METH_VARARGS | METH_KEYWORDS, queuesmod_destroy_doc},
|
|
{"list_all", queuesmod_list_all,
|
|
METH_NOARGS, queuesmod_list_all_doc},
|
|
{"put", _PyCFunction_CAST(queuesmod_put),
|
|
METH_VARARGS | METH_KEYWORDS, queuesmod_put_doc},
|
|
{"get", _PyCFunction_CAST(queuesmod_get),
|
|
METH_VARARGS | METH_KEYWORDS, queuesmod_get_doc},
|
|
{"bind", _PyCFunction_CAST(queuesmod_bind),
|
|
METH_VARARGS | METH_KEYWORDS, queuesmod_bind_doc},
|
|
{"release", _PyCFunction_CAST(queuesmod_release),
|
|
METH_VARARGS | METH_KEYWORDS, queuesmod_release_doc},
|
|
{"get_maxsize", _PyCFunction_CAST(queuesmod_get_maxsize),
|
|
METH_VARARGS | METH_KEYWORDS, queuesmod_get_maxsize_doc},
|
|
{"get_queue_defaults", _PyCFunction_CAST(queuesmod_get_queue_defaults),
|
|
METH_VARARGS | METH_KEYWORDS, queuesmod_get_queue_defaults_doc},
|
|
{"is_full", _PyCFunction_CAST(queuesmod_is_full),
|
|
METH_VARARGS | METH_KEYWORDS, queuesmod_is_full_doc},
|
|
{"get_count", _PyCFunction_CAST(queuesmod_get_count),
|
|
METH_VARARGS | METH_KEYWORDS, queuesmod_get_count_doc},
|
|
{"_register_heap_types", _PyCFunction_CAST(queuesmod__register_heap_types),
|
|
METH_VARARGS | METH_KEYWORDS, NULL},
|
|
|
|
{NULL, NULL} /* sentinel */
|
|
};
|
|
|
|
|
|
/* initialization function */
|
|
|
|
PyDoc_STRVAR(module_doc,
|
|
"This module provides primitive operations to manage Python interpreters.\n\
|
|
The 'interpreters' module provides a more convenient interface.");
|
|
|
|
static int
|
|
module_exec(PyObject *mod)
|
|
{
|
|
if (_globals_init() != 0) {
|
|
return -1;
|
|
}
|
|
|
|
/* Add exception types */
|
|
if (add_QueueError(mod) < 0) {
|
|
goto error;
|
|
}
|
|
|
|
/* Make sure queues drop objects owned by this interpreter. */
|
|
PyInterpreterState *interp = _get_current_interp();
|
|
PyUnstable_AtExit(interp, clear_interpreter, (void *)interp);
|
|
|
|
return 0;
|
|
|
|
error:
|
|
_globals_fini();
|
|
return -1;
|
|
}
|
|
|
|
static struct PyModuleDef_Slot module_slots[] = {
|
|
{Py_mod_exec, module_exec},
|
|
{Py_mod_multiple_interpreters, Py_MOD_PER_INTERPRETER_GIL_SUPPORTED},
|
|
{Py_mod_gil, Py_MOD_GIL_NOT_USED},
|
|
{0, NULL},
|
|
};
|
|
|
|
static int
|
|
module_traverse(PyObject *mod, visitproc visit, void *arg)
|
|
{
|
|
module_state *state = get_module_state(mod);
|
|
traverse_module_state(state, visit, arg);
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
module_clear(PyObject *mod)
|
|
{
|
|
module_state *state = get_module_state(mod);
|
|
|
|
// Now we clear the module state.
|
|
clear_module_state(state);
|
|
return 0;
|
|
}
|
|
|
|
static void
|
|
module_free(void *mod)
|
|
{
|
|
module_state *state = get_module_state(mod);
|
|
|
|
// Now we clear the module state.
|
|
clear_module_state(state);
|
|
|
|
_globals_fini();
|
|
}
|
|
|
|
static struct PyModuleDef moduledef = {
|
|
.m_base = PyModuleDef_HEAD_INIT,
|
|
.m_name = MODULE_NAME_STR,
|
|
.m_doc = module_doc,
|
|
.m_size = sizeof(module_state),
|
|
.m_methods = module_functions,
|
|
.m_slots = module_slots,
|
|
.m_traverse = module_traverse,
|
|
.m_clear = module_clear,
|
|
.m_free = (freefunc)module_free,
|
|
};
|
|
|
|
PyMODINIT_FUNC
|
|
MODINIT_FUNC_NAME(void)
|
|
{
|
|
return PyModuleDef_Init(&moduledef);
|
|
}
|