07.多线程与 Tee Element
2026/1/19大约 4 分钟multithreadingtee
GStreamer学习笔记:07.多线程与 Tee Element
本示例尝试使用 GStreamer 的多线程能力,试图通过 tee 元素将音频流分发到多个分支,实现音频和视频的同时播放。
核心概念
1. GStreamer 的多线程模型
- GStreamer 是一个多线程的框架,在内部,它根据需要创建和销毁线程
- 插件可以自由创建线程来处理它们的任务
- 例如:视频解码器可以创建四个线程以充分利用 CPU 的四个核
- 可以利用
queue元素使得 pipeline 的不同分支运行在不同的线程上
2. Tee 元素
tee 元素可以将输入的数据流复制并分发到多个输出 Pad:
tee (复制数据)
|
+-----+-----+
| | |
src src1 src2
| | ...3. 转换 Element 的必要性
转换 element(audioconvert、audioresample 和 videoconvert)是必须的,可以保证 pipeline 可以正确地连接:
- 音频和视频元素的 sink 的 Caps 是由硬件确定的
- 如果
audiotestsrc和wavescope的 Caps 能够匹配 - 这些 element 的行为就类似于直通——对信号不做任何修改,这对于性能的影响基本可以忽略不计
4. Pipeline 结构
audio_source (线程1) -> tee
tee.src_0 -> audio_queue (线程2) -> audio_convert -> audio_resample -> audio_sink
tee.src_1 -> video_queue (线程3) -> visual -> video_convert -> video_sinkqueue元素会创建新的线程- 整条 pipeline 有三个线程:主线程(线程1)、线程2、线程3
核心代码
1. 创建元素
audio_source = gst_element_factory_make("audiotestsrc", "audio_source");
tee = gst_element_factory_make("tee", "tee");
audio_queue = gst_element_factory_make("queue", "audio_queue");
audio_convert = gst_element_factory_make("audioconvert", "audio_convert");
audio_resample = gst_element_factory_make("audioresample", "audio_resample");
audio_sink = gst_element_factory_make("autoaudiosink", "audio_sink");
video_queue = gst_element_factory_make("queue", "video_queue");
visual = gst_element_factory_make("wavescope", "visual"); // 消费一个音频信号并且将它渲染成波形,简易的示波器
video_convert = gst_element_factory_make("videoconvert", "csp");
video_sink = gst_element_factory_make("autovideosink", "video_sink");2. 配置元素属性
g_object_set(audio_source, "freq", 215.0f, NULL);
g_object_set(visual, "shader", 0, "style", 1, NULL);3. 添加元素到 Pipeline
gst_bin_add_many(GST_BIN(pipeline),
audio_source, tee, audio_queue, audio_convert, audio_resample, audio_sink,
video_queue, visual, video_convert, video_sink,
NULL
);4. 链接 "Always" Pads
if (
gst_element_link_many(audio_source, tee, NULL) != TRUE ||
gst_element_link_many(audio_queue, audio_convert, audio_resample, audio_sink, NULL) != TRUE ||
gst_element_link_many(video_queue, visual, video_convert, video_sink, NULL) != TRUE
)
{
g_printerr("Elements could not be linked.\n");
gst_object_unref(pipeline);
return -1;
}5. 手动链接 "Request" Pads
tee 元素的 src pad 是 Request Pads,这类 pad 是按需创建的,需要手动创建。
// 获取 queue.sink 具名插槽
queue_audio_pad = gst_element_get_static_pad(audio_queue, "sink");
queue_video_pad = gst_element_get_static_pad(video_queue, "sink");
// 手动申请 tee.src_0 和 tee.src_1
tee_audio_pad = gst_element_request_pad_simple(tee, "src_%u");
tee_video_pad = gst_element_request_pad_simple(tee, "src_%u");
g_print("Obtained request pad %s for audio branch.\n", gst_pad_get_name(tee_audio_pad));
g_print("Obtained request pad %s for video branch.\n", gst_pad_get_name(tee_video_pad));
// 手动连接 tee 元素
if (gst_pad_link(tee_audio_pad, queue_audio_pad) != GST_PAD_LINK_OK ||
gst_pad_link(tee_video_pad, queue_video_pad) != GST_PAD_LINK_OK)
{
g_printerr("Tee could not be linked.\n");
gst_object_unref(pipeline);
return -1;
}
// 释放结构体内存(并不会释放 pad)
gst_object_unref(queue_audio_pad);
gst_object_unref(queue_video_pad);6. 释放 Request Pads
gst_element_release_request_pad(tee, tee_audio_pad); // 释放申请的 tee.src_0 pad 插槽
gst_element_release_request_pad(tee, tee_video_pad); // 释放申请的 tee.src_1 pad 插槽
gst_object_unref(tee_audio_pad); // 释放结构体内存
gst_object_unref(tee_video_pad); // 释放结构体内存完整代码
#include <gst/gst.h>
int main(int argc, char *argv[])
{
GstElement *pipeline, *audio_source, *tee, *audio_queue, *audio_convert, *audio_resample, *audio_sink;
GstElement *video_queue, *visual, *video_convert, *video_sink;
GstBus *bus;
GstMessage *msg;
GstPad *tee_audio_pad, *tee_video_pad;
GstPad *queue_audio_pad, *queue_video_pad;
gst_init(&argc, &argv);
/* Create the elements */
audio_source = gst_element_factory_make("audiotestsrc", "audio_source");
tee = gst_element_factory_make("tee", "tee");
audio_queue = gst_element_factory_make("queue", "audio_queue");
audio_convert = gst_element_factory_make("audioconvert", "audio_convert");
audio_resample = gst_element_factory_make("audioresample", "audio_resample");
audio_sink = gst_element_factory_make("autoaudiosink", "audio_sink");
video_queue = gst_element_factory_make("queue", "video_queue");
visual = gst_element_factory_make("wavescope", "visual");
video_convert = gst_element_factory_make("videoconvert", "csp");
video_sink = gst_element_factory_make("autovideosink", "video_sink");
pipeline = gst_pipeline_new("test-pipeline");
if (!pipeline || !audio_source || !tee || !audio_queue || !audio_convert || !audio_resample || !audio_sink ||
!video_queue || !visual || !video_convert || !video_sink)
{
g_printerr("Not all elements could be created.\n");
return -1;
}
g_object_set(audio_source, "freq", 215.0f, NULL);
g_object_set(visual, "shader", 0, "style", 1, NULL);
gst_bin_add_many(GST_BIN(pipeline), audio_source, tee, audio_queue, audio_convert, audio_resample, audio_sink,
video_queue, visual, video_convert, video_sink, NULL);
if (
gst_element_link_many(audio_source, tee, NULL) != TRUE ||
gst_element_link_many(audio_queue, audio_convert, audio_resample, audio_sink, NULL) != TRUE ||
gst_element_link_many(video_queue, visual, video_convert, video_sink, NULL) != TRUE
)
{
g_printerr("Elements could not be linked.\n");
gst_object_unref(pipeline);
return -1;
}
queue_audio_pad = gst_element_get_static_pad(audio_queue, "sink");
queue_video_pad = gst_element_get_static_pad(video_queue, "sink");
tee_audio_pad = gst_element_request_pad_simple(tee, "src_%u");
tee_video_pad = gst_element_request_pad_simple(tee, "src_%u");
g_print("Obtained request pad %s for audio branch.\n", gst_pad_get_name(tee_audio_pad));
g_print("Obtained request pad %s for video branch.\n", gst_pad_get_name(tee_video_pad));
if (gst_pad_link(tee_audio_pad, queue_audio_pad) != GST_PAD_LINK_OK ||
gst_pad_link(tee_video_pad, queue_video_pad) != GST_PAD_LINK_OK)
{
g_printerr("Tee could not be linked.\n");
gst_object_unref(pipeline);
return -1;
}
gst_object_unref(queue_audio_pad);
gst_object_unref(queue_video_pad);
gst_element_set_state(pipeline, GST_STATE_PLAYING);
bus = gst_element_get_bus(pipeline);
msg = gst_bus_timed_pop_filtered(bus, GST_CLOCK_TIME_NONE, GST_MESSAGE_ERROR | GST_MESSAGE_EOS);
gst_element_release_request_pad(tee, tee_audio_pad);
gst_element_release_request_pad(tee, tee_video_pad);
gst_object_unref(tee_audio_pad);
gst_object_unref(tee_video_pad);
if (msg != NULL)
gst_message_unref(msg);
gst_object_unref(bus);
gst_element_set_state(pipeline, GST_STATE_NULL);
gst_object_unref(pipeline);
return 0;
}编译和运行
gcc main.c -o main.out $(pkg-config --cflags --libs gstreamer-1.0)
./main.out总结
本示例展示了:
- 多线程 Pipeline:使用
queue元素创建多线程分支 - Tee Element:将数据流复制并分发到多个分支
- Request Pads:手动申请和释放 request pads
- 转换 Elements:保证 pipeline 正确连接的必要元素
- 资源管理:正确释放 request pads
多线程 Pipeline 可以充分利用多核 CPU 的性能,实现复杂的媒体处理任务,如同时进行音频播放、视频渲染和数据分析。