[Python][Django][asyncio]关于多线程、协程以及异步在Django中的应用

在开发环境中,Django默认用runserver方式启动,请求默认是支持多线程的,可以通过加上

--nothreading

参数,禁止多线程运行;
但是在生产环境中,都会用uWSGI作为前置启动方式,它是默认采用多进程方式工作,每个进程是单线程运行,可以通过加上–threads设置单个进程运行的线程数;
Important!!!
重要的事情说三遍,单进程单线程的工作模式会导致工程中使用threading启动新的线程并不会一直工作,只在网络有请求到来时,主线程运行的那段时间内,新线程才会同步运行!!!
重要的事情说三遍,单进程单线程的工作模式会导致工程中使用threading启动新的线程并不会一直工作,只在网络有请求到来时,主线程运行的那段时间内,新线程才会同步运行!!!
重要的事情说三遍,单进程单线程的工作模式会导致工程中使用threading启动新的线程并不会一直工作,只在网络有请求到来时,主线程运行的那段时间内,新线程才会同步运行!!!
Gunicorn也是一种高性能的Python WSGI HTTP Server,默认工作方式是sync,也是多进程,每个进程默认单线程的工作方式,可以在配置文件中加入threads参数设置线程的数量(假定在gthread模式下生效!也就是会用gthread替代sync模式);
因为Python的全局解释器锁(GIL)的缘故,采用多线程进行上下文切换效率很低,不如异步框架(https://blog.csdn.net/yiliumu/article/details/77983393)。
下面针对gevent模式来说明:
1、gevent是基于协程(coroutine -based)的Python网络库
2、Event Loop是其核心概念
3、代码执行的同步、异步机制研究(http://sdiehl.github.io/gevent-tutorial/
4、使用siege进行压力测试,sync在每个请求执行时间很短的情况下,是优于gevent模式,因为串行一次比切换一次loop消耗开销更小(0.01s);但是在单个业务超过3s以上的时候,串行阻塞时间会叠加、直到前一个业务返回,导致平均响应时间线性上升,如果client并发超过server的最大并发数量,所有请求将会严重等待。。。而基于协程的并行模式,只要不超过服务器的最大连接数,每个业务都只消耗该任务的网络I/O的等待时间,对于网络异步IO密集的任务很适用。。。例如:APNs推送。。。
5、Gunicorn下threading会正常工作,并不会停止运行!!!!

总结一下:多线程是线程调度,每个线程运行网络请求需要while循环等待网络数据,多个请求是竞争状态;协程是在Socket进入等待的时候,运行状态会挂起,进入下一个消息循环运行别的代码、直到收到信号数据准备好了,再切回来运行,尽量让工作都不闲着。。。因为是单线程,所以在本线程的cpu时间内运行的飞起。。。直到server的连接数满上限,不能接受多余的任务为止。。。

附上一张Gunicorn-Gevent初始化运行的调试堆栈和代码:

 
class GeventWorker(AsyncWorker):
 
    server_class = None
    wsgi_handler = None
 
    def patch(self):
        from gevent import monkey
        monkey.noisy = False
 
        # if the new version is used make sure to patch subprocess
        if gevent.version_info[0] == 0:
            monkey.patch_all()
        else:
            monkey.patch_all(subprocess=True)
 
        # monkey patch sendfile to make it none blocking
        patch_sendfile()
 
        # patch sockets
        sockets = []
        for s in self.sockets:
            if sys.version_info[0] == 3:
                sockets.append(socket(s.FAMILY, _socket.SOCK_STREAM,
                    fileno=s.sock.fileno()))
            else:
                sockets.append(socket(s.FAMILY, _socket.SOCK_STREAM,
                    _sock=s))
        self.sockets = sockets
 
    def notify(self):
        super(GeventWorker, self).notify()
        if self.ppid != os.getppid():
            self.log.info("Parent changed, shutting down: %s", self)
            sys.exit(0)
 
    def timeout_ctx(self):
        return gevent.Timeout(self.cfg.keepalive, False)
 
    def run(self):
        servers = []
        ssl_args = {}
 
        if self.cfg.is_ssl:
            ssl_args = dict(server_side=True, **self.cfg.ssl_options)
 
        for s in self.sockets:
            s.setblocking(1)
            pool = Pool(self.worker_connections)
            if self.server_class is not None:
                environ = base_environ(self.cfg)
                environ.update({
                    "wsgi.multithread": True,
                    "SERVER_SOFTWARE": VERSION,
                })
                server = self.server_class(
                    s, application=self.wsgi, spawn=pool, log=self.log,
                    handler_class=self.wsgi_handler, environ=environ,
                    **ssl_args)
            else:
                hfun = partial(self.handle, s)
                server = StreamServer(s, handle=hfun, spawn=pool, **ssl_args)
 
            server.start()
            servers.append(server)
 
        while self.alive:
            self.notify()
            gevent.sleep(1.0)
 
        try:
            # Stop accepting requests
            for server in servers:
                if hasattr(server, 'close'):  # gevent 1.0
                    server.close()
                if hasattr(server, 'kill'):  # gevent < 1.0
                    server.kill()
 
            # Handle current requests until graceful_timeout
            ts = time.time()
            while time.time() - ts <= self.cfg.graceful_timeout:
                accepting = 0
                for server in servers:
                    if server.pool.free_count() != server.pool.size:
                        accepting += 1
 
                # if no server is accepting a connection, we can exit
                if not accepting:
                    return
 
                self.notify()
                gevent.sleep(1.0)
 
            # Force kill all active the handlers
            self.log.warning("Worker graceful timeout (pid:%s)" % self.pid)
            [server.stop(timeout=1) for server in servers]
        except:
            pass
 
    def handle_request(self, *args):
        try:
            super(GeventWorker, self).handle_request(*args)
        except gevent.GreenletExit:
            pass
        except SystemExit:
            pass
 
    def handle_quit(self, sig, frame):
        # Move this out of the signal handler so we can use
        # blocking calls. See #1126
        gevent.spawn(super(GeventWorker, self).handle_quit, sig, frame)
 
    if gevent.version_info[0] == 0:
 
        def init_process(self):
            # monkey patch here
            self.patch()
 
            # reinit the hub
            import gevent.core
            gevent.core.reinit()
 
            #gevent 0.13 and older doesn't reinitialize dns for us after forking
            #here's the workaround
            gevent.core.dns_shutdown(fail_requests=1)
            gevent.core.dns_init()
            super(GeventWorker, self).init_process()
 
    else:
 
        def init_process(self):
            # monkey patch here
            self.patch()
 
            # reinit the hub
            from gevent import hub
            hub.reinit()
 
            # then initialize the process
            super(GeventWorker, self).init_process()

从运行过程可以看到,Gunicorn启动Django的时候,默认已经打上了全部的monkey补丁。。。同时也更改了wsgi.multithread=True。。。