跳到主要内容
版本:1.2.2

LineFileBuffer

上下文源码

fbuf = LineFileBuffer(
file_path: str | os.PathLike,
maxlen: int = 20,
tmp_id: str = "tmp",
)

with fbuf:
...

文件驱动的、以行为单元的缓存句柄。

该缓存为文字流提供了一个轮换存储空间。文字的存储单位不是字符,而是行。存储空间的最大行数是有限的。每条记录将会存储为一个独立的文件。同步透过文件锁实现。

加持了文件锁的句柄、可以共享在不同的进程之间,但这并不是推荐的做法。更好的处理方式,是在每个进程里(若有必要),独立地初始化各自的句柄。

警告

须知该句柄是进程安全、而非线程安全的。换言之,每个进程应当只维护一个独立的LineFileBuffer。该LineFileBuffer不宜在不同的线程、或不同的进程之间共享。

别名

该类可以按以下方式之一获取

import syncstream


syncstream.LineFileBuffer
syncstream.file.LineFileBuffer

参数

参数类型必选
说明
file_pathstr | os.PathLike所存储记录的基路径,其指向的位置既包括了存储内容、又包含文件锁。文件的后缀名不会自动设置为.log
maxlenint所记录行数的最大值。每行记录会保存为一个文件。
tmp_idstr临时文件的ID。对于每个进程,应当各自持有独立的ID。若不同的进程持有冲突的ID,则会导致不同进程的写入流之间相互干扰。

方法

close

fbuf.close(exc: BaseException | None = None)

关闭 IO。该方法只会生效一次。第二次调用则无任何效果。

输入

参数类型必选
说明
excBaseExceptionexcNone,则在关闭缓存前调用send_exc()。否则,调用new_line()

clear

fbuf.clear()

清空整个缓存。

该方法会搜索、并移除所有的日志文件,也包括临时文件,但不会移除锁文件。要使用该方法,典型的做法是只在主进程中清空文件。


new_line

fbuf.new_line()

清空当前的临时缓存,并人为地触发一个“新行”信号。若当前的临时缓存包含数据,则会先将数据移动到存储区,然后再创建新行。若当前的临时缓存的已经是新行,则不做任何处理。

该方法等价于

if (last line is not empty):
write('\n')

flush

fbuf.flush()

刷新当前正在写入行的数据流(临时缓存)。

警告

该方法不会产生任何实际效果,因为基于文件的缓存不需要刷新临时缓存。


parse_lines

fbuf.parse_lines(lines: Sequence[str])

处理行。

每次write()方法将要向缓存写入新行时,调用此方法。默认的行为是向日志文件写入行。

用户自行定制形如“正则表达式搜索”的处理方法、并以此继承、重载该方法。该方法的默认行为透过以下私有方法实现:

fbuf.__update_records(lines)

输入

参数类型必选
说明
linesSequence[str]要写入日志文件的字符串序列。用户可以捕获任一行,并从中提取某种特定的信息。

read

lines: Sequence[str] = fbuf.read(size: int | None = None)

从缓存中获取所存储的各项记录。read()方法是进程安全的,并且不会干扰write()方法的指针。

若当前正在写入的行不为空,调用该方法时,则会将其视为最后一条记录。

输入

参数类型必选
说明
sizeint | None若设为None,则会返回整个存储区。若设为一个整数,则会返回最后size条信息。

输出

参数类型
说明
linesSequence[str]所返回的信息。这些信息已经按断行符分拆过。

write

n_bytes: int = fbuf.write(data: str)

写缓存的方法。源数据的形式与文字 IO 的数据形式相同。每当data包含断行符时,会向存储区置入一条新的记录。write()方法是进程安全的。

输入

参数类型必选
说明
datastr要写入的信息。会自动检测其中包含的断行符。

输出

参数类型
说明
n_bytesint所要写入缓存的byte数目。该值直接从输入data计算。

send_exc

fbuf.send_exc(exc: BaseException)

向记录中发送一个异常或警告对象。

该对象将会和其回溯信息(traceback)一并作为长字符串写入记录中。即便这条记录中包含断行符,这整个对象的信息仍然被当作一条记录。

输入

参数类型必选
说明
excBaseException要写入记录的异常对象。

fileno

fbuf.fileno() -> Never

返回文件 ID。

由于该缓存不使用文件 ID,故而该方法会抛出OSError


isatty

is_atty: False = fbuf.fileno()

流是否连接到了终端/TTY 上。返回False

输出

参数类型
说明
is_attybool恒为False.

readable

is_readable: bool = fbuf.readable()

流是否可读。只要流还未关闭,其就是可读的。

若流不再可读,调用read()会抛出OSError

输出

参数类型
说明
is_readablebool若缓存未关闭,返回True。否则,返回False

writable

is_writable: bool = fbuf.writable()

流是否可写。只要流还未关闭,其就是可读的。

若流不再可写,调用read()会抛出OSError

输出

参数类型
说明
is_writablebool若缓存未关闭,返回True。否则,返回False

seekable

is_seekable: False = fbuf.seekable()

流是否支持随机访问。该缓存不支持。

输出

参数类型
说明
is_seekablebool恒为False

seek

fbuf.seek() -> Never

由于缓存不支持随机访问,调用该方法会抛出OSError

属性

maxlen

maxlen: int = fbuf.maxlen

缓存中的最大长度(即行数)。


closed

is_closed: bool = fbuf.closed

检查缓存是否已经被关闭。

运算符

__len__

n_buffer_items: int = len(fbuf)

缓存中的行/条目数。

范例

在共享的日志文件之间同步信息

sync_by_files.py
import os
import multiprocessing
from syncstream.file import LineFileBuffer


def f(f_path, f_len=10):
with LineFileBuffer(f_path, maxlen=f_len, tmp_id=os.getpid()):
print("example")


if __name__ == "__main__":
f_path = os.path.join("logs", "test_file")
f_len = 10
with multiprocessing.Pool(4) as p:
p.starmap(f, tuple((f_path, f_len) for _ in range(4)))
fbuf = LineFileBuffer(f_path, maxlen=f_len, tmp_id=os.getpid())
print(fbuf.read())