Skip to main content
Version: 0.3.x

LineProcBuffer

ClassSource

pbuf = syncstream.mproc.LineProcBuffer(maxlen: int = 20)

The process-safe line-based buffer.

The rotating buffer with a maximal storage length. This buffer is the extended version of the basic LineBuffer. It is used for the case of multi-processing. Use the shared queue of this buffer to ensure the synchronization among processes. To learn how to use this buffer, see the example on this page, or the tutorial.

Aliases

This class can be acquired by

import syncstream


syncstream.LineProcBuffer
syncstream.mproc.LineProcBuffer

Arguments

Requires

ArgumentTypeRequired
Description
maxlenintThe maximal number of stored lines. This value has the same meaning of deque.maxlen.

Methods

clear

pbuf.clear()

Clear the whole buffer.

This method would clear the storage and the last line stream of this buffer. However, it would not clear any mirrors or copies of this object. This method is thread-safe and should always success.


stop_all_mirrors

pbuf.stop_all_mirrors()

Send stop signals to all mirrors.

This operation is used for terminating the sub-processes safely. However, this method would not trigger TERMINATE or KILL signals. Instead, each mirror accepting the stop signal would trigger a StopIteartion exception. The checking for the stop signal happens each time when write() is used by the mirrors. By this way, most programs would be interrupted.

danger

Since this method does not send TERMINATE or KILL signals, it does not guarantee that the processes would be closed instantly. If users want to use this method, please ensure that the StopIteration exception is catched by the LineProcMirror.send_error(). The error would not be catched automatically. If users do not catch the error, the main process would stuck at wait().


reset_states

pbuf.stop_all_mirrors()

Reset the states of the buffer.

During the interaction with the mirrors, the states of the buffer may change. If user want to reuse the buffer and start a new group of sub-processes, please use this method.


receive

is_live: bool = pbuf.receive()

Receive one item from the mirror.

This method would fetch one item from the process-safe queue, and write the results in the thread-safe buffer. The fetched results would be saved in the storage.

Returns

ArgumentType
Description
is_liveboolThe finish flag. If this value is True, it means the buffer still needs to receive more messages from the mirrors. If False, it means that the close signals from all mirrors have been received.

wait

pbuf.wait()

The block waiting method. This method is simply implemented by the following codes:

while pbuf.receive():
pass

The method would make the current thread keeping blocked until the buffer receives all messages from the mirrors.


parse_lines

pbuf.parse_lines(lines: Sequence[str])

Parse the lines.

This method would be triggered when the new lines are received (the exception or warning is not included). The default behavior is adding the item into the storage.

Users could inherit this method and override it with their customized parsing method, like regular expression searching. The default behavior is implemented by:

pbuf.storage.extend(lines)

Requires

ArgumentTypeRequired
Description
linesSequence[str]A sequence of strs to be written in the storage. Users could catch any line item, and extract any specific information from it.

read

lines: Sequence[str] = pbuf.read(size: int | None = None)

Fetch the stored record items from the buffer. Using the read() method is thread-safe and would not influence the cursor of write() method.

If the current written line is not blank, the read() method would regard it as the last record item.

Requires

ArgumentTypeRequired
Description
sizeint | NoneIf set None, would return the whole storage. If set a int value, would return the latest size items.

Returns

ArgumentType
Description
linesSequence[str]The returned messages. The messages have been already split by the line breaks.

write

caution

This method is prohibited for this class. Because the write() method should be only used by the mirror objects.

Properties

mirror

mirror: LineProcMirror = pbuf.mirror

Get the mirror of this buffer.

The buffer should not be used in sub-processes directly. Instead, the buffer/mirror work in master/slave mode. The property pbuf.mirror is an instance of LineProcMirror. It is used for providing the process-safe mirror of the buffer in the sub-process.

This property could not be modified after the initialization.


n_mirrors

n_mirrors: int = pbuf.n_mirrors

Current living mirrors. The buffer should still need to receive message until n_mirrors becomes 0. Each time pbuf.mirror is invoked, this value would be increased by 1.

Example

Synchronize messages among processes

sync_among_processes.py
import contextlib
import multiprocessing
from syncstream.mproc import LineProcBuffer


def f(buffer):
with contextlib.redirect_stdout(buffer):
print('example')
buffer.send_eof()


if __name__ == '__main__':
pbuf = LineProcBuffer(maxlen=10)
with multiprocessing.Pool(4) as p:
p.map_async(f, tuple(pbuf.mirror for _ in range(4)))
pbuf.wait()
print(pbuf.read())