LineFileBuffer
ClassSource
fbuf = syncstream.file.LineFileBuffer(
file_path: str,
maxlen: int = 20,
tmp_id: str = "tmp",
)
The file-driven line-based buffer handle.
This buffer provides a rotating item stroage for the text-based stream. The text is stored not by length, but by lines. The maximal line number of the storage is limited. Each item of the record would be saved as an independent file. The synchronization is implemented by the file locks.
The file-locked handle could be shared by different processes, but we do not recommend to do that. A better way to use this handle is to initialize it in each sub-processes (if needed).
Note that this handle is process-safe, not thread-safe. In other words, each process should only maintain one INDEPENDENT LineFileBuffer
. The LineFileBuffer
should not be shared by either different threads or different processes.
Aliases
This class can be acquired by
import syncstream
syncstream.LineFileBuffer
syncstream.file.LineFileBuffer
Arguments
Requires
Argument | Type | Required | |
---|---|---|---|
file_path | str | The basic path of the saved records, including the saved storage and the file locks. The file suffix would be automatically set as .log . | |
maxlen | int | The maximal number of records. Each record would be saved as one file. | |
tmp_id | str | The identifier for the temporary file. Each process should holds one unique id. A conflict id may cause the written flows from different processes to interrupt each other. |
Methods
clear
fbuf.clear()
Clear the whole buffer.
This method would search and remove all log files, including the temporary file. However, the lock files would not be removed. A typical usage of this method is to clear files only in the main process.
new_line
fbuf.new_line()
Clear the current temporary buffer, and manually trigger a new line signal. If the current temporary buffer contains data, the data would be moved to the storage, and a new line would be created. If the current temporary buffer is already a new line, do nothing.
This method is equivalent to
if (last line is not empty):
write('\n')
flush
fbuf.flush()
Flush the current written line stream (temporary buffer).
This method does nothing, because the file-based buffer does not need to flush the temporary buffer.
parse_lines
fbuf.parse_lines(lines: Sequence[str])
Parse the lines.
This method would be triggered when the new lines are written by write()
method. The default behavior is writting the lines to the log files.
Users could inherit this method and override it with their customized parsing method, like regular expression searching. The default behavior is implemented by the following private method:
fbuf.__update_records(lines)
Requires
Argument | Type | Required | |
---|---|---|---|
lines | Sequence[str] | A sequence of strs to be written in the log files. Users could catch any line item, and extract any specific information from it. |
read
lines: Sequence[str] = fbuf.read(size: int | None = None)
Fetch the stored record items from the buffer. Using the read()
method is process-safe and would not influence the cursor of write()
method.
If the current written line is not blank, the read()
method would regard it as the last record item.
Requires
Argument | Type | Required | |
---|---|---|---|
size | int | None | If set None , would return the whole storage. If set a int value, would return the latest size items. |
Returns
Argument | Type | |
---|---|---|
lines | Sequence[str] | The returned messages. The messages have been already split by the line breaks. |
write
n_bytes: int = fbuf.write(data: str)
The writting method of the buffer. The source data is the same as that of a text-based IO. Each time when data
contains a line break, a new record item would be pushed in the storage. The write()
method is process-safe.
Requires
Argument | Type | Required | |
---|---|---|---|
data | str | The written messages. The line breaks would be detected automatically. |
Returns
Argument | Type | |
---|---|---|
n_bytes | int | The number of bytes that would be written to the buffer. This value is counted from the input data directly. |
Example
Synchronize messages by shared log files
- Codes
- Results
import os
import contextlib
import multiprocessing
from syncstream.file import LineFileBuffer
def f(f_path, f_len=10):
fbuf = LineFileBuffer(f_path, maxlen=f_len, tmp_id=os.getpid())
with contextlib.redirect_stdout(fbuf):
print('example')
fbuf.new_line()
if __name__ == '__main__':
f_path = os.path.join('logs', 'test_file')
f_len = 10
with multiprocessing.Pool(4) as p:
p.starmap(f, tuple((f_path, f_len) for _ in range(4)))
fbuf = LineFileBuffer(f_path, maxlen=f_len, tmp_id=os.getpid())
print(fbuf.read())
('example', 'example', 'example', 'example')