2 * This file is part of FFmpeg.
4 * FFmpeg is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * FFmpeg is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with FFmpeg; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22 #include "libavutil/avassert.h"
23 #include "libavutil/container_fifo.h"
24 #include "libavutil/channel_layout.h"
25 #include "libavutil/cpu.h"
26 #include "libavutil/error.h"
27 #include "libavutil/mathematics.h"
28 #include "libavutil/mem.h"
29 #include "libavutil/samplefmt.h"
30 #include "libavutil/timestamp.h"
32 #include "sync_queue.h"
37 * time: 0 1 2 3 4 5 6 7 8 9 10 11 12 13
38 * -------------------------------------------------------------------
39 * | | | | | | | | | | | | | |
40 * | ┌───┐┌────────┐┌───┐┌─────────────┐
41 * stream 0| │d=1││ d=2 ││d=1││ d=3 │
42 * | └───┘└────────┘└───┘└─────────────┘
43 * ┌───┐ ┌───────────────────────┐
44 * stream 1│d=1│ │ d=5 │
45 * └───┘ └───────────────────────┘
46 * | ┌───┐┌───┐┌───┐┌───┐
47 * stream 2| │d=1││d=1││d=1││d=1│ <- stream 2 is the head stream of the queue
48 * | └───┘└───┘└───┘└───┘
50 * [stream 2 tail] [stream 2 head]
52 * We have N streams (N=3 in the diagram), each stream is a FIFO. The *tail* of
53 * each FIFO is the frame with smallest end time, the *head* is the frame with
54 * the largest end time. Frames submitted to the queue with sq_send() are placed
55 * after the head, frames returned to the caller with sq_receive() are taken
58 * The head stream of the whole queue (SyncQueue.head_stream) is the limiting
59 * stream with the *smallest* head timestamp, i.e. the stream whose source lags
60 * furthest behind all other streams. It determines which frames can be output
63 * In the diagram, the head stream is 2, because it head time is t=5, while
64 * streams 0 and 1 end at t=8 and t=9 respectively. All frames that _end_ at
65 * or before t=5 can be output, i.e. the first 3 frames from stream 0, first
66 * frame from stream 1, and all 4 frames from stream 2.
69 #define SQPTR(sq, frame) ((sq->type == SYNC_QUEUE_FRAMES) ? \
70 (void*)frame.f : (void*)frame.p)
72 typedef struct SyncQueueStream
{
73 AVContainerFifo
*fifo
;
76 /* number of audio samples in fifo */
77 uint64_t samples_queued
;
78 /* stream head: largest timestamp seen */
81 /* no more frames will be sent for this stream */
85 uint64_t samples_sent
;
91 enum SyncQueueType type
;
95 /* no more frames will be sent for any stream */
97 /* sync head: the stream with the _smallest_ head timestamp
98 * this stream determines which frames can be output */
100 /* the finished stream with the smallest finish timestamp or -1 */
101 int head_finished_stream
;
103 // maximum buffering duration in microseconds
106 SyncQueueStream
*streams
;
107 unsigned int nb_streams
;
111 uintptr_t align_mask
;
115 * Compute the end timestamp of a frame. If nb_samples is provided, consider
116 * the frame to have this number of audio samples, otherwise use frame duration.
118 static int64_t frame_end(const SyncQueue
*sq
, SyncQueueFrame frame
, int nb_samples
)
121 int64_t d
= av_rescale_q(nb_samples
, (AVRational
){ 1, frame
.f
->sample_rate
},
123 return frame
.f
->pts
+ d
;
126 return (sq
->type
== SYNC_QUEUE_PACKETS
) ?
127 frame
.p
->pts
+ frame
.p
->duration
:
128 frame
.f
->pts
+ frame
.f
->duration
;
131 static int frame_samples(const SyncQueue
*sq
, SyncQueueFrame frame
)
133 return (sq
->type
== SYNC_QUEUE_PACKETS
) ? 0 : frame
.f
->nb_samples
;
136 static int frame_null(const SyncQueue
*sq
, SyncQueueFrame frame
)
138 return (sq
->type
== SYNC_QUEUE_PACKETS
) ? (frame
.p
== NULL
) : (frame
.f
== NULL
);
141 static void tb_update(const SyncQueue
*sq
, SyncQueueStream
*st
,
142 const SyncQueueFrame frame
)
144 AVRational tb
= (sq
->type
== SYNC_QUEUE_PACKETS
) ?
145 frame
.p
->time_base
: frame
.f
->time_base
;
147 av_assert0(tb
.num
> 0 && tb
.den
> 0);
149 if (tb
.num
== st
->tb
.num
&& tb
.den
== st
->tb
.den
)
152 // timebase should not change after the first frame
153 av_assert0(!av_container_fifo_can_read(st
->fifo
));
155 if (st
->head_ts
!= AV_NOPTS_VALUE
)
156 st
->head_ts
= av_rescale_q(st
->head_ts
, st
->tb
, tb
);
161 static void finish_stream(SyncQueue
*sq
, unsigned int stream_idx
)
163 SyncQueueStream
*st
= &sq
->streams
[stream_idx
];
166 av_log(sq
->logctx
, AV_LOG_DEBUG
,
167 "sq: finish %u; head ts %s\n", stream_idx
,
168 av_ts2timestr(st
->head_ts
, &st
->tb
));
172 if (st
->limiting
&& st
->head_ts
!= AV_NOPTS_VALUE
) {
173 /* check if this stream is the new finished head */
174 if (sq
->head_finished_stream
< 0 ||
175 av_compare_ts(st
->head_ts
, st
->tb
,
176 sq
->streams
[sq
->head_finished_stream
].head_ts
,
177 sq
->streams
[sq
->head_finished_stream
].tb
) < 0) {
178 sq
->head_finished_stream
= stream_idx
;
181 /* mark as finished all streams that should no longer receive new frames,
182 * due to them being ahead of some finished stream */
183 st
= &sq
->streams
[sq
->head_finished_stream
];
184 for (unsigned int i
= 0; i
< sq
->nb_streams
; i
++) {
185 SyncQueueStream
*st1
= &sq
->streams
[i
];
186 if (st
!= st1
&& st1
->head_ts
!= AV_NOPTS_VALUE
&&
187 av_compare_ts(st
->head_ts
, st
->tb
, st1
->head_ts
, st1
->tb
) <= 0) {
189 av_log(sq
->logctx
, AV_LOG_DEBUG
,
190 "sq: finish secondary %u; head ts %s\n", i
,
191 av_ts2timestr(st1
->head_ts
, &st1
->tb
));
198 /* mark the whole queue as finished if all streams are finished */
199 for (unsigned int i
= 0; i
< sq
->nb_streams
; i
++) {
200 if (!sq
->streams
[i
].finished
)
205 av_log(sq
->logctx
, AV_LOG_DEBUG
, "sq: finish queue\n");
208 static void queue_head_update(SyncQueue
*sq
)
210 av_assert0(sq
->have_limiting
);
212 if (sq
->head_stream
< 0) {
213 unsigned first_limiting
= UINT_MAX
;
215 /* wait for one timestamp in each stream before determining
217 for (unsigned int i
= 0; i
< sq
->nb_streams
; i
++) {
218 SyncQueueStream
*st
= &sq
->streams
[i
];
221 if (st
->head_ts
== AV_NOPTS_VALUE
)
223 if (first_limiting
== UINT_MAX
)
227 // placeholder value, correct one will be found below
228 av_assert0(first_limiting
< UINT_MAX
);
229 sq
->head_stream
= first_limiting
;
232 for (unsigned int i
= 0; i
< sq
->nb_streams
; i
++) {
233 SyncQueueStream
*st_head
= &sq
->streams
[sq
->head_stream
];
234 SyncQueueStream
*st_other
= &sq
->streams
[i
];
235 if (st_other
->limiting
&& st_other
->head_ts
!= AV_NOPTS_VALUE
&&
236 av_compare_ts(st_other
->head_ts
, st_other
->tb
,
237 st_head
->head_ts
, st_head
->tb
) < 0)
242 /* update this stream's head timestamp */
243 static void stream_update_ts(SyncQueue
*sq
, unsigned int stream_idx
, int64_t ts
)
245 SyncQueueStream
*st
= &sq
->streams
[stream_idx
];
247 if (ts
== AV_NOPTS_VALUE
||
248 (st
->head_ts
!= AV_NOPTS_VALUE
&& st
->head_ts
>= ts
))
253 /* if this stream is now ahead of some finished stream, then
254 * this stream is also finished */
255 if (sq
->head_finished_stream
>= 0 &&
256 av_compare_ts(sq
->streams
[sq
->head_finished_stream
].head_ts
,
257 sq
->streams
[sq
->head_finished_stream
].tb
,
259 finish_stream(sq
, stream_idx
);
261 /* update the overall head timestamp if it could have changed */
263 (sq
->head_stream
< 0 || sq
->head_stream
== stream_idx
))
264 queue_head_update(sq
);
267 /* If the queue for the given stream (or all streams when stream_idx=-1)
268 * is overflowing, trigger a fake heartbeat on lagging streams.
270 * @return 1 if heartbeat triggered, 0 otherwise
272 static int overflow_heartbeat(SyncQueue
*sq
, int stream_idx
)
275 SyncQueueFrame frame
;
276 int64_t tail_ts
= AV_NOPTS_VALUE
;
278 /* if no stream specified, pick the one that is most ahead */
279 if (stream_idx
< 0) {
280 int64_t ts
= AV_NOPTS_VALUE
;
282 for (int i
= 0; i
< sq
->nb_streams
; i
++) {
283 st
= &sq
->streams
[i
];
284 if (st
->head_ts
!= AV_NOPTS_VALUE
&&
285 (ts
== AV_NOPTS_VALUE
||
286 av_compare_ts(ts
, sq
->streams
[stream_idx
].tb
,
287 st
->head_ts
, st
->tb
) < 0)) {
292 /* no stream has a timestamp yet -> nothing to do */
297 st
= &sq
->streams
[stream_idx
];
299 /* get the chosen stream's tail timestamp */
300 for (size_t i
= 0; tail_ts
== AV_NOPTS_VALUE
&&
301 av_container_fifo_peek(st
->fifo
, (void**)&frame
, i
) >= 0; i
++)
302 tail_ts
= frame_end(sq
, frame
, 0);
304 /* overflow triggers when the tail is over specified duration behind the head */
305 if (tail_ts
== AV_NOPTS_VALUE
|| tail_ts
>= st
->head_ts
||
306 av_rescale_q(st
->head_ts
- tail_ts
, st
->tb
, AV_TIME_BASE_Q
) < sq
->buf_size_us
)
309 /* signal a fake timestamp for all streams that prevent tail_ts from being output */
311 for (unsigned int i
= 0; i
< sq
->nb_streams
; i
++) {
312 const SyncQueueStream
*st1
= &sq
->streams
[i
];
315 if (st
== st1
|| st1
->finished
||
316 (st1
->head_ts
!= AV_NOPTS_VALUE
&&
317 av_compare_ts(tail_ts
, st
->tb
, st1
->head_ts
, st1
->tb
) <= 0))
320 ts
= av_rescale_q(tail_ts
, st
->tb
, st1
->tb
);
321 if (st1
->head_ts
!= AV_NOPTS_VALUE
)
322 ts
= FFMAX(st1
->head_ts
+ 1, ts
);
324 av_log(sq
->logctx
, AV_LOG_DEBUG
, "sq: %u overflow heardbeat %s -> %s\n",
325 i
, av_ts2timestr(st1
->head_ts
, &st1
->tb
), av_ts2timestr(ts
, &st1
->tb
));
327 stream_update_ts(sq
, i
, ts
);
333 int sq_send(SyncQueue
*sq
, unsigned int stream_idx
, SyncQueueFrame frame
)
339 av_assert0(stream_idx
< sq
->nb_streams
);
340 st
= &sq
->streams
[stream_idx
];
342 if (frame_null(sq
, frame
)) {
343 av_log(sq
->logctx
, AV_LOG_DEBUG
, "sq: %u EOF\n", stream_idx
);
344 finish_stream(sq
, stream_idx
);
350 tb_update(sq
, st
, frame
);
352 nb_samples
= frame_samples(sq
, frame
);
353 // make sure frame duration is consistent with sample count
355 av_assert0(frame
.f
->sample_rate
> 0);
356 frame
.f
->duration
= av_rescale_q(nb_samples
, (AVRational
){ 1, frame
.f
->sample_rate
},
360 ts
= frame_end(sq
, frame
, 0);
362 av_log(sq
->logctx
, AV_LOG_DEBUG
, "sq: send %u ts %s\n", stream_idx
,
363 av_ts2timestr(ts
, &st
->tb
));
365 ret
= av_container_fifo_write(st
->fifo
, SQPTR(sq
, frame
), 0);
369 stream_update_ts(sq
, stream_idx
, ts
);
371 st
->samples_queued
+= nb_samples
;
372 st
->samples_sent
+= nb_samples
;
374 if (st
->frame_samples
)
375 st
->frames_sent
= st
->samples_sent
/ st
->frame_samples
;
379 if (st
->frames_sent
>= st
->frames_max
) {
380 av_log(sq
->logctx
, AV_LOG_DEBUG
, "sq: %u frames_max %"PRIu64
" reached\n",
381 stream_idx
, st
->frames_max
);
383 finish_stream(sq
, stream_idx
);
389 static void offset_audio(AVFrame
*f
, int nb_samples
)
391 const int planar
= av_sample_fmt_is_planar(f
->format
);
392 const int planes
= planar
? f
->ch_layout
.nb_channels
: 1;
393 const int bps
= av_get_bytes_per_sample(f
->format
);
394 const int offset
= nb_samples
* bps
* (planar
? 1 : f
->ch_layout
.nb_channels
);
397 av_assert0(nb_samples
< f
->nb_samples
);
399 for (int i
= 0; i
< planes
; i
++) {
400 f
->extended_data
[i
] += offset
;
401 if (i
< FF_ARRAY_ELEMS(f
->data
))
402 f
->data
[i
] = f
->extended_data
[i
];
404 f
->linesize
[0] -= offset
;
405 f
->nb_samples
-= nb_samples
;
406 f
->duration
= av_rescale_q(f
->nb_samples
, (AVRational
){ 1, f
->sample_rate
},
408 f
->pts
+= av_rescale_q(nb_samples
, (AVRational
){ 1, f
->sample_rate
},
412 static int frame_is_aligned(const SyncQueue
*sq
, const AVFrame
*frame
)
414 // only checks linesize[0], so only works for audio
415 av_assert0(frame
->nb_samples
> 0);
416 av_assert0(sq
->align_mask
);
418 // only check data[0], because we always offset all data pointers
419 // by the same offset, so if one is aligned, all are
420 if (!((uintptr_t)frame
->data
[0] & sq
->align_mask
) &&
421 !(frame
->linesize
[0] & sq
->align_mask
) &&
422 frame
->linesize
[0] > sq
->align_mask
)
428 static int receive_samples(SyncQueue
*sq
, SyncQueueStream
*st
,
429 AVFrame
*dst
, int nb_samples
)
434 av_assert0(st
->samples_queued
>= nb_samples
);
436 ret
= av_container_fifo_peek(st
->fifo
, (void**)&src
, 0);
437 av_assert0(ret
>= 0);
439 // peeked frame has enough samples and its data is aligned
440 // -> we can just make a reference and limit its sample count
441 if (src
.f
->nb_samples
> nb_samples
&& frame_is_aligned(sq
, src
.f
)) {
442 ret
= av_frame_ref(dst
, src
.f
);
446 dst
->nb_samples
= nb_samples
;
447 offset_audio(src
.f
, nb_samples
);
448 st
->samples_queued
-= nb_samples
;
453 // otherwise allocate a new frame and copy the data
454 ret
= av_channel_layout_copy(&dst
->ch_layout
, &src
.f
->ch_layout
);
458 dst
->format
= src
.f
->format
;
459 dst
->nb_samples
= nb_samples
;
461 ret
= av_frame_get_buffer(dst
, 0);
465 ret
= av_frame_copy_props(dst
, src
.f
);
470 while (dst
->nb_samples
< nb_samples
) {
473 ret
= av_container_fifo_peek(st
->fifo
, (void**)&src
, 0);
474 av_assert0(ret
>= 0);
476 to_copy
= FFMIN(nb_samples
- dst
->nb_samples
, src
.f
->nb_samples
);
478 av_samples_copy(dst
->extended_data
, src
.f
->extended_data
, dst
->nb_samples
,
479 0, to_copy
, dst
->ch_layout
.nb_channels
, dst
->format
);
481 if (to_copy
< src
.f
->nb_samples
)
482 offset_audio(src
.f
, to_copy
);
484 av_container_fifo_drain(st
->fifo
, 1);
486 st
->samples_queued
-= to_copy
;
488 dst
->nb_samples
+= to_copy
;
492 dst
->duration
= av_rescale_q(nb_samples
, (AVRational
){ 1, dst
->sample_rate
},
502 static int receive_for_stream(SyncQueue
*sq
, unsigned int stream_idx
,
503 SyncQueueFrame frame
)
505 const SyncQueueStream
*st_head
= sq
->head_stream
>= 0 ?
506 &sq
->streams
[sq
->head_stream
] : NULL
;
509 av_assert0(stream_idx
< sq
->nb_streams
);
510 st
= &sq
->streams
[stream_idx
];
512 if (av_container_fifo_can_read(st
->fifo
) &&
513 (st
->frame_samples
<= st
->samples_queued
|| st
->finished
)) {
514 int nb_samples
= st
->frame_samples
;
520 nb_samples
= FFMIN(nb_samples
, st
->samples_queued
);
522 av_container_fifo_peek(st
->fifo
, (void**)&peek
, 0);
523 ts
= frame_end(sq
, peek
, nb_samples
);
525 /* check if this stream's tail timestamp does not overtake
526 * the overall queue head */
527 if (ts
!= AV_NOPTS_VALUE
&& st_head
)
528 cmp
= av_compare_ts(ts
, st
->tb
, st_head
->head_ts
, st_head
->tb
);
530 /* We can release frames that do not end after the queue head.
531 * Frames with no timestamps are just passed through with no conditions.
532 * Frames are also passed through when there are no limiting streams.
534 if (cmp
<= 0 || ts
== AV_NOPTS_VALUE
|| !sq
->have_limiting
) {
536 (nb_samples
!= peek
.f
->nb_samples
|| !frame_is_aligned(sq
, peek
.f
))) {
537 int ret
= receive_samples(sq
, st
, frame
.f
, nb_samples
);
541 int ret
= av_container_fifo_read(st
->fifo
, SQPTR(sq
, frame
), 0);
542 av_assert0(ret
>= 0);
544 av_assert0(st
->samples_queued
>= frame_samples(sq
, frame
));
545 st
->samples_queued
-= frame_samples(sq
, frame
);
548 av_log(sq
->logctx
, AV_LOG_DEBUG
,
549 "sq: receive %u ts %s queue head %d ts %s\n", stream_idx
,
550 av_ts2timestr(frame_end(sq
, frame
, 0), &st
->tb
),
552 st_head
? av_ts2timestr(st_head
->head_ts
, &st_head
->tb
) : "N/A");
558 return (sq
->finished
|| (st
->finished
&& !av_container_fifo_can_read(st
->fifo
))) ?
559 AVERROR_EOF
: AVERROR(EAGAIN
);
562 static int receive_internal(SyncQueue
*sq
, int stream_idx
, SyncQueueFrame frame
)
567 /* read a frame for a specific stream */
568 if (stream_idx
>= 0) {
569 ret
= receive_for_stream(sq
, stream_idx
, frame
);
570 return (ret
< 0) ? ret
: stream_idx
;
573 /* read a frame for any stream with available output */
574 for (unsigned int i
= 0; i
< sq
->nb_streams
; i
++) {
575 ret
= receive_for_stream(sq
, i
, frame
);
576 if (ret
== AVERROR_EOF
|| ret
== AVERROR(EAGAIN
)) {
577 nb_eof
+= (ret
== AVERROR_EOF
);
580 return (ret
< 0) ? ret
: i
;
583 return (nb_eof
== sq
->nb_streams
) ? AVERROR_EOF
: AVERROR(EAGAIN
);
586 int sq_receive(SyncQueue
*sq
, int stream_idx
, SyncQueueFrame frame
)
588 int ret
= receive_internal(sq
, stream_idx
, frame
);
590 /* try again if the queue overflowed and triggered a fake heartbeat
591 * for lagging streams */
592 if (ret
== AVERROR(EAGAIN
) && overflow_heartbeat(sq
, stream_idx
))
593 ret
= receive_internal(sq
, stream_idx
, frame
);
598 int sq_add_stream(SyncQueue
*sq
, int limiting
)
600 SyncQueueStream
*tmp
, *st
;
602 tmp
= av_realloc_array(sq
->streams
, sq
->nb_streams
+ 1, sizeof(*sq
->streams
));
604 return AVERROR(ENOMEM
);
607 st
= &sq
->streams
[sq
->nb_streams
];
608 memset(st
, 0, sizeof(*st
));
610 st
->fifo
= (sq
->type
== SYNC_QUEUE_FRAMES
) ?
611 av_container_fifo_alloc_avframe(0) : av_container_fifo_alloc_avpacket(0);
613 return AVERROR(ENOMEM
);
615 /* we set a valid default, so that a pathological stream that never
616 * receives even a real timebase (and no frames) won't stall all other
617 * streams forever; cf. overflow_heartbeat() */
618 st
->tb
= (AVRational
){ 1, 1 };
619 st
->head_ts
= AV_NOPTS_VALUE
;
620 st
->frames_max
= UINT64_MAX
;
621 st
->limiting
= limiting
;
623 sq
->have_limiting
|= limiting
;
625 return sq
->nb_streams
++;
628 void sq_limit_frames(SyncQueue
*sq
, unsigned int stream_idx
, uint64_t frames
)
632 av_assert0(stream_idx
< sq
->nb_streams
);
633 st
= &sq
->streams
[stream_idx
];
635 st
->frames_max
= frames
;
636 if (st
->frames_sent
>= st
->frames_max
)
637 finish_stream(sq
, stream_idx
);
640 void sq_frame_samples(SyncQueue
*sq
, unsigned int stream_idx
,
645 av_assert0(sq
->type
== SYNC_QUEUE_FRAMES
);
646 av_assert0(stream_idx
< sq
->nb_streams
);
647 st
= &sq
->streams
[stream_idx
];
649 st
->frame_samples
= frame_samples
;
651 sq
->align_mask
= av_cpu_max_align() - 1;
654 SyncQueue
*sq_alloc(enum SyncQueueType type
, int64_t buf_size_us
, void *logctx
)
656 SyncQueue
*sq
= av_mallocz(sizeof(*sq
));
662 sq
->buf_size_us
= buf_size_us
;
665 sq
->head_stream
= -1;
666 sq
->head_finished_stream
= -1;
671 void sq_free(SyncQueue
**psq
)
673 SyncQueue
*sq
= *psq
;
678 for (unsigned int i
= 0; i
< sq
->nb_streams
; i
++)
679 av_container_fifo_free(&sq
->streams
[i
].fifo
);
681 av_freep(&sq
->streams
);