LineProcBuffer
类源码
pbuf = LineProcBuffer(maxlen: int = 20)
进程安全的行缓存。
带有最大存储长度的轮换缓存。该缓存作为基础LineBuffer
的扩展版实现。其可用于多进程场景。使用该缓存提供的共享队列,可以确保信息在进程之间同步。要了解如何使用该缓存,可以查阅本页的范例,或是指南。
别名
该类可以按以下方式之一获取
import syncstream
syncstream.LineProcBuffer
syncstream.mproc.LineProcBuffer
参数
参数 | 类型 | 必选 | |
---|---|---|---|
maxlen | int | 所存储的最大行数。该值和deque.maxlen 的含义相同。 |
方法
close
pbuf.close()
关闭IO。该方法只会生效一次。第二次调用则无任何效果。
clear
pbuf.clear()
清空整个缓存。
该方法会清空该缓存内的整个存储空间,以及当前正在写的最后一行的流。 然而,该方法不会清空该缓存的任何镜像或拷贝。该方法是线程安全的,且应当永远可以成功执行。
stop_all_mirrors
pbuf.stop_all_mirrors()
向所有的mirror发送中断信号。
该操作用于安全中断子进程。然而,该方法并不会触发TERMINATE
或是KILL
信号。取而代之的是,每个接收到中断信号的mirror,都会触发一个StopIteartion
异常。针对中断信号的检查会在每次调用mirror的write()
方法时发生 。透过这种方法,绝大多数程序都会被中断。
由于该方法不会发送TERMINATE
或KILL
信号,它并不能保证进程会立刻结束。若用户决意使用该方法,请确保正确地使用LineProcMirror.send_error()
捕获了StopIteration
。该异常不会被自动捕获。若用户不去捕获该异常,主进程将会死锁在wait()
上。
force_stop
pbuf.force_stop()
强制停止缓存。
调用该方法,将会导致receive()
强迫返回False
。此后,所有向该缓存发送信息的镜像都会被分离(detached)。
需要注意的是,该方法并不安全,而且有可能造成带有镜像的子进程阻塞。如无必要,更推荐使用stop_all_mirrors()
、而非此方法。
调用该方法就意味着、该缓存的多进程功能被释放,除非再调用self.reset_states()
。
如果用户打算将所有的子进程透过Process.terminate()
或Process.kill()
中断,则建议使用该方法停止主缓存。
reset_states
pbuf.stop_all_mirrors()
重置缓存的状态。
在与mirror互动的过程中,缓存的状态有可能会发生变化。若用户想要复用缓存、并开启一组新的子进程,请使用该方法。
receive
is_live: bool = pbuf.receive()
从mirror接收一条信息。
该方法会从进程安全队列里获取一条信息,并将信息写入到线程安全缓存中。所获取的信息将会保存在存储区内。
输出
参数 | 类型 | |
---|---|---|
is_live | bool | 结束标志。若该值为True ,这代表该缓存仍然需要从mirror中接收更多信息。若为False ,则表明从所有mirror来的关闭信号,都已经收到。 |
wait
pbuf.wait()
阻塞的等待方法。该方法透过以下方式简单地实现:
while pbuf.receive():
pass
该方法会令当前线程保持阻塞,直到缓存从所有mirror中接收到了所有信息。
parse_lines
pbuf.parse_lines(lines: Sequence[str])
处理行。
每次write()
方法将要向缓存写入新行时,调用此方法(异常和警告地写入不会调用该方法)。默认的行为是向存储空间添加一条新的信息。
用户自行定制形如“正则表达式搜索”的处理方法、并以此继承、重载该方法。该方法的默认行为透过以下私有方法实现:
pbuf.storage.extend(lines)
输入
参数 | 类型 | 必选 | |
---|---|---|---|
lines | Sequence[str] | 要写入存储空间的字符串序列。用户可以捕获任一行,并从中提取某种特定的信息。 |
read
lines: Sequence[str] = pbuf.read(size: int | None = None)
从缓存中获取所存储的各项记录。read()
方法是线程安全的,并且不会干扰write()
方法的指针。
若当前正在写入的行不为空,调用该方法时,则会将其视为最后一条记录。
输入
参数 | 类型 | 必选 | |
---|---|---|---|
size | int | None | 若设为None ,则会返回整个存储区。若设为一个整数,则会返回最后size 条信息。 |
输出
参数 | 类型 | |
---|---|---|
lines | Sequence[str] | 所返回的信息。这些信息已经按断行符分拆过。 |
write
该类不允许使用该方法,这是由于write()
只能由mirror使用。
属性
mirror
mirror: LineProcMirror = pbuf.mirror
获取缓存的一个mirror。
缓存不能直接在子进程中使用。取而代之的是,按照master/slave模式使用buffer/mirror。属性pbuf.mirror
是LineProcMirror
的实例。这种模式用来在子进程中提供缓存的进程安全镜像。
初始化之后,无法修改该属性。
n_mirrors
n_mirrors: int = pbuf.n_mirrors
当前存活的镜像。缓存需要持续地获取信息,直到n_mirrors
变为0。每当调用pbuf.mirror
一次时,该值就会自增1。
maxlen
maxlen: int | None = pbuf.maxlen
缓存中的最大长度(即行数)。
若该值为None
,则表明缓存没有最大长度限制。
closed
is_closed: bool = pbuf.closed
检查缓存是否已经被关闭。
运算符
__len__
n_buffer_items: int = len(pbuf)
缓存中的行/条目数。
范例
在进程之间同步信息
- 代码
- 结果
import multiprocessing
from syncstream.mproc import LineProcBuffer, LineProcMirror
def f(buffer: LineProcMirror):
with buffer:
print('example')
if __name__ == '__main__':
pbuf = LineProcBuffer(maxlen=10)
with multiprocessing.Pool(4) as p:
p.map_async(f, tuple(pbuf.mirror for _ in range(4)))
pbuf.wait()
print(pbuf.read())
('example', 'example', 'example', 'example')