Skip to main content
Version: 3.1.0

Pushing a video stream

Preparation

The ffserver has been removed after FFMpeg 3.4 (see the docs here). In other words, FFMpeg could not work without a server program. The same case exists in our mpegCoder. Users need to start a server program first. The server program will keeps listening and waiting for any pushed streams. After that, mpegCoder would push the stream to the server by mpegCoder.MpegServer.

caution

It is also supported if you push a stream with MpegServer and receive the same stream with mpegCoder.MpegClient in the same time. But we recommend users to run MpegServer and MpegClient on different devices, because the encoder implemented in MpegServer may occupy a lot of system resources.

We recommend the following video server projects. User could choose one from them according to their requirements.

ProjectWindowsLinux
RTSP Simple Server
Matroska Server Mk2
Simple Realtime Server

Take RTSP Simple Server on Windows as an example. We only need to launch the server program by one command:

Launch the RTSP Simple Server

When the server is listening, we could use the following addresses for the testings

rtsp://localhost:8554/
rtmp://localhost:1935/

Non-blocking example

This example is based on the non-blocking API MpegServer.ServeFrame(). Synchronization is an important problem when pushing a stream. If we keeps using ServeFrame(), the frames would be sent as many as possible. The newly income frames would override the previous pushed frames. In some cases, the server would be broken, because the server could not accept so many frames.

To make the server works properly, we need to push the frames according to the video timestamp. When MpegServer.FFmpegSetup() is called, we mark this time point as a starting time. MpegServer will maintain a timer. Everytime users call MpegServer.getParemeter('waitRef'), the method would returns a waiting period, indicating how long the pushed video stream is ahead of the playing time. The waiting period is half of the aforementioned time lag (the unit of the returned value is second). If we have pushed too much frames, we need to let the server wait for a while.

server-non-blocking.py
import time
import mpegCoder

d = mpegCoder.MpegDecoder()
opened = d.FFmpegSetup('test-video.mp4')
e = mpegCoder.MpegServer()
e.setParameter(configDict=d.getParameter(), codecName='libx264', videoAddress='rtsp://localhost:8554/video') # inherit most of parameters from the decoder.
opened = opened and e.FFmpegSetup() # Load the pusher.
if opened: # If the decoder and the pusher are not loaded successfully, do not continue.
gop = True
s = 0
while gop is not None:
gop = d.ExtractGOP() # Extract current GOP.
if gop is not None:
for i in gop: # Select every frame.
e.ServeFrame(i) # Serve current frame.
s += 1
if s == 10: # Wait for synchronization for each 10 frames.
wait = e.getParameter('waitRef')
time.sleep(wait)
s = 0
e.FFmpegClose() # End encoding and pushing, and flush all frames in cache.
else:
print(e)
e.clear() # Close the pusher.
d.clear() # Close the decoder.

Dual-process example

The above example is not an elegant implementation, because MpegDecoder and MpegServer occupy the same main thread. When decoder takes a lot of time, there would be an obvious latency. Therefore, we suggest users to split MpegDecoder and MpegServer to two different sub-processes. The following codes are implemented by this way. The decoder and the streamer are synchronized by a shared queue. Instead of using MpegServer.ServeFrame(), we use MpegServer.ServeFrameBlock() here. Each time this method is called, MpegServer will check the current playing time first, and ensure that the timestamp of the newly incoming frame is not ahead of the playing time too much. If the time lag between the new frame and the playing time is too long, the method will wait until the time lag becomes small enough.

server-dual-procs.py
import mpegCoder
import multiprocessing


class Decoder(multiprocessing.Process):
def __init__(self, video_name='test-video.mp4', q_o=None, name=None, daemon=None):
super().__init__(name=name, daemon=daemon)
self.video_name = video_name
self.q_o = q_o

def run(self):
d = mpegCoder.MpegDecoder()
opened = d.FFmpegSetup(self.video_name)
self.q_o.put(d.getParameter())
if opened:
gop = True
while gop is not None:
gop = d.ExtractGOP() # Extract current GOP.
if gop is not None:
for i in gop: # Select every frame.
self.q_o.put(i)
else:
self.q_o.put(None)
else:
print(d)
d.clear()


class Encoder(multiprocessing.Process):
def __init__(self, video_addr='rtsp://localhost:8554/video', q_i=None, name=None, daemon=None):
super().__init__(name=name, daemon=daemon)
self.video_addr = video_addr
self.q_i = q_i

def run(self):
e = mpegCoder.MpegServer()
config_dict = self.q_i.get() # Get decoder configurations.
e.setParameter(configDict=config_dict, codecName='libx264', maxBframe=16, videoAddress=self.video_addr)
opened = e.FFmpegSetup()
if opened: # If pusher is not loaded successfully, do not continue.
frame = True
while frame is not None:
frame = self.q_i.get() # Get one frame.
if frame is not None:
e.ServeFrameBlock(frame) # Encode and serve the current frame.
e.FFmpegClose() # End encoding, and flush all frames in cache.
else:
print(e)
e.clear()


if __name__ == '__main__':
queue_data = multiprocessing.Queue(maxsize=20)
proc_dec = Decoder(video_name='test-video.mp4', q_o=queue_data, daemon=True)
proc_enc = Encoder(video_addr='rtsp://localhost:8554/video', q_i=queue_data, daemon=True)
proc_dec.start()
proc_enc.start()
proc_enc.join()
proc_dec.join()
caution

In the above examples, we use configDict for MpegServer.setParameter(). The input value is a python dict returned by MpegDecoder.getParameter(). This API is equivalent to using e.setParameter(decoder=d). However, we have to use the equivalent API here, because all classes of mpegCoder could not be pickled.