同步于进程之间
在不同的子进程之间同步信息并不轻松,这是由于不同进程的print函数会相互干扰。此包就是为了解决这个问题而设计的。若用户的需求是、在同一程序的不同子进程之间捕获标准输出,则建议使用LineProcBuffer。
下例捕获了来自四个不同进程的信息:
sync-proc.py
import multiprocessing
from contextlib import redirect_stdout
import syncstream
def worker_process(buffer: syncstream.LineProcMirror):
    '''定义workder_process。'''
    try:
        with redirect_stdout(buffer):
            print('Message', 'item')
    except Exception as err:
        buffer.send_error(err)
    else:
        buffer.send_eof()
if __name__ == '__main__':
    pbuf = syncstream.LineProcBuffer(10)  # 初始化。
    with multiprocessing.Pool(4) as pool:
        pool.map_async(
            worker_process,
            tuple(pbuf.mirror for _ in range(4))
        )  # 运行四个进程。
        pbuf.wait()  # 等待终结信号。
    messages = pbuf.read()  # 获得结果。
    for mitem in messages:
        print(mitem)
在基于进程的同步里,不同进程之间共享一个缓存(LineProcBuffer)。可以令捕获信息的最大行数设置为10:
pbuf = syncstream.LineProcBuffer(10)
不同于用于多线程场景的LineBuffer,LineProcBuffer自身并不直接在进程之间共享。取而代之的是,使用以下属性来获取缓存的“拷贝”。
mirror = pbuf.mirror
值mirror是syncstream.LineProcMirror的实例。值mirror可以视为缓存的拷贝。不同于缓存本身,它只用于维护要写入缓存的当前行。当进程运行的时候,缓存本身、和它的多个镜像(mirrors)之间、属于收发信息的关系。对于不同mirror,和缓存之间的通信是进程安全的。
该方法
pbuf.wait()
用来等待所有mirror的关闭信号。调用buffer.send_error或buffer.send_eof将触发关闭信号。在上例中,pbuf需要等待来自不同子进程的四个关闭信号。
危险
任何情况下,都不应拷贝mirror,这是由于每次获取属性值pbuf.mirror时,pbuf所维护的子进程计数将会增加1。因此,将tuple(pbuf.mirror for _ in range(4))替换成(pbuf.mirror, ) * 4是不允许的。
信息
方法buffer.send_error是用来捕获异常信息的。若触发了异常,在pbuf中将会写入一个特殊的实例GroupedMessage。该实例可以直接打印,且内容包含了所抛出异常的回溯信息(traceback)。