Skip to main content
Version: 0.3.x

LineFileBuffer

ClassSource

fbuf = syncstream.file.LineFileBuffer(
file_path: str,
maxlen: int = 20,
tmp_id: str = "tmp",
)

The file-driven line-based buffer handle.

This buffer provides a rotating item stroage for the text-based stream. The text is stored not by length, but by lines. The maximal line number of the storage is limited. Each item of the record would be saved as an independent file. The synchronization is implemented by the file locks.

The file-locked handle could be shared by different processes, but we do not recommend to do that. A better way to use this handle is to initialize it in each sub-processes (if needed).

caution

Note that this handle is process-safe, not thread-safe. In other words, each process should only maintain one INDEPENDENT LineFileBuffer. The LineFileBuffer should not be shared by either different threads or different processes.

Aliases

This class can be acquired by

import syncstream


syncstream.LineFileBuffer
syncstream.file.LineFileBuffer

Arguments

Requires

ArgumentTypeRequired
Description
file_pathstrThe basic path of the saved records, including the saved storage and the file locks. The file suffix would be automatically set as .log.
maxlenintThe maximal number of records. Each record would be saved as one file.
tmp_idstrThe identifier for the temporary file. Each process should holds one unique id. A conflict id may cause the written flows from different processes to interrupt each other.

Methods

clear

fbuf.clear()

Clear the whole buffer.

This method would search and remove all log files, including the temporary file. However, the lock files would not be removed. A typical usage of this method is to clear files only in the main process.


new_line

fbuf.new_line()

Clear the current temporary buffer, and manually trigger a new line signal. If the current temporary buffer contains data, the data would be moved to the storage, and a new line would be created. If the current temporary buffer is already a new line, do nothing.

This method is equivalent to

if (last line is not empty):
write('\n')

flush

fbuf.flush()

Flush the current written line stream (temporary buffer).

caution

This method does nothing, because the file-based buffer does not need to flush the temporary buffer.


parse_lines

fbuf.parse_lines(lines: Sequence[str])

Parse the lines.

This method would be triggered when the new lines are written by write() method. The default behavior is writting the lines to the log files.

Users could inherit this method and override it with their customized parsing method, like regular expression searching. The default behavior is implemented by the following private method:

fbuf.__update_records(lines)

Requires

ArgumentTypeRequired
Description
linesSequence[str]A sequence of strs to be written in the log files. Users could catch any line item, and extract any specific information from it.

read

lines: Sequence[str] = fbuf.read(size: int | None = None)

Fetch the stored record items from the buffer. Using the read() method is process-safe and would not influence the cursor of write() method.

If the current written line is not blank, the read() method would regard it as the last record item.

Requires

ArgumentTypeRequired
Description
sizeint | NoneIf set None, would return the whole storage. If set a int value, would return the latest size items.

Returns

ArgumentType
Description
linesSequence[str]The returned messages. The messages have been already split by the line breaks.

write

n_bytes: int = fbuf.write(data: str)

The writting method of the buffer. The source data is the same as that of a text-based IO. Each time when data contains a line break, a new record item would be pushed in the storage. The write() method is process-safe.

Requires

ArgumentTypeRequired
Description
datastrThe written messages. The line breaks would be detected automatically.

Returns

ArgumentType
Description
n_bytesintThe number of bytes that would be written to the buffer. This value is counted from the input data directly.

Example

Synchronize messages by shared log files

sync_by_files.py
import os
import contextlib
import multiprocessing
from syncstream.file import LineFileBuffer


def f(f_path, f_len=10):
fbuf = LineFileBuffer(f_path, maxlen=f_len, tmp_id=os.getpid())
with contextlib.redirect_stdout(fbuf):
print('example')
fbuf.new_line()


if __name__ == '__main__':
f_path = os.path.join('logs', 'test_file')
f_len = 10
with multiprocessing.Pool(4) as p:
p.starmap(f, tuple((f_path, f_len) for _ in range(4)))
fbuf = LineFileBuffer(f_path, maxlen=f_len, tmp_id=os.getpid())
print(fbuf.read())