跳到主要内容
版本:1.2.2

LineProcBuffer

源码

pbuf = LineProcBuffer(maxlen: int = 20)

进程安全的行缓存。

带有最大存储长度的轮换缓存。该缓存作为基础LineBuffer的扩展版实现。其可用于多进程场景。使用该缓存提供的共享队列,可以确保信息在进程之间同步。要了解如何使用该缓存,可以查阅本页的范例,或是指南

别名

该类可以按以下方式之一获取

import syncstream


syncstream.LineProcBuffer
syncstream.mproc.LineProcBuffer

参数

参数类型必选
说明
maxlenint所存储的最大行数。该值和deque.maxlen的含义相同。

方法

close

pbuf.close()

关闭IO。该方法只会生效一次。第二次调用则无任何效果。


clear

pbuf.clear()

清空整个缓存。

该方法会清空该缓存内的整个存储空间,以及当前正在写的最后一行的流。然而,该方法不会清空该缓存的任何镜像或拷贝。该方法是线程安全的,且应当永远可以成功执行。


stop_all_mirrors

pbuf.stop_all_mirrors()

向所有的mirror发送中断信号。

该操作用于安全中断子进程。然而,该方法并不会触发TERMINATE或是KILL信号。取而代之的是,每个接收到中断信号的mirror,都会触发一个StopIteartion异常。针对中断信号的检查会在每次调用mirror的write()方法时发生。透过这种方法,绝大多数程序都会被中断。

危险

由于该方法不会发送TERMINATEKILL信号,它并不能保证进程会立刻结束。若用户决意使用该方法,请确保正确地使用LineProcMirror.send_error()捕获了StopIteration。该异常不会被自动捕获。若用户不去捕获该异常,主进程将会死锁在wait()上。


force_stop

pbuf.force_stop()

强制停止缓存。

调用该方法,将会导致receive()强迫返回False。此后,所有向该缓存发送信息的镜像都会被分离(detached)。

需要注意的是,该方法并不安全,而且有可能造成带有镜像的子进程阻塞。如无必要,更推荐使用stop_all_mirrors()、而非此方法。

调用该方法就意味着、该缓存的多进程功能被释放,除非再调用self.reset_states()

如果用户打算将所有的子进程透过Process.terminate()Process.kill()中断,则建议使用该方法停止主缓存。


reset_states

pbuf.stop_all_mirrors()

重置缓存的状态。

在与mirror互动的过程中,缓存的状态有可能会发生变化。若用户想要复用缓存、并开启一组新的子进程,请使用该方法。


receive

is_live: bool = pbuf.receive()

从mirror接收一条信息。

该方法会从进程安全队列里获取一条信息,并将信息写入到线程安全缓存中。所获取的信息将会保存在存储区内。

输出

参数类型
说明
is_livebool结束标志。若该值为True,这代表该缓存仍然需要从mirror中接收更多信息。若为False,则表明从所有mirror来的关闭信号,都已经收到。

wait

pbuf.wait()

阻塞的等待方法。该方法透过以下方式简单地实现:

while pbuf.receive():
pass

该方法会令当前线程保持阻塞,直到缓存从所有mirror中接收到了所有信息。


parse_lines

pbuf.parse_lines(lines: Sequence[str])

处理行。

每次write()方法将要向缓存写入新行时,调用此方法(异常和警告地写入不会调用该方法)。默认的行为是向存储空间添加一条新的信息。

用户自行定制形如“正则表达式搜索”的处理方法、并以此继承、重载该方法。该方法的默认行为透过以下私有方法实现:

pbuf.storage.extend(lines)

输入

参数类型必选
说明
linesSequence[str]要写入存储空间的字符串序列。用户可以捕获任一行,并从中提取某种特定的信息。

read

lines: Sequence[str] = pbuf.read(size: int | None = None)

从缓存中获取所存储的各项记录。read()方法是线程安全的,并且不会干扰write()方法的指针。

若当前正在写入的行不为空,调用该方法时,则会将其视为最后一条记录。

输入

参数类型必选
说明
sizeint | None若设为None,则会返回整个存储区。若设为一个整数,则会返回最后size条信息。

输出

参数类型
说明
linesSequence[str]所返回的信息。这些信息已经按断行符分拆过。

write

警告

该类不允许使用该方法,这是由于write()只能由mirror使用。

属性

mirror

mirror: LineProcMirror = pbuf.mirror

获取缓存的一个mirror。

缓存不能直接在子进程中使用。取而代之的是,按照master/slave模式使用buffer/mirror。属性pbuf.mirrorLineProcMirror的实例。这种模式用来在子进程中提供缓存的进程安全镜像。

初始化之后,无法修改该属性。


n_mirrors

n_mirrors: int = pbuf.n_mirrors

当前存活的镜像。缓存需要持续地获取信息,直到n_mirrors变为0。每当调用pbuf.mirror一次时,该值就会自增1。


maxlen

maxlen: int | None = pbuf.maxlen

缓存中的最大长度(即行数)。

若该值为None,则表明缓存没有最大长度限制。


closed

is_closed: bool = pbuf.closed

检查缓存是否已经被关闭。

运算符

__len__

n_buffer_items: int = len(pbuf)

缓存中的行/条目数。

范例

在进程之间同步信息

sync_among_processes.py
import multiprocessing
from syncstream.mproc import LineProcBuffer, LineProcMirror


def f(buffer: LineProcMirror):
with buffer:
print('example')


if __name__ == '__main__':
pbuf = LineProcBuffer(maxlen=10)
with multiprocessing.Pool(4) as p:
p.map_async(f, tuple(pbuf.mirror for _ in range(4)))
pbuf.wait()
print(pbuf.read())