Derivation
In most cases, the synchornizer need to be derived for catching some special messages. For example, we may need to catch the progress of a running program. But we do not want to parse the text-based messages when reading the results. In this case, we could override the parse_lines
method of the buffer. For example:
import re
import time
import threading
import multiprocessing
from typing import Union
try:
from typing import Sequence
except ImportError:
from collections.abc import Sequence
import syncstream
class CustomedBuffer(syncstream.LineProcBuffer):
"""Customized buffer, used for catching the progress of a program."""
def __init__(self, maxlen: int = 10) -> None:
"""Initialization."""
super().__init__(maxlen=maxlen)
self.__r_prog = re.compile(pattern=r"\>\>\> Progress: ([\d]*?)%\.")
self.__info_lock = threading.Lock()
self.__progress = 0
@property
def progress(self) -> int:
"""Get the current progress."""
with self.__info_lock:
return self.__progress
def parse_lines(
self, lines: Sequence[Union[str, syncstream.GroupedMessage]]
) -> None:
"""Parse the lines
This method would be triggered each time a new line is written to the buffer.
The overriding method would catch the useful information from specific patterns.
"""
remained_lines = list()
for line in lines:
if not isinstance(line, str):
remained_lines.append(line)
continue
# Catch the progress.
res = self.__r_prog.fullmatch(line)
if res is not None:
with self.__info_lock:
self.__progress = int(res.group(1))
continue
# Send remained lines to the storage.
remained_lines.append(line)
return super().parse_lines(remained_lines)
def worker_process(buffer: syncstream.LineProcMirror):
"""Define the workder_process."""
with buffer:
for i in range(10):
time.sleep(0.5)
print(">>> Progress: {0:d}%.".format(i * 10))
time.sleep(0.5)
print("Message", "item")
print(">>> Progress: 100%.")
if __name__ == "__main__":
pbuf = CustomedBuffer(10) # Initialization.
with multiprocessing.Pool(1) as pool:
pool.map_async(
worker_process, tuple(pbuf.mirror for _ in range(1))
) # Run only one process.
# Move the wait method to the background.
thd = threading.Thread(target=pbuf.wait, daemon=True)
thd.start()
print("Start to wait.")
prev_progress = 10
while pbuf.progress < 100:
if pbuf.progress >= prev_progress:
print("Progress >= {0}%.".format(prev_progress))
prev_progress += 10
time.sleep(0.1)
print("Progress is 100%.")
thd.join()
messages = pbuf.read() # Get results.
for mitem in messages:
print(mitem)
In this example, we override the parse_lines
method of syncstream.LineProcBuffer
. It is still important to run the pbuf.wait
in the background. Because the buffer would not be updated unless pbuf.wait
or pbuf.receive
is called. We move this check to a daemon thread. Then we check the progress repeatly until we get 100%. The results does not contain any messages like >>> Progress ...
, beause such messages have been already catched for updating the progress value.