跳到主要内容
版本:0.3.x

继承

绝大多数情况下,要获取某些特殊信息,需要继承同步器做二次开发。例如,有可能需要捕获一个正在运行的程序的进度。不过,在读取捕获结果的过程中、处理那些文字信息又非我所乐见。因此,需要重载缓存的parse_lines方法,例如:

derivation.py
import re
import time
import threading
import multiprocessing
from contextlib import redirect_stdout

try:
from typing import Sequence
except ImportError:
from collections.abc import Sequence

import syncstream


class CustomedBuffer(syncstream.LineProcBuffer):
'''定制的缓存,用来捕获程序的进度。'''

def __init__(self, maxlen: int = 10) -> None:
'''初始化。'''
super().__init__(maxlen=maxlen)
self.__r_prog = re.compile(pattern=r'\>\>\> Progress: ([\d]*?)%\.')
self.__info_lock = threading.Lock()
self.__progress = 0

@property
def progress(self) -> int:
'''获取当前进度。'''
with self.__info_lock:
return self.__progress

def parse_lines(self, lines: Sequence[str]) -> None:
'''处理行
每次将要向缓存写入新行时,调用此方法。
重载此方法用来获取具有特定模式的有用信息。
'''
remained_lines = list()
for line in lines:
# 捕获进度。
res = self.__r_prog.fullmatch(line)
if res is not None:
with self.__info_lock:
self.__progress = int(res.group(1))
continue
# 将不属于进度的行送入缓存。
remained_lines.append(line)
return super().parse_lines(remained_lines)


def worker_process(buffer: syncstream.LineProcMirror):
'''定义workder_process。'''
try:
with redirect_stdout(buffer):
for i in range(10):
time.sleep(0.5)
print('>>> Progress: {0:d}%.'.format(i * 10))
print('Message', 'item')
print('>>> Progress: 100%.')
except Exception as err:
buffer.send_error(err)
else:
buffer.send_eof()


if __name__ == '__main__':
pbuf = CustomedBuffer(10) # 初始化。
with multiprocessing.Pool(1) as pool:
pool.map_async(
worker_process,
tuple(pbuf.mirror for _ in range(1))
) # 只运行一个进程。
# 将等待方法移到背景。
thd = threading.Thread(target=pbuf.wait, daemon=True)
thd.start()
print('开始等待。')
while pbuf.progress < 100:
time.sleep(0.1)
print('进度达到100%。')
thd.join()

messages = pbuf.read() # 获取结果。
for mitem in messages:
print(mitem)

在此例中,重载了syncstream.LineProcBufferparse_lines方法。需要强调的是,仍然需要在背景里运行pbuf.wait方法,这是由于只有运行pbuf.waitpbuf.receive,buffer才会得到更新。此处将这一检查移到了守护线程中。接下来、不断检查进度,直到100%为止。所捕获的结果不包含任何形如>>> Progress ...的信息,这是由于此类信息已经缓存给了进度值的更新。