18.2 线程和进程

18.2.1 什么是进程?

18.2.1 什么是进程?

计算机程序只不过是磁盘中可执行的,二进制的数据。它们只有在被读取到内存中,被操作系统调用的时候才开始它们的生命周期。进程(重量级进程)是程序的一 次执行,每个进程都有自己的地址空间,内存,数据栈以及其它记录其运行轨迹的辅助数据。操作系统管理在其上运行的所有进程,并为这些进程公平的分配时间。 进程也可以通过fork和spawn操作来完成其它的任务。不过各个进程有自己的内存空间,数据栈等,所以只能使用进程间通讯(IPC),而不能直接共享 信息。

18.2.2 什么是线程

线程(轻量级进程)跟进程有些相似,不同的是:所有的线程运行在同一个进程中,共享相同的运行环境。它们可以想象成是在主进程或“主线程”中并行运行的“迷你进程”。

线程有开始,顺序执行和结束三部分。它有一个自己的指令指针,记录自己运行到什么地方。线程的运行可能被抢占(中断),或暂时的被挂起(也叫睡眠),让其 它的线程运行,这叫做让步。一个进程中的各个线程之间共享同一片数据空间,所以线程之间可以比进程之间更方便的共享数据以及相互通讯。线程一般都是并发执 行的,正式由于这种并行和数据共享的机制使得多个任务的合作变成可能。实际上,在单CPU的系统中,真正的并发是不可能的,每个线程会被安排成每次只运行 一会,然后就把CPU让出来,让其它的线程去运行。在进程的整个运行过程中,每个线程都只做自己的事,在需要的时候跟其它的线程共享运行的结果。

当然,这样的共享并不是完全没有危险的。如果多个线程共同访问同一片数据,则由于数据访问的顺序不一样,有可能导致数据结果的不一致问题。

另一个需要注意的地方是:由于有的函数会在完成之前阻塞住,在没有特别为多线程做修改的情况下,这种“贪婪”的函数会让CPU的市价分配有所倾斜。导致各个线程分配到的运行时间可能不尽相同,不尽公平。

18.3.5 Python的threading模块

核心提示:避免使用thread模块。更高级别的 threading 模块更为先进,对线程的支持更为完善,而且使用 thread 模块里的属性有可能会与 threading 出现冲突。其次,低级别的 thread 模块的同步原语很少(实际上只有一个),而 threading 模块则有很多。

thread 模块函数
start_new_thread(function,
args, kwargs=None)                 产生一个新的线程,在新线程中用指定的参数和可选的
                                                kwargs来调用这个函数。
allocate_lock()                         分配一个LockType 类型的锁对象
exit()                                         让线程退出

LockType类型锁对象方法
acquire(wait=None)                     尝试获取锁对象
locked()                                     如果获取了锁对象返回True,否则返回False
release()                                     释放锁

例子1:简单的多线程

 1 from time import sleep, ctime
 2 import thread
 3 def loop0():
 4     print "start loop 0 at:", ctime()
 5     sleep(4)
 6     print "loop 0 done at:", ctime()
 7 def loop1():
 8     print "start loop 1 at:", ctime()
 9     sleep(2)
10     print "loop 1 done at:", ctime()
11 def main():
12     print "starting at:", ctime()
13     thread.start_new_thread(loop0, ())
14     thread.start_new_thread(loop1, ())
15     sleep(6)
16     print "all DONE at:", ctime()
17  
18 if __name__ == "__main__":
19     main()

mtsleep1.py

其中sleep(6)是为了让线程中的sleep(2)和sleep(4)能够完整运行(而不是随着主线程结束直接终止)。
我们可以通过使用锁来保证主线程不会提前结束。

 1 #!/usr/bin/env python
 2 
 3 import thread
 4 from time import sleep, ctime
 5 
 6 loops = [4,2]
 7 
 8 def loop(nloop, nsec, lock):
 9     print 'start loop', nloop, 'at: ', ctime()  #运行时这条语句有错误输出
10     sleep(nsec)
11     print 'loop', nloop, 'done at: ', ctime()
12     lock.release()
13 
14 def main():
15     print 'starting at:', ctime()
16     locks = []
17     nloops = range(len(loops))
18 
19     for i in nloops:
20         lock = thread.allocate_lock()
21         lock.acquire()                 #尝试获得锁(将锁锁上)
22         locks.append(lock)
23     for i in nloops:
24         thread.start_new_thread(loop, (i, loops[i], locks[i]))
25 
26     for i in nloops:
27         while locks[i].locked():
28             pass
29     print 'all DONE at:', ctime()
30 main()

mtsleep2.py

18.5 threading 模块

接下来,我们要介绍的是更高级别的threading模块,它不仅提供了Thread类,还提供了各种好用的同步机制。

threading 模块对象                                         描述
Thread                                             表示一个线程的执行的对象
Lock                                                 锁原语对象(跟thread 模块里的锁对象相同)
RLock                                         可重入锁对象。使单线程可以再次获得已经获得了的锁(递归锁定)。
Condition                                     条件变量对象能让一个线程停下来,等待其它线程满足了某个“条件”。
                                                    如,状态的改变或值的改变。
Event                                     通用的条件变量。多个线程可以等待某个事件的发生,在事件发生后,
                                            所有的线程都会被激活。
Semaphore                                 为等待锁的线程提供一个类似“等候室”的结构
BoundedSemaphore                             与Semaphore 类似,只是它不允许超过初始值
Timer                                             与Thread 相似,只是,它要等待一段时间后才开始运行。

核心提示:守护线程
另一个避免使用thread 模块的原因是,它不支持守护线程。当主线程退出时,所有的子线程不论它们是否还在工作,都会被强行退出。有时,我们并不期望这种行为,这时,就引入了守护线程的概念
threading
模块支持守护线程,它们是这样工作的:守护线程一般是一个等待客户请求的服务器,如果没有客户提出请求,它就在那等着。如果你设定一个线程为守护线程,就
表示你在说这个线程是不重要的,在进程退出的时候,不用等待这个线程退出。就像你在第16
章网络编程看到的,服务器线程运行在一个无限循环中,一般不会退出。

如果你的主线程要退出的时候,不用等待那些子线程完成,那就设定这些线程的daemon
属性。即,在线程开始(调用thread.start())之前,调用setDaemon()函数设定线程的daemon
标志(thread.setDaemon(True))就表示这个线程“不重要”
如果你想要等待子线程完成再退出, 那就什么都不用做,
或者显式地调用thread.setDaemon(False)以保证其daemon
标志为False。你可以调用thread.isDaemon()函数来判断其daemon 标志的值。新的子线程会继承其父线程的daemon
标志。整个Python 会在所有的非守护线程退出后才会结束,即进程中没有非守护线程存在的时候才结束。

18.5.1Thread类

函数                                                 描述
start()                                             开始线程的执行
run()                                             定义线程的功能的函数(一般会被子类重写)
join(timeout=None)                         程序挂起,直到线程结束;如果给了timeout,则最多阻塞timeout 秒
getName()                                     返回线程的名字
setName(name)                               设置线程的名字
isAlive()                                         布尔标志,表示这个线程是否还在运行中
isDaemon()                                     返回线程的daemon 标志
setDaemon(daemonic)                把线程的daemon 标志设为daemonic(一定要在调用start()函数前调用)

有三种方法用Thread类创建线程。

1. 创建一个Thread的实例,传给它一个函数

2. 创建一个Thread的实例,传给它一个可调用的类对象

3. 从Thread派生出一个子类, 创建一个这个子类的实例

下面分别给出例子

1.创建一个Thread的实例,传给它一个函数

 1 #!/usr/bin/env python
 2 
 3 import threading
 4 from time import sleep, ctime
 5 
 6 loops = [4,2]
 7 
 8 def loop(nloop, nsec):
 9     print 'start loop', nloop, 'at:', ctime()
10     sleep(nsec)
11     print 'loop', nloop, 'done at:', ctime()
12 
13 def main():
14     print 'starting at:', ctime()
15     threads = []
16     nloops = range(len(loops))
17 
18     for i in nloops:
19         t = threading.Thread(target = loop, args = (i, loops[i]))
20         threads.append(t)
21 
22     for i in nloops:
23         threads[i].start()
24 
25     for i in nloops:
26         threads[i].join()
27 
28     print 'all DONE at:', ctime()
29 
30 if __name__ == '__main__':
31     main()

mtsleep3.py

所有线程都创建了之后,再一起调用start()函数启动,而不是创建一个启动一个,而且,不用再管理一堆锁(分配锁,获得锁,释放锁,检查锁的状态),只要简单地对每个线程调用join()函数就可以了。

join()会等到线程结束,或者在给力timeout参数的情况下等到超时,使用join()会比使用一个等待锁释放的无限循环更清晰(这种锁也被称为自旋锁)。

join()的另一个比较重要的方面是它可以完全不被调用。事实上一旦线程启动以后就会一直运行,直到线程的函数结束。如果你的主线程除了等线程结束以外还有其他的事情要做,那就不用调用join(),只有当你要等待线程结束的时候才调用join()。

2.创建一个Thread的实例,传给它一个可调用的类对象。

 1 #!/usr/bin/env python
 2 
 3 import threading
 4 from time import sleep, ctime
 5 
 6 loops = [4,2]
 7 
 8 class ThreadFunc(object):
 9 
10     def __init__(self, func, args, name=''):
11         self.name = name
12         self.func = func
13         self.args = args
14 
15     def __call__(self):
16         self.res = self.func(*self.args)
17 
18 def loop(nloop, nsec):
19         print 'start loop', nloop, 'at:', ctime()
20         sleep(nsec)
21         print 'loop', nloop, 'done at:', ctime()
22 
23 def main():
24     print 'starting at:', ctime()
25     threads = []
26     nloops = range(len(loops))
27 
28     for i in nloops:
29         t = threading.Thread(target = ThreadFunc(loop, (i,loops[i]),
30                                                  loop.__name__))
31         threads.append(t)
32 
33     for i in nloops:
34         threads[i].start()
35 
36     for i in nloops:
37         threads[i].join()
38 
39     print 'all DONE at:', ctime()
40 
41 if __name__ == '__main__':
42     main()
43         

mtsleep4.py

3.从Thread类中派生出一个子类,创建一个这个子类的实例

mtsleep5和mtsleep4的最大区别在于1.MyThread子类的构造器一定要先调用其基类的构造器 2.之前的特殊函数__call__()在子类中,名字要改为run()。

 1 #!/usr/bin/env python
 2 
 3 import threading
 4 from time import sleep, ctime
 5 
 6 loops = (4, 2)
 7 
 8 class MyThread(threading.Thread):
 9     def __init__(self, func, args, name=''):
10             threading.Thread.__init__(self)
11             self.name = name
12             self.func = func
13             self.args = args
14 
15     def run(self):
16         self.func(*self.args) #apply(self.func, self.args)
17 
18 def loop(nloop, nsec):
19     print 'start loop', nloop, 'at:', ctime()
20     sleep(nsec)
21     print 'loop', nloop, 'done at:', ctime()
22 
23 def main():
24     print 'starting at:', ctime()
25     threads = []
26     nloops = range(len(loops))
27 
28     for i in nloops:
29         t = MyThread(loop, (i, loops[i]), loop.__name__)
30         threads.append(t)
31 
32     for i in nloops:
33         threads[i].start()
34 
35     for i in nloops:
36         threads[i].join()
37 
38     print 'all DONE at:', ctime()
39 
40 if __name__ == '__main__':
41     main()

mtsleep5.py

为了让mtsleep5中的Thread的子类更为通用我们把子类单独放在一个模块中,并加上一个getResult()函数用以返回函数的运行结果

 1 #!/usr/bin/eny python
 2 
 3 import threading
 4 from time import ctime
 5 
 6 class MyThread(threading.Thread):
 7     def __init__(self, func, args, name=''):
 8         threading.Thread.__init__(self)
 9         self.name = name
10         self.func = func
11         self.args = args
12 
13     def getResult(self):
14         return self.res
15     
16     def run(self):
17         print 'starting', self.name, 'at:'. ctime()
18         self.res = self.func(*self.args)
19         print self.name, 'finished at:', ctime()

myThread.py

接下来给出一个脚本比较递归求斐波那契、阶乘和累加和函数的运行。脚本先在单线程中运行再在多线程中运行以说明多线程的好处。

 1 #!/usr/bin/eny python
 2 
 3 from myThread import MyThread
 4 from time import ctime, sleep
 5 
 6 def fib(x):
 7     sleep(0.005)
 8     if x < 2: return 1
 9     return (fib(x-2) + fib(x-1))
10 
11 def fac(x):
12     sleep(0.1)
13     if x < 2:return 1
14     return (x * fac(x-1))
15 
16 def sum(x):
17     sleep(0.1)
18     if x < 2:return 1
19     return (x + sum(x - 1))
20 
21 funcs = [fib, fac, sum]
22 n = 12
23 
24 def main():
25     nfuncs = range(len(funcs))
26 
27     print '*** SINGLE THREAD'
28     for i in nfuncs:
29         print 'starting', funcs[i].__name__, 'at:', ctime()
30         print funcs[i](n)
31         print funcs[i].__name__, 'finished at:', ctime()
32 
33     print '
*** MUTIPLE THREADS'
34     threads = []
35     for i in nfuncs:
36         t = MyThread(funcs[i],(n,), funcs[i].__name__)
37         threads.append(t)
38 
39     for i in nfuncs:
40         threads[i].start()
41 
42     for i in nfuncs:
43         threads[i].join()
44         print threads[i].getResult()
45 
46     print 'all DONE'
47 
48 if __name__ == '__main__':
49     main()

mtfacfib.py

18.5.3threading模块中的其他函数

函数 描述
activeCount() 当前活动的线程对象的数量
crrentThread() 返回当前线程对象
enumerate() 返回当前活动线程的列表
settrace(func) 为所有线程设置一个跟踪函数
setprofile(func) 为所有线程设置一个profile函数

18.5.4 生产者-消费者问题和Queue模块

生产者-消费者问题,就是生产者把生产的货物放进队列一类的数据结构中供消费者使用,其中生产货物和消费货物的时间都是不固定的。

Queue模块可以用来解决生产者-消费者问题,让各个线程之间通信,所用到的属性如下:

函数 描述
Queue模块函数
queue(size) 创建一个大小为size的Queue对象
Queue对象函数
qsize() 返回队列的大小(由于在返回的时候,队列可能会被其他线程修改,所以这个值是近似值)
empty()

如果队列为空返回True, 否则返回False

full() 如果队列已满返回True,否则返回False
put(item, block=0) 把item放到队列中,如果给了block,函数会一直阻塞到队列中有空间为止
get(block=0) 从队列中取一个对象,如果给了block,函数会一直阻塞直到队列中有对象为止。

 1 #!/usr/bin/env python
 2 
 3 from random import randint
 4 from time import sleep
 5 from Queue import Queue
 6 from myThread import MyThread
 7 
 8 def writeQ(queue):
 9     print 'producing object for Q...'
10     queue.put('xxx', 1)
11     print "size now", queue.qsize()
12 
13 def readQ(queue):
14     val = queue.get(1)
15     print 'consumed object from Q... size now', queue.qsize()
16 
17 def writer(queue, loops):
18     for i in range(loops):
19         writeQ(queue)
20         sleep(randint(1, 3))
21 
22 def reader(queue, loops):
23     for i in range(loops):
24         readQ(queue)
25         sleep(randint(1, 3))
26 
27 funcs = [writer, reader]
28 nfuncs = range(len(funcs))
29 
30 def main():
31     nloops = randint(2, 5)
32     q = Queue(32)
33 
34     threads = []
35     for i in nfuncs:
36         t = MyThread(funcs[i], (q, nloops), funcs[i].__name__)
37         threads.append(t)
38 
39     for i in nfuncs:
40         threads[i].start()
41 
42     for i in nfuncs:
43         threads[i].join()
44 
45     print 'all DONE'
46 
47 if __name__ == '__main__':
48     main()

procons.py

18.6相关模块

模块 描述
thread 基本的、低级别的线程模块
threading 高级别的线程和同步对象
Queue 供多线程使用的同步队列
mutex 互斥对象
SocketServer 具有线程控制的TCP和UDP管理器