Pushing a video stream
Preparation
The ffserver
has been removed after FFMpeg 3.4
(see the docs here). In other words, FFMpeg could not work without a server program. The same case exists in our mpegCoder
. Users need to start a server program first. The server program will keeps listening and waiting for any pushed streams. After that, mpegCoder
would push the stream to the server by mpegCoder.MpegServer
.
caution
It is also supported if you push a stream with MpegServer
and receive the same stream with mpegCoder.MpegClient
in the same time. But we recommend users to run MpegServer
and MpegClient
on different devices, because the encoder implemented in MpegServer
may occupy a lot of system resources.
We recommend the following video server projects. User could choose one from them according to their requirements.
Project | Windows | Linux |
---|---|---|
RTSP Simple Server | ||
Matroska Server Mk2 | ||
Simple Realtime Server |
Take RTSP Simple Server on Windows as an example. We only need to launch the server program by one command:
When the server is listening, we could use the following addresses for the testings
rtsp://localhost:8554/
rtmp://localhost:1935/
Non-blocking example
This example is based on the non-blocking API MpegServer.ServeFrame()
. Synchronization is an important problem when pushing a stream. If we keeps using ServeFrame()
, the frames would be sent as many as possible. The newly income frames would override the previous pushed frames. In some cases, the server would be broken, because the server could not accept so many frames.
To make the server works properly, we need to push the frames according to the video timestamp. When MpegServer.FFmpegSetup()
is called, we mark this time point as a starting time. MpegServer
will maintain a timer. Everytime users call MpegServer.getParemeter('waitRef')
, the method would returns a waiting period, indicating how long the pushed video stream is ahead of the playing time. The waiting period is half of the aforementioned time lag (the unit of the returned value is second). If we have pushed too much frames, we need to let the server wait for a while.
import timeimport mpegCoderd = mpegCoder.MpegDecoder()opened = d.FFmpegSetup('test-video.mp4')e = mpegCoder.MpegServer()e.setParameter(configDict=d.getParameter(), codecName='libx264', videoAddress='rtsp://localhost:8554/video') # inherit most of parameters from the decoder.opened = opened and e.FFmpegSetup() # Load the pusher.if opened: # If the decoder and the pusher are not loaded successfully, do not continue. gop = True s = 0 while gop is not None: gop = d.ExtractGOP() # Extract current GOP. if gop is not None: for i in gop: # Select every frame. e.ServeFrame(i) # Serve current frame. s += 1 if s == 10: # Wait for synchronization for each 10 frames. wait = e.getParameter('waitRef') time.sleep(wait) s = 0 e.FFmpegClose() # End encoding and pushing, and flush all frames in cache.else: print(e)e.clear() # Close the pusher.d.clear() # Close the decoder.
Dual-process example
The above example is not an elegant implementation, because MpegDecoder
and MpegServer
occupy the same main thread. When decoder takes a lot of time, there would be an obvious latency. Therefore, we suggest users to split MpegDecoder
and MpegServer
to two different sub-processes. The following codes are implemented by this way. The decoder and the streamer are synchronized by a shared queue. Instead of using MpegServer.ServeFrame()
, we use MpegServer.ServeFrameBlock()
here. Each time this method is called, MpegServer
will check the current playing time first, and ensure that the timestamp of the newly incoming frame is not ahead of the playing time too much. If the time lag between the new frame and the playing time is too long, the method will wait until the time lag becomes small enough.
import mpegCoderimport multiprocessingclass Decoder(multiprocessing.Process): def __init__(self, video_name='test-video.mp4', q_o=None, name=None, daemon=None): super().__init__(name=name, daemon=daemon) self.video_name = video_name self.q_o = q_o def run(self): d = mpegCoder.MpegDecoder() opened = d.FFmpegSetup(self.video_name) self.q_o.put(d.getParameter()) if opened: gop = True while gop is not None: gop = d.ExtractGOP() # Extract current GOP. if gop is not None: for i in gop: # Select every frame. self.q_o.put(i) else: self.q_o.put(None) else: print(d) d.clear()class Encoder(multiprocessing.Process): def __init__(self, video_addr='rtsp://localhost:8554/video', q_i=None, name=None, daemon=None): super().__init__(name=name, daemon=daemon) self.video_addr = video_addr self.q_i = q_i def run(self): e = mpegCoder.MpegServer() config_dict = self.q_i.get() # Get decoder configurations. e.setParameter(configDict=config_dict, codecName='libx264', maxBframe=16, videoAddress=self.video_addr) opened = e.FFmpegSetup() if opened: # If pusher is not loaded successfully, do not continue. frame = True while frame is not None: frame = self.q_i.get() # Get one frame. if frame is not None: e.ServeFrameBlock(frame) # Encode and serve the current frame. e.FFmpegClose() # End encoding, and flush all frames in cache. else: print(e) e.clear()if __name__ == '__main__': queue_data = multiprocessing.Queue(maxsize=20) proc_dec = Decoder(video_name='test-video.mp4', q_o=queue_data, daemon=True) proc_enc = Encoder(video_addr='rtsp://localhost:8554/video', q_i=queue_data, daemon=True) proc_dec.start() proc_enc.start() proc_enc.join() proc_dec.join()
caution
In the above examples, we use configDict
for MpegServer.setParameter()
. The input value is a python dict returned by MpegDecoder.getParameter()
. This API is equivalent to using e.setParameter(decoder=d)
. However, we have to use the equivalent API here, because all classes of mpegCoder
could not be pickled.