推送远端视频流
准备
鉴于ffserver
在FFMpeg 3.4
版本后就已经被移除(参见这里),FFMpeg无法在没有一个服务器程序协同的情况下单独完成推流工作。同样的问题也存在于mpegCoder
中。用户需要先启动一个服务器程序,该程序会持续侦听、等待推送的视频流。在此之后mpegCoder
就可以通过mpegCoder.MpegServer
推送视频了。
caution
实际上,你也可以在使用MpegServer
推送视频的同时,用mpegCoder.MpegClient
接收这个视频流。但是我们还是建议用户尽可能在两台不同的机器上运行MpegServer
和MpegClient
。因为MpegServer
自带的编码器会占用很多系统资源。
建议使用以下视频服务器项目。用户按自己的需求,从中选择一个。
项目 | Windows | Linux |
---|---|---|
简单RTSP服务器(RTSP Simple Server) | ||
马特罗斯卡服务Mk2(Matroska Server Mk2) | ||
简单实时服务器(Simple Realtime Server) |
以Windows平台和简单RTSP服务器(RTSP Simple Server)为例,我们只需要通过一行命令启动这个服务器程序即可:
当服务器处于侦听状态时,我们可以使用以下地址来进行推流测试。
rtsp://localhost:8554/
rtmp://localhost:1935/
范例:非阻塞式推流
此例基于非阻塞式API MpegServer.ServeFrame()
。在推流的过程中,确保数据同步是一个很重要的问题。如果我们一直不断地使用ServeFrame()
,那么我们就会尽可能地能推送多少帧、就推送多少帧。这些新推送的帧就会覆盖掉之前推送的帧。在一些情况下,服务器甚至会崩溃,因为服务器无法接收如此多的帧。
为了保证服务器能正常运转,我们需要按照视频的时间戳来推送帧。当MpegServer.FFmpegSetup()
成功调用时,将会设置一个开始时间戳。MpegServer
会维护一个计时器,每当用户调用MpegServer.getParemeter('waitRef')
时,该方法就会返回一个推荐等待时长,用来表示推送出去的视频帧已经比实际视频帧多出了多久。这个推荐等待时长就是上述的这个时间间隔的一半(单位为秒)。如果我们推送了过多帧,就可以利用这个参数让服务等待一会。
import timeimport mpegCoderd = mpegCoder.MpegDecoder()opened = d.FFmpegSetup('test-video.mp4')e = mpegCoder.MpegServer()e.setParameter(configDict=d.getParameter(), codecName='libx264', videoAddress='rtsp://localhost:8554/video') # 从解码器继承绝大多数设置。opened = opened and e.FFmpegSetup() # 加载推流器。if opened: # 如果推流器、解码器没有正常载入,停止后续步骤。 gop = True s = 0 while gop is not None: gop = d.ExtractGOP() # 提取当前的画面组。 if gop is not None: for i in gop: # 遍历每一帧。 e.ServeFrame(i) # 编码并推送当前帧。 s += 1 if s == 10: # 每过10帧,检查、并等待播放同步。 wait = e.getParameter('waitRef') time.sleep(wait) s = 0 e.FFmpegClose() # 结束编码和推流,并将缓存内的所有帧刷入视频流内。else: print(e)e.clear() # 清除推流器设置。d.clear() # 关闭解码器、并清除设置。
范例:双进程模式
以上的例子并不是一个优雅的实现,因为MpegDecoder
和MpegServer
同时抢占了主线程。如果解码器需要花费相当的时间,那么推流就会出现明显延迟。因此,建议将MpegDecoder
和MpegServer
分离到两个不同的子进程里。下面的代码就是通过这种方式实现的。解码器和推流器通过一个共享的数据队列实现同步。在此我们使用MpegServer.ServeFrameBlock()
取代MpegServer.ServeFrame()
。每当调用这个方法的时候,MpegServer
就会检查当前的播放时长,并自动确保新推送帧的时间戳不超过播放时长过多。如果新帧的时间戳和播放时长的差距过大,该方法就会阻塞所在的线程,直到这个差距小到可以接受为止。
import mpegCoderimport multiprocessingclass Decoder(multiprocessing.Process): def __init__(self, video_name='test-video.mp4', q_o=None, name=None, daemon=None): super().__init__(name=name, daemon=daemon) self.video_name = video_name self.q_o = q_o def run(self): d = mpegCoder.MpegDecoder() opened = d.FFmpegSetup(self.video_name) self.q_o.put(d.getParameter()) if opened: gop = True while gop is not None: gop = d.ExtractGOP() # 提取当前的画面组。 if gop is not None: for i in gop: # 遍历每一帧。 self.q_o.put(i) else: self.q_o.put(None) else: print(d) d.clear()class Encoder(multiprocessing.Process): def __init__(self, video_addr='rtsp://localhost:8554/video', q_i=None, name=None, daemon=None): super().__init__(name=name, daemon=daemon) self.video_addr = video_addr self.q_i = q_i def run(self): e = mpegCoder.MpegServer() config_dict = self.q_i.get() # 获取解码器的参数。 e.setParameter(configDict=config_dict, codecName='libx264', maxBframe=16, videoAddress=self.video_addr) opened = e.FFmpegSetup() if opened: # 如果推流器没有正常加载,就停止以下步骤。 frame = True while frame is not None: frame = self.q_i.get() # 获取一帧。 if frame is not None: e.ServeFrameBlock(frame) # 编码并推送当前帧。 e.FFmpegClose() # 结束编码和推流,并将缓存内的所有帧刷入视频流内。 else: print(e) e.clear()if __name__ == '__main__': queue_data = multiprocessing.Queue(maxsize=20) proc_dec = Decoder(video_name='test-video.mp4', q_o=queue_data, daemon=True) proc_enc = Encoder(video_addr='rtsp://localhost:8554/video', q_i=queue_data, daemon=True) proc_dec.start() proc_enc.start() proc_enc.join() proc_dec.join()
caution
在上例里,调用MpegServer.setParameter()
时使用了configDict
。这个输入值是MpegDecoder.getParameter()
返回的一个python字典。该用法等价与使用e.setParameter(decoder=d)
。然而,此例中我们必须使用这个等价用法,因为mpegCoder
所有的实例都无法被pickled。