gunicorn源码分析:Master-Worker模式

gunicorn源码分析:Master-Worker模式

最近阅读了gunicron的源码。 gunicron的工作模式采用Master-Worker, 提前fork出配置文件中配置数量的worker进程(prefork),这些worker进程同事监听这master进程的一个socket. worker进程响应用户请求。master进程通过各种信号量管理者worker进程。整个实现非常的巧妙,很多细节值得学习。

我把代码回退到较早的一个版本的提交,主要想看清楚整个处理架构。
commit_id: ec301fd43d26207ff0dd9e06925882bc017dc866

文件结构

.
├── LICENSE
├── Manifest.in
├── README.rst
├── bin
│   ├── gunicorn  #程序入口
│   └── gunicorn_django # django_app
├── examples
│   └── test.py # test app符合wsgi协议
├── gunicorn # 核心目录
│   ├── __init__.py 
│   ├── arbiter.py  # master进程启动文件
│   ├── http
│   │   ├── __init__.py
│   │   ├── iostream.py
│   │   ├── request.py # 处理http请求
│   │   ├── response.py # 处理http请求
│   ├── util.py
│   ├── worker.py  # woker进程核心文件
└── setup.py

代码启动

1
2
arbiter = gunicorn.Arbiter((opts.host, opts.port), opts.workers, app)
arbiter.run()

构造Arbiter类,传入监听端口,worker数量,调用run方法启动。项目启动后能看到3个进程(设置worker数量为2)

1
2
3
501 75763 72171   0 Fri11PM ??         0:00.58 /System/Library/Frameworks/Python.framework/Versions/2.7/Resources/Python.app/Contents/MacOS/Python /Users/wangjinlong/Documents/jinlong/code/github/gunicorn/bin/gunicorn --workers=2 examples/test:app
501 75764 75763 0 Fri11PM ?? 0:00.11 /System/Library/Frameworks/Python.framework/Versions/2.7/Resources/Python.app/Contents/MacOS/Python /Users/wangjinlong/Documents/jinlong/code/github/gunicorn/bin/gunicorn --workers=2 examples/test:app
501 75765 75763 0 Fri11PM ?? 0:00.11 /System/Library/Frameworks/Python.framework/Versions/2.7/Resources/Python.app/Contents/MacOS/Python /Users/wangjinlong/Documents/jinlong/code/github/gunicorn/bin/gunicorn --workers=2 examples/test:app

可以看到两个75763fork出两个子进程7576475765. 75763是master进程,
子进程为worker进程。

master类代码

类图

arbiter类图如下

image

构造函数

master类的构造方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class Arbiter(object):

LISTENER = None # 后面赋值为监听的地址和端口构造的socket对象(127.0.0.1:8000)
WORKERS = {}
PIPE = [] # 父子进程通过管道通讯?没看出来具体作用。知识点见: https://www.cnblogs.com/kunhu/p/3608109.html

# I love dyanmic languages
SIG_QUEUE = [] # 信号量队列
SIGNALS = map(
lambda x: getattr(signal, "SIG%s" % x),
"CHLD HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()
)
SIG_NAMES = dict(
(getattr(signal, name), name[3:].lower()) for name in dir(signal)
if name[:3] == "SIG" and name[3] != "_"
)

def __init__(self, address, num_workers, modname):
self.address = address # 地址 127.0.0.1:8080
self.num_workers = num_workers # woker数量,不用说了一版为cpu核数
self.modname = modname # app名字,app为符合wsgi协议的python web代码
self.timeout = 30 # 超时时间
self.reexec_pid = 0 # 子进程ID
self.pid = os.getpid() # master 进程ID
self.init_signals() # 初始化信号量对应操作
self.listen(self.address) # 构造socket对象,并把LISTENER赋值为这个对象
log.info("Booted Arbiter: %s" % os.getpid())

self.init_signals() 初始化了所有的信号量以及对应操作, master靠信号量来管理worker进程,例如:增加一个worker进程,杀死一个woker进程等等,具体代码如下:

1
2
3
4
5
6
7
def init_signals(self):
if self.PIPE:
map(lambda p: p.close(), self.PIPE)
self.PIPE = pair = os.pipe()
map(self.set_non_blocking, pair)
map(lambda p: fcntl.fcntl(p, fcntl.F_SETFD, fcntl.FD_CLOEXEC), pair)
map(lambda s: signal.signal(s, self.signal), self.SIGNALS)

self.PIPE 没搞清楚到底作用是什么?调试代码发现可能是为了在master进程中,有队PIPE写和读的操作。

self.listen 方法主要是构造了socket对象

重点看Arbiter类的run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def run(self):
self.manage_workers()
while True:
try:
sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None
if sig is None:
self.sleep()
continue

if sig not in self.SIG_NAMES:
log.info("Ignoring unknown signal: %s" % sig)
continue

signame = self.SIG_NAMES.get(sig)
handler = getattr(self, "handle_%s" % signame, None)
if not handler:
log.error("Unhandled signal: %s" % signame)
continue
log.info("Handling signal: %s" % signame)
handler()

self.murder_workers()
self.reap_workers()
self.manage_workers()
except StopIteration:
break
except KeyboardInterrupt:
self.stop(False)
sys.exit(-1)
except Exception, e:
log.exception("Unhandled exception in main loop.")
self.stop(False)
sys.exit(-1)

log.info("Master is shutting down.")
self.stop()

self.manage_workers方法:

1
2
3
4
5
6
7
def manage_workers(self):
if len(self.WORKERS.keys()) < self.num_workers:
self.spawn_workers()

for pid, w in self.WORKERS.items():
if w.id >= self.num_workers:
self.kill_worker(pid, signal.SIGQUIT)

self.spawn_workers方法就可以看到fork worker的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def spawn_workers(self):
workers = set(w.id for w in self.WORKERS.values())
for i in range(self.num_workers):
if i in workers:
continue

worker = Worker(i, self.pid, self.LISTENER, self.modname)
pid = os.fork()
if pid != 0:
self.WORKERS[pid] = worker
continue

# Process Child
worker_pid = os.getpid()
try:
log.info("Worker %s booting" % worker_pid)
worker.run()
sys.exit(0)
except SystemExit:
raise
except:
log.exception("Exception in worker process.")
sys.exit(-1)
finally:
worker.tmp.close()
log.info("Worker %s exiting." % worker_pid)