跳到主要内容
版本:0.3.x

LineProcBuffer

源码

pbuf = syncstream.mproc.LineProcBuffer(maxlen: int = 20)

进程安全的行缓存。

带有最大存储长度的轮换缓存。该缓存作为基础LineBuffer的扩展版实现。其可用于多进程场景。使用该缓存提供的共享队列,可以确保信息在进程之间同步。要了解如何使用该缓存,可以查阅本页的范例,或是指南

别名

该类可以按以下方式之一获取

import syncstream


syncstream.LineProcBuffer
syncstream.mproc.LineProcBuffer

参数

输入

参数类型必选
说明
maxlenint所存储的最大行数。该值和deque.maxlen的含义相同。

方法

clear

pbuf.clear()

清空整个缓存。

该方法会清空该缓存内的整个存储空间,以及当前正在写的最后一行的流。然而,该方法不会清空该缓存的任何镜像或拷贝。该方法是线程安全的,且应当永远可以成功执行。


stop_all_mirrors

pbuf.stop_all_mirrors()

向所有的mirror发送中断信号。

该操作用于安全中断子进程。然而,该方法并不会触发TERMINATE或是KILL信号。取而代之的是,每个接收到中断信号的mirror,都会触发一个StopIteartion异常。针对中断信号的检查会在每次调用mirror的write()方法时发生。透过这种方法,绝大多数程序都会被中断。

危险

由于该方法不会发送TERMINATEKILL信号,它并不能保证进程会立刻结束。若用户决意使用该方法,请确保正确地使用LineProcMirror.send_error()捕获了StopIteration。该异常不会被自动捕获。若用户不去捕获该异常,主进程将会死锁在wait()上。


reset_states

pbuf.stop_all_mirrors()

重置缓存的状态。

在与mirror互动的过程中,缓存的状态有可能会发生变化。若用户想要复用缓存、并开启一组新的子进程,请使用该方法。


receive

is_live: bool = pbuf.receive()

从mirror接收一条信息。

该方法会从进程安全队列里获取一条信息,并将信息写入到线程安全缓存中。所获取的信息将会保存在存储区内。

输出

参数类型
说明
is_livebool结束标志。若该值为True,这代表该缓存仍然需要从mirror中接收更多信息。若为False,则表明从所有mirror来的关闭信号,都已经收到。

wait

pbuf.wait()

阻塞的等待方法。该方法透过以下方式简单地实现:

while pbuf.receive():
pass

该方法会令当前线程保持阻塞,直到缓存从所有mirror中接收到了所有信息。


parse_lines

pbuf.parse_lines(lines: Sequence[str])

处理行。

每次write()方法将要向缓存写入新行时,调用此方法(异常和警告地写入不会调用该方法)。默认的行为是向存储空间添加一条新的信息。

用户自行定制形如“正则表达式搜索”的处理方法、并以此继承、重载该方法。该方法的默认行为透过以下私有方法实现:

pbuf.storage.extend(lines)

输入

参数类型必选
说明
linesSequence[str]要写入存储空间的字符串序列。用户可以捕获任一行,并从中提取某种特定的信息。

read

lines: Sequence[str] = pbuf.read(size: int | None = None)

从缓存中获取所存储的各项记录。read()方法是线程安全的,并且不会干扰write()方法的指针。

若当前正在写入的行不为空,调用该方法时,则会将其视为最后一条记录。

输入

参数类型必选
说明
sizeint | None若设为None,则会返回整个存储区。若设为一个整数,则会返回最后size条信息。

输出

参数类型
说明
linesSequence[str]所返回的信息。这些信息已经按断行符分拆过。

write

警告

该类不允许使用该方法,这是由于write()只能由mirror使用。

属性

mirror

mirror: LineProcMirror = pbuf.mirror

获取缓存的一个mirror。

缓存不能直接在子进程中使用。取而代之的是,按照master/slave模式使用buffer/mirror。属性pbuf.mirrorLineProcMirror的实例。这种模式用来在子进程中提供缓存的进程安全镜像。

初始化之后,无法修改该属性。


n_mirrors

n_mirrors: int = pbuf.n_mirrors

当前存活的镜像。缓存需要持续地获取信息,直到n_mirrors变为0。每当调用pbuf.mirror一次时,该值就会自增1。

范例

在进程之间同步信息

sync_among_processes.py
import contextlib
import multiprocessing
from syncstream.mproc import LineProcBuffer


def f(buffer):
with contextlib.redirect_stdout(buffer):
print('example')
buffer.send_eof()


if __name__ == '__main__':
pbuf = LineProcBuffer(maxlen=10)
with multiprocessing.Pool(4) as p:
p.map_async(f, tuple(pbuf.mirror for _ in range(4)))
pbuf.wait()
print(pbuf.read())