LineProcBuffer
ClassSource
pbuf = syncstream.mproc.LineProcBuffer(maxlen: int = 20)
The process-safe line-based buffer.
The rotating buffer with a maximal storage length. This buffer is the extended version of the basic LineBuffer
. It is used for the case of multi-processing. Use the shared queue of this buffer to ensure the synchronization among processes. To learn how to use this buffer, see the example on this page, or the tutorial.
Aliases
This class can be acquired by
import syncstream
syncstream.LineProcBuffer
syncstream.mproc.LineProcBuffer
Arguments
Requires
Argument | Type | Required | |
---|---|---|---|
maxlen | int | The maximal number of stored lines. This value has the same meaning of deque.maxlen . |
Methods
clear
pbuf.clear()
Clear the whole buffer.
This method would clear the storage and the last line stream of this buffer. However, it would not clear any mirrors or copies of this object. This method is thread-safe and should always success.
stop_all_mirrors
pbuf.stop_all_mirrors()
Send stop signals to all mirrors.
This operation is used for terminating the sub-processes safely. However, this method would not trigger TERMINATE
or KILL
signals. Instead, each mirror accepting the stop signal would trigger a StopIteartion
exception. The checking for the stop signal happens each time when write()
is used by the mirrors. By this way, most programs would be interrupted.
Since this method does not send TERMINATE
or KILL
signals, it does not guarantee that the processes would be closed instantly. If users want to use this method, please ensure that the StopIteration
exception is catched by the LineProcMirror.send_error()
. The error would not be catched automatically. If users do not catch the error, the main process would stuck at wait()
.
reset_states
pbuf.stop_all_mirrors()
Reset the states of the buffer.
During the interaction with the mirrors, the states of the buffer may change. If user want to reuse the buffer and start a new group of sub-processes, please use this method.
receive
is_live: bool = pbuf.receive()
Receive one item from the mirror.
This method would fetch one item from the process-safe queue, and write the results in the thread-safe buffer. The fetched results would be saved in the storage.
Returns
Argument | Type | |
---|---|---|
is_live | bool | The finish flag. If this value is True , it means the buffer still needs to receive more messages from the mirrors. If False , it means that the close signals from all mirrors have been received. |
wait
pbuf.wait()
The block waiting method. This method is simply implemented by the following codes:
while pbuf.receive():
pass
The method would make the current thread keeping blocked until the buffer receives all messages from the mirrors.
parse_lines
pbuf.parse_lines(lines: Sequence[str])
Parse the lines.
This method would be triggered when the new lines are received (the exception or warning is not included). The default behavior is adding the item into the storage.
Users could inherit this method and override it with their customized parsing method, like regular expression searching. The default behavior is implemented by:
pbuf.storage.extend(lines)
Requires
Argument | Type | Required | |
---|---|---|---|
lines | Sequence[str] | A sequence of strs to be written in the storage. Users could catch any line item, and extract any specific information from it. |
read
lines: Sequence[str] = pbuf.read(size: int | None = None)
Fetch the stored record items from the buffer. Using the read()
method is thread-safe and would not influence the cursor of write()
method.
If the current written line is not blank, the read()
method would regard it as the last record item.
Requires
Argument | Type | Required | |
---|---|---|---|
size | int | None | If set None, would return the whole storage. If set a int value, would return the latest size items. |
Returns
Argument | Type | |
---|---|---|
lines | Sequence[str] | The returned messages. The messages have been already split by the line breaks. |
write
This method is prohibited for this class. Because the write()
method should be only used by the mirror objects.
Properties
mirror
mirror: LineProcMirror = pbuf.mirror
Get the mirror of this buffer.
The buffer should not be used in sub-processes directly. Instead, the buffer/mirror work in master/slave mode. The property pbuf.mirror
is an instance of LineProcMirror
. It is used for providing the process-safe mirror of the buffer in the sub-process.
This property could not be modified after the initialization.
n_mirrors
n_mirrors: int = pbuf.n_mirrors
Current living mirrors. The buffer should still need to receive message until n_mirrors
becomes 0. Each time pbuf.mirror
is invoked, this value would be increased by 1.
Example
Synchronize messages among processes
- Codes
- Results
import contextlib
import multiprocessing
from syncstream.mproc import LineProcBuffer
def f(buffer):
with contextlib.redirect_stdout(buffer):
print('example')
buffer.send_eof()
if __name__ == '__main__':
pbuf = LineProcBuffer(maxlen=10)
with multiprocessing.Pool(4) as p:
p.map_async(f, tuple(pbuf.mirror for _ in range(4)))
pbuf.wait()
print(pbuf.read())
('example', 'example', 'example', 'example')