Derivation
In most cases, the synchornizer need to be derived for catching some special messages. For example, we may need to catch the progress of a running program. But we do not want to parse the text-based messages when reading the results. In this case, we could override the parse_lines
method of the buffer. For example:
import re
import time
import threading
import multiprocessing
from contextlib import redirect_stdout
try:
from typing import Sequence
except ImportError:
from collections.abc import Sequence
import syncstream
class CustomedBuffer(syncstream.LineProcBuffer):
'''Customized buffer, used for catching the progress of a program.'''
def __init__(self, maxlen: int = 10) -> None:
'''Initialization.'''
super().__init__(maxlen=maxlen)
self.__r_prog = re.compile(pattern=r'\>\>\> Progress: ([\d]*?)%\.')
self.__info_lock = threading.Lock()
self.__progress = 0
@property
def progress(self) -> int:
'''Get the current progress.'''
with self.__info_lock:
return self.__progress
def parse_lines(self, lines: Sequence[str]) -> None:
'''Parse the lines
This method would be triggered each time a new line is written to the buffer.
The overriding method would catch the useful information from specific patterns.
'''
remained_lines = list()
for line in lines:
# Catch the progress.
res = self.__r_prog.fullmatch(line)
if res is not None:
with self.__info_lock:
self.__progress = int(res.group(1))
continue
# Send remained lines to the storage.
remained_lines.append(line)
return super().parse_lines(remained_lines)
def worker_process(buffer: syncstream.LineProcMirror):
'''Define the workder_process.'''
try:
with redirect_stdout(buffer):
for i in range(10):
time.sleep(0.5)
print('>>> Progress: {0:d}%.'.format(i * 10))
print('Message', 'item')
print('>>> Progress: 100%.')
except Exception as err:
buffer.send_error(err)
else:
buffer.send_eof()
if __name__ == '__main__':
pbuf = CustomedBuffer(10) # Initialization.
with multiprocessing.Pool(1) as pool:
pool.map_async(
worker_process,
tuple(pbuf.mirror for _ in range(1))
) # Run only one process.
# Move the wait method to the background.
thd = threading.Thread(target=pbuf.wait, daemon=True)
thd.start()
print('Start to wait.')
while pbuf.progress < 100:
time.sleep(0.1)
print('Progress is 100%.')
thd.join()
messages = pbuf.read() # Get results.
for mitem in messages:
print(mitem)
In this example, we override the parse_lines
method of syncstream.LineProcBuffer
. It is still important to run the pbuf.wait
in the background. Because the buffer would not be updated unless pbuf.wait
or pbuf.receive
is called. We move this check to a daemon thread. Then we check the progress repeatly until we get 100%. The results does not contain any messages like >>> Progress ...
, beause such messages have been already catched for updating the progress value.