Skip to main content
Version: 0.3.x

LineHostBuffer

ClassSource

hbuf = syncstream.host.LineHostBuffer(
api_route: str = "/sync-stream",
endpoint: str | None = None,
maxlen: int = 2,
)

The host service provider for the line-based buffer.

The rotating buffer with a maximal storage length. This buffer is the extended version of the basic LineBuffer. It is used in the case of multi-devices. It supports the one-host-multi-clients mode, and supports the syncholization by the web services. To learn how to use this buffer, see the example on this page, or the tutorial.

Aliases

This class can be acquired by

import syncstream


syncstream.LineHostBuffer
syncstream.host.LineHostBuffer

Arguments

Requires

ArgumentTypeRequired
Description
api_routestrThe route path of the api. It has the same meaning of the parameter Api.add_resource(url).
endpointstr | NoneThe endpoint name of the api. If set None, the endpoint name would be inferred from api_route. It has the same meaning of the parameter Api.add_resource(endpoint).
maxlenintThe maximal number of stored lines. This value has the same meaning of deque.maxlen.

Methods

serve

hbuf.serve(api: flask_restful.Api)

Load the services to the flask app.

Requires

ArgumentTypeRequired
Description
apiflask_restful.ApiThe Flask Api object. All web services related to this LineHostBuffer would be registered to the Api object.

Services

Register the Flask app with this instance, the following services would be provided, where the bold arguments are required arguments.

<api_route>

The basic APIs. They are used for maintaining the storage of the message items.

Method
Requires
Response
Description
GET
  • n: The number of messages that is required. If not set, would return the whole storage.
  • message: The response text. If the results are valid, this value should be success.
  • data: A list of returned messages.
Get the message items. Similar to LineBuffer.read().
POST
  • type: The type of the message. Could be str, error, warning, or close.
  • data: A dict of message data from LineHostMirror.
  • message: The response text. If the results are valid, this value should be success.
Write the remote data into the storage. This method should not be used by users directly, because it is provided for the interaction with LineHostMirror.
DELETE
-
  • message: The response text. If the results are valid, this value should be success.
Clear the storage, but the status of the buffer is not reset by this method.

<api_route>-state

The APIs used for maintaining the buffer status.

Method
Requires
Response
Description
GET
  • state: The name of the state to be read.
  • message: The response text. If the results are valid, this value should be success.
  • data: The value of the required state.
Get a specific state value.
POST
  • state: The name of the state to be written.
  • value: The value used for changing the state.
  • message: The response text. If the results are valid, this value should be success.
Modify a specific state value.
DELETE
-
  • message: The response text. If the results are valid, this value should be success.
Reset the state to the default values.

The states are shared by different clients. Each time the clients interact with the host, they would use GET method to renew their states. Some states could be used for controlling the behaviors of the clients. For example, POST with state=closed, value=true serves the same funcionality of invoking LineProcBuffer.stop_all_mirrors().

Example

Synchronize messages by web services

sync_by_web_services.py
import os
import contextlib
import multiprocessing
import flask
from flask_restful import Api
from syncstream.host import LineHostMirror, LineHostBuffer


def f(address):
buffer = LineHostMirror(address=address, timeout=5)
with contextlib.redirect_stdout(buffer):
print('example')
buffer.send_eof()


app = flask.Flask(__name__)
api = Api(app)
hbuf = LineHostBuffer('/sync-stream', maxlen=10)
hbuf.serve(api)


@app.route('/test', methods=['GET'])
def another_service():
address = 'http://localhost:5000/sync-stream'
with multiprocessing.Pool(4) as p:
p.map(f, tuple(address for _ in range(4)))
hbuf_items = hbuf.read()
return {'message': 'success', 'items': hbuf_items}, 200


if __name__ == '__main__':
app.run(host='localhost', port=5000) # Run the Flask service.