LineHostBuffer
ClassSource
hbuf = LineHostBuffer(
api_route: str = "/sync-stream",
endpoint: str | None = None,
maxlen: int = 2,
)
The host service provider for the line-based buffer.
The rotating buffer with a maximal storage length. This buffer is the extended version of the basic LineBuffer
. It is used in the case of multi-devices. It supports the one-host-multi-clients mode, and supports the syncholization by the web services. To learn how to use this buffer, see the example on this page, or the tutorial.
Aliases
This class can be acquired by
import syncstream
syncstream.LineHostBuffer
syncstream.host.LineHostBuffer
Arguments
Argument | Type | Required | |
---|---|---|---|
api_route | str | The route path of the api. | |
endpoint | str | None | The endpoint name of the api. If set None , the endpoint name would be inferred from api_route . | |
maxlen | int | The maximal number of stored lines. This value has the same meaning of deque.maxlen . |
Methods
close
hbuf.close()
Close the IO. This method only takes effects once. The second call will do nothing.
read_serialized
lines: tuple[str | GroupedMessage, ...] = hbuf.read_serialized(size: int | None = None)
Read the records (serialized).
It has the same functionalities of read(...)
. However, all the data returned
by this method has been serialized and compatible with jsonifying.
This method should be used for providing query services.
Requires
Argument | Type | Required | |
---|---|---|---|
size | int | None | If set None , would return the whole storage. If set an int value, would return the last size items. |
Returns
Argument | Type | |
---|---|---|
lines | tuple[str | GroupedMessage, ...] | A sequence of fetched record items. Results are sorted in the FIFO order. |
serve
hbuf.serve(api: flask.Flask)
Provide the service of the host buffer.
The service would be equipped as an independent thread. Each time the request is received, the service would be triggered, and the thread-safe results would be saved.
Requires
Argument | Type | Required | |
---|---|---|---|
api | flask.Flask | The Flask object. All web services related to this LineHostBuffer would be registered to the Flask object. |
stop_all_mirrors
flag: bool = hbuf.stop_all_mirrors()
Send stop signals to all mirrors.
This operation is used for terminating the mirrors safely. It does not guarantee that the processes would be closed instantly. Each time when the new message is written by the mirrors, a check would be triggered.
If users want to use this method, please ensure that the StopIteration
error
is catched by the process. The error would not be sent back to the buffer.
Returns
Argument | Type | |
---|---|---|
flag | bool | Return True if the stopping is successful. Return False if the stopping is rejected. Raise Error if there is anything wrong on the server. |
reset_states
flag: bool = hbuf.reset_states()
Reset the states of the buffer.
This method should be used if the buffer needs to be reused.
Returns
Argument | Type | |
---|---|---|
flag | bool | Return True if the resetting is successful. Return False if the resetting is rejected. Raise Error if there is anything wrong on the server. |
Operators
__len__
n_buffer_items: int = len(hbuf)
Number of lines/items in the buffer.
Services
Register the Flask app with this instance, the following services would be provided, where the bold
arguments are required arguments.
<api_route>
The basic APIs. They are used for maintaining the storage of the message items.
Method | |||
---|---|---|---|
GET |
|
| Get the message items. Similar to LineBuffer.read() . |
POST |
|
| Write the remote data into the storage. This method should not be used by users directly, because it is provided for the interaction with LineHostMirror . |
DELETE |
| Clear the storage, but the status of the buffer is not reset by this method. |
<api_route>-state
The APIs used for maintaining the buffer status.
Method | |||
---|---|---|---|
GET |
|
| Get a specific state value. |
POST |
|
| Modify a specific state value. |
DELETE |
| Reset the state to the default values. |
The states are shared by different clients. Each time the clients interact with the host, they would use GET
method to renew their states. Some states could be used for controlling the behaviors of the clients. For example, POST
with state=closed
, value=true
serves the same funcionality of invoking LineProcBuffer.stop_all_mirrors()
.
Example
Synchronize messages by web services
- Codes
- Results
import multiprocessing
import flask
from syncstream.host import LineHostMirror, LineHostBuffer
def f(address):
with LineHostMirror(address=address, timeout=5):
print('example')
app = flask.Flask(__name__)
hbuf = LineHostBuffer('/sync-stream', maxlen=10)
hbuf.serve(app)
@app.route('/test', methods=['GET'])
def another_service():
address = 'http://localhost:5000/sync-stream'
with multiprocessing.Pool(4) as p:
p.map(f, tuple(address for _ in range(4)))
hbuf_items = hbuf.read()
return {'message': 'success', 'items': hbuf_items}, 200
if __name__ == '__main__':
app.run(host='localhost', port=5000) # Run the Flask service.
After the server is launched, use the following address to get the results:
http://localhost:5000/test
The response should be
{"items": ["example", "example", "example", "example"], "message": "success"}