LineFileBuffer
ClassContextSource
fbuf = LineFileBuffer(
file_path: str | os.PathLike,
maxlen: int = 20,
tmp_id: str = "tmp",
)
with fbuf:
...
The file-driven line-based buffer handle.
This buffer provides a rotating item stroage for the text-based stream. The text is stored not by length, but by lines. The maximal line number of the storage is limited. Each item of the record would be saved as an independent file. The synchronization is implemented by the file locks.
The file-locked handle could be shared by different processes, but we do not recommend to do that. A better way to use this handle is to initialize it in each sub-processes (if needed).
Note that this handle is process-safe, not thread-safe. In other words, each process should only maintain one INDEPENDENT LineFileBuffer
. The LineFileBuffer
should not be shared by either different threads or different processes.
Aliases
This class can be acquired by
import syncstream
syncstream.LineFileBuffer
syncstream.file.LineFileBuffer
Arguments
Argument | Type | Required | |
---|---|---|---|
file_path | str | os.PathLike | The basic path of the saved records, including the saved storage and the file locks. The file suffix would be automatically set as .log . | |
maxlen | int | The maximal number of records. Each record would be saved as one file. | |
tmp_id | str | The identifier for the temporary file. Each process should holds one unique id. A conflict id may cause the written flows from different processes to interrupt each other. |
Methods
close
fbuf.close(exc: BaseException | None = None)
Close the IO. This method only takes effects once. The second call will do nothing.
Requires
Argument | Type | Required | |
---|---|---|---|
exc | BaseException | If exc is not None , will call send_exc() before closing the buffer. Otherwise, call new_line() . |
clear
fbuf.clear()
Clear the whole buffer.
This method would search and remove all log files, including the temporary file. However, the lock files would not be removed. A typical usage of this method is to clear files only in the main process.
new_line
fbuf.new_line()
Clear the current temporary buffer, and manually trigger a new line signal. If the current temporary buffer contains data, the data would be moved to the storage, and a new line would be created. If the current temporary buffer is already a new line, do nothing.
This method is equivalent to
if (last line is not empty):
write('\n')
flush
fbuf.flush()
Flush the current written line stream (temporary buffer).
This method does nothing, because the file-based buffer does not need to flush the temporary buffer.
parse_lines
fbuf.parse_lines(lines: Sequence[str])
Parse the lines.
This method would be triggered when the new lines are written by write()
method. The default behavior is writting the lines to the log files.
Users could inherit this method and override it with their customized parsing method, like regular expression searching. The default behavior is implemented by the following private method:
fbuf.__update_records(lines)
Requires
Argument | Type | Required | |
---|---|---|---|
lines | Sequence[str] | A sequence of strs to be written in the log files. Users could catch any line item, and extract any specific information from it. |
read
lines: Sequence[str] = fbuf.read(size: int | None = None)
Fetch the stored record items from the buffer. Using the read()
method is process-safe and would not influence the cursor of write()
method.
If the current written line is not blank, the read()
method would regard it as the last record item.
Requires
Argument | Type | Required | |
---|---|---|---|
size | int | None | If set None , would return the whole storage. If set a int value, would return the latest size items. |
Returns
Argument | Type | |
---|---|---|
lines | Sequence[str] | The returned messages. The messages have been already split by the line breaks. |
write
n_bytes: int = fbuf.write(data: str)
The writting method of the buffer. The source data is the same as that of a text-based IO. Each time when data
contains a line break, a new record item would be pushed in the storage. The write()
method is process-safe.
Requires
Argument | Type | Required | |
---|---|---|---|
data | str | The written messages. The line breaks would be detected automatically. |
Returns
Argument | Type | |
---|---|---|
n_bytes | int | The number of bytes that would be written to the buffer. This value is counted from the input data directly. |
send_exc
fbuf.send_exc(exc: BaseException)
Send an exception/warning object to the records.
The object will be written as a long string with its traceback. Even if the long string contains line break, this item will be still viewed as one record.
Requires
Argument | Type | Required | |
---|---|---|---|
exc | BaseException | The exception object to be saved in the records. |
fileno
fbuf.fileno() -> Never
Return the file ID.
This buffer will not use file ID, so this method will raise an OSError
.
isatty
is_atty: False = fbuf.fileno()
Whether the stream is connected to terminal/TTY. Return False
.
Returns
Argument | Type | |
---|---|---|
is_atty | bool | Always False . |
readable
is_readable: bool = fbuf.readable()
Whether the stream is readable. The stream is readable as long as the buffer is not closed.
If the stream is not readable, calling read()
will raise an OSError
.
Returns
Argument | Type | |
---|---|---|
is_readable | bool | Return True if the buffer is not closed. Otherwise, return False . |
writable
is_writable: bool = fbuf.writable()
Whether the stream is writable. The stream is writable as long as the buffer is not closed.
If the stream is not writable, calling write()
will raise an OSError
.
Returns
Argument | Type | |
---|---|---|
is_writable | bool | Return True if the buffer is not closed. Otherwise, return False . |
seekable
is_seekable: False = fbuf.seekable()
Whether the stream support random access. This buffer does not.
Returns
Argument | Type | |
---|---|---|
is_seekable | bool | Always False . |
seek
fbuf.seek() -> Never
Will raise an OSError
since this buffer does not support random access.
Properties
maxlen
maxlen: int = fbuf.maxlen
The maximal length (number of lines) of the buffer.
closed
is_closed: bool = fbuf.closed
Check whether the buffer has been closed.
Operators
__len__
n_buffer_items: int = len(fbuf)
Number of lines/items in the buffer.
Example
Synchronize messages by shared log files
- Codes
- Results
import os
import multiprocessing
from syncstream.file import LineFileBuffer
def f(f_path, f_len=10):
with LineFileBuffer(f_path, maxlen=f_len, tmp_id=os.getpid()):
print("example")
if __name__ == "__main__":
f_path = os.path.join("logs", "test_file")
f_len = 10
with multiprocessing.Pool(4) as p:
p.starmap(f, tuple((f_path, f_len) for _ in range(4)))
fbuf = LineFileBuffer(f_path, maxlen=f_len, tmp_id=os.getpid())
print(fbuf.read())
('example', 'example', 'example', 'example')