Monday, February 26, 2018

Decoupling GStreamer Pipelines

This post is best read with some prior familiarity with GStreamer pipelines. If you want to learn more about that, a good place to start is the tutorial Jan presented at LCA 2018.

Elevator Pitch


GStreamer was designed with modularity, pluggability, and ease of use in mind, and the structure was somewhat inspired by UNIX pipes. With GStreamer, you start with an idea of what your dataflow will look like, and the pipeline will map that quite closely.

This is true whether you're working with a simple and static pipeline:

source ! transform ! sink

Or if you need complex and dynamic pipelines with varying rates of data flow:


The inherent pluggability of the system allows for quick prototyping and makes a lot of changes simpler than they would be in other systems.

At the same time, to achieve efficient multimedia processing, one must avoid onerous copying of data, excessive threading, or additional latency. Other features necessary are varying rates of playback, seeking, branching, mixing, non-linear data flow, timing, and much more, but let's keep it simple for now.

Modular Multimedia Processing


A naive way to implement this would be to have one thread (or process) for each node, and use shared memory or message-passing. This can achieve high throughput if you use the right APIs for zerocopy message-passing, but because of a lack of realtime guarantees on all consumer operating systems, the latency will be jittery and much harder to achieve.

So how does GStreamer solve these problems?

Let's take a look at a simple pipeline to try and understand. We generate a sine wave, encode it with Opus, mux it into an Ogg container, and write it to disk.


$ gst-launch-1.0 -e audiotestsrc ! opusenc ! oggmux ! filesink location=out.ogg


How does data make it from one end of this pipeline to the other in GStreamer? The answer lies in source pads, sink pads and the chain function.

In this pipeline, the audiotestsrc element has one source pad. opusenc and oggmux have one source pad and one sink pad each, and filesink only has a sink pad. Buffers always move from source pads to sink pads. All elements that receive buffers (with sink pads) must implement a chain function to handle each buffer.

Zooming in a bit more, to output buffers, an element will call gst_pad_push() on its source pad. This function will figure out what the corresponding sink pad is, and call the chain function of that element with a pointer to the buffer that was pushed earlier. This chain function can then apply a transformation to the buffer and push it (or a new buffer) onward with gst_pad_push() again.

The net effect of this is that all buffer handling from one end of this pipeline to the other happens in one series of chained function calls. This is a really important detail that allows GStreamer to be efficient by default.

Pipeline Multithreading


Of course, sometimes you want to decouple parts of the pipeline, and that brings us to the simplest mechanism for doing so: the queue element. The most basic use-case for this element is to ensure that the downstream of your pipeline runs in a new thread.

In some applications, you want even greater decoupling of parts of your pipeline. For instance, if you're reading data from the network, you don't want a network error to bring down our entire pipeline, or if you're working with a hotpluggable device, device removal should be recoverable without needing to restart the pipeline.

There are various  mechanisms to achieve such decoupling: appsrc/appsink, fdsrc/fdsink, shmsrc/shmsink, ipcpipeline, etc.  However, each of those have their own limitations and complexities. In particular, events, negotiation, and synchronization usually need to be handled or serialized manually at the boundary.

Seamless Pipeline Decoupling


We recently merged a new plugin that makes this job much simpler: gstproxy. Essentially, you insert a proxysink element when you want to send data outside your pipeline, and use a proxysrc element to push that data into a different pipeline in the same process.

The interesting thing about this plugin is that everything is proxied, not just buffers. Events, queries, and hence caps negotiation all happen seamlessly. This is particularly useful when you want to do dynamic reconfiguration of your pipeline, and want the decoupled parts to reconfigure automatically.

Say you have a pipeline like this:


pulsesrc ! opusenc ! oggmux ! souphttpclientsink


Where the souphttpclientsink element is doing a PUT to a remote HTTP server. If the server suddenly closes the connection, you want to be able to immediately reconnect to the same server or a different one without interrupting the recording. One way to do this, would be to use appsrc and appsink to split it into two pipelines:


pulsesrc ! opusenc ! oggmux ! appsink

appsrc ! souphttpclientsink


Now you need to write code to handle buffers that are received on the appsink and then manually push those into appsrc. With the proxy plugin, you split your pipeline like before:


pulsesrc ! opusenc ! oggmux ! proxysink

proxysrc ! souphttpclientsink


Next, we connect the proxysrc and proxysink elements, and gstreamer will automatically push buffers from the first pipeline to the second one.

g_object_set (psrc, "proxysink", psink, NULL);

proxysink also contains a queue, so the second pipeline will always run in a separate thread.

Another option is the inter plugin. If you use a pair of interaudiosink/interaudiosrc elements, buffers will be automatically moved between pipelines, but those only support raw audio or video, and drop events and queries at the boundary. The proxy elements push pointers to buffers without copying, and they do not care what the contents of the buffers are.

This example was a trivial one, but with more complex pipelines, you usually have bins that automatically reconfigure themselves according to the events and caps sent by upstream elements; f.ex decodebin and webrtcbin. This metadata about the buffers is lost when using appsrc/appsink, and similar elements, but is transparently proxied by the proxy elements.

The ipcpipeline elements also forward buffers, events, queries, etc (not zerocopy, but could be), but they are much more complicated since they were built for splitting pipelines across multiple processes, and are most often used in a security-sensitive context.

The proxy elements only work when all the split pipelines are within the same process, are much simpler and as a result, more efficient. They should be used when you want graceful recovery from element errors, and your elements are not a vector for security attacks.

For more details on how to use them, checkout the documentation and example! The online docs will be generated from that when we're closer to the release of GStreamer 1.14. There are a few caveats, but a number of projects are already using it with great success.

2 comments:

  1. One little correction about ipcpipeline, it's not zero copy at all. Everything is send over a unix socket, so there are multiple copies of everything. It's been used with encoded content or other low bitrate data.

    Also, the ipcpipeline have one big feature, they allow the downstream part of the pipeline to be controlled by the upstream part as if they were a single pipeline even though they may be in separate processes.

    ReplyDelete
  2. @ocrete, weird, I somehow imagined that I read somewhere that you use memfd to do zerocopy while passing fds over the unix socket. Will correct that, thanks!

    I didn't want to talk in too much detail about ipcpipeline because the blog post is already too long, but I did link to George's post, and I hope people click through and read about it. :)

    ReplyDelete