LineProcMirror
ClassPrivateContextSource
pbuf: syncstream.LineProcBuffer
buffer = pbuf.mirror
with buffer:
...
The mirror for the process-safe line-based buffer.
This mirror is initialized by LineProcBuffer
, and would be used for managing the lines written to the buffer. Users do not need, and should not use this class to initialize objects. Please always use pbuf.mirror
to get the mirror object.
Aliases
This class can be acquired by
import syncstream
syncstream.LineProcMirror
syncstream.mproc.LineProcMirror
Arguments
Argument | Type | Required | |
---|---|---|---|
q_maxsize | int | The maximal size of the queue. This value has the same meaning of queue.qsize . | |
aggressive | bool | If set True , the aggresive mode would be enabled, where the mirror would send messages to the buffer each time when new data is written. If False , the mirror would communicate with the buffer only when a new line is written. | |
timeout | int | None | The timeout configuration of the queue. This value has the same meaning of queue.put(timeout) . | |
_queue | Queue | None | The queue used for communicating with the buffer. This object could be an instance of multiprocessing.Queue() or given by multiprocessing.Manager() . The queue object should be created by the LineProcBuffer object. | |
_state | dict[str, Any] | None | A shared state dictionary given by multiprocessing.Manager() . The dict object should be created by the LineProcBuffer object. | |
_state_lock | Lock | None | The reading / writing lock of the state dictionary given by multiprocessing.Manager() . The lock object should be created by the LineProcBuffer object. |
Methods
close
buffer.close(exc: BaseException | None = None)
Close the IO. This method only takes effects once. The second call will do nothing.
Requires
Argument | Type | Required | |
---|---|---|---|
exc | BaseException | If exc is not None , will call send_error() before closing the buffer. Otherwise, call send_eof() . |
clear
buffer.clear()
Clear the whole buffer.
This method would clear the temporary buffer of the mirror. If the mirror works in the aggresive
mode, the temporary buffer would not be used. In this case,
this method would not exert any influences to the mirror.
This method is thread-safe. Mirrors in different processes would not share the temporary buffer. Note that the shared queue would not be cleared by this method.
new_line
buffer.new_line()
Clear the current temporary buffer, and manually trigger a new line signal. If the current temporary buffer contains data, the data would be moved to the storage, and a new line would be created. If the current temporary buffer is already a new line, do nothing.
This method is equivalent to
if buffer.last_line.tell() > 0:
write('\n')
flush
buffer.flush()
Flush the current written line stream (temporary buffer).
send_eof
buffer.send_eof()
Send the safely close signal.
The signal should be used at the end of the sub-process. This method is used for informing the buffer that the sub-process has been finished safely.
- Users should always use
send_eof()
orsend_error()
at the end of a sub-process. Otherwise, a dead lock would be caused by the main thread (buffer). - Users should only invoke
send_eof()
orsend_error()
once for each sub-process. For example, ifsend_error()
has been invoked,send_eof()
should not be used. Sending more than one close signal from the same sub-process may cause fatal bugs.
send_error
buffer.send_error(error: BaseException)
Send the exception object to the main buffer.
Send the catched error object. This method should always be used in the try / except block. The error object would be captured as an item of the storage in the main buffer.
See send_eof()
.
Requires
Argument | Type | Required | |
---|---|---|---|
error | BaseException | The exception object that would be sent. |
send_warning
buffer.send_warning(warning: Warning)
Send the standard warning object to the main buffer.
Send the catched warning object. It is recommended that this method can be used when logging the warning objects. The warning object would be captured as an item of the storage in the main buffer.
This method could only catch the warning object defined by the stdlib. Some different warning objects, like logging.warning
messages would not be collected with correct configurations.
Requires
Argument | Type | Required | |
---|---|---|---|
warning | Warning | The warning object that would be sent. |
send_data
buffer.send_data(data: str)
Send the data to the main buffer.
This method is equivalent to call the main buffer (LineProcBuffer) by the following method protected by process-safe synchronization:
super(LineProcBuffer, pbuf).write(data)
This method is used by other methods implicitly, and should not be used by users.
Requires
Argument | Type | Required | |
---|---|---|---|
data | str | A str to be sent to the main buffer. |
read
line: str = buffer.read(size: int | None = None)
This method would only read the current bufferred values in the mirror storage. If the property aggressive
is True
, the read()
method would always return empty value.
Requires
Argument | Type | Required | |
---|---|---|---|
size | int | If set None , would return the whole storage. If set an int value, would return the last size items. |
Returns
Argument | Type | |
---|---|---|
line | str | Current storage of the mirror. The value in the storage has not been sent to the main buffer. |
write
n_bytes: int = buffer.write(data: str)
The writting method of the buffer. The source data is the same as that of a text-based IO. If aggressive
is True
, each call of write()
would make the stream value sent to the main buffer. If not, each time when data
contains a line break, the stream value would be sent to the main buffer.
The method is thread-safe.
Requires
Argument | Type | Required | |
---|---|---|---|
data | str | The written messages. The line breaks would be detected automatically. |
Returns
Argument | Type | |
---|---|---|
n_bytes | int | The number of bytes that would be written to the buffer. This value is counted from the input data directly. |
fileno
buffer.fileno() -> Never
Return the file ID.
This buffer will not use file ID, so this method will raise an OSError
.
isatty
is_atty: False = buffer.fileno()
Whether the stream is connected to terminal/TTY. Return False
.
Returns
Argument | Type | |
---|---|---|
is_atty | bool | Always False . |
readable
is_readable: bool = buffer.readable()
Whether the stream is readable. The stream is readable as long as the buffer is not closed.
If the stream is not readable, calling read()
will raise an OSError
.
Returns
Argument | Type | |
---|---|---|
is_readable | bool | Return True if the buffer is not closed. Otherwise, return False . |
writable
is_writable: bool = buffer.writable()
Whether the stream is writable. The stream is writable as long as the buffer is not closed.
If the stream is not writable, calling write()
will raise an OSError
.
Returns
Argument | Type | |
---|---|---|
is_writable | bool | Return True if the buffer is not closed. Otherwise, return False . |
seekable
is_seekable: False = buffer.seekable()
Whether the stream support random access. This buffer does not.
Returns
Argument | Type | |
---|---|---|
is_seekable | bool | Always False . |
seek
buffer.seek() -> Never
Will raise an OSError
since this buffer does not support random access.
Properties
aggressive
is_aggressive: bool = buffer.aggressive
The aggressive mode.
This mode could only by configured by the initialization. If set True
, the aggresive mode would be enabled, where the mirror would send messages to the buffer each time when new data is written. If False
, the mirror would communicate with the buffer only when a new line is written.
timeout
tiemout: int = buffer.timeout
The timeout configuration of the queue. This value has the same meaning of queue.put(timeout)
.
In current implementation, this value is always configured as None
. However, users could change this value when using the mirror.
closed
is_closed: bool = buffer.closed
Check whether the buffer has been closed.
Example
See LineProcBuffer
.