Skip to main content
Version: 1.2.2

LineProcMirror

ClassPrivateContextSource

pbuf: syncstream.LineProcBuffer
buffer = pbuf.mirror

with buffer:
...

The mirror for the process-safe line-based buffer.

danger

This mirror is initialized by LineProcBuffer, and would be used for managing the lines written to the buffer. Users do not need, and should not use this class to initialize objects. Please always use pbuf.mirror to get the mirror object.

Aliases

This class can be acquired by

import syncstream


syncstream.LineProcMirror
syncstream.mproc.LineProcMirror

Arguments

ArgumentTypeRequired
Description
q_maxsizeintThe maximal size of the queue. This value has the same meaning of queue.qsize.
aggressiveboolIf set True, the aggresive mode would be enabled, where the mirror would send messages to the buffer each time when new data is written. If False, the mirror would communicate with the buffer only when a new line is written.
timeoutint | NoneThe timeout configuration of the queue. This value has the same meaning of queue.put(timeout).
_queueQueue | NoneThe queue used for communicating with the buffer. This object could be an instance of multiprocessing.Queue() or given by multiprocessing.Manager(). The queue object should be created by the LineProcBuffer object.
_statedict[str, Any] | NoneA shared state dictionary given by multiprocessing.Manager(). The dict object should be created by the LineProcBuffer object.
_state_lockLock | NoneThe reading / writing lock of the state dictionary given by multiprocessing.Manager(). The lock object should be created by the LineProcBuffer object.

Methods

close

buffer.close(exc: BaseException | None = None)

Close the IO. This method only takes effects once. The second call will do nothing.

Requires

ArgumentTypeRequired
Description
excBaseExceptionIf exc is not None, will call send_error() before closing the buffer. Otherwise, call send_eof().

clear

buffer.clear()

Clear the whole buffer.

This method would clear the temporary buffer of the mirror. If the mirror works in the aggresive mode, the temporary buffer would not be used. In this case, this method would not exert any influences to the mirror.

This method is thread-safe. Mirrors in different processes would not share the temporary buffer. Note that the shared queue would not be cleared by this method.


new_line

buffer.new_line()

Clear the current temporary buffer, and manually trigger a new line signal. If the current temporary buffer contains data, the data would be moved to the storage, and a new line would be created. If the current temporary buffer is already a new line, do nothing.

This method is equivalent to

if buffer.last_line.tell() > 0:
write('\n')

flush

buffer.flush()

Flush the current written line stream (temporary buffer).


send_eof

buffer.send_eof()

Send the safely close signal.

The signal should be used at the end of the sub-process. This method is used for informing the buffer that the sub-process has been finished safely.

danger
  • Users should always use send_eof() or send_error() at the end of a sub-process. Otherwise, a dead lock would be caused by the main thread (buffer).
  • Users should only invoke send_eof() or send_error() once for each sub-process. For example, if send_error() has been invoked, send_eof() should not be used. Sending more than one close signal from the same sub-process may cause fatal bugs.

send_error

buffer.send_error(error: BaseException)

Send the exception object to the main buffer.

Send the catched error object. This method should always be used in the try / except block. The error object would be captured as an item of the storage in the main buffer.

danger

Requires

ArgumentTypeRequired
Description
errorBaseExceptionThe exception object that would be sent.

send_warning

buffer.send_warning(warning: Warning)

Send the standard warning object to the main buffer.

Send the catched warning object. It is recommended that this method can be used when logging the warning objects. The warning object would be captured as an item of the storage in the main buffer.

caution

This method could only catch the warning object defined by the stdlib. Some different warning objects, like logging.warning messages would not be collected with correct configurations.

Requires

ArgumentTypeRequired
Description
warningWarningThe warning object that would be sent.

send_data

buffer.send_data(data: str)

Send the data to the main buffer.

This method is equivalent to call the main buffer (LineProcBuffer) by the following method protected by process-safe synchronization:

super(LineProcBuffer, pbuf).write(data)

This method is used by other methods implicitly, and should not be used by users.

Requires

ArgumentTypeRequired
Description
datastrA str to be sent to the main buffer.

read

line: str = buffer.read(size: int | None = None)

This method would only read the current bufferred values in the mirror storage. If the property aggressive is True, the read() method would always return empty value.

Requires

ArgumentTypeRequired
Description
sizeintIf set None, would return the whole storage. If set an int value, would return the last size items.

Returns

ArgumentType
Description
linestrCurrent storage of the mirror. The value in the storage has not been sent to the main buffer.

write

n_bytes: int = buffer.write(data: str)

The writting method of the buffer. The source data is the same as that of a text-based IO. If aggressive is True, each call of write() would make the stream value sent to the main buffer. If not, each time when data contains a line break, the stream value would be sent to the main buffer.

The method is thread-safe.

Requires

ArgumentTypeRequired
Description
datastrThe written messages. The line breaks would be detected automatically.

Returns

ArgumentType
Description
n_bytesintThe number of bytes that would be written to the buffer. This value is counted from the input data directly.

fileno

buffer.fileno() -> Never

Return the file ID.

This buffer will not use file ID, so this method will raise an OSError.


isatty

is_atty: False = buffer.fileno()

Whether the stream is connected to terminal/TTY. Return False.

Returns

ArgumentType
Description
is_attyboolAlways False.

readable

is_readable: bool = buffer.readable()

Whether the stream is readable. The stream is readable as long as the buffer is not closed.

If the stream is not readable, calling read() will raise an OSError.

Returns

ArgumentType
Description
is_readableboolReturn True if the buffer is not closed. Otherwise, return False.

writable

is_writable: bool = buffer.writable()

Whether the stream is writable. The stream is writable as long as the buffer is not closed.

If the stream is not writable, calling write() will raise an OSError.

Returns

ArgumentType
Description
is_writableboolReturn True if the buffer is not closed. Otherwise, return False.

seekable

is_seekable: False = buffer.seekable()

Whether the stream support random access. This buffer does not.

Returns

ArgumentType
Description
is_seekableboolAlways False.

seek

buffer.seek() -> Never

Will raise an OSError since this buffer does not support random access.

Properties

aggressive

is_aggressive: bool = buffer.aggressive

The aggressive mode.

This mode could only by configured by the initialization. If set True, the aggresive mode would be enabled, where the mirror would send messages to the buffer each time when new data is written. If False, the mirror would communicate with the buffer only when a new line is written.


timeout

tiemout: int = buffer.timeout

The timeout configuration of the queue. This value has the same meaning of queue.put(timeout).

In current implementation, this value is always configured as None. However, users could change this value when using the mirror.


closed

is_closed: bool = buffer.closed

Check whether the buffer has been closed.

Example

See LineProcBuffer.