Python 多进程

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import time
import os

单进程

def music(name):
    for i in range(3):
        print('Process %s 正在播放歌曲: %s.' % (os.getpid(), name))
        time.sleep(2)

def movie(name):
    for i in range(3):
        print('Process %s 正在播放视频: %s.' % (os.getpid(), name))
        time.sleep(2)

if __name__ == '__main__':
    music(u'张信哲 - 信仰')
    movie(u'天下无贼')

1.Process类的参数:
Process(self,group=None,target=None,name=None,args=(),kwargs={})
group :group参数并不使用;
target:表示调用对象,即该进程要执行的任务,或者说是传入一个方法;
name : 进程的别名;
args : srgs是一个元组,它里面是调用target方法需要传入的参数; args=(u'张信哲 - 信仰',)
kwargs:kwargs是一个字典,它里面是调用target方法需要传入的关键字参数; kwargs={"name": u'张信哲 - 信仰'}

2.Process类的方法:
start() : 启动进程,并调用方法run();
run() : 进程启动后运行target调用函数的方法;
join(timeout) :让主线程等待子进程结束后再继续进行,通常用于进程间的同步。timeout是可选的超时时间,即超过该时间主线程将不再继续等待。
terminate() :强制终止进程,但不会进行任何清理操作;
is_alive() :判断进程是否"存活",存活返回True,否则返回False;

启动2个进程,主进程会等待所有的子进程执行结束再结束。

def music(name):
    for i in range(3):
        print('Process %s 正在播放歌曲: %s.' % (os.getpid(), name))
        time.sleep(2)

def movie(name):
    for i in range(3):
        print('Process %s 正在播放视频: %s.' % (os.getpid(), name))
        time.sleep(2)

if __name__ == '__main__':
    music_process = multiprocessing.Process(target=music, args=(u'张信哲 - 信仰',))
    movie_process = multiprocessing.Process(target=movie, args=(u'天下无贼',))

    music_process.start()
    movie_process.start()
    time.sleep(1)
    print('主进程执行完毕!')

启动2个进程,设置子进程守护,当主进程结束时,子进程也不再继续执行,直接结束。

def music(name):
    for i in range(3):
        print('Process %s 正在播放歌曲: %s.' % (os.getpid(), name))
        time.sleep(2)

def movie(name):
    for i in range(3):
        print('Process %s 正在播放视频: %s.' % (os.getpid(), name))
        time.sleep(2)

if __name__ == '__main__':
    music_process = multiprocessing.Process(target=music, args=(u'张信哲 - 信仰',))
    movie_process = multiprocessing.Process(target=movie, args=(u'天下无贼',))

    # 子进程就会守护主进程,主进程结束,子进程也会自动销毁
    music_process.daemon = True
    movie_process.daemon = True

    music_process.start()
    movie_process.start()

    time.sleep(1)
    print('主进程执行完毕!')

进程池函数Pool()可以用来批量创建子进程,也可以用来管理进程。进程池常用方法:
1.apply(func,args=(),kwargs={}):将func函数提交给进程池处理。
2.apply_async(func,args=(),kwargs={},callback=None,error_callback=None):apply()方法的异步版本,该方法不会被阻塞。其中callback指定func函数完成后的回调函数,error_callback指定func函数出错后的回调函数。这两个参数可以省略。
3.map(func,iterable) :类似于Python内置的map函数。
4.map_async(func,iterable,callback=None,error_callback=None):map()的异步版本,该方法不会被阻塞。
5.close() :关闭进程池。不允许进程池继续接受新的任务,执行close()方法后,进程池会在当前进程池中所有任务执行完后关闭自己。
6.terminate() :立即终止进程池。
7.join() :等待进程池中所有任务完成。此方法只能在close()或terminate()之后调用。
说明:
1.回调函数:只有异步函数才会有回调函数,即每当进程池的的某个进程的任务处理完成了,它就将处理结果交给回调函数,由回调函数作进一步处理后告知主进程,再由主进程调用其它函数处理回调函数返回的结果。
如果是在主进程等到进程池中的所有任务都执行完毕了再统一处理结果,则不需要回调函数。
2.当创建一个进程池时,可以设定它的最大进程容纳数。之后每当有新的请求提交到Pool中时,如果池还没满,那就创建一个进程来执行该请求,如果进程池已满,那么该请求将会等待,直到池中有进程结束。如果没有设置Pool的大小,那么默认是CPU的核数。

进程池

def calculate(num):
    return num ** 2

if __name__ == '__main__':
    pool = multiprocessing.Pool(5)  # 创建一个5个进程的进程池

    # apply方法创建进程,同步,一般不使用
    pool.apply(func=calculate, args=(1,))
    # apply方法创建进程,获取返回值,同步,一般不使用
    result = pool.apply(func=calculate, args=(2,))
    print(result)

    # apply_async方法创建进程
    pool.apply_async(func=calculate, args=(3,))
    # apply_async方法创建进程,使用get()获取返回值
    result = pool.apply_async(func=calculate, args=(4,))
    print(result.get())

    pool.close()
    pool.join()

    time.sleep(1)
    print('主进程执行完毕!')

进程池,for循环使用进程池,无返回值

def calculate(num):
    time.sleep(1)
    print(num ** 2)

if __name__ == '__main__':
    pool = multiprocessing.Pool(5)  # 创建一个5个进程的进程池

    for i in range(1,5):
        pool.apply_async(func=calculate, args=(i,))

    pool.close()
    pool.join()

    time.sleep(1)
    print('主进程执行完毕!')

进程池,for循环使用进程池,有返回值,会被阻塞

def calculate(num):
    time.sleep(1)
    return num ** 2

if __name__ == '__main__':
    pool = multiprocessing.Pool(5)  # 创建一个5个进程的进程池

    for i in range(1,5):
        result = pool.apply_async(func=calculate, args=(i,))
        print(result.get())

    pool.close()
    pool.join()

    time.sleep(1)
    print('主进程执行完毕!')

进程池map,传入一个可迭代对象

def play(name):
    for i in range(3):
        print('Process %s 正在播放: %s.' % (os.getpid(), name))
        time.sleep(2)

if __name__ == '__main__':
    pool = multiprocessing.Pool(5)  # 创建一个5个进程的进程池

    pool.map(func=play, iterable=(u'张信哲 - 信仰', u'天下无贼', u'春天', u'夏天', u'秋天', u'冬天',))

    pool.close()
    pool.join()

    time.sleep(1)
    print('主进程执行完毕!')

进程池map,传入一些参数,以及一个可迭代对象,使用 functools.partial

functools.partial 基于一个函数创建一个新的可调用对象,把原函数的某些参数固定
from functools import partial
def play(arg_a, arg_b, name):
    for i in range(3):
        print('%s %s Process %s 正在播放: %s.' % (arg_a, arg_b, os.getpid(), name))
        time.sleep(2)

if __name__ == '__main__':
    pool = multiprocessing.Pool(5)  # 创建一个5个进程的进程池

    arg_a = 'Hello'
    arg_b = 'World'

    # 可迭代对象中的元素为最后一个参数
    pool.map(func=partial(play, arg_a, arg_b), iterable=(u'张信哲 - 信仰', u'天下无贼', u'春天', u'夏天', u'秋天', u'冬天',))

    pool.close()
    pool.join()

    time.sleep(1)
    print('主进程执行完毕!')

上面的方法都不能拿来传第数据库游标等类型的数据 TypeError: can't pickle sqlite3.Cursor objects
pool.map(func=partial(proxy_check, cur, con), iterable=result)
pool.starmap(func=proxy_check, iterable=zip(repeat(cur), repeat(con), (u'张信哲 - 信仰', u'天下无贼', u'春天', u'夏天', u'秋天', u'冬天',)))

进程池map,传入一些参数,以及一个可迭代对象,使用 multiprocessing.starmap

itertools.repeat(val, num) 基于变量创建一个可调用对象,数据將被迭代 num 次,若不指定數字將重复无数次。
from itertools import repeat
def play(arg_a, arg_b, name):
    for i in range(3):
        print('%s %s Process %s 正在播放: %s.' % (arg_a, arg_b, os.getpid(), name))
        time.sleep(2)

if __name__ == '__main__':
    pool = multiprocessing.Pool(5)  # 创建一个5个进程的进程池

    arg_a = 'Hello'
    arg_b = 'World'
    # 参数位置随意参数
    pool.starmap(func=play, iterable=zip(repeat(arg_a), repeat(arg_b), (u'张信哲 - 信仰', u'天下无贼', u'春天', u'夏天', u'秋天', u'冬天',)))

    pool.close()
    pool.join()

    time.sleep(1)
    print('主进程执行完毕!')

进程池map,传入一个可迭代对象,获取结果

def calculate(num):
    return num ** 2

if __name__ == '__main__':
    pool = multiprocessing.Pool(5)  # 创建一个5个进程的进程池

    result = pool.map(func=calculate, iterable=(1, 2, 3, 4, 5, 6))  # map()会等到所有进程都执行完毕后,再把结果以列表的形式返回。
    print(result)

    pool.close()
    pool.join()

    time.sleep(1)
    print('主进程执行完毕!')