跳到主要内容
版本:1.2.2

LineHostMirror

上下文源码

buffer = LineHostMirror(
address: str,
aggressive: bool = False,
timeout: int | None = None,
)

with buffer:
...

网络服务缓存的镜像。

该镜像是针对LineHostBuffer所提供服务的客户端。要使用该镜像,需要独立地进行初始化,并将其用于处理要写入缓存的行。不同于LineProcMirror,各个独立的mirror之间不存在一个共享的队列。

别名

该类可以按以下方式之一获取

import syncstream


syncstream.LineHostMirror
syncstream.host.LineHostMirror

参数

参数类型必选
说明
addressstr完整 URL,包括LineHostBuffer服务的API名。
aggressivebool若设为True,则启用积极模式,亦即是说,只要有新的数据写入该mirror,就会立即传递给缓存。若设为False,则只会在新写一整行的时候、才传递给缓存。
timeoutint | None网络同步事件的时限。若不设置,则执行同步操作将持续阻塞当前进程。

方法

close

buffer.close(exc: BaseException | None = None)

关闭IO。该方法只会生效一次。第二次调用则无任何效果。

输入

参数类型必选
说明
excBaseExceptionexcNone,则在关闭缓存前调用send_error()。否则,调用send_eof()

clear

buffer.clear()

清空整个缓存。

该方法会清空mirror的临时缓存。若mirror按aggresive模式工作,则临时缓存不会用到。在此情形下,该方法将不会对mirror产生任何影响。

该方法是线程安全的。且不同进程的镜像并不会共享相同的临时缓存。


new_line

buffer.new_line()

清空当前的临时缓存,并人为地触发一个“新行”信号。若当前的临时缓存包含数据,则会先将数据移动到存储区,然后再创建新行。若当前的临时缓存的已经是新行,则不做任何处理。

该方法等价于

if buffer.last_line.tell() > 0:
write('\n')

flush

buffer.flush()

刷新当前正在写入行的数据流(临时缓存)。


send_eof

buffer.send_eof()

发送安全关闭信号。

应当在子进程的结尾使用该信号。该方法用来通知缓存子进程的工作已经安全完成。

危险
  • 在每个子进程的结尾,应当总是使用send_eof()send_error()。否则,临时缓存里的数据可能不会正确地移交到(远端的)存储区内。
  • 在每个子进程的结尾,应当只使用一次send_eof()send_error()。例如,若调用过send_error(),则不应再调用send_eof()。从同一子进程发送超过一个关闭信号,可能引发严重故障。

send_error

buffer.send_error(error: Exception)

向主缓存发送异常对象。

发送缓存过的异常对象。该方法需要用于try / except块中。错误对象将会在主缓存中捕获为一条信息。

危险

参见send_eof()

输入

参数类型必选
说明
errorException要发送的异常对象。

send_warning

buffer.send_warning(warning: Warning)

向主缓存发送标准警告对象。

发送缓存过的警告对象。建议在每当需要写入警告日志时、使用该方法。警告对象将会在主缓存中捕获为一条信息。

警告

该方法只能捕获标准库定义的警告对象。某些不同的警告对象,例如logging.warning所写的warning信息,则无法被该方法收集。

输入

参数类型必选
说明
warningWarning要发送的警告对象。

send_data

buffer.send_data(data: str)

向主缓存发送数据。

该方法会触发主缓存的POST服务请求,并发送str数据。

该方法被其他方法隐式调用,故用户不应使用该方法。

输入

参数类型必选
说明
datastr要发送到主缓存的str

check_states

buffer.check_states()

检查当前的缓存状态。

目前,该方法只用来检查服务是否已关闭。

该方法被其他方法隐式调用,故用户不应使用该方法。


read

line: str = buffer.read(size: int | None = None)

该方法只会读取、当前保留在mirror的临时缓存内的数据。若属性aggressiveTrue,则read()方法将总是返回空值。

输入

参数类型必选
说明
sizeintIf set None, would return the whole storage. If set an int value, would return the last size items.

输出

参数类型
说明
linestr当前mirror的临时存储结果。在临时存储内的数据还没有发送到主缓存。

write

n_bytes: int = buffer.write(data: str)

写缓存的方法。源数据的形式与文字IO的数据形式相同。若属性aggressiveTrue,每次调用write()时,都将会使得流内的数据发送到主缓存。否则,只在每当data包含断行符的时候、才会将流的数据发送到主缓存。

该方法是线程安全的。但信息同步的过程是主机安全的。

输入

参数类型必选
说明
datastr要写入的信息。会自动检测其中包含的断行符。

输出

参数类型
说明
n_bytesint所要写入缓存的byte数目。该值直接从输入data计算。

fileno

buffer.fileno() -> Never

返回文件ID。

由于该缓存不使用文件ID,故而该方法会抛出OSError


isatty

is_atty: False = buffer.fileno()

流是否连接到了终端/TTY上。返回False

输出

参数类型
说明
is_attybool恒为False.

readable

is_readable: bool = buffer.readable()

流是否可读。只要流还未关闭,其就是可读的。

若流不再可读,调用read()会抛出OSError

输出

参数类型
说明
is_readablebool若缓存未关闭,返回True。否则,返回False

writable

is_writable: bool = buffer.writable()

流是否可写。只要流还未关闭,其就是可读的。

若流不再可写,调用read()会抛出OSError

输出

参数类型
说明
is_writablebool若缓存未关闭,返回True。否则,返回False

seekable

is_seekable: False = buffer.seekable()

流是否支持随机访问。该缓存不支持。

输出

参数类型
说明
is_seekablebool恒为False

seek

buffer.seek() -> Never

由于缓存不支持随机访问,调用该方法会抛出OSError

属性

aggressive

is_aggressive: bool = buffer.aggressive

积极模式。

该模式只能在初始化的时候设置。若设为True,则积极模式启用,mirror将会在每当收到新的数据时,将信息传递给主缓存。若设为False,则只会在新写一整行的时候、才传递给缓存。


headers

headers: ChainMap[str, str] = buffer.headers

网络连接的默认请求头。这些头信息将会用于每个POST方法中。


headers_get

headers_get: ChainMap[str, str] = buffer.headers_get

网络连接的默认请求头。这些头信息将会用于每个GET方法中。


closed

is_closed: bool = buffer.closed

检查缓存是否已经被关闭。

范例

使用mirror向服务端缓存写入数据

使用LineHostBuffer启动一个带有空缓存的服务。

start_a_server.py
import flask
from syncstream.host import LineHostBuffer


app = flask.Flask(__name__)
hbuf = LineHostBuffer("/sync-stream", maxlen=10)
hbuf.serve(app)


@app.route("/test", methods=["GET"])
def load_data():
hbuf_items = hbuf.read()
return {"message": "success", "items": hbuf_items}, 200


if __name__ == "__main__":
app.run(host="localhost", port=5000) # 运行Flask服务。