Skip to main content
Version: 0.3.x

LineBuffer

ClassSource

tbuf = syncstream.mproc.LineBuffer(maxlen: int = 20)

The basic line-based buffer handle.

This buffer provides a rotating item stroage for the text-based stream. The text is stored not by length, but by lines. The maximal line number of the storage is limited.

Aliases

This class can be acquired by

import syncstream


syncstream.LineBuffer
syncstream.mproc.LineBuffer

Arguments

Requires

ArgumentTypeRequired
Description
maxlenintThe maximal number of stored lines. This value has the same meaning of deque.maxlen.

Methods

clear

tbuf.clear()

Clear the whole buffer.

This method would clear the storage and the last line stream of this buffer. However, it would not clear any mirrors or copies of this object. This method is thread-safe and should always success.


new_line

tbuf.new_line()

Clear the current temporary buffer, and manually trigger a new line signal. If the current temporary buffer contains data, the data would be moved to the storage, and a new line would be created. If the current temporary buffer is already a new line, do nothing.

This method is equivalent to

if tbuf.last_line.tell() > 0:
write('\n')

flush

tbuf.flush()

Flush the current written line stream (temporary buffer).


parse_lines

tbuf.parse_lines(lines)

Parse the lines.

This method would be triggered when the new lines are written by write() method. The default behavior is adding the item into the storage.

Users could inherit this method and override it with their customized parsing method, like regular expression searching. The default behavior is implemented by:

tbuf.storage.extend(lines: Sequence[str])

Requires

ArgumentTypeRequired
Description
linesSequence[str]A sequence of strs to be written in the storage. Users could catch any line item, and extract any specific information from it.

read

lines: Sequence[str] = tbuf.read(size: int | None = None)

Fetch the stored record items from the buffer. Using the read() method is thread-safe and would not influence the cursor of write() method.

If the current written line is not blank, the read() method would regard it as the last record item.

Requires

ArgumentTypeRequired
Description
sizeint | NoneIf set None, would return the whole storage. If set a int value, would return the latest size items.

Returns

ArgumentType
Description
linesSequence[str]The returned messages. The messages have been already split by the line breaks.

write

n_bytes: int = tbuf.write(data: str)

The writting method of the buffer. The source data is the same as that of a text-based IO. Each time when data contains a line break, a new record item would be pushed in the storage. The write() method is thread-safe.

Requires

ArgumentTypeRequired
Description
datastrThe written messages. The line breaks would be detected automatically.

Returns

ArgumentType
Description
n_bytesintThe number of bytes that would be written to the buffer. This value is counted from the input data directly.

Example

Synchronize messages among threads

sync_among_threads.py
import contextlib
from syncstream.mproc import LineBuffer

tbuf = LineBuffer(5)

with contextlib.redirect_stdout(tbuf):
for i in range(10):
print(i)

print(tbuf.read())