08.Appsrc 和 Appsink
2026/1/21大约 5 分钟appsrcappsink
GStreamer学习笔记:08.Appsrc 和 Appsink
本示例尝试使用 appsrc 和 appsink 元素来实现自定义数据源和从 pipeline 中接收数据。这对于需要将 GStreamer 集成到现有应用程序中的场景非常有用。
核心概念
1. Appsrc - 自定义数据源
appsrc 允许应用程序向 GStreamer pipeline 注入自定义数据。
配置 Appsrc
GstAudioInfo info;
GstCaps *audio_caps;
// 设置音频格式信息
gst_audio_info_set_format(&info, GST_AUDIO_FORMAT_S16, SAMPLE_RATE, 1, NULL);
// 将 info 转换为 caps
audio_caps = gst_audio_info_to_caps(&info);
// 配置 appsrc 元素属性
g_object_set(data.app_src,
"caps", audio_caps, // 能力描述
"format", GST_FORMAT_TIME, // 格式为时间(根据提供的纳秒级时间戳来安排缓冲区的播放顺序和时间)
NULL
);
// 绑定事件回调函数
g_signal_connect(data.app_src, "need-data", G_CALLBACK(start_feed), &data);
g_signal_connect(data.app_src, "enough-data", G_CALLBACK(stop_feed), &data);
gst_caps_unref(audio_caps);Appsrc 信号
- need-data:当 appsrc 内部队列数据不足时触发,请求应用程序提供数据
- enough-data:当 appsrc 有足够数据时触发,应用程序可以停止推送数据
2. Appsink - 数据接收
appsink 允许应用程序从 GStreamer pipeline 接收数据。
配置 Appsink
g_object_set(data.app_sink,
"emit-signals", TRUE, // 允许发出例如 `new-sample` 的信号
"caps", audio_caps, // 能力描述
NULL
);
g_signal_connect(data.app_sink, "new-sample", G_CALLBACK(new_sample), &data);Appsink 信号
- new-sample:当 appsink 收到一个新 sample 时触发
3. 数据推送机制
推送数据函数
static gboolean push_data(CustomData *data)
{
GstBuffer *buffer;
GstFlowReturn ret;
GstMapInfo map;
gint16 *raw;
gint num_samples = CHUNK_SIZE / 2; // 每个采样 16 位
gfloat freq;
// 创建缓冲区
buffer = gst_buffer_new_and_alloc(CHUNK_SIZE);
// 设置时间戳和持续时间
// buffer->pts = 采样计数 * 单位时间(ns) / 采样率(hz)
GST_BUFFER_TIMESTAMP(buffer) = gst_util_uint64_scale(data->num_samples, GST_SECOND, SAMPLE_RATE);
// buffer->duration = 本缓冲区的采样个数 * 单位时间(ns) / 采样率(hz)
GST_BUFFER_DURATION(buffer) = gst_util_uint64_scale(num_samples, GST_SECOND, SAMPLE_RATE);
// 映射缓冲区以进行写入
gst_buffer_map(buffer, &map, GST_MAP_WRITE);
raw = (gint16 *)map.data;
// 生成波形数据
data->c += data->d;
data->d -= data->c / 1000;
freq = 1100 + 1000 * data->d;
for (int i = 0; i < num_samples; i++)
{
data->a += data->b;
data->b -= data->a / freq;
raw[i] = (gint16)(500 * data->a);
}
gst_buffer_unmap(buffer, &map);
data->num_samples += num_samples;
// 推送 buffer 到 appsrc
g_signal_emit_by_name(data->app_src, "push-buffer", buffer, &ret);
// 释放 buffer
gst_buffer_unref(buffer);
return ret == GST_FLOW_OK;
}时间戳计算
// gst_util_uint64_scale(a, b, c) 使用 uint64 完成缩放计算: res = a * b / c
// 例如:将 A ∈ [0, 2π] 映射到 B ∈ [0, MAX_INT32]
// B = A / 2π * MAX_INT32 = A * MAX_INT32 / 2π
// B = gst_util_uint64_scale(A, MAX_INT32, 2π)
// 每个缓冲区的时间戳
GST_BUFFER_TIMESTAMP(buffer) = gst_util_uint64_scale(data->num_samples, GST_SECOND, SAMPLE_RATE);
// 每个缓冲区的持续时间
GST_BUFFER_DURATION(buffer) = gst_util_uint64_scale(num_samples, GST_SECOND, SAMPLE_RATE);4. 信号回调处理
开始推送数据
static void start_feed(GstElement *source, guint size, CustomData *data)
{
if (data->sourceid == 0)
{
g_print("Start feeding\n");
// 注册 GLib 空闲函数,在主循环空闲时执行
data->sourceid = g_idle_add((GSourceFunc)push_data, data);
}
}停止推送数据
static void stop_feed(GstElement *source, CustomData *data)
{
if (data->sourceid != 0)
{
g_print("Stop feeding\n");
g_source_remove(data->sourceid);
data->sourceid = 0;
}
}接收 Sample
static GstFlowReturn new_sample(GstElement *sink, CustomData *data)
{
GstSample *sample;
g_signal_emit_by_name(sink, "pull-sample", &sample);
if (sample)
{
g_print("*"); // 输出 * 表示接收到一个 buffer
gst_sample_unref(sample);
return GST_FLOW_OK;
}
return GST_FLOW_ERROR;
}5. GLib 主循环
data.main_loop = g_main_loop_new(NULL, FALSE);
g_main_loop_run(data.main_loop);- 创建 GLib 主循环
- 上下文为 NULL 代表默认上下文
- 主循环的作用是等待事件触发、调用绑定的事件处理函数
- 类似于 Node.js 的事件处理机制
6. Pipeline 结构
app_src -> tee
tee.src_1 -> audio_queue -> audio_convert1 -> audio_resample -> audio_sink
tee.src_2 -> video_queue -> audio_convert2 -> visual -> video_convert -> video_sink
tee.src_3 -> app_queue -> app_sink完整代码
#include <gst/gst.h>
#include <gst/audio/audio.h>
#include <string.h>
#define CHUNK_SIZE 1024
#define SAMPLE_RATE 44100
typedef struct _CustomData
{
GstElement *pipeline;
GstElement *app_src, *tee;
GstElement *audio_queue, *audio_convert1, *audio_resample, *audio_sink;
GstElement *video_queue, *audio_convert2, *visual, *video_convert, *video_sink;
GstElement *app_queue, *app_sink;
guint64 num_samples;
gfloat a, b, c, d;
guint sourceid;
GMainLoop *main_loop;
} CustomData;
static gboolean push_data(CustomData *data);
static void start_feed(GstElement *source, guint size, CustomData *data);
static void stop_feed(GstElement *source, CustomData *data);
static GstFlowReturn new_sample(GstElement *sink, CustomData *data);
static void error_cb(GstBus *bus, GstMessage *msg, CustomData *data);
int main(int argc, char *argv[])
{
CustomData data;
GstPad *tee_pad_1, *tee_pad_2, *tee_pad_3;
GstPad *queue_audio_pad, *queue_video_pad, *queue_app_pad;
GstAudioInfo info;
GstCaps *audio_caps;
GstBus *bus;
memset(&data, 0, sizeof(data));
data.b = 1;
data.d = 1;
gst_init(&argc, &argv);
data.app_src = gst_element_factory_make("appsrc", "audio_source");
data.tee = gst_element_factory_make("tee", "tee");
data.audio_queue = gst_element_factory_make("queue", "audio_queue");
data.audio_convert1 = gst_element_factory_make("audioconvert", "audio_convert1");
data.audio_resample = gst_element_factory_make("audioresample", "audio_resample");
data.audio_sink = gst_element_factory_make("autoaudiosink", "audio_sink");
data.video_queue = gst_element_factory_make("queue", "video_queue");
data.audio_convert2 = gst_element_factory_make("audioconvert", "audio_convert2");
data.visual = gst_element_factory_make("wavescope", "visual");
data.video_convert = gst_element_factory_make("videoconvert", "video_convert");
data.video_sink = gst_element_factory_make("autovideosink", "video_sink");
data.app_queue = gst_element_factory_make("queue", "app_queue");
data.app_sink = gst_element_factory_make("appsink", "app_sink");
data.pipeline = gst_pipeline_new("test-pipeline");
if (!data.pipeline || !data.app_src || !data.tee ||
!data.audio_queue || !data.audio_convert1 || !data.audio_resample || !data.audio_sink ||
!data.video_queue || !data.audio_convert2 || !data.visual || !data.video_convert || !data.video_sink ||
!data.app_queue || !data.app_sink)
{
g_printerr("Not all elements could be created.\n");
return -1;
}
g_object_set(data.visual, "shader", 0, "style", 0, NULL);
gst_audio_info_set_format(&info, GST_AUDIO_FORMAT_S16, SAMPLE_RATE, 1, NULL);
audio_caps = gst_audio_info_to_caps(&info);
g_object_set(data.app_src, "caps", audio_caps, "format", GST_FORMAT_TIME, NULL);
g_signal_connect(data.app_src, "need-data", G_CALLBACK(start_feed), &data);
g_signal_connect(data.app_src, "enough-data", G_CALLBACK(stop_feed), &data);
g_object_set(data.app_sink, "emit-signals", TRUE, "caps", audio_caps, NULL);
g_signal_connect(data.app_sink, "new-sample", G_CALLBACK(new_sample), &data);
gst_caps_unref(audio_caps);
gst_bin_add_many(GST_BIN(data.pipeline),
data.app_src, data.tee,
data.audio_queue, data.audio_convert1, data.audio_resample, data.audio_sink,
data.video_queue, data.audio_convert2, data.visual, data.video_convert, data.video_sink,
data.app_queue, data.app_sink,
NULL
);
if (gst_element_link_many(data.app_src, data.tee, NULL) != TRUE ||
gst_element_link_many(data.audio_queue, data.audio_convert1, data.audio_resample, data.audio_sink, NULL) != TRUE ||
gst_element_link_many(data.video_queue, data.audio_convert2, data.visual, data.video_convert, data.video_sink, NULL) != TRUE ||
gst_element_link_many(data.app_queue, data.app_sink, NULL) != TRUE)
{
g_printerr("Elements could not be linked.\n");
gst_object_unref(data.pipeline);
return -1;
}
queue_audio_pad = gst_element_get_static_pad(data.audio_queue, "sink");
queue_video_pad = gst_element_get_static_pad(data.video_queue, "sink");
queue_app_pad = gst_element_get_static_pad(data.app_queue, "sink");
tee_pad_1 = gst_element_request_pad_simple(data.tee, "src_%u");
tee_pad_2 = gst_element_request_pad_simple(data.tee, "src_%u");
tee_pad_3 = gst_element_request_pad_simple(data.tee, "src_%u");
if (gst_pad_link(tee_pad_1, queue_audio_pad) != GST_PAD_LINK_OK ||
gst_pad_link(tee_pad_2, queue_video_pad) != GST_PAD_LINK_OK ||
gst_pad_link(tee_pad_3, queue_app_pad) != GST_PAD_LINK_OK)
{
g_printerr("Tee could not be linked\n");
gst_object_unref(data.pipeline);
return -1;
}
gst_object_unref(queue_audio_pad);
gst_object_unref(queue_video_pad);
gst_object_unref(queue_app_pad);
bus = gst_element_get_bus(data.pipeline);
gst_bus_add_signal_watch(bus);
g_signal_connect(G_OBJECT(bus), "message::error", (GCallback)error_cb, &data);
gst_object_unref(bus);
gst_element_set_state(data.pipeline, GST_STATE_PLAYING);
data.main_loop = g_main_loop_new(NULL, FALSE);
g_main_loop_run(data.main_loop);
gst_element_release_request_pad(data.tee, tee_pad_1);
gst_element_release_request_pad(data.tee, tee_pad_2);
gst_element_release_request_pad(data.tee, tee_pad_3);
gst_object_unref(tee_pad_1);
gst_object_unref(tee_pad_2);
gst_object_unref(tee_pad_3);
gst_element_set_state(data.pipeline, GST_STATE_NULL);
gst_object_unref(data.pipeline);
return 0;
}编译和运行
gcc main.c -o main.out $(pkg-config --cflags --libs gstreamer-1.0)
./main.out总结
本示例展示了:
- Appsrc:自定义数据源,向 pipeline 注入数据
- Appsink:数据接收,从 pipeline 获取处理后的数据
- 时间戳管理:正确设置 buffer 的 PTS 和 Duration
- 信号机制:使用
need-data和enough-data控制数据推送 - GLib 主循环:事件驱动的异步处理
- 多分支 Pipeline:使用 tee 分发数据到多个分支
Appsrc 和 Appsink 是 GStreamer 与应用程序集成的关键组件,允许将 GStreamer 的强大媒体处理能力集成到任何应用程序中。