Skip to main content
Version: 1.2.2

LineBuffer

ClassContextSource

tbuf = LineBuffer(maxlen: int = 20)

with tbuf:
...

The basic line-based buffer handle.

This buffer provides a rotating item stroage for the text-based stream. The text is stored not by length, but by lines. The maximal line number of the storage is limited.

Aliases

This class can be acquired by

import syncstream


syncstream.LineBuffer
syncstream.mproc.LineBuffer

Arguments

ArgumentTypeRequired
Description
maxlenintThe maximal number of stored lines. This value has the same meaning of deque.maxlen.

Methods

close

tbuf.close()

Close the IO. This method only takes effects once. The second call will do nothing.


clear

tbuf.clear()

Clear the whole buffer.

This method would clear the storage and the last line stream of this buffer. However, it would not clear any mirrors or copies of this object. This method is thread-safe and should always success.


new_line

tbuf.new_line()

Clear the current temporary buffer, and manually trigger a new line signal. If the current temporary buffer contains data, the data would be moved to the storage, and a new line would be created. If the current temporary buffer is already a new line, do nothing.

This method is equivalent to

if tbuf.last_line.tell() > 0:
write('\n')

flush

tbuf.flush()

Flush the current written line stream (temporary buffer).


parse_lines

tbuf.parse_lines(lines)

Parse the lines.

This method would be triggered when the new lines are written by write() method. The default behavior is adding the item into the storage.

Users could inherit this method and override it with their customized parsing method, like regular expression searching. The default behavior is implemented by:

tbuf.storage.extend(lines: Sequence[str])

Requires

ArgumentTypeRequired
Description
linesSequence[str]A sequence of strs to be written in the storage. Users could catch any line item, and extract any specific information from it.

read

lines: Sequence[str] = tbuf.read(size: int | None = None)

Fetch the stored record items from the buffer. Using the read() method is thread-safe and would not influence the cursor of write() method.

If the current written line is not blank, the read() method would regard it as the last record item.

Requires

ArgumentTypeRequired
Description
sizeint | NoneIf set None, would return the whole storage. If set a int value, would return the latest size items.

Returns

ArgumentType
Description
linesSequence[str]The returned messages. The messages have been already split by the line breaks.

write

n_bytes: int = tbuf.write(data: str)

The writting method of the buffer. The source data is the same as that of a text-based IO. Each time when data contains a line break, a new record item would be pushed in the storage. The write() method is thread-safe.

Requires

ArgumentTypeRequired
Description
datastrThe written messages. The line breaks would be detected automatically.

Returns

ArgumentType
Description
n_bytesintThe number of bytes that would be written to the buffer. This value is counted from the input data directly.

fileno

tbuf.fileno() -> Never

Return the file ID.

This buffer will not use file ID, so this method will raise an OSError.


isatty

is_atty: False = tbuf.fileno()

Whether the stream is connected to terminal/TTY. Return False.

Returns

ArgumentType
Description
is_attyboolAlways False.

readable

is_readable: bool = tbuf.readable()

Whether the stream is readable. The stream is readable as long as the buffer is not closed.

If the stream is not readable, calling read() will raise an OSError.

Returns

ArgumentType
Description
is_readableboolReturn True if the buffer is not closed. Otherwise, return False.

writable

is_writable: bool = tbuf.writable()

Whether the stream is writable. The stream is writable as long as the buffer is not closed.

If the stream is not writable, calling write() will raise an OSError.

Returns

ArgumentType
Description
is_writableboolReturn True if the buffer is not closed. Otherwise, return False.

seekable

is_seekable: False = tbuf.seekable()

Whether the stream support random access. This buffer does not.

Returns

ArgumentType
Description
is_seekableboolAlways False.

seek

tbuf.seek() -> Never

Will raise an OSError since this buffer does not support random access.

Properties

maxlen

maxlen: int | None = tbuf.maxlen

The maximal length (number of lines) of the buffer.

When this value is None, it means that there is no maximal length limit.


closed

is_closed: bool = tbuf.closed

Check whether the buffer has been closed.

Operators

__len__

n_buffer_items: int = len(tbuf)

Number of lines/items in the buffer.

Example

Synchronize messages among threads

sync_among_threads.py
from syncstream import LineBuffer

tbuf = LineBuffer(5)

with tbuf:
for i in range(10):
print(i)

print(tbuf.read())