继承
绝大多数情况下,要获取某些特殊信息,需要继承同步器做二次开发。例如,有可能需要捕获一个正在运行的程序的进度。不过,在读取捕获结果的过程中、处理那些文字信息又非我所乐见。因此,需要重载缓存的parse_lines
方法,例如:
derivation.py
import re
import time
import threading
import multiprocessing
from typing import Union
try:
from typing import Sequence
except ImportError:
from collections.abc import Sequence
import syncstream
class CustomedBuffer(syncstream.LineProcBuffer):
"""定制的缓存,用来捕获程序的进度。"""
def __init__(self, maxlen: int = 10) -> None:
"""初始化。"""
super().__init__(maxlen=maxlen)
self.__r_prog = re.compile(pattern=r"\>\>\> Progress: ([\d]*?)%\.")
self.__info_lock = threading.Lock()
self.__progress = 0
@property
def progress(self) -> int:
"""获取当前进度。"""
with self.__info_lock:
return self.__progress
def parse_lines(
self, lines: Sequence[Union[str, syncstream.GroupedMessage]]
) -> None:
"""处理行
每次将要向缓存写入新行时,调用此方法。
重载此方法用来获取具有特定模式的有用信息。
"""
remained_lines = list()
for line in lines:
if not isinstance(line, str):
remained_lines.append(line)
continue
# 捕获进度。
res = self.__r_prog.fullmatch(line)
if res is not None:
with self.__info_lock:
self.__progress = int(res.group(1))
continue
# 将不属于进度的行送入缓存。
remained_lines.append(line)
return super().parse_lines(remained_lines)
def worker_process(buffer: syncstream.LineProcMirror):
"""定义workder_process。"""
with buffer:
for i in range(10):
time.sleep(0.5)
print(">>> Progress: {0:d}%.".format(i * 10))
time.sleep(0.5)
print("Message", "item")
print(">>> Progress: 100%.")
if __name__ == "__main__":
pbuf = CustomedBuffer(10) # 初始化。
with multiprocessing.Pool(1) as pool:
pool.map_async(
worker_process, tuple(pbuf.mirror for _ in range(1))
) # 只运行一个进程。
# 将等待方法移到背景。
thd = threading.Thread(target=pbuf.wait, daemon=True)
thd.start()
print("开始等待。")
prev_progress = 10
while pbuf.progress < 100:
if pbuf.progress >= prev_progress:
print("进度达到{0}%。".format(prev_progress))
prev_progress += 10
time.sleep(0.1)
print("进度达到100%。")
thd.join()
messages = pbuf.read() # 获取结果。
for mitem in messages:
print(mitem)
在此例中,重载了syncstream.LineProcBuffer
的parse_lines
方法。需要强调的是,仍然需要在背景里运行pbuf.wait
方法,这是由于只有运行pbuf.wait
或pbuf.receive
,buffer才会得到更新。此处将这一检查移到了守护线程中。接下来、不断检查进度,直到100%为止。所捕获的结果不包含任何形如>>> Progress ...
的信息,这是由于此类信息已经缓存给了进度值的更新。