Skip to main content
Version: 0.3.x

Derivation

In most cases, the synchornizer need to be derived for catching some special messages. For example, we may need to catch the progress of a running program. But we do not want to parse the text-based messages when reading the results. In this case, we could override the parse_lines method of the buffer. For example:

derivation.py
import re
import time
import threading
import multiprocessing
from contextlib import redirect_stdout

try:
from typing import Sequence
except ImportError:
from collections.abc import Sequence

import syncstream


class CustomedBuffer(syncstream.LineProcBuffer):
'''Customized buffer, used for catching the progress of a program.'''

def __init__(self, maxlen: int = 10) -> None:
'''Initialization.'''
super().__init__(maxlen=maxlen)
self.__r_prog = re.compile(pattern=r'\>\>\> Progress: ([\d]*?)%\.')
self.__info_lock = threading.Lock()
self.__progress = 0

@property
def progress(self) -> int:
'''Get the current progress.'''
with self.__info_lock:
return self.__progress

def parse_lines(self, lines: Sequence[str]) -> None:
'''Parse the lines
This method would be triggered each time a new line is written to the buffer.
The overriding method would catch the useful information from specific patterns.
'''
remained_lines = list()
for line in lines:
# Catch the progress.
res = self.__r_prog.fullmatch(line)
if res is not None:
with self.__info_lock:
self.__progress = int(res.group(1))
continue
# Send remained lines to the storage.
remained_lines.append(line)
return super().parse_lines(remained_lines)


def worker_process(buffer: syncstream.LineProcMirror):
'''Define the workder_process.'''
try:
with redirect_stdout(buffer):
for i in range(10):
time.sleep(0.5)
print('>>> Progress: {0:d}%.'.format(i * 10))
print('Message', 'item')
print('>>> Progress: 100%.')
except Exception as err:
buffer.send_error(err)
else:
buffer.send_eof()


if __name__ == '__main__':
pbuf = CustomedBuffer(10) # Initialization.
with multiprocessing.Pool(1) as pool:
pool.map_async(
worker_process,
tuple(pbuf.mirror for _ in range(1))
) # Run only one process.
# Move the wait method to the background.
thd = threading.Thread(target=pbuf.wait, daemon=True)
thd.start()
print('Start to wait.')
while pbuf.progress < 100:
time.sleep(0.1)
print('Progress is 100%.')
thd.join()

messages = pbuf.read() # Get results.
for mitem in messages:
print(mitem)

In this example, we override the parse_lines method of syncstream.LineProcBuffer. It is still important to run the pbuf.wait in the background. Because the buffer would not be updated unless pbuf.wait or pbuf.receive is called. We move this check to a daemon thread. Then we check the progress repeatly until we get 100%. The results does not contain any messages like >>> Progress ..., beause such messages have been already catched for updating the progress value.