LineHostMirror
类上下文源码
buffer = LineHostMirror(
address: str,
aggressive: bool = False,
timeout: int | None = None,
)
with buffer:
...
网络服务缓存的镜像。
该镜像是针对LineHostBuffer
所提供服务的客户端。要使用该镜像,需要独立地进行初始化,并将其用于处理要写入缓存的行。不同于LineProcMirror
,各个独立的mirror之间不存在一个共享的队列。
别名
该类可以按以下方式之一获取
import syncstream
syncstream.LineHostMirror
syncstream.host.LineHostMirror
参数
参数 | 类型 | 必选 | |
---|---|---|---|
address | str | 完整 URL,包括LineHostBuffer 服务的API名。 | |
aggressive | bool | 若设为True ,则启用积极模式,亦即是说,只要有新的数据写入该mirror,就会立即传递给缓存。若设为False ,则只会在新写一整行的时候、才传递给缓存。 | |
timeout | int | None | 网络同步事件的时限。若不设置,则执行同步操作将持续阻塞当前进程。 |
方法
close
buffer.close(exc: BaseException | None = None)
关闭IO。该方法只会生效一次。第二次调用则无任何效果。
输入
参数 | 类型 | 必选 | |
---|---|---|---|
exc | BaseException | 若exc 非None ,则在关闭缓存前调用send_error() 。否则,调用send_eof() 。 |
clear
buffer.clear()
清空整个缓存。
该方法会清空mirror的临时缓存。若mirror按aggresive
模式工作,则临时缓存不会用到。在此情形下,该方法将不会对mirror产生任何影响。
该方法是线程安全的。且不同进程的镜像并不会共享相同的临时缓存。
new_line
buffer.new_line()
清空当前的临时缓存,并人为地触发一个“新行”信号。若当前的临时缓存包含数据,则会先将数据移动到存储区,然后再创建新行。若当前的临时缓存的已经是新行,则不做任何处理。
该方法等价于
if buffer.last_line.tell() > 0:
write('\n')
flush
buffer.flush()
刷新当前正在写入行的数据流(临时缓存)。
send_eof
buffer.send_eof()
发送安全关闭信号。
应当在子进程的结尾使用该信号。该方法用来通知缓存子进程的工作已经安全完成。
- 在每个子进程的结尾,应当总是使用
send_eof()
或send_error()
。否则,临时缓存里的数据可能不会正确地移交到(远端的)存储区内。 - 在每个子进程的结尾,应当只使用一次
send_eof()
或send_error()
。例如,若调用过send_error()
,则不应再调用send_eof()
。从同一子进程发送超过一个关闭信号,可能引发严重故障。
send_error
buffer.send_error(error: Exception)
向主缓存发送异常对象。
发送缓存过的异常对象。该方法需要用于try / except块中。错误对象将会在主缓存中捕获为一条信息。
参见send_eof()
。
输入
参数 | 类型 | 必选 | |
---|---|---|---|
error | Exception | 要发送的异常对象。 |
send_warning
buffer.send_warning(warning: Warning)
向主缓存发送标准警告对象。
发送缓存过的警告对象。建议在每当需要写入警告日志时、使用该方法。警告对象将会在主缓存中捕获为一条信息。
该方法只能捕获标准库定义的警告对象。某些不同的警告对象,例如logging.warning
所写的warning信息,则无法被该方法收集。
输入
参数 | 类型 | 必选 | |
---|---|---|---|
warning | Warning | 要发送的警告对象。 |
send_data
buffer.send_data(data: str)
向主缓存发送数据。
该方法会触发主缓存的POST服务请求,并发送str
数据。
该方法被其他方法隐式调用,故用户不应使用该方法。
输入
参数 | 类型 | 必选 | |
---|---|---|---|
data | str | 要发送到主缓存的str 。 |
check_states
buffer.check_states()
检查当前的缓存状态。
目前,该方法只用来检查服务是否已关闭。
该方法被其他方法隐式调用,故用户不应使用该方法。
read
line: str = buffer.read(size: int | None = None)
该方法只会读取、当前保留在mirror的临时缓存内的数据。若属性aggressive
为True
,则read()
方法将总是返回空值。
输入
参数 | 类型 | 必选 | |
---|---|---|---|
size | int | If set None , would return the whole storage. If set an int value, would return the last size items. |
输出
参数 | 类型 | |
---|---|---|
line | str | 当前mirror的临时存储结果。在临时存储内的数据还没有发送到主缓存。 |
write
n_bytes: int = buffer.write(data: str)
写缓存的方法。源数据的形式与文字IO的数据形式相同。若属性aggressive
为True
,每次调用write()
时,都将会使得流内的数据发送到主缓存。否则,只在每当data
包含断行符的时候、才会将流的数据发送到主缓存。
该方法是线程安全的。但信息同步的过程是主机安全的。
输入
参数 | 类型 | 必选 | |
---|---|---|---|
data | str | 要写入的信息。会自动检测其中包含的断行符。 |
输出
参数 | 类型 | |
---|---|---|
n_bytes | int | 所要写入缓存的byte数目。该值直接从输入data 计算。 |
fileno
buffer.fileno() -> Never
返回文件ID。
由于该缓存不使用文件ID,故而该方法会抛出OSError
。
isatty
is_atty: False = buffer.fileno()
流是否连接到了终端/TTY上。返回False
。
输出
参数 | 类型 | |
---|---|---|
is_atty | bool | 恒为False . |
readable
is_readable: bool = buffer.readable()
流是否可读。只要流还未关闭,其就是可读的。
若流不再可读,调用read()
会抛出OSError
。
输出
参数 | 类型 | |
---|---|---|
is_readable | bool | 若缓存未关闭,返回True 。否则,返回False 。 |
writable
is_writable: bool = buffer.writable()
流是否可写。只要流还未关闭,其就是可读的。
若流不再可写,调用read()
会抛出OSError
。
输出
参数 | 类型 | |
---|---|---|
is_writable | bool | 若缓存未关闭,返回True 。否则,返回False 。 |
seekable
is_seekable: False = buffer.seekable()
流是否支持随机访问。该缓存不支持。
输出
参数 | 类型 | |
---|---|---|
is_seekable | bool | 恒为False 。 |
seek
buffer.seek() -> Never
由于缓存不支持随机访问,调用该方法会抛出OSError
。
属性
aggressive
is_aggressive: bool = buffer.aggressive
积极模式。
该模式只能在初始化的时候设置。若设为True
,则积极模式启用,mirror将会在每当收到新的数据时,将信息传递给主缓存。若设为False
,则只会在新写一整行的时候、才传递给缓存。
headers
headers: ChainMap[str, str] = buffer.headers
网络连接的默认请求头。这些头信息将会用于每个POST
方法中。
headers_get
headers_get: ChainMap[str, str] = buffer.headers_get
网络连接的默认请求头。这些头信息将会用于每个GET
方法中。
closed
is_closed: bool = buffer.closed
检查缓存是否已经被关闭。
范例
使用mirror向服务端缓存写入数据
- 代码(服务端)
- 代码(客户端)
- 结果
使用LineHostBuffer
启动一个带有空缓存的服务。
import flask
from syncstream.host import LineHostBuffer
app = flask.Flask(__name__)
hbuf = LineHostBuffer("/sync-stream", maxlen=10)
hbuf.serve(app)
@app.route("/test", methods=["GET"])
def load_data():
hbuf_items = hbuf.read()
return {"message": "success", "items": hbuf_items}, 200
if __name__ == "__main__":
app.run(host="localhost", port=5000) # 运行Flask服务。
当LineHostBuffer
正在运行时,可以使用以下代码来写入缓存。
from syncstream import LineHostMirror
with LineHostMirror('http://localhost:5000/sync-stream'):
print("test line 1")
print("test line 2")
print("test line 3")
访问
http://localhost:5000/test
响应应当为
{
"items": ["test line 1", "test line 2", "test line 3"],
"message": "success"
}