Python并发:线程和进程

multiprocessing

可以通过派生multiprocessing.Process来构造一个子进程:

class MyProcess(multiprocessing.Process):
    def __init__(self, name):
        super(MyProcess, self).__init__(name=name)

    def run(self):
        // 进程实际的逻辑
        print 'Worker: %d. pid: %d' % (num, os.getpid())

再通过p.start()来启动子进程,然后通过p.join()方法来使得子进程运行结束后再执行父进程。

jobs = [MyProcess(str(i)) for i in range(5)]
for p in jobs:
    p.start()
for p in jobs:
    p.join()

multiprocessing.Pool

PoolProcess做了进一步封装,它默认按照CPU的数量创建子进程。

p = multiprocessing.Pool()
for i in range(5):
    p.apply_async(worker, args=(i,))
p.close()       # 关闭pool,执行close()将不能再调用apply_async()添加子进程
p.join()

【注】如果使用了Pool,那么不能直接用Queue.Queue()在父子进程之间传递消息,必须使用multiprocessing里的Queue。 【注】同时使用mamager.Queue()multiprocessing.Lock() 会有问题。

import multiprocessing

def write(q):
    for value in range(10):
        print 'Put %s to queue...' % value
        q.put(value)
    # 需要有标识队列结束
    q.put(None)

def read(q):
    while True:
        value = q.get(False)
        if value is None:
            break
        print 'Get %s from queue.' % value

manager = multiprocessing.Manager()
# 父进程创建Queue,并传给各个子进程:
q = manager.Queue()
p = multiprocessing.Pool()
p.apply_async(write, args=(q, ))
p.apply_async(read, args=(q, ))
p.close()
p.join()

print '所有数据都写入并且读完'

threading

threading.Threadmultiprocessing.Process类似,可以派生一个新类:

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s')

class MyThread(threading.Thread):
    def __init__(self):
        super(MyThread, self).__init__(name='thread name')

    def run(self):  # 必须实现run()函数
        # do something
        logging.debug('print thread name')

使用的时候:

thr = MyThread()
thr.start()
thr.join()

获取当前thread名字:threading.current_thread.getName()

在使用threading的时候,和C++开发很大的差异是没法和signal混用。 只有main thread能够接收signal,其他线程是没法感知到signal的。 所以,如果需要接收signal,一定要保证main thread存活。

python thread(非daemon)通信建议的方法是:threading.EventEvent.wait() 会一直block,直到调用Event.set(),返回True

示例:

class MyThread(threading.Thread):
    def __init__(self):
        super(MyThread, self).__init__()
        self.event = threading.Event()

    def run(self):
        while not self.event.is_set():
            print "something"
            self.event.wait(10)

thr = MyThread()
thr.start()

def stop():
    thr.event.set()

signal.signal(signal.SIGINT, stop)
signal.signal(signal.SIGTERM, stop)

# 注意需保证main thread能接收到signal,主线程不要退出。

thr.join()

Queue.Queue

Python Queue既可以用在多线程也可以用在多进程中通信。

队列中常用的方法

  • Queue.qsize() 返回队列的大小
  • Queue.empty() 如果队列为空,返回True,反之False
  • Queue.full() 如果队列满了,返回True,反之False
  • Queue.get([block[, timeout]]) 获取队列,timeout等待时间
  • Queue.get_nowait() 相当Queue.get(False)
  • Queue.put(item) 写入队列,timeout等待时间
  • Queue.put_nowait(item) 相当Queue.put(item, False)
  • Queue.task_done() 一般调用 get() 并处理完毕后,再调用一下task_done()
  • Queue.join() 阻塞,直到队列元素全处理完毕

一定要调用 Queue.task_done(),否则queue不空最终join不会退出,可以这样idiom:

def worker():
    while True:
        item = q.get()
        try:
            1/item
        except Exception as e:
            print e
        finally:
            q.task_done()

各种Queue

  • Queue.Queue
  • multiprocessing.Queue
  • multiprocessing.JoinableQueue 需要调用task_done,保证队列里的任务全部处理完(感觉比multiprocessing.Queue慢很多)
  • multiprocessing.manager.Queue

concurrent.features

Future(类似还有Promise和Delay)用于一些并发语言中做同步proxy,它是一个对象,实际是未知结果的proxy(因为是异步处理,此时结果还没计算出来)。

把任务分解到多个进程去执行,可以绕过GIL:

with concurrent.futures.ProcessPoolExecutor() as executor:
     result = executor.map(function, iterable)

如果瓶颈在IO的话,也可以使用ThreadPoollExcutor,此时也会有性能提升。

参考