继承
绝大多数情况下,要获取某些特殊信息,需要继承同步器做二次开发。例如,有可能需要捕获一个正在运行的程序的进度。不过,在读取捕获结果的过程中、处理那些文字信息又非我所乐见。因此,需要重载缓存的parse_lines方法,例如:
derivation.py
import re
import time
import threading
import multiprocessing
from contextlib import redirect_stdout
try:
    from typing import Sequence
except ImportError:
    from collections.abc import Sequence
import syncstream
class CustomedBuffer(syncstream.LineProcBuffer):
    '''定制的缓存,用来捕获程序的进度。'''
    def __init__(self, maxlen: int = 10) -> None:
        '''初始化。'''
        super().__init__(maxlen=maxlen)
        self.__r_prog = re.compile(pattern=r'\>\>\> Progress: ([\d]*?)%\.')
        self.__info_lock = threading.Lock()
        self.__progress = 0
    
    @property
    def progress(self) -> int:
        '''获取当前进度。'''
        with self.__info_lock:
            return self.__progress
    def parse_lines(self, lines: Sequence[str]) -> None:
        '''处理行
        每次将要向缓存写入新行时,调用此方法。
        重载此方法用来获取具有特定模式的有用信息。
        '''
        remained_lines = list()
        for line in lines:
            # 捕获进度。
            res = self.__r_prog.fullmatch(line)
            if res is not None:
                with self.__info_lock:
                    self.__progress = int(res.group(1))
                continue
            # 将不属于进度的行送入缓存。
            remained_lines.append(line)
        return super().parse_lines(remained_lines)
def worker_process(buffer: syncstream.LineProcMirror):
    '''定义workder_process。'''
    try:
        with redirect_stdout(buffer):
            for i in range(10):
                time.sleep(0.5)
                print('>>> Progress: {0:d}%.'.format(i * 10))
            print('Message', 'item')
            print('>>> Progress: 100%.')
    except Exception as err:
        buffer.send_error(err)
    else:
        buffer.send_eof()
if __name__ == '__main__':
    pbuf = CustomedBuffer(10)  # 初始化。
    with multiprocessing.Pool(1) as pool:
        pool.map_async(
            worker_process,
            tuple(pbuf.mirror for _ in range(1))
        )  # 只运行一个进程。
        # 将等待方法移到背景。
        thd = threading.Thread(target=pbuf.wait, daemon=True)
        thd.start()
        print('开始等待。')
        while pbuf.progress < 100:
            time.sleep(0.1)
        print('进度达到100%。')
        thd.join()
    messages = pbuf.read()  # 获取结果。
    for mitem in messages:
        print(mitem)
在此例中,重载了syncstream.LineProcBuffer的parse_lines方法。需要强调的是,仍然需要在背景里运行pbuf.wait方法,这是由于只有运行pbuf.wait或pbuf.receive,buffer才会得到更新。此处将这一检查移到了守护线程中。接下来、不断  检查进度,直到100%为止。所捕获的结果不包含任何形如>>> Progress ...的信息,这是由于此类信息已经缓存给了进度值的更新。