跳到正文
版本:3.1.0

推送远端视频流

准备

鉴于ffserver在FFMpeg 3.4版本后就已经被移除(参见这里),FFMpeg无法在没有一个服务器程序协同的情况下单独完成推流工作。同样的问题也存在于mpegCoder中。用户需要先启动一个服务器程序,该程序会持续侦听、等待推送的视频流。在此之后mpegCoder就可以通过mpegCoder.MpegServer推送视频了。

caution

实际上,你也可以在使用MpegServer推送视频的同时,用mpegCoder.MpegClient接收这个视频流。但是我们还是建议用户尽可能在两台不同的机器上运行MpegServerMpegClient。因为MpegServer自带的编码器会占用很多系统资源。

建议使用以下视频服务器项目。用户按自己的需求,从中选择一个。

项目WindowsLinux
简单RTSP服务器(RTSP Simple Server)
马特罗斯卡服务Mk2(Matroska Server Mk2)
简单实时服务器(Simple Realtime Server)

以Windows平台和简单RTSP服务器(RTSP Simple Server)为例,我们只需要通过一行命令启动这个服务器程序即可:

启动简单RTSP服务器

当服务器处于侦听状态时,我们可以使用以下地址来进行推流测试。

rtsp://localhost:8554/
rtmp://localhost:1935/

范例:非阻塞式推流

此例基于非阻塞式API MpegServer.ServeFrame()。在推流的过程中,确保数据同步是一个很重要的问题。如果我们一直不断地使用ServeFrame(),那么我们就会尽可能地能推送多少帧、就推送多少帧。这些新推送的帧就会覆盖掉之前推送的帧。在一些情况下,服务器甚至会崩溃,因为服务器无法接收如此多的帧。

为了保证服务器能正常运转,我们需要按照视频的时间戳来推送帧。当MpegServer.FFmpegSetup()成功调用时,将会设置一个开始时间戳。MpegServer会维护一个计时器,每当用户调用MpegServer.getParemeter('waitRef')时,该方法就会返回一个推荐等待时长,用来表示推送出去的视频帧已经比实际视频帧多出了多久。这个推荐等待时长就是上述的这个时间间隔的一半(单位为秒)。如果我们推送了过多帧,就可以利用这个参数让服务等待一会。

server-non-blocking.py
import time
import mpegCoder

d = mpegCoder.MpegDecoder()
opened = d.FFmpegSetup('test-video.mp4')
e = mpegCoder.MpegServer()
e.setParameter(configDict=d.getParameter(), codecName='libx264', videoAddress='rtsp://localhost:8554/video') # 从解码器继承绝大多数设置。
opened = opened and e.FFmpegSetup() # 加载推流器。
if opened: # 如果推流器、解码器没有正常载入,停止后续步骤。
gop = True
s = 0
while gop is not None:
gop = d.ExtractGOP() # 提取当前的画面组。
if gop is not None:
for i in gop: # 遍历每一帧。
e.ServeFrame(i) # 编码并推送当前帧。
s += 1
if s == 10: # 每过10帧,检查、并等待播放同步。
wait = e.getParameter('waitRef')
time.sleep(wait)
s = 0
e.FFmpegClose() # 结束编码和推流,并将缓存内的所有帧刷入视频流内。
else:
print(e)
e.clear() # 清除推流器设置。
d.clear() # 关闭解码器、并清除设置。

范例:双进程模式

以上的例子并不是一个优雅的实现,因为MpegDecoderMpegServer同时抢占了主线程。如果解码器需要花费相当的时间,那么推流就会出现明显延迟。因此,建议将MpegDecoderMpegServer分离到两个不同的子进程里。下面的代码就是通过这种方式实现的。解码器和推流器通过一个共享的数据队列实现同步。在此我们使用MpegServer.ServeFrameBlock()取代MpegServer.ServeFrame()。每当调用这个方法的时候,MpegServer就会检查当前的播放时长,并自动确保新推送帧的时间戳不超过播放时长过多。如果新帧的时间戳和播放时长的差距过大,该方法就会阻塞所在的线程,直到这个差距小到可以接受为止。

server-dual-procs.py
import mpegCoder
import multiprocessing


class Decoder(multiprocessing.Process):
def __init__(self, video_name='test-video.mp4', q_o=None, name=None, daemon=None):
super().__init__(name=name, daemon=daemon)
self.video_name = video_name
self.q_o = q_o

def run(self):
d = mpegCoder.MpegDecoder()
opened = d.FFmpegSetup(self.video_name)
self.q_o.put(d.getParameter())
if opened:
gop = True
while gop is not None:
gop = d.ExtractGOP() # 提取当前的画面组。
if gop is not None:
for i in gop: # 遍历每一帧。
self.q_o.put(i)
else:
self.q_o.put(None)
else:
print(d)
d.clear()


class Encoder(multiprocessing.Process):
def __init__(self, video_addr='rtsp://localhost:8554/video', q_i=None, name=None, daemon=None):
super().__init__(name=name, daemon=daemon)
self.video_addr = video_addr
self.q_i = q_i

def run(self):
e = mpegCoder.MpegServer()
config_dict = self.q_i.get() # 获取解码器的参数。
e.setParameter(configDict=config_dict, codecName='libx264', maxBframe=16, videoAddress=self.video_addr)
opened = e.FFmpegSetup()
if opened: # 如果推流器没有正常加载,就停止以下步骤。
frame = True
while frame is not None:
frame = self.q_i.get() # 获取一帧。
if frame is not None:
e.ServeFrameBlock(frame) # 编码并推送当前帧。
e.FFmpegClose() # 结束编码和推流,并将缓存内的所有帧刷入视频流内。
else:
print(e)
e.clear()


if __name__ == '__main__':
queue_data = multiprocessing.Queue(maxsize=20)
proc_dec = Decoder(video_name='test-video.mp4', q_o=queue_data, daemon=True)
proc_enc = Encoder(video_addr='rtsp://localhost:8554/video', q_i=queue_data, daemon=True)
proc_dec.start()
proc_enc.start()
proc_enc.join()
proc_dec.join()
caution

在上例里,调用MpegServer.setParameter()时使用了configDict。这个输入值是MpegDecoder.getParameter()返回的一个python字典。该用法等价与使用e.setParameter(decoder=d)。然而,此例中我们必须使用这个等价用法,因为mpegCoder所有的实例都无法被pickled。