LineHostReader
ClassContextDecoratorSource
reader = LineHostReader(
address: str, timeout: int | None = None,
)
with reader:
...
@reader
def func(): ...
The reader for the host-service line-based buffer (host.LineHostBuffer
).
This class is merely used as a convenient API for reading the data from a specified host. It provides functionalities for reading the buffer and the service states, but does not provide any functionalities for writting the buffer or the states. We recommend that this reader should be used as a context for explicitly specifying the scope of the HTTP connection.
Aliases
This class can be acquired by
import syncstream
syncstream.LineHostReader
syncstream.host.LineHostReader
Arguments
Argument | Type | Required | |
---|---|---|---|
address | str | The address of the LineHostBuffer . The data will be read from this specified service. | |
timeout | int | None | The timeout of the web syncholizing events. If not set, the synchronization would block the current process. |
Methods
clear
flag: bool = reader.clear()
Clear the whole buffer.
This method would clear the storage and the last line stream of this buffer. However, it would not clear any mirrors or copies of this object. This method is thread-safe and should always success.
Returns
Argument | Type | |
---|---|---|
flag | bool | Return True if the deleting is successful. Return False if the deleting is rejected. Raise Error if there is anything wrong on the server. |
reset_states
flag: bool = reader.reset_states()
Reset the states of the buffer.
This method should be used if the buffer needs to be reused.
Returns
Argument | Type | |
---|---|---|
flag | bool | Return True if the resetting is successful. Return False if the resetting is rejected. Raise Error if there is anything wrong on the server. |
stop_all_mirrors
flag: bool = reader.stop_all_mirrors()
Send stop signals to all mirrors.
This operation is used for terminating the mirrors safely. It does not guarantee that the processes would be closed instantly. Each time when the new message is written by the mirrors, a check would be triggered.
If users want to use this method, please ensure that the StopIteration
error
is catched by the process. The error would not be sent back to the buffer.
Returns
Argument | Type | |
---|---|---|
flag | bool | Return True if the stopping is successful. Return False if the stopping is rejected. Raise Error if there is anything wrong on the server. |
read
lines: tuple[str | GroupedMessage, ...] = reader.read(size: int | None = None)
Read the records.
Fetch the stored record items from the buffer. Using the read()
method is
thread-safe and would not influence the cursor of write()
method.
If the current written line is not blank, the read()
method would regard
it as the last record item.
Requires
Argument | Type | Required | |
---|---|---|---|
size | int | None | If set None , would return the whole storage. If set an int value, would return the last size items. |
Returns
Argument | Type | |
---|---|---|
lines | tuple[str | GroupedMessage, ...] | A sequence of fetched record items. Results are sorted in the FIFO order. |
Properties
headers
headers: ChainMap[str, str] = reader.headers
Get the default headers (get) of the reader.
headers_post
headers_post: ChainMap[str, str] = reader.headers_post
Get the default headers (post) of the reader.
maxlen
maxlen: int = reader.maxlen
Get the maximal length of the buffer.
closed
is_closed: bool = reader.closed
Check whether the service has been closed.
Operators
__len__
n_buffer_items: int = len(reader)
Get the current length of the buffer.
Example
Read the server data and status with the reader
- Codes (server)
- Codes (with context)
- Codes (no context)
- Results
Use LineHostBuffer
to start a server with an empty buffer.
import flask
import syncstream
app = flask.Flask('test')
syncstream.LineHostBuffer(api_route='/sync-stream', maxlen=10).serve(app)
if __name__ == '__main__':
app.run('localhost', port=5000)
While the LineHostBuffer
is running, the following codes can be used for getting the status of the buffer.
from syncstream import LineHostReader
with LineHostReader('http://localhost:5000/sync-stream') as hreader:
n_maxlen = hreader.maxlen
is_closed = hreader.closed
print("States: maxlen={0}, closed={1}.".format(n_maxlen, is_closed))
if not is_closed:
print(hreader.read())
Certainly, this reader can be also used outside the context. In that case, a temporary HTTP pool will be established and destroyed everytime the service is used.
from syncstream import LineHostReader
hreader = LineHostReader('http://localhost:5000/sync-stream')
n_maxlen = hreader.maxlen
is_closed = hreader.closed
print("States: maxlen={0}, closed={1}.".format(n_maxlen, is_closed))
if not is_closed:
print(hreader.read())
States: maxlen=10, closed=False.
()