Python中多线程的实现

Python在Py_Initialize中会创建一个默认的PyInterpreterState对象以及一个默认的 PyThreadState 对象。默认情况下不会初始化线程环境(也就是创建GIL),这个任务 是在第一次调用thread.start_new_thread是初始化的。

http://docs.python.org/c-api/init.html#thread-state-and-the-global-interpreter-lock

http://www.linuxjournal.com/article/3641

线程的创建

Modules/threadmodule.c
========================

static PyObject *
thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs)
{
    PyObject *func, *args, *keyw = NULL;
    struct bootstate *boot;
    long ident;

    if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 3,
                           &func, &args, &keyw))
        return NULL;
    if (!PyCallable_Check(func)) {
        PyErr_SetString(PyExc_TypeError,
                        "first arg must be callable");
        return NULL;
    }
    if (!PyTuple_Check(args)) {
        PyErr_SetString(PyExc_TypeError,
                        "2nd arg must be a tuple");
        return NULL;
    }
    if (keyw != NULL && !PyDict_Check(keyw)) {
        PyErr_SetString(PyExc_TypeError,
                        "optional 3rd arg must be a dictionary");
        return NULL;
    }

    boot = PyMem_NEW(struct bootstate, 1);
    if (boot == NULL)
        return PyErr_NoMemory();
    boot->interp = PyThreadState_GET()->interp;
    boot->func = func;
    boot->args = args;
    boot->keyw = keyw;
    // 创建新线程对应的PyThreadState结构
    boot->tstate = _PyThreadState_Prealloc(boot->interp);
    if (boot->tstate == NULL) {
        PyMem_DEL(boot);
        return PyErr_NoMemory();
    }
    Py_INCREF(func);
    Py_INCREF(args);
    Py_XINCREF(keyw);
    // 初始化线程环境
    PyEval_InitThreads();
    // 如果是pthread库,这个函数基本等于pthread_create
    ident = PyThread_start_new_thread(t_bootstrap, (void*) boot);
    if (ident == -1) {
        PyErr_SetString(ThreadError, "can't start new thread");
        Py_DECREF(func);
        Py_DECREF(args);
        Py_XDECREF(keyw);
        PyThreadState_Clear(boot->tstate);
        PyMem_DEL(boot);
        return NULL;
    }
    return PyInt_FromLong(ident);
}

Python/pystate.c
==================

PyThreadState *_PyThreadState_Current = NULL;

PyThreadState *
PyThreadState_Swap(PyThreadState *newts)
{
    PyThreadState *oldts = _PyThreadState_Current;
    _PyThreadState_Current = newts;

    return oldts;
}

Python/ceval.c
===============

// 这个就是GIL了
static PyThread_type_lock interpreter_lock = 0;

void
PyEval_InitThreads(void)
{
    if (interpreter_lock)
        return;
    // 在这里创建GIL
    interpreter_lock = PyThread_allocate_lock();
    PyThread_acquire_lock(interpreter_lock, 1);
    main_thread = PyThread_get_thread_ident();
}

Modules/threadmodule.c
=========================

static void
t_bootstrap(void *boot_raw)
{
    struct bootstate *boot = (struct bootstate *) boot_raw;
    PyThreadState *tstate;
    PyObject *res;

    tstate = boot->tstate;
    tstate->thread_id = PyThread_get_thread_ident();
    _PyThreadState_Init(tstate);
    PyEval_AcquireThread(tstate);
    res = PyEval_CallObjectWithKeywords(
        boot->func, boot->args, boot->keyw);
    if (res == NULL) {
        if (PyErr_ExceptionMatches(PyExc_SystemExit))
            PyErr_Clear();
        else {
            PyObject *file;
            PySys_WriteStderr(
                "Unhandled exception in thread started by ");
            file = PySys_GetObject("stderr");
            if (file)
                PyFile_WriteObject(boot->func, file, 0);
            else
                PyObject_Print(boot->func, stderr, 0);
            PySys_WriteStderr("\n");
            PyErr_PrintEx(0);
        }
    }
    else
        Py_DECREF(res);
    Py_DECREF(boot->func);
    Py_DECREF(boot->args);
    Py_XDECREF(boot->keyw);
    PyMem_DEL(boot_raw);
    PyThreadState_Clear(tstate);
    PyThreadState_DeleteCurrent();
    PyThread_exit_thread();
}

Python/ceval.c
================

void
PyEval_AcquireThread(PyThreadState *tstate)
{
    if (tstate == NULL)
        Py_FatalError("PyEval_AcquireThread: NULL new thread state");
    /* Check someone has called PyEval_InitThreads() to create the lock */
    assert(interpreter_lock);
    // 请求获取GIL
    PyThread_acquire_lock(interpreter_lock, 1);
    // Got it! 我们现在是当前线程了,设置全局的PyThreadState指针
    if (PyThreadState_Swap(tstate) != NULL)
        Py_FatalError(
            "PyEval_AcquireThread: non-NULL old thread state");
}

线程的调度

  1. 标准调度

    Python/ceval.c
    ===============
    
    volatile int _Py_Ticker = 100;
    
    PyObject *
    PyEval_EvalFrameEx(PyFrameObject *f, int throwflag)
    {
        ...
    
        PyThreadState *tstate = PyThreadState_GET();
    
        ...
    
        for (;;) {
            ...
    
            if (--_Py_Ticker < 0) {
                if (interpreter_lock) {
    
                    // 保存状态,释放GIL,从而触发操作系统进行线程的切换
                    if (PyThreadState_Swap(NULL) != tstate)
                        Py_FatalError("ceval: tstate mix-up");
                    PyThread_release_lock(interpreter_lock);
    
                    // 请求GIL,等待下次被操作系统调度
                    PyThread_acquire_lock(interpreter_lock, 1);
                    if (PyThreadState_Swap(tstate) != NULL)
                        Py_FatalError("ceval: orphan tstate");
    
                    ...
                }
            }
    
        fast_next_code:
    
            // dispatch opcode and execute
            ...
        }
    
        ...
    }
    
  2. 阻塞调度

    #define Py_BEGIN_ALLOW_THREADS { \
                            PyThreadState *_save; \
                            _save = PyEval_SaveThread();
    
    PyThreadState *
    PyEval_SaveThread(void)
    {
        PyThreadState *tstate = PyThreadState_Swap(NULL);
        if (tstate == NULL)
            Py_FatalError("PyEval_SaveThread: NULL tstate");
    
        if (interpreter_lock)
            PyThread_release_lock(interpreter_lock);
    
        return tstate;
    }
    
    #define Py_END_ALLOW_THREADS    PyEval_RestoreThread(_save); \
                     }
    

    在C extension中需要调用阻塞函数的时候时候会用到这两个宏,使用Py_BEGIN_ALLOW_THREADS 保存当前线程状态,主动释放GIL,从而让其它线程可以获得调度的机会,这个时候才是 实际意义上的多线程。阻塞函数返回后,需要操作Python对象时,在Py_END_ALLOW_THREADS 重新请求获取GIL。

    multiprocessing

PyGILState*函数

使用Python的thread模块创建出来的线程,Python会自动为其创建对应的PyThreadState对象, 管理GIL。这些thread中的代码可以直接调用Python/C API。

但是对于embed python或者python c extension中自行创建的线程,如果在这些线程需要调用 Python/C API,就必须创建自己的PyThreadState对象,并且在操作Python API之前获取GIL, 对于这类应用场景的支持,Python提供了PyGILState*函数。

PyGILState_STATE
PyGILState_Ensure(void)
{
    int current;
    PyThreadState *tcur;
    /* Note that we do not auto-init Python here - apart from
       potential races with 2 threads auto-initializing, pep-311
       spells out other issues.  Embedders are expected to have
       called Py_Initialize() and usually PyEval_InitThreads().
    */
    assert(autoInterpreterState); /* Py_Initialize() hasn't been called! */

    // 获取本线程的PyThreadState对象
    tcur = (PyThreadState *)PyThread_get_key_value(autoTLSkey);
    if (tcur == NULL) {
        /* Create a new thread state for this thread */
        tcur = PyThreadState_New(autoInterpreterState);
        if (tcur == NULL)
            Py_FatalError("Couldn't create thread-state for new thread");
        /* This is our thread state!  We'll need to delete it in the
           matching call to PyGILState_Release(). */
        tcur->gilstate_counter = 0;
        current = 0; /* new thread state is never current */
    }
    else
        // Python线程在获得GIL之后,首先会将全局的当前ThreadState对象换成
        // 本线程的PyThreadState对象,同样在释放GIL前,也会首先保存下这个
        // 全局的当前ThreadState对象。
        // 所以可以通过判断全局的当前线程ThreadState对象等于tcur判断本线程
        // 是否已经获得了GIL。
        current = PyThreadState_IsCurrent(tcur);
    if (current == 0)
        // 如果没有获取GIL,那么这里请求GIL。
        PyEval_RestoreThread(tcur);
    /* Update our counter in the thread-state - no need for locks:
       - tcur will remain valid as we hold the GIL.
       - the counter is safe as we are the only thread "allowed"
         to modify this value
    */
    ++tcur->gilstate_counter;
    return current ? PyGILState_LOCKED : PyGILState_UNLOCKED;
}

void
PyGILState_Release(PyGILState_STATE oldstate)
{
    PyThreadState *tcur = (PyThreadState *)PyThread_get_key_value(
                                                            autoTLSkey);
    if (tcur == NULL)
        Py_FatalError("auto-releasing thread-state, "
                      "but no thread-state for this thread");
    /* We must hold the GIL and have our thread state current */
    /* XXX - remove the check - the assert should be fine,
       but while this is very new (April 2003), the extra check
       by release-only users can't hurt.
    */
    if (! PyThreadState_IsCurrent(tcur))
        Py_FatalError("This thread state must be current when releasing");
    assert(PyThreadState_IsCurrent(tcur));
    --tcur->gilstate_counter;
    assert(tcur->gilstate_counter >= 0); /* illegal counter value */

    /* If we're going to destroy this thread-state, we must
     * clear it while the GIL is held, as destructors may run.
     */
    if (tcur->gilstate_counter == 0) {
        /* can't have been locked when we created it */
        assert(oldstate == PyGILState_UNLOCKED);
        PyThreadState_Clear(tcur);
        /* Delete the thread-state.  Note this releases the GIL too!
         * It's vital that the GIL be held here, to avoid shutdown
         * races; see bugs 225673 and 1061968 (that nasty bug has a
         * habit of coming back).
         */
        PyThreadState_DeleteCurrent();
    }
    /* Release the lock if necessary */
    else if (oldstate == PyGILState_UNLOCKED)
        PyEval_SaveThread();
}