跳到主要内容
版本:0.3.x

LineFileBuffer

源码

fbuf = syncstream.file.LineFileBuffer(
file_path: str,
maxlen: int = 20,
tmp_id: str = "tmp",
)

文件驱动的、以行为单元的缓存句柄。

该缓存为文字流提供了一个轮换存储空间。文字的存储单位不是字符,而是行。存储空间的最大行数是有限的。每条记录将会存储为一个独立的文件。同步透过文件锁实现。

加持了文件锁的句柄、可以共享在不同的进程之间,但这并不是推荐的做法。更好的处理方式,是在每个进程里(若有必要),独立地初始化各自的句柄。

警告

须知该句柄是进程安全、而非线程安全的。换言之,每个进程应当只维护一个独立的LineFileBuffer。该LineFileBuffer不宜在不同的线程、或不同的进程之间共享。

别名

该类可以按以下方式之一获取

import syncstream


syncstream.LineFileBuffer
syncstream.file.LineFileBuffer

参数

输入

参数类型必选
说明
file_pathstr所存储记录的基路径,其指向的位置既包括了存储内容、又包含文件锁。文件的后缀名不会自动设置为.log
maxlenint所记录行数的最大值。每行记录会保存为一个文件。
tmp_idstr临时文件的ID。对于每个进程,应当各自持有独立的ID。若不同的进程持有冲突的ID,则会导致不同进程的写入流之间相互干扰。

方法

clear

fbuf.clear()

清空整个缓存。

该方法会搜索、并移除所有的日志文件,也包括临时文件,但不会移除锁文件。要使用该方法,典型的做法是只在主进程中清空文件。


new_line

fbuf.new_line()

清空当前的临时缓存,并人为地触发一个“新行”信号。若当前的临时缓存包含数据,则会先将数据移动到存储区,然后再创建新行。若当前的临时缓存的已经是新行,则不做任何处理。

该方法等价于

if (last line is not empty):
write('\n')

flush

fbuf.flush()

刷新当前正在写入行的数据流(临时缓存)。

警告

该方法不会产生任何实际效果,因为基于文件的缓存不需要刷新临时缓存。


parse_lines

fbuf.parse_lines(lines: Sequence[str])

处理行。

每次write()方法将要向缓存写入新行时,调用此方法。默认的行为是向日志文件写入行。

用户自行定制形如“正则表达式搜索”的处理方法、并以此继承、重载该方法。该方法的默认行为透过以下私有方法实现:

fbuf.__update_records(lines)

输入

参数类型必选
说明
linesSequence[str]要写入日志文件的字符串序列。用户可以捕获任一行,并从中提取某种特定的信息。

read

lines: Sequence[str] = fbuf.read(size: int | None = None)

从缓存中获取所存储的各项记录。read()方法是进程安全的,并且不会干扰write()方法的指针。

若当前正在写入的行不为空,调用该方法时,则会将其视为最后一条记录。

输入

参数类型必选
说明
sizeint | None若设为None,则会返回整个存储区。若设为一个整数,则会返回最后size条信息。

输出

参数类型
说明
linesSequence[str]所返回的信息。这些信息已经按断行符分拆过。

write

n_bytes: int = fbuf.write(data: str)

写缓存的方法。源数据的形式与文字IO的数据形式相同。每当data包含断行符时,会向存储区置入一条新的记录。write()方法是进程安全的。

输入

参数类型必选
说明
datastr要写入的信息。会自动检测其中包含的断行符。

输出

参数类型
说明
n_bytesint所要写入缓存的byte数目。该值直接从输入data计算。

范例

在共享的日志文件之间同步信息

sync_by_files.py
import os
import contextlib
import multiprocessing
from syncstream.file import LineFileBuffer


def f(f_path, f_len=10):
fbuf = LineFileBuffer(f_path, maxlen=f_len, tmp_id=os.getpid())
with contextlib.redirect_stdout(fbuf):
print('example')
fbuf.new_line()


if __name__ == '__main__':
f_path = os.path.join('logs', 'test_file')
f_len = 10
with multiprocessing.Pool(4) as p:
p.starmap(f, tuple((f_path, f_len) for _ in range(4)))
fbuf = LineFileBuffer(f_path, maxlen=f_len, tmp_id=os.getpid())
print(fbuf.read())