Skip to main content
Version: 1.2.2

LineProcBuffer

ClassSource

pbuf = LineProcBuffer(maxlen: int = 20)

The process-safe line-based buffer.

The rotating buffer with a maximal storage length. This buffer is the extended version of the basic LineBuffer. It is used for the case of multi-processing. Use the shared queue of this buffer to ensure the synchronization among processes. To learn how to use this buffer, see the example on this page, or the tutorial.

Aliases

This class can be acquired by

import syncstream


syncstream.LineProcBuffer
syncstream.mproc.LineProcBuffer

Arguments

ArgumentTypeRequired
Description
maxlenintThe maximal number of stored lines. This value has the same meaning of deque.maxlen.

Methods

close

pbuf.close()

Close the IO. This method only takes effects once. The second call will do nothing.


clear

pbuf.clear()

Clear the whole buffer.

This method would clear the storage and the last line stream of this buffer. However, it would not clear any mirrors or copies of this object. This method is thread-safe and should always success.


stop_all_mirrors

pbuf.stop_all_mirrors()

Send stop signals to all mirrors.

This operation is used for terminating the sub-processes safely. However, this method would not trigger TERMINATE or KILL signals. Instead, each mirror accepting the stop signal would trigger a StopIteartion exception. The checking for the stop signal happens each time when write() is used by the mirrors. By this way, most programs would be interrupted.

danger

Since this method does not send TERMINATE or KILL signals, it does not guarantee that the processes would be closed instantly. If users want to use this method, please ensure that the StopIteration exception is catched by the LineProcMirror.send_error(). The error would not be catched automatically. If users do not catch the error, the main process would stuck at wait().


force_stop

pbuf.force_stop()

Force the buffer stopped.

Calling this method will force the receive() exit with False returned. After that, all mirrors sent to the previous processes will be detached.

Note that calling this method is unsafe and may cause the subprocesses with mirrors blocked. If not necessary, it is better to call stop_all_mirrors() instead of using this method.

Calling this method means that the multi-processing functionalities of this buffer will be disposed, unless self.reset_states() is used.

Recommend that this method should be used when all subprocesses will be stopped by Process.terminate() or Process.kill().


reset_states

pbuf.stop_all_mirrors()

Reset the states of the buffer.

During the interaction with the mirrors, the states of the buffer may change. If user want to reuse the buffer and start a new group of sub-processes, please use this method.


receive

is_live: bool = pbuf.receive()

Receive one item from the mirror.

This method would fetch one item from the process-safe queue, and write the results in the thread-safe buffer. The fetched results would be saved in the storage.

Returns

ArgumentType
Description
is_liveboolThe finish flag. If this value is True, it means the buffer still needs to receive more messages from the mirrors. If False, it means that the close signals from all mirrors have been received.

wait

pbuf.wait()

The block waiting method. This method is simply implemented by the following codes:

while pbuf.receive():
pass

The method would make the current thread keeping blocked until the buffer receives all messages from the mirrors.


parse_lines

pbuf.parse_lines(lines: Sequence[str])

Parse the lines.

This method would be triggered when the new lines are received (the exception or warning is not included). The default behavior is adding the item into the storage.

Users could inherit this method and override it with their customized parsing method, like regular expression searching. The default behavior is implemented by:

pbuf.storage.extend(lines)

Requires

ArgumentTypeRequired
Description
linesSequence[str]A sequence of strs to be written in the storage. Users could catch any line item, and extract any specific information from it.

read

lines: Sequence[str] = pbuf.read(size: int | None = None)

Fetch the stored record items from the buffer. Using the read() method is thread-safe and would not influence the cursor of write() method.

If the current written line is not blank, the read() method would regard it as the last record item.

Requires

ArgumentTypeRequired
Description
sizeint | NoneIf set None, would return the whole storage. If set a int value, would return the latest size items.

Returns

ArgumentType
Description
linesSequence[str]The returned messages. The messages have been already split by the line breaks.

write

caution

This method is prohibited for this class. Because the write() method should be only used by the mirror objects.

Properties

mirror

mirror: LineProcMirror = pbuf.mirror

Get the mirror of this buffer.

The buffer should not be used in sub-processes directly. Instead, the buffer/mirror work in master/slave mode. The property pbuf.mirror is an instance of LineProcMirror. It is used for providing the process-safe mirror of the buffer in the sub-process.

This property could not be modified after the initialization.


n_mirrors

n_mirrors: int = pbuf.n_mirrors

Current living mirrors. The buffer should still need to receive message until n_mirrors becomes 0. Each time pbuf.mirror is invoked, this value would be increased by 1.


maxlen

maxlen: int | None = pbuf.maxlen

The maximal length (number of lines) of the buffer.

When this value is None, it means that there is no maximal length limit.


closed

is_closed: bool = pbuf.closed

Check whether the buffer has been closed.

Operators

__len__

n_buffer_items: int = len(pbuf)

Number of lines/items in the buffer.

Example

Synchronize messages among processes

sync_among_processes.py
import multiprocessing
from syncstream.mproc import LineProcBuffer, LineProcMirror


def f(buffer: LineProcMirror):
with buffer:
print('example')


if __name__ == '__main__':
pbuf = LineProcBuffer(maxlen=10)
with multiprocessing.Pool(4) as p:
p.map_async(f, tuple(pbuf.mirror for _ in range(4)))
pbuf.wait()
print(pbuf.read())