跳到正文
版本:预览版

推送远端视频流

准备

鉴于ffserver在FFMpeg 3.4版本后就已经被移除(参见这里),FFMpeg无法在没有一个服务器程序协同的情况下单独完成推流工作。同样的问题也存在于mpegCoder中。用户需要先启动一个服务器程序,该程序会持续侦听、等待推送的视频流。在此之后mpegCoder就可以通过mpegCoder.MpegServer推送视频了。

caution

实际上,你也可以在使用MpegServer推送视频的同时,用mpegCoder.MpegClient接收这个视频流。但是我们还是建议用户尽可能在两台不同的机器上运行MpegServerMpegClient。因为MpegServer自带的编码器会占用很多系统资源。

建议使用以下视频服务器项目。用户按自己的需求,从中选择一个。

项目WindowsLinux
简单RTSP服务器(RTSP Simple Server)
马特罗斯卡服务Mk2(Matroska Server Mk2)
简单实时服务器(Simple Realtime Server)

以Windows平台和简单RTSP服务器(RTSP Simple Server)为例,我们只需要通过一行命令启动这个服务器程序即可:

启动简单RTSP服务器

当服务器处于侦听状态时,我们可以使用以下地址来进行推流测试。

rtsp://localhost:8554/
rtmp://localhost:1935/

范例:非阻塞式推流

此例基于非阻塞式API MpegServer.ServeFrame()。在推流的过程中,确保数据同步是一个很重要的问题。如果我们一直不断地使用ServeFrame(),那么我们就会尽可能地能推送多少帧、就推送多少帧。这些新推送的帧就会覆盖掉之前推送的帧。在一些情况下,服务器甚至会崩溃,因为服务器无法接收如此多的帧。

为了保证服务器能正常运转,我们需要按照视频的时间戳来推送帧。当MpegServer.FFmpegSetup()成功调用时,将会设置一个开始时间戳。MpegServer会维护一个计时器,每当用户调用MpegServer.getParemeter('waitRef')时,该方法就会返回一个推荐等待时长,用来表示推送出去的视频帧已经比实际视频帧多出了多久。这个推荐等待时长就是上述的这个时间间隔的一半(单位为秒)。如果我们推送了过多帧,就可以利用这个参数让服务等待一会。

server-non-blocking.py
import timeimport mpegCoderd = mpegCoder.MpegDecoder()opened = d.FFmpegSetup('test-video.mp4')e = mpegCoder.MpegServer()e.setParameter(configDict=d.getParameter(), codecName='libx264', videoAddress='rtsp://localhost:8554/video')  # 从解码器继承绝大多数设置。opened = opened and e.FFmpegSetup()  # 加载推流器。if opened:  # 如果推流器、解码器没有正常载入,停止后续步骤。    gop = True    s = 0    while gop is not None:        gop = d.ExtractGOP()  # 提取当前的画面组。        if gop is not None:            for i in gop:  # 遍历每一帧。                e.ServeFrame(i)  # 编码并推送当前帧。                s += 1                if s == 10:  # 每过10帧,检查、并等待播放同步。                    wait = e.getParameter('waitRef')                    time.sleep(wait)                    s = 0    e.FFmpegClose()  # 结束编码和推流,并将缓存内的所有帧刷入视频流内。else:    print(e)e.clear()  # 清除推流器设置。d.clear()  # 关闭解码器、并清除设置。

范例:双进程模式

以上的例子并不是一个优雅的实现,因为MpegDecoderMpegServer同时抢占了主线程。如果解码器需要花费相当的时间,那么推流就会出现明显延迟。因此,建议将MpegDecoderMpegServer分离到两个不同的子进程里。下面的代码就是通过这种方式实现的。解码器和推流器通过一个共享的数据队列实现同步。在此我们使用MpegServer.ServeFrameBlock()取代MpegServer.ServeFrame()。每当调用这个方法的时候,MpegServer就会检查当前的播放时长,并自动确保新推送帧的时间戳不超过播放时长过多。如果新帧的时间戳和播放时长的差距过大,该方法就会阻塞所在的线程,直到这个差距小到可以接受为止。

server-dual-procs.py
import mpegCoderimport multiprocessingclass Decoder(multiprocessing.Process):    def __init__(self, video_name='test-video.mp4', q_o=None, name=None, daemon=None):        super().__init__(name=name, daemon=daemon)        self.video_name = video_name        self.q_o = q_o    def run(self):        d = mpegCoder.MpegDecoder()        opened = d.FFmpegSetup(self.video_name)        self.q_o.put(d.getParameter())        if opened:            gop = True            while gop is not None:                gop = d.ExtractGOP()  # 提取当前的画面组。                if gop is not None:                    for i in gop:  # 遍历每一帧。                        self.q_o.put(i)                else:                    self.q_o.put(None)        else:            print(d)        d.clear()class Encoder(multiprocessing.Process):    def __init__(self, video_addr='rtsp://localhost:8554/video', q_i=None, name=None, daemon=None):        super().__init__(name=name, daemon=daemon)        self.video_addr = video_addr        self.q_i = q_i    def run(self):        e = mpegCoder.MpegServer()        config_dict = self.q_i.get()  # 获取解码器的参数。        e.setParameter(configDict=config_dict, codecName='libx264', maxBframe=16, videoAddress=self.video_addr)        opened = e.FFmpegSetup()        if opened:  # 如果推流器没有正常加载,就停止以下步骤。            frame = True            while frame is not None:                frame = self.q_i.get()  # 获取一帧。                if frame is not None:                    e.ServeFrameBlock(frame)  # 编码并推送当前帧。            e.FFmpegClose()  # 结束编码和推流,并将缓存内的所有帧刷入视频流内。        else:            print(e)        e.clear()if __name__ == '__main__':    queue_data = multiprocessing.Queue(maxsize=20)    proc_dec = Decoder(video_name='test-video.mp4', q_o=queue_data, daemon=True)    proc_enc = Encoder(video_addr='rtsp://localhost:8554/video', q_i=queue_data, daemon=True)    proc_dec.start()    proc_enc.start()    proc_enc.join()    proc_dec.join()
caution

在上例里,调用MpegServer.setParameter()时使用了configDict。这个输入值是MpegDecoder.getParameter()返回的一个python字典。该用法等价与使用e.setParameter(decoder=d)。然而,此例中我们必须使用这个等价用法,因为mpegCoder所有的实例都无法被pickled。