LineHostMirror
ClassContextSource
buffer = LineHostMirror(
address: str,
aggressive: bool = False,
timeout: int | None = None,
)
with buffer:
...
The mirror for the host-safe line-based buffer.
This mirror is the client of the services from LineHostBuffer
. It should be initialized independently, and would be used for managing the lines written to the buffer. Different from LineProcMirror
, the independent mirror does not require shared queue.
Aliases
This class can be acquired by
import syncstream
syncstream.LineHostMirror
syncstream.host.LineHostMirror
Arguments
Argument | Type | Required | |
---|---|---|---|
address | str | The full url, including the API name of the LineHostBuffer services. | |
aggressive | bool | If set True , the aggresive mode would be enabled, where the mirror would send messages to the buffer each time when new data is written. If False , the mirror would communicate with the buffer only when a new line is written. | |
timeout | int | None | The timeout of the web syncholizing events. If not set, the synchronization would block the current process. |
Methods
close
buffer.close(exc: BaseException | None = None)
Close the IO. This method only takes effects once. The second call will do nothing.
Requires
Argument | Type | Required | |
---|---|---|---|
exc | BaseException | If exc is not None , will call send_error() before closing the buffer. Otherwise, call send_eof() . |
clear
buffer.clear()
Clear the whole buffer.
This method would clear the temporary buffer of the mirror. If the mirror works in the aggresive
mode, the temporary buffer would not be used. In this case, this method would not exert any influences to the mirror.
This method is thread-safe. Mirrors in different processes would not share the temporary buffer.
new_line
buffer.new_line()
Clear the current temporary buffer, and manually trigger a new line signal. If the current temporary buffer contains data, the data would be moved to the storage, and a new line would be created. If the current temporary buffer is already a new line, do nothing.
This method is equivalent to
if buffer.last_line.tell() > 0:
write('\n')
flush
buffer.flush()
Flush the current written line stream (temporary buffer).
send_eof
buffer.send_eof()
Send the safely close signal.
The signal should be used at the end of the sub-process. This method is used for informing the buffer that the sub-process has been finished safely.
- Users should always use
send_eof()
orsend_error()
at the end of a sub-process. Otherwise, the temporary data may not be moved into the storage correctly. - Users should only invoke
send_eof()
orsend_error()
once for each sub-process. For example, ifsend_error()
has been invoked,send_eof()
should not be used. Sending more than one close signal from the same sub-process may cause fatal bugs.
send_error
buffer.send_error(error: BaseException)
Send the exception object to the main buffer.
Send the catched error object. This method should always be used in the try / except block. The error object would be captured as an item of the storage in the main buffer.
See send_eof()
.
Requires
Argument | Type | Required | |
---|---|---|---|
error | BaseException | The exception object that would be sent. |
send_warning
buffer.send_warning(warning: Warning)
Send the standard warning object to the main buffer.
Send the catched warning object. It is recommended that this method can be used when logging the warning objects. The warning object would be captured as an item of the storage in the main buffer.
This method could only catch the warning object defined by the stdlib. Some different warning objects, like logging.warning
messages would not be collected by this method.
Requires
Argument | Type | Required | |
---|---|---|---|
warning | Warning | The warning object that would be sent. |
send_data
buffer.send_data(data: str)
Send the data to the main buffer.
This method would fire a POST service of the main buffer, and send the str
data.
This method is used by other methods implicitly, and should not be used by users.
Requires
Argument | Type | Required | |
---|---|---|---|
data | str | An str to be sent to the main buffer. |
check_states
buffer.check_states()
Check the current buffer states.
Currently, this method in only used for checking whether the service is closed.
This method is used by other methods implicitly, and should not be used by users.
read
line: str = buffer.read(size: int | None = None)
This method would only read the current bufferred values in the mirror storage. If the property aggressive
is True
, the read()
method would always return empty value.
Requires
Argument | Type | Required | |
---|---|---|---|
size | int | If set None , would return the whole storage. If set an int value, would return the last size items. |
Returns
Argument | Type | |
---|---|---|
line | str | Current storage of the mirror. The value in the storage has not been sent to the main buffer. |
write
n_bytes: int = buffer.write(data: str)
The writting method of the buffer. The source data is the same as that of a text-based IO. If aggressive
is True
, each call of write()
would make the stream value sent to the main buffer. If not, each time when data
contains a line break, the stream value would be sent to the main buffer.
The method is thread-safe, but the message synchronization is host-safe.
Requires
Argument | Type | Required | |
---|---|---|---|
data | str | The written messages. The line breaks would be detected automatically. |
Returns
Argument | Type | |
---|---|---|
n_bytes | int | The number of bytes that would be written to the buffer. This value is counted from the input data directly. |
fileno
buffer.fileno() -> Never
Return the file ID.
This buffer will not use file ID, so this method will raise an OSError
.
isatty
is_atty: False = buffer.fileno()
Whether the stream is connected to terminal/TTY. Return False
.
Returns
Argument | Type | |
---|---|---|
is_atty | bool | Always False . |
readable
is_readable: bool = buffer.readable()
Whether the stream is readable. The stream is readable as long as the buffer is not closed.
If the stream is not readable, calling read()
will raise an OSError
.
Returns
Argument | Type | |
---|---|---|
is_readable | bool | Return True if the buffer is not closed. Otherwise, return False . |
writable
is_writable: bool = buffer.writable()
Whether the stream is writable. The stream is writable as long as the buffer is not closed.
If the stream is not writable, calling write()
will raise an OSError
.
Returns
Argument | Type | |
---|---|---|
is_writable | bool | Return True if the buffer is not closed. Otherwise, return False . |
seekable
is_seekable: False = buffer.seekable()
Whether the stream support random access. This buffer does not.
Returns
Argument | Type | |
---|---|---|
is_seekable | bool | Always False . |
seek
buffer.seek() -> Never
Will raise an OSError
since this buffer does not support random access.
Properties
aggressive
is_aggressive: bool = buffer.aggressive
The aggressive mode.
This mode could only by configured by the initialization. If set True
, the aggresive mode would be enabled, where the mirror would send messages to the buffer each time when new data is written. If False
, the mirror would communicate with the buffer only when a new line is written.
headers
headers: ChainMap[str, str] = buffer.headers
Get the default headers (post) of the mirror.
headers_get
headers_get: ChainMap[str, str] = buffer.headers_get
Get the default headers (get) of the mirror.
closed
is_closed: bool = buffer.closed
Check whether the buffer has been closed.
Example
Write the server-side buffer with the mirror
- Codes (server)
- Codes (client)
- Results
Use LineHostBuffer
to start a server with an empty buffer.
import flask
from syncstream.host import LineHostBuffer
app = flask.Flask(__name__)
hbuf = LineHostBuffer("/sync-stream", maxlen=10)
hbuf.serve(app)
@app.route("/test", methods=["GET"])
def load_data():
hbuf_items = hbuf.read()
return {"message": "success", "items": hbuf_items}, 200
if __name__ == "__main__":
app.run(host="localhost", port=5000) # Run the Flask service.
While the LineHostBuffer
is running, run the following codes to write the buffer.
from syncstream import LineHostMirror
with LineHostMirror('http://localhost:5000/sync-stream'):
print("test line 1")
print("test line 2")
print("test line 3")
Access
http://localhost:5000/test
The response should be
{
"items": ["test line 1", "test line 2", "test line 3"],
"message": "success"
}