跳到主要内容
版本:1.2.2

LineHostBuffer

源码

hbuf = LineHostBuffer(
api_route: str = "/sync-stream",
endpoint: str | None = None,
maxlen: int = 2,
)

透过网络服务,实现并提供以行为单元的缓存。

轮换缓存需要设置一个最大存储区长度。该缓存是基础缓存LineBuffer、针对多设备所设计的扩展版。其支持单服务端-多客户端模式,透过网络服务实现同步。要了解如何使用该缓存,可以查阅本页的范例,或是指南

别名

该类可以按以下方式之一获取

import syncstream


syncstream.LineHostBuffer
syncstream.host.LineHostBuffer

参数

参数类型必选
说明
api_routestrAPI的路由路径。
endpointstr | NoneAPI的endpoint名称。若设为None,则会从api_route推断endpoint名称。
maxlenint所存储的最大行数。该值和deque.maxlen的含义相同。

方法

close

hbuf.close(exc: BaseException | None = None)

关闭 IO。该方法只会生效一次。第二次调用则无任何效果。


read_serialized

lines: tuple[str | SerializedMessage, ...] = hbuf.read_serialized(size: int | None = None)

读取信息(序列化)。

read(...)的功能相同。不过,该方法返回的所有数据都是已经序列化过、可以和JSON兼容的数据。

该方法用来提供查询服务。

输入

参数类型必选
说明
sizeint | None若设为None,则会返回整个存储区。若设为一个整数,则会返回最后size条信息。

输出

参数类型
说明
linestuple[str | SerializedMessage, ...]所获取的多项记录信息。结果按照先入先出的顺序排序。

serve

hbuf.serve(api: flask.Flask)

向Flask应用载入服务。

输入

参数类型必选
说明
apiflask.FlaskFlask对象。所有与该LineHostBuffer关联的网络服务、都会注册到Flask对象上。

stop_all_mirrors

flag: bool = hbuf.stop_all_mirrors()

向所有的mirror发送中断信号。

该操作用于安全中断进程。它并不能保证进程会立刻结束。针对每个mirror,在每次要写信息的时候,检查中断信号。

若用户打算使用该方法,请确保在进程中捕获了StopIteration。该异常不会回送到缓存中。

输出

参数类型
说明
flagbool若中断成功,返回True。若中断遭到了拒绝,返回False。若服务端有任何异常,抛出错误。

reset_states

flag: bool = hbuf.reset_states()

重置缓存的状态。

若需要复用缓存,则应当调用该方法。

输出

参数类型
说明
flagbool若重置成功,返回True。若重置遭到了拒绝,返回False。若服务端有任何异常,抛出错误。

运算符

__len__

n_buffer_items: int = len(hbuf)

缓存中的行/条目数。

服务

向Flask应用注册本实例后,则会启用以下服务,其中加粗的参数是必需的。

<api_route>

基础API。使用它们来维护用来保存各条信息的存储区域。

方法
输入
响应
描述
GET
  • n: 所要读取的信息条数。若不设置,返回整个存储区。
  • message: 文字响应。若该结果有效,其值应当为success
  • data: 返回信息构成的列表。
获取多条信息。与LineBuffer.read()相若。
POST
  • type: 信息的类型。可以是strerrorwarning,或close
  • data: 从LineHostMirror获取的信息数据字典。
  • message: 文字响应。若该结果有效,其值应当为success
将来自远端的的数据写入存储。用户不应当直接使用该方法,这是因为该方法是为了和LineHostMirror联动设计的。
DELETE
-
  • message: 文字响应。若该结果有效,其值应当为success
清理存储区,但该方法不会重置缓存的状态。

<api_route>-state

用来维护缓存状态的API。

方法
输入
响应
描述
GET
  • state: 要读取的状态名称。
  • message: 文字响应。若该结果有效,其值应当为success
  • data: 所需的状态值。
获取某个特定的状态值。
POST
  • state: 要写入的状态名称。
  • value: 用来修改状态的值。
  • message: 文字响应。若该结果有效,其值应当为success
修改某个特定的状态值。
DELETE
-
  • message: 文字响应。若该结果有效,其值应当为success
将全部状态重置为默认值。

不同的客户端可以共享相同的状态。每次客户端和服务端互动时,客户端会使用GET方法来更新其状态。某些状态可以用来控制客户端的行为。例如,发送一个带有state=closedvalue=truePOST,和调用LineProcBuffer.stop_all_mirrors()具有相同的意义。

范例

在网络服务之间同步信息

sync_by_web_services.py
import multiprocessing
import flask
from syncstream.host import LineHostMirror, LineHostBuffer


def f(address):
with LineHostMirror(address=address, timeout=5):
print('example')


app = flask.Flask(__name__)
hbuf = LineHostBuffer('/sync-stream', maxlen=10)
hbuf.serve(app)


@app.route('/test', methods=['GET'])
def another_service():
address = 'http://localhost:5000/sync-stream'
with multiprocessing.Pool(4) as p:
p.map(f, tuple(address for _ in range(4)))
hbuf_items = hbuf.read()
return {'message': 'success', 'items': hbuf_items}, 200


if __name__ == '__main__':
app.run(host='localhost', port=5000) # 运行Flask服务。