Skip to main content
Version: 1.2.2

LineHostBuffer

ClassSource

hbuf = LineHostBuffer(
api_route: str = "/sync-stream",
endpoint: str | None = None,
maxlen: int = 2,
)

The host service provider for the line-based buffer.

The rotating buffer with a maximal storage length. This buffer is the extended version of the basic LineBuffer. It is used in the case of multi-devices. It supports the one-host-multi-clients mode, and supports the syncholization by the web services. To learn how to use this buffer, see the example on this page, or the tutorial.

Aliases

This class can be acquired by

import syncstream


syncstream.LineHostBuffer
syncstream.host.LineHostBuffer

Arguments

ArgumentTypeRequired
Description
api_routestrThe route path of the api.
endpointstr | NoneThe endpoint name of the api. If set None, the endpoint name would be inferred from api_route.
maxlenintThe maximal number of stored lines. This value has the same meaning of deque.maxlen.

Methods

close

hbuf.close()

Close the IO. This method only takes effects once. The second call will do nothing.


read_serialized

lines: tuple[str | GroupedMessage, ...] = hbuf.read_serialized(size: int | None = None)

Read the records (serialized).

It has the same functionalities of read(...). However, all the data returned by this method has been serialized and compatible with jsonifying.

This method should be used for providing query services.

Requires

ArgumentTypeRequired
Description
sizeint | NoneIf set None, would return the whole storage. If set an int value, would return the last size items.

Returns

ArgumentType
Description
linestuple[str | GroupedMessage, ...]A sequence of fetched record items. Results are sorted in the FIFO order.

serve

hbuf.serve(api: flask.Flask)

Provide the service of the host buffer.

The service would be equipped as an independent thread. Each time the request is received, the service would be triggered, and the thread-safe results would be saved.

Requires

ArgumentTypeRequired
Description
apiflask.FlaskThe Flask object. All web services related to this LineHostBuffer would be registered to the Flask object.

stop_all_mirrors

flag: bool = hbuf.stop_all_mirrors()

Send stop signals to all mirrors.

This operation is used for terminating the mirrors safely. It does not guarantee that the processes would be closed instantly. Each time when the new message is written by the mirrors, a check would be triggered.

If users want to use this method, please ensure that the StopIteration error is catched by the process. The error would not be sent back to the buffer.

Returns

ArgumentType
Description
flagboolReturn True if the stopping is successful. Return False if the stopping is rejected. Raise Error if there is anything wrong on the server.

reset_states

flag: bool = hbuf.reset_states()

Reset the states of the buffer.

This method should be used if the buffer needs to be reused.

Returns

ArgumentType
Description
flagboolReturn True if the resetting is successful. Return False if the resetting is rejected. Raise Error if there is anything wrong on the server.

Operators

__len__

n_buffer_items: int = len(hbuf)

Number of lines/items in the buffer.

Services

Register the Flask app with this instance, the following services would be provided, where the bold arguments are required arguments.

<api_route>

The basic APIs. They are used for maintaining the storage of the message items.

Method
Requires
Response
Description
GET
  • n: The number of messages that is required. If not set, would return the whole storage.
  • message: The response text. If the results are valid, this value should be success.
  • data: A list of returned messages.
Get the message items. Similar to LineBuffer.read().
POST
  • type: The type of the message. Could be str, error, warning, or close.
  • data: A dict of message data from LineHostMirror.
  • message: The response text. If the results are valid, this value should be success.
Write the remote data into the storage. This method should not be used by users directly, because it is provided for the interaction with LineHostMirror.
DELETE
-
  • message: The response text. If the results are valid, this value should be success.
Clear the storage, but the status of the buffer is not reset by this method.

<api_route>-state

The APIs used for maintaining the buffer status.

Method
Requires
Response
Description
GET
  • state: The name of the state to be read.
  • message: The response text. If the results are valid, this value should be success.
  • data: The value of the required state.
Get a specific state value.
POST
  • state: The name of the state to be written.
  • value: The value used for changing the state.
  • message: The response text. If the results are valid, this value should be success.
Modify a specific state value.
DELETE
-
  • message: The response text. If the results are valid, this value should be success.
Reset the state to the default values.

The states are shared by different clients. Each time the clients interact with the host, they would use GET method to renew their states. Some states could be used for controlling the behaviors of the clients. For example, POST with state=closed, value=true serves the same funcionality of invoking LineProcBuffer.stop_all_mirrors().

Example

Synchronize messages by web services

sync_by_web_services.py
import multiprocessing
import flask
from syncstream.host import LineHostMirror, LineHostBuffer


def f(address):
with LineHostMirror(address=address, timeout=5):
print('example')


app = flask.Flask(__name__)
hbuf = LineHostBuffer('/sync-stream', maxlen=10)
hbuf.serve(app)


@app.route('/test', methods=['GET'])
def another_service():
address = 'http://localhost:5000/sync-stream'
with multiprocessing.Pool(4) as p:
p.map(f, tuple(address for _ in range(4)))
hbuf_items = hbuf.read()
return {'message': 'success', 'items': hbuf_items}, 200


if __name__ == '__main__':
app.run(host='localhost', port=5000) # Run the Flask service.