2 * Inter-thread scheduling/synchronization.
3 * Copyright (c) 2023 Anton Khirnov
5 * This file is part of FFmpeg.
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22 #include <stdatomic.h>
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
32 #include "libavcodec/packet.h"
34 #include "libavutil/avassert.h"
35 #include "libavutil/error.h"
36 #include "libavutil/fifo.h"
37 #include "libavutil/frame.h"
38 #include "libavutil/mem.h"
39 #include "libavutil/thread.h"
40 #include "libavutil/threadmessage.h"
41 #include "libavutil/time.h"
44 // FIXME: some other value? make this dynamic?
45 #define SCHEDULE_TOLERANCE (100 * 1000)
52 typedef struct SchWaiter
{
57 // the following are internal state of schedule_update_locked() and must not
58 // be accessed outside of it
63 typedef struct SchTask
{
74 typedef struct SchDecOutput
{
76 uint8_t *dst_finished
;
80 typedef struct SchDec
{
85 SchDecOutput
*outputs
;
89 // Queue for receiving input packets, one stream.
92 // Queue for sending post-flush end timestamps back to the source
93 AVThreadMessageQueue
*queue_end_ts
;
96 // temporary storage used by sch_dec_send()
100 typedef struct SchSyncQueue
{
103 pthread_mutex_t lock
;
109 typedef struct SchEnc
{
110 const AVClass
*class;
114 uint8_t *dst_finished
;
117 // [0] - index of the sync queue in Scheduler.sq_enc,
118 // [1] - index of this encoder in the sq
121 /* Opening encoders is somewhat nontrivial due to their interaction with
122 * sync queues, which are (among other things) responsible for maintaining
123 * constant audio frame size, when it is required by the encoder.
125 * Opening the encoder requires stream parameters, obtained from the first
126 * frame. However, that frame cannot be properly chunked by the sync queue
127 * without knowing the required frame size, which is only available after
128 * opening the encoder.
130 * This apparent circular dependency is resolved in the following way:
131 * - the caller creating the encoder gives us a callback which opens the
132 * encoder and returns the required frame size (if any)
133 * - when the first frame is sent to the encoder, the sending thread
134 * - calls this callback, opening the encoder
135 * - passes the returned frame size to the sync queue
137 int (*open_cb
)(void *opaque
, const AVFrame
*frame
);
141 // Queue for receiving input frames, one stream.
143 // tq_send() to queue returned EOF
146 // temporary storage used by sch_enc_send()
150 typedef struct SchDemuxStream
{
152 uint8_t *dst_finished
;
156 typedef struct SchDemux
{
157 const AVClass
*class;
159 SchDemuxStream
*streams
;
165 // temporary storage used by sch_demux_send()
168 // protected by schedule_lock
172 typedef struct PreMuxQueue
{
174 * Queue for buffering the packets before the muxer task can be started.
178 * Maximum number of packets in fifo.
182 * The size of the AVPackets' buffers in queue.
183 * Updated when a packet is either pushed or pulled from the queue.
186 /* Threshold after which max_packets will be in effect */
187 size_t data_threshold
;
190 typedef struct SchMuxStream
{
192 SchedulerNode src_sched
;
194 unsigned *sub_heartbeat_dst
;
195 unsigned nb_sub_heartbeat_dst
;
197 PreMuxQueue pre_mux_queue
;
199 // an EOF was generated while flushing the pre-mux queue
202 ////////////////////////////////////////////////////////////
203 // The following are protected by Scheduler.schedule_lock //
205 /* dts+duration of the last packet sent to this stream
208 // this stream no longer accepts input
210 ////////////////////////////////////////////////////////////
213 typedef struct SchMux
{
214 const AVClass
*class;
216 SchMuxStream
*streams
;
218 unsigned nb_streams_ready
;
220 int (*init
)(void *arg
);
224 * Set to 1 after starting the muxer task and flushing the
226 * Set either before any tasks have started, or with
227 * Scheduler.mux_ready_lock held.
229 atomic_int mux_started
;
233 AVPacket
*sub_heartbeat_pkt
;
236 typedef struct SchFilterIn
{
238 SchedulerNode src_sched
;
240 int receive_finished
;
243 typedef struct SchFilterOut
{
247 typedef struct SchFilterGraph
{
248 const AVClass
*class;
252 atomic_uint nb_inputs_finished_send
;
253 unsigned nb_inputs_finished_receive
;
255 SchFilterOut
*outputs
;
259 // input queue, nb_inputs+1 streams
260 // last stream is control
264 // protected by schedule_lock
269 enum SchedulerState
{
276 const AVClass
*class;
284 unsigned nb_mux_ready
;
285 pthread_mutex_t mux_ready_lock
;
287 unsigned nb_mux_done
;
288 pthread_mutex_t mux_done_lock
;
289 pthread_cond_t mux_done_cond
;
298 SchSyncQueue
*sq_enc
;
301 SchFilterGraph
*filters
;
307 enum SchedulerState state
;
308 atomic_int terminate
;
309 atomic_int task_failed
;
311 pthread_mutex_t schedule_lock
;
313 atomic_int_least64_t last_dts
;
317 * Wait until this task is allowed to proceed.
319 * @retval 0 the caller should proceed
320 * @retval 1 the caller should terminate
322 static int waiter_wait(Scheduler
*sch
, SchWaiter
*w
)
326 if (!atomic_load(&w
->choked
))
329 pthread_mutex_lock(&w
->lock
);
331 while (atomic_load(&w
->choked
) && !atomic_load(&sch
->terminate
))
332 pthread_cond_wait(&w
->cond
, &w
->lock
);
334 terminate
= atomic_load(&sch
->terminate
);
336 pthread_mutex_unlock(&w
->lock
);
341 static void waiter_set(SchWaiter
*w
, int choked
)
343 pthread_mutex_lock(&w
->lock
);
345 atomic_store(&w
->choked
, choked
);
346 pthread_cond_signal(&w
->cond
);
348 pthread_mutex_unlock(&w
->lock
);
351 static int waiter_init(SchWaiter
*w
)
355 atomic_init(&w
->choked
, 0);
357 ret
= pthread_mutex_init(&w
->lock
, NULL
);
361 ret
= pthread_cond_init(&w
->cond
, NULL
);
368 static void waiter_uninit(SchWaiter
*w
)
370 pthread_mutex_destroy(&w
->lock
);
371 pthread_cond_destroy(&w
->cond
);
374 static int queue_alloc(ThreadQueue
**ptq
, unsigned nb_streams
, unsigned queue_size
,
380 if (queue_size
<= 0) {
381 if (type
== QUEUE_FRAMES
)
382 queue_size
= DEFAULT_FRAME_THREAD_QUEUE_SIZE
;
384 queue_size
= DEFAULT_PACKET_THREAD_QUEUE_SIZE
;
387 if (type
== QUEUE_FRAMES
) {
388 // This queue length is used in the decoder code to ensure that
389 // there are enough entries in fixed-size frame pools to account
390 // for frames held in queues inside the ffmpeg utility. If this
391 // can ever dynamically change then the corresponding decode
392 // code needs to be updated as well.
393 av_assert0(queue_size
== DEFAULT_FRAME_THREAD_QUEUE_SIZE
);
396 op
= (type
== QUEUE_PACKETS
) ? objpool_alloc_packets() :
397 objpool_alloc_frames();
399 return AVERROR(ENOMEM
);
401 tq
= tq_alloc(nb_streams
, queue_size
, op
,
402 (type
== QUEUE_PACKETS
) ? pkt_move
: frame_move
);
405 return AVERROR(ENOMEM
);
412 static void *task_wrapper(void *arg
);
414 static int task_start(SchTask
*task
)
418 av_log(task
->func_arg
, AV_LOG_VERBOSE
, "Starting thread...\n");
420 av_assert0(!task
->thread_running
);
422 ret
= pthread_create(&task
->thread
, NULL
, task_wrapper
, task
);
424 av_log(task
->func_arg
, AV_LOG_ERROR
, "pthread_create() failed: %s\n",
429 task
->thread_running
= 1;
433 static void task_init(Scheduler
*sch
, SchTask
*task
, enum SchedulerNodeType type
, unsigned idx
,
434 SchThreadFunc func
, void *func_arg
)
438 task
->node
.type
= type
;
439 task
->node
.idx
= idx
;
442 task
->func_arg
= func_arg
;
445 static int64_t trailing_dts(const Scheduler
*sch
, int count_finished
)
447 int64_t min_dts
= INT64_MAX
;
449 for (unsigned i
= 0; i
< sch
->nb_mux
; i
++) {
450 const SchMux
*mux
= &sch
->mux
[i
];
452 for (unsigned j
= 0; j
< mux
->nb_streams
; j
++) {
453 const SchMuxStream
*ms
= &mux
->streams
[j
];
455 if (ms
->source_finished
&& !count_finished
)
457 if (ms
->last_dts
== AV_NOPTS_VALUE
)
458 return AV_NOPTS_VALUE
;
460 min_dts
= FFMIN(min_dts
, ms
->last_dts
);
464 return min_dts
== INT64_MAX
? AV_NOPTS_VALUE
: min_dts
;
467 void sch_free(Scheduler
**psch
)
469 Scheduler
*sch
= *psch
;
476 for (unsigned i
= 0; i
< sch
->nb_demux
; i
++) {
477 SchDemux
*d
= &sch
->demux
[i
];
479 for (unsigned j
= 0; j
< d
->nb_streams
; j
++) {
480 SchDemuxStream
*ds
= &d
->streams
[j
];
482 av_freep(&ds
->dst_finished
);
484 av_freep(&d
->streams
);
486 av_packet_free(&d
->send_pkt
);
488 waiter_uninit(&d
->waiter
);
490 av_freep(&sch
->demux
);
492 for (unsigned i
= 0; i
< sch
->nb_mux
; i
++) {
493 SchMux
*mux
= &sch
->mux
[i
];
495 for (unsigned j
= 0; j
< mux
->nb_streams
; j
++) {
496 SchMuxStream
*ms
= &mux
->streams
[j
];
498 if (ms
->pre_mux_queue
.fifo
) {
500 while (av_fifo_read(ms
->pre_mux_queue
.fifo
, &pkt
, 1) >= 0)
501 av_packet_free(&pkt
);
502 av_fifo_freep2(&ms
->pre_mux_queue
.fifo
);
505 av_freep(&ms
->sub_heartbeat_dst
);
507 av_freep(&mux
->streams
);
509 av_packet_free(&mux
->sub_heartbeat_pkt
);
511 tq_free(&mux
->queue
);
515 for (unsigned i
= 0; i
< sch
->nb_dec
; i
++) {
516 SchDec
*dec
= &sch
->dec
[i
];
518 tq_free(&dec
->queue
);
520 av_thread_message_queue_free(&dec
->queue_end_ts
);
522 for (unsigned j
= 0; j
< dec
->nb_outputs
; j
++) {
523 SchDecOutput
*o
= &dec
->outputs
[j
];
526 av_freep(&o
->dst_finished
);
529 av_freep(&dec
->outputs
);
531 av_frame_free(&dec
->send_frame
);
535 for (unsigned i
= 0; i
< sch
->nb_enc
; i
++) {
536 SchEnc
*enc
= &sch
->enc
[i
];
538 tq_free(&enc
->queue
);
540 av_packet_free(&enc
->send_pkt
);
543 av_freep(&enc
->dst_finished
);
547 for (unsigned i
= 0; i
< sch
->nb_sq_enc
; i
++) {
548 SchSyncQueue
*sq
= &sch
->sq_enc
[i
];
550 av_frame_free(&sq
->frame
);
551 pthread_mutex_destroy(&sq
->lock
);
552 av_freep(&sq
->enc_idx
);
554 av_freep(&sch
->sq_enc
);
556 for (unsigned i
= 0; i
< sch
->nb_filters
; i
++) {
557 SchFilterGraph
*fg
= &sch
->filters
[i
];
561 av_freep(&fg
->inputs
);
562 av_freep(&fg
->outputs
);
564 waiter_uninit(&fg
->waiter
);
566 av_freep(&sch
->filters
);
568 av_freep(&sch
->sdp_filename
);
570 pthread_mutex_destroy(&sch
->schedule_lock
);
572 pthread_mutex_destroy(&sch
->mux_ready_lock
);
574 pthread_mutex_destroy(&sch
->mux_done_lock
);
575 pthread_cond_destroy(&sch
->mux_done_cond
);
580 static const AVClass scheduler_class
= {
581 .class_name
= "Scheduler",
582 .version
= LIBAVUTIL_VERSION_INT
,
585 Scheduler
*sch_alloc(void)
590 sch
= av_mallocz(sizeof(*sch
));
594 sch
->class = &scheduler_class
;
597 ret
= pthread_mutex_init(&sch
->schedule_lock
, NULL
);
601 ret
= pthread_mutex_init(&sch
->mux_ready_lock
, NULL
);
605 ret
= pthread_mutex_init(&sch
->mux_done_lock
, NULL
);
609 ret
= pthread_cond_init(&sch
->mux_done_cond
, NULL
);
619 int sch_sdp_filename(Scheduler
*sch
, const char *sdp_filename
)
621 av_freep(&sch
->sdp_filename
);
622 sch
->sdp_filename
= av_strdup(sdp_filename
);
623 return sch
->sdp_filename
? 0 : AVERROR(ENOMEM
);
626 static const AVClass sch_mux_class
= {
627 .class_name
= "SchMux",
628 .version
= LIBAVUTIL_VERSION_INT
,
629 .parent_log_context_offset
= offsetof(SchMux
, task
.func_arg
),
632 int sch_add_mux(Scheduler
*sch
, SchThreadFunc func
, int (*init
)(void *),
633 void *arg
, int sdp_auto
, unsigned thread_queue_size
)
635 const unsigned idx
= sch
->nb_mux
;
640 ret
= GROW_ARRAY(sch
->mux
, sch
->nb_mux
);
644 mux
= &sch
->mux
[idx
];
645 mux
->class = &sch_mux_class
;
647 mux
->queue_size
= thread_queue_size
;
649 task_init(sch
, &mux
->task
, SCH_NODE_TYPE_MUX
, idx
, func
, arg
);
651 sch
->sdp_auto
&= sdp_auto
;
656 int sch_add_mux_stream(Scheduler
*sch
, unsigned mux_idx
)
663 av_assert0(mux_idx
< sch
->nb_mux
);
664 mux
= &sch
->mux
[mux_idx
];
666 ret
= GROW_ARRAY(mux
->streams
, mux
->nb_streams
);
669 stream_idx
= mux
->nb_streams
- 1;
671 ms
= &mux
->streams
[stream_idx
];
673 ms
->pre_mux_queue
.fifo
= av_fifo_alloc2(8, sizeof(AVPacket
*), 0);
674 if (!ms
->pre_mux_queue
.fifo
)
675 return AVERROR(ENOMEM
);
677 ms
->last_dts
= AV_NOPTS_VALUE
;
682 static const AVClass sch_demux_class
= {
683 .class_name
= "SchDemux",
684 .version
= LIBAVUTIL_VERSION_INT
,
685 .parent_log_context_offset
= offsetof(SchDemux
, task
.func_arg
),
688 int sch_add_demux(Scheduler
*sch
, SchThreadFunc func
, void *ctx
)
690 const unsigned idx
= sch
->nb_demux
;
695 ret
= GROW_ARRAY(sch
->demux
, sch
->nb_demux
);
699 d
= &sch
->demux
[idx
];
701 task_init(sch
, &d
->task
, SCH_NODE_TYPE_DEMUX
, idx
, func
, ctx
);
703 d
->class = &sch_demux_class
;
704 d
->send_pkt
= av_packet_alloc();
706 return AVERROR(ENOMEM
);
708 ret
= waiter_init(&d
->waiter
);
715 int sch_add_demux_stream(Scheduler
*sch
, unsigned demux_idx
)
720 av_assert0(demux_idx
< sch
->nb_demux
);
721 d
= &sch
->demux
[demux_idx
];
723 ret
= GROW_ARRAY(d
->streams
, d
->nb_streams
);
724 return ret
< 0 ? ret
: d
->nb_streams
- 1;
727 int sch_add_dec_output(Scheduler
*sch
, unsigned dec_idx
)
732 av_assert0(dec_idx
< sch
->nb_dec
);
733 dec
= &sch
->dec
[dec_idx
];
735 ret
= GROW_ARRAY(dec
->outputs
, dec
->nb_outputs
);
739 return dec
->nb_outputs
- 1;
742 static const AVClass sch_dec_class
= {
743 .class_name
= "SchDec",
744 .version
= LIBAVUTIL_VERSION_INT
,
745 .parent_log_context_offset
= offsetof(SchDec
, task
.func_arg
),
748 int sch_add_dec(Scheduler
*sch
, SchThreadFunc func
, void *ctx
, int send_end_ts
)
750 const unsigned idx
= sch
->nb_dec
;
755 ret
= GROW_ARRAY(sch
->dec
, sch
->nb_dec
);
759 dec
= &sch
->dec
[idx
];
761 task_init(sch
, &dec
->task
, SCH_NODE_TYPE_DEC
, idx
, func
, ctx
);
763 dec
->class = &sch_dec_class
;
764 dec
->send_frame
= av_frame_alloc();
765 if (!dec
->send_frame
)
766 return AVERROR(ENOMEM
);
768 ret
= sch_add_dec_output(sch
, idx
);
772 ret
= queue_alloc(&dec
->queue
, 1, 0, QUEUE_PACKETS
);
777 ret
= av_thread_message_queue_alloc(&dec
->queue_end_ts
, 1, sizeof(Timestamp
));
785 static const AVClass sch_enc_class
= {
786 .class_name
= "SchEnc",
787 .version
= LIBAVUTIL_VERSION_INT
,
788 .parent_log_context_offset
= offsetof(SchEnc
, task
.func_arg
),
791 int sch_add_enc(Scheduler
*sch
, SchThreadFunc func
, void *ctx
,
792 int (*open_cb
)(void *opaque
, const AVFrame
*frame
))
794 const unsigned idx
= sch
->nb_enc
;
799 ret
= GROW_ARRAY(sch
->enc
, sch
->nb_enc
);
803 enc
= &sch
->enc
[idx
];
805 enc
->class = &sch_enc_class
;
806 enc
->open_cb
= open_cb
;
810 task_init(sch
, &enc
->task
, SCH_NODE_TYPE_ENC
, idx
, func
, ctx
);
812 enc
->send_pkt
= av_packet_alloc();
814 return AVERROR(ENOMEM
);
816 ret
= queue_alloc(&enc
->queue
, 1, 0, QUEUE_FRAMES
);
823 static const AVClass sch_fg_class
= {
824 .class_name
= "SchFilterGraph",
825 .version
= LIBAVUTIL_VERSION_INT
,
826 .parent_log_context_offset
= offsetof(SchFilterGraph
, task
.func_arg
),
829 int sch_add_filtergraph(Scheduler
*sch
, unsigned nb_inputs
, unsigned nb_outputs
,
830 SchThreadFunc func
, void *ctx
)
832 const unsigned idx
= sch
->nb_filters
;
837 ret
= GROW_ARRAY(sch
->filters
, sch
->nb_filters
);
840 fg
= &sch
->filters
[idx
];
842 fg
->class = &sch_fg_class
;
844 task_init(sch
, &fg
->task
, SCH_NODE_TYPE_FILTER_IN
, idx
, func
, ctx
);
847 fg
->inputs
= av_calloc(nb_inputs
, sizeof(*fg
->inputs
));
849 return AVERROR(ENOMEM
);
850 fg
->nb_inputs
= nb_inputs
;
854 fg
->outputs
= av_calloc(nb_outputs
, sizeof(*fg
->outputs
));
856 return AVERROR(ENOMEM
);
857 fg
->nb_outputs
= nb_outputs
;
860 ret
= waiter_init(&fg
->waiter
);
864 ret
= queue_alloc(&fg
->queue
, fg
->nb_inputs
+ 1, 0, QUEUE_FRAMES
);
871 int sch_add_sq_enc(Scheduler
*sch
, uint64_t buf_size_us
, void *logctx
)
876 ret
= GROW_ARRAY(sch
->sq_enc
, sch
->nb_sq_enc
);
879 sq
= &sch
->sq_enc
[sch
->nb_sq_enc
- 1];
881 sq
->sq
= sq_alloc(SYNC_QUEUE_FRAMES
, buf_size_us
, logctx
);
883 return AVERROR(ENOMEM
);
885 sq
->frame
= av_frame_alloc();
887 return AVERROR(ENOMEM
);
889 ret
= pthread_mutex_init(&sq
->lock
, NULL
);
893 return sq
- sch
->sq_enc
;
896 int sch_sq_add_enc(Scheduler
*sch
, unsigned sq_idx
, unsigned enc_idx
,
897 int limiting
, uint64_t max_frames
)
903 av_assert0(sq_idx
< sch
->nb_sq_enc
);
904 sq
= &sch
->sq_enc
[sq_idx
];
906 av_assert0(enc_idx
< sch
->nb_enc
);
907 enc
= &sch
->enc
[enc_idx
];
909 ret
= GROW_ARRAY(sq
->enc_idx
, sq
->nb_enc_idx
);
912 sq
->enc_idx
[sq
->nb_enc_idx
- 1] = enc_idx
;
914 ret
= sq_add_stream(sq
->sq
, limiting
);
918 enc
->sq_idx
[0] = sq_idx
;
919 enc
->sq_idx
[1] = ret
;
921 if (max_frames
!= INT64_MAX
)
922 sq_limit_frames(sq
->sq
, enc
->sq_idx
[1], max_frames
);
927 int sch_connect(Scheduler
*sch
, SchedulerNode src
, SchedulerNode dst
)
932 case SCH_NODE_TYPE_DEMUX
: {
935 av_assert0(src
.idx
< sch
->nb_demux
&&
936 src
.idx_stream
< sch
->demux
[src
.idx
].nb_streams
);
937 ds
= &sch
->demux
[src
.idx
].streams
[src
.idx_stream
];
939 ret
= GROW_ARRAY(ds
->dst
, ds
->nb_dst
);
943 ds
->dst
[ds
->nb_dst
- 1] = dst
;
945 // demuxed packets go to decoding or streamcopy
947 case SCH_NODE_TYPE_DEC
: {
950 av_assert0(dst
.idx
< sch
->nb_dec
);
951 dec
= &sch
->dec
[dst
.idx
];
953 av_assert0(!dec
->src
.type
);
957 case SCH_NODE_TYPE_MUX
: {
960 av_assert0(dst
.idx
< sch
->nb_mux
&&
961 dst
.idx_stream
< sch
->mux
[dst
.idx
].nb_streams
);
962 ms
= &sch
->mux
[dst
.idx
].streams
[dst
.idx_stream
];
964 av_assert0(!ms
->src
.type
);
969 default: av_assert0(0);
974 case SCH_NODE_TYPE_DEC
: {
978 av_assert0(src
.idx
< sch
->nb_dec
);
979 dec
= &sch
->dec
[src
.idx
];
981 av_assert0(src
.idx_stream
< dec
->nb_outputs
);
982 o
= &dec
->outputs
[src
.idx_stream
];
984 ret
= GROW_ARRAY(o
->dst
, o
->nb_dst
);
988 o
->dst
[o
->nb_dst
- 1] = dst
;
990 // decoded frames go to filters or encoding
992 case SCH_NODE_TYPE_FILTER_IN
: {
995 av_assert0(dst
.idx
< sch
->nb_filters
&&
996 dst
.idx_stream
< sch
->filters
[dst
.idx
].nb_inputs
);
997 fi
= &sch
->filters
[dst
.idx
].inputs
[dst
.idx_stream
];
999 av_assert0(!fi
->src
.type
);
1003 case SCH_NODE_TYPE_ENC
: {
1006 av_assert0(dst
.idx
< sch
->nb_enc
);
1007 enc
= &sch
->enc
[dst
.idx
];
1009 av_assert0(!enc
->src
.type
);
1013 default: av_assert0(0);
1018 case SCH_NODE_TYPE_FILTER_OUT
: {
1021 av_assert0(src
.idx
< sch
->nb_filters
&&
1022 src
.idx_stream
< sch
->filters
[src
.idx
].nb_outputs
);
1023 fo
= &sch
->filters
[src
.idx
].outputs
[src
.idx_stream
];
1025 av_assert0(!fo
->dst
.type
);
1028 // filtered frames go to encoding or another filtergraph
1030 case SCH_NODE_TYPE_ENC
: {
1033 av_assert0(dst
.idx
< sch
->nb_enc
);
1034 enc
= &sch
->enc
[dst
.idx
];
1036 av_assert0(!enc
->src
.type
);
1040 case SCH_NODE_TYPE_FILTER_IN
: {
1043 av_assert0(dst
.idx
< sch
->nb_filters
&&
1044 dst
.idx_stream
< sch
->filters
[dst
.idx
].nb_inputs
);
1045 fi
= &sch
->filters
[dst
.idx
].inputs
[dst
.idx_stream
];
1047 av_assert0(!fi
->src
.type
);
1051 default: av_assert0(0);
1057 case SCH_NODE_TYPE_ENC
: {
1060 av_assert0(src
.idx
< sch
->nb_enc
);
1061 enc
= &sch
->enc
[src
.idx
];
1063 ret
= GROW_ARRAY(enc
->dst
, enc
->nb_dst
);
1067 enc
->dst
[enc
->nb_dst
- 1] = dst
;
1069 // encoding packets go to muxing or decoding
1071 case SCH_NODE_TYPE_MUX
: {
1074 av_assert0(dst
.idx
< sch
->nb_mux
&&
1075 dst
.idx_stream
< sch
->mux
[dst
.idx
].nb_streams
);
1076 ms
= &sch
->mux
[dst
.idx
].streams
[dst
.idx_stream
];
1078 av_assert0(!ms
->src
.type
);
1083 case SCH_NODE_TYPE_DEC
: {
1086 av_assert0(dst
.idx
< sch
->nb_dec
);
1087 dec
= &sch
->dec
[dst
.idx
];
1089 av_assert0(!dec
->src
.type
);
1094 default: av_assert0(0);
1099 default: av_assert0(0);
1105 static int mux_task_start(SchMux
*mux
)
1109 ret
= task_start(&mux
->task
);
1113 /* flush the pre-muxing queues */
1114 for (unsigned i
= 0; i
< mux
->nb_streams
; i
++) {
1115 SchMuxStream
*ms
= &mux
->streams
[i
];
1118 while (av_fifo_read(ms
->pre_mux_queue
.fifo
, &pkt
, 1) >= 0) {
1121 ret
= tq_send(mux
->queue
, i
, pkt
);
1122 av_packet_free(&pkt
);
1123 if (ret
== AVERROR_EOF
)
1128 tq_send_finish(mux
->queue
, i
);
1132 atomic_store(&mux
->mux_started
, 1);
1137 int print_sdp(const char *filename
);
1139 static int mux_init(Scheduler
*sch
, SchMux
*mux
)
1143 ret
= mux
->init(mux
->task
.func_arg
);
1147 sch
->nb_mux_ready
++;
1149 if (sch
->sdp_filename
|| sch
->sdp_auto
) {
1150 if (sch
->nb_mux_ready
< sch
->nb_mux
)
1153 ret
= print_sdp(sch
->sdp_filename
);
1155 av_log(sch
, AV_LOG_ERROR
, "Error writing the SDP.\n");
1159 /* SDP is written only after all the muxers are ready, so now we
1160 * start ALL the threads */
1161 for (unsigned i
= 0; i
< sch
->nb_mux
; i
++) {
1162 ret
= mux_task_start(&sch
->mux
[i
]);
1167 ret
= mux_task_start(mux
);
1175 void sch_mux_stream_buffering(Scheduler
*sch
, unsigned mux_idx
, unsigned stream_idx
,
1176 size_t data_threshold
, int max_packets
)
1181 av_assert0(mux_idx
< sch
->nb_mux
);
1182 mux
= &sch
->mux
[mux_idx
];
1184 av_assert0(stream_idx
< mux
->nb_streams
);
1185 ms
= &mux
->streams
[stream_idx
];
1187 ms
->pre_mux_queue
.max_packets
= max_packets
;
1188 ms
->pre_mux_queue
.data_threshold
= data_threshold
;
1191 int sch_mux_stream_ready(Scheduler
*sch
, unsigned mux_idx
, unsigned stream_idx
)
1196 av_assert0(mux_idx
< sch
->nb_mux
);
1197 mux
= &sch
->mux
[mux_idx
];
1199 av_assert0(stream_idx
< mux
->nb_streams
);
1201 pthread_mutex_lock(&sch
->mux_ready_lock
);
1203 av_assert0(mux
->nb_streams_ready
< mux
->nb_streams
);
1205 // this may be called during initialization - do not start
1206 // threads before sch_start() is called
1207 if (++mux
->nb_streams_ready
== mux
->nb_streams
&&
1208 sch
->state
>= SCH_STATE_STARTED
)
1209 ret
= mux_init(sch
, mux
);
1211 pthread_mutex_unlock(&sch
->mux_ready_lock
);
1216 int sch_mux_sub_heartbeat_add(Scheduler
*sch
, unsigned mux_idx
, unsigned stream_idx
,
1223 av_assert0(mux_idx
< sch
->nb_mux
);
1224 mux
= &sch
->mux
[mux_idx
];
1226 av_assert0(stream_idx
< mux
->nb_streams
);
1227 ms
= &mux
->streams
[stream_idx
];
1229 ret
= GROW_ARRAY(ms
->sub_heartbeat_dst
, ms
->nb_sub_heartbeat_dst
);
1233 av_assert0(dec_idx
< sch
->nb_dec
);
1234 ms
->sub_heartbeat_dst
[ms
->nb_sub_heartbeat_dst
- 1] = dec_idx
;
1236 if (!mux
->sub_heartbeat_pkt
) {
1237 mux
->sub_heartbeat_pkt
= av_packet_alloc();
1238 if (!mux
->sub_heartbeat_pkt
)
1239 return AVERROR(ENOMEM
);
1245 static void unchoke_for_stream(Scheduler
*sch
, SchedulerNode src
);
1247 // Unchoke any filter graphs that are downstream of this node, to prevent it
1248 // from getting stuck trying to push data to a full queue
1249 static void unchoke_downstream(Scheduler
*sch
, SchedulerNode
*dst
)
1254 switch (dst
->type
) {
1255 case SCH_NODE_TYPE_DEC
:
1256 dec
= &sch
->dec
[dst
->idx
];
1257 for (int i
= 0; i
< dec
->nb_outputs
; i
++)
1258 unchoke_downstream(sch
, dec
->outputs
[i
].dst
);
1260 case SCH_NODE_TYPE_ENC
:
1261 enc
= &sch
->enc
[dst
->idx
];
1262 for (int i
= 0; i
< enc
->nb_dst
; i
++)
1263 unchoke_downstream(sch
, &enc
->dst
[i
]);
1265 case SCH_NODE_TYPE_MUX
:
1266 // muxers are never choked
1268 case SCH_NODE_TYPE_FILTER_IN
:
1269 fg
= &sch
->filters
[dst
->idx
];
1270 if (fg
->best_input
== fg
->nb_inputs
) {
1271 fg
->waiter
.choked_next
= 0;
1273 // ensure that this filter graph is not stuck waiting for
1274 // input from a different upstream demuxer
1275 unchoke_for_stream(sch
, fg
->inputs
[fg
->best_input
].src_sched
);
1279 av_assert0(!"Invalid destination node type?");
1284 static void unchoke_for_stream(Scheduler
*sch
, SchedulerNode src
)
1289 // fed directly by a demuxer (i.e. not through a filtergraph)
1290 if (src
.type
== SCH_NODE_TYPE_DEMUX
) {
1291 SchDemux
*demux
= &sch
->demux
[src
.idx
];
1292 if (demux
->waiter
.choked_next
== 0)
1293 return; // prevent infinite loop
1294 demux
->waiter
.choked_next
= 0;
1295 for (int i
= 0; i
< demux
->nb_streams
; i
++)
1296 unchoke_downstream(sch
, demux
->streams
[i
].dst
);
1300 av_assert0(src
.type
== SCH_NODE_TYPE_FILTER_OUT
);
1301 fg
= &sch
->filters
[src
.idx
];
1303 // the filtergraph contains internal sources and
1304 // requested to be scheduled directly
1305 if (fg
->best_input
== fg
->nb_inputs
) {
1306 fg
->waiter
.choked_next
= 0;
1310 src
= fg
->inputs
[fg
->best_input
].src_sched
;
1314 static void schedule_update_locked(Scheduler
*sch
)
1317 int have_unchoked
= 0;
1319 // on termination request all waiters are choked,
1320 // we are not to unchoke them
1321 if (atomic_load(&sch
->terminate
))
1324 dts
= trailing_dts(sch
, 0);
1326 atomic_store(&sch
->last_dts
, dts
);
1328 // initialize our internal state
1329 for (unsigned type
= 0; type
< 2; type
++)
1330 for (unsigned i
= 0; i
< (type
? sch
->nb_filters
: sch
->nb_demux
); i
++) {
1331 SchWaiter
*w
= type
? &sch
->filters
[i
].waiter
: &sch
->demux
[i
].waiter
;
1332 w
->choked_prev
= atomic_load(&w
->choked
);
1336 // figure out the sources that are allowed to proceed
1337 for (unsigned i
= 0; i
< sch
->nb_mux
; i
++) {
1338 SchMux
*mux
= &sch
->mux
[i
];
1340 for (unsigned j
= 0; j
< mux
->nb_streams
; j
++) {
1341 SchMuxStream
*ms
= &mux
->streams
[j
];
1343 // unblock sources for output streams that are not finished
1344 // and not too far ahead of the trailing stream
1345 if (ms
->source_finished
)
1347 if (dts
== AV_NOPTS_VALUE
&& ms
->last_dts
!= AV_NOPTS_VALUE
)
1349 if (dts
!= AV_NOPTS_VALUE
&& ms
->last_dts
- dts
>= SCHEDULE_TOLERANCE
)
1352 // resolve the source to unchoke
1353 unchoke_for_stream(sch
, ms
->src_sched
);
1358 // make sure to unchoke at least one source, if still available
1359 for (unsigned type
= 0; !have_unchoked
&& type
< 2; type
++)
1360 for (unsigned i
= 0; i
< (type
? sch
->nb_filters
: sch
->nb_demux
); i
++) {
1361 int exited
= type
? sch
->filters
[i
].task_exited
: sch
->demux
[i
].task_exited
;
1362 SchWaiter
*w
= type
? &sch
->filters
[i
].waiter
: &sch
->demux
[i
].waiter
;
1371 for (unsigned type
= 0; type
< 2; type
++)
1372 for (unsigned i
= 0; i
< (type
? sch
->nb_filters
: sch
->nb_demux
); i
++) {
1373 SchWaiter
*w
= type
? &sch
->filters
[i
].waiter
: &sch
->demux
[i
].waiter
;
1374 if (w
->choked_prev
!= w
->choked_next
)
1375 waiter_set(w
, w
->choked_next
);
1387 check_acyclic_for_output(const Scheduler
*sch
, SchedulerNode src
,
1388 uint8_t *filters_visited
, SchedulerNode
*filters_stack
)
1390 unsigned nb_filters_stack
= 0;
1392 memset(filters_visited
, 0, sch
->nb_filters
* sizeof(*filters_visited
));
1395 const SchFilterGraph
*fg
= &sch
->filters
[src
.idx
];
1397 filters_visited
[src
.idx
] = CYCLE_NODE_STARTED
;
1399 // descend into every input, depth first
1400 if (src
.idx_stream
< fg
->nb_inputs
) {
1401 const SchFilterIn
*fi
= &fg
->inputs
[src
.idx_stream
++];
1403 // connected to demuxer, no cycles possible
1404 if (fi
->src_sched
.type
== SCH_NODE_TYPE_DEMUX
)
1407 // otherwise connected to another filtergraph
1408 av_assert0(fi
->src_sched
.type
== SCH_NODE_TYPE_FILTER_OUT
);
1411 if (filters_visited
[fi
->src_sched
.idx
] == CYCLE_NODE_STARTED
)
1412 return AVERROR(EINVAL
);
1414 // place current position on stack and descend
1415 av_assert0(nb_filters_stack
< sch
->nb_filters
);
1416 filters_stack
[nb_filters_stack
++] = src
;
1417 src
= (SchedulerNode
){ .idx
= fi
->src_sched
.idx
, .idx_stream
= 0 };
1421 filters_visited
[src
.idx
] = CYCLE_NODE_DONE
;
1423 // previous search finished,
1424 if (nb_filters_stack
) {
1425 src
= filters_stack
[--nb_filters_stack
];
1432 static int check_acyclic(Scheduler
*sch
)
1434 uint8_t *filters_visited
= NULL
;
1435 SchedulerNode
*filters_stack
= NULL
;
1439 if (!sch
->nb_filters
)
1442 filters_visited
= av_malloc_array(sch
->nb_filters
, sizeof(*filters_visited
));
1443 if (!filters_visited
)
1444 return AVERROR(ENOMEM
);
1446 filters_stack
= av_malloc_array(sch
->nb_filters
, sizeof(*filters_stack
));
1447 if (!filters_stack
) {
1448 ret
= AVERROR(ENOMEM
);
1452 // trace the transcoding graph upstream from every filtegraph
1453 for (unsigned i
= 0; i
< sch
->nb_filters
; i
++) {
1454 ret
= check_acyclic_for_output(sch
, (SchedulerNode
){ .idx
= i
},
1455 filters_visited
, filters_stack
);
1457 av_log(&sch
->filters
[i
], AV_LOG_ERROR
, "Transcoding graph has a cycle\n");
1463 av_freep(&filters_visited
);
1464 av_freep(&filters_stack
);
1468 static int start_prepare(Scheduler
*sch
)
1472 for (unsigned i
= 0; i
< sch
->nb_demux
; i
++) {
1473 SchDemux
*d
= &sch
->demux
[i
];
1475 for (unsigned j
= 0; j
< d
->nb_streams
; j
++) {
1476 SchDemuxStream
*ds
= &d
->streams
[j
];
1479 av_log(d
, AV_LOG_ERROR
,
1480 "Demuxer stream %u not connected to any sink\n", j
);
1481 return AVERROR(EINVAL
);
1484 ds
->dst_finished
= av_calloc(ds
->nb_dst
, sizeof(*ds
->dst_finished
));
1485 if (!ds
->dst_finished
)
1486 return AVERROR(ENOMEM
);
1490 for (unsigned i
= 0; i
< sch
->nb_dec
; i
++) {
1491 SchDec
*dec
= &sch
->dec
[i
];
1493 if (!dec
->src
.type
) {
1494 av_log(dec
, AV_LOG_ERROR
,
1495 "Decoder not connected to a source\n");
1496 return AVERROR(EINVAL
);
1499 for (unsigned j
= 0; j
< dec
->nb_outputs
; j
++) {
1500 SchDecOutput
*o
= &dec
->outputs
[j
];
1503 av_log(dec
, AV_LOG_ERROR
,
1504 "Decoder output %u not connected to any sink\n", j
);
1505 return AVERROR(EINVAL
);
1508 o
->dst_finished
= av_calloc(o
->nb_dst
, sizeof(*o
->dst_finished
));
1509 if (!o
->dst_finished
)
1510 return AVERROR(ENOMEM
);
1514 for (unsigned i
= 0; i
< sch
->nb_enc
; i
++) {
1515 SchEnc
*enc
= &sch
->enc
[i
];
1517 if (!enc
->src
.type
) {
1518 av_log(enc
, AV_LOG_ERROR
,
1519 "Encoder not connected to a source\n");
1520 return AVERROR(EINVAL
);
1523 av_log(enc
, AV_LOG_ERROR
,
1524 "Encoder not connected to any sink\n");
1525 return AVERROR(EINVAL
);
1528 enc
->dst_finished
= av_calloc(enc
->nb_dst
, sizeof(*enc
->dst_finished
));
1529 if (!enc
->dst_finished
)
1530 return AVERROR(ENOMEM
);
1533 for (unsigned i
= 0; i
< sch
->nb_mux
; i
++) {
1534 SchMux
*mux
= &sch
->mux
[i
];
1536 for (unsigned j
= 0; j
< mux
->nb_streams
; j
++) {
1537 SchMuxStream
*ms
= &mux
->streams
[j
];
1539 switch (ms
->src
.type
) {
1540 case SCH_NODE_TYPE_ENC
: {
1541 SchEnc
*enc
= &sch
->enc
[ms
->src
.idx
];
1542 if (enc
->src
.type
== SCH_NODE_TYPE_DEC
) {
1543 ms
->src_sched
= sch
->dec
[enc
->src
.idx
].src
;
1544 av_assert0(ms
->src_sched
.type
== SCH_NODE_TYPE_DEMUX
);
1546 ms
->src_sched
= enc
->src
;
1547 av_assert0(ms
->src_sched
.type
== SCH_NODE_TYPE_FILTER_OUT
);
1551 case SCH_NODE_TYPE_DEMUX
:
1552 ms
->src_sched
= ms
->src
;
1555 av_log(mux
, AV_LOG_ERROR
,
1556 "Muxer stream #%u not connected to a source\n", j
);
1557 return AVERROR(EINVAL
);
1561 ret
= queue_alloc(&mux
->queue
, mux
->nb_streams
, mux
->queue_size
,
1567 for (unsigned i
= 0; i
< sch
->nb_filters
; i
++) {
1568 SchFilterGraph
*fg
= &sch
->filters
[i
];
1570 for (unsigned j
= 0; j
< fg
->nb_inputs
; j
++) {
1571 SchFilterIn
*fi
= &fg
->inputs
[j
];
1574 if (!fi
->src
.type
) {
1575 av_log(fg
, AV_LOG_ERROR
,
1576 "Filtergraph input %u not connected to a source\n", j
);
1577 return AVERROR(EINVAL
);
1580 if (fi
->src
.type
== SCH_NODE_TYPE_FILTER_OUT
)
1581 fi
->src_sched
= fi
->src
;
1583 av_assert0(fi
->src
.type
== SCH_NODE_TYPE_DEC
);
1584 dec
= &sch
->dec
[fi
->src
.idx
];
1586 switch (dec
->src
.type
) {
1587 case SCH_NODE_TYPE_DEMUX
: fi
->src_sched
= dec
->src
; break;
1588 case SCH_NODE_TYPE_ENC
: fi
->src_sched
= sch
->enc
[dec
->src
.idx
].src
; break;
1589 default: av_assert0(0);
1594 for (unsigned j
= 0; j
< fg
->nb_outputs
; j
++) {
1595 SchFilterOut
*fo
= &fg
->outputs
[j
];
1597 if (!fo
->dst
.type
) {
1598 av_log(fg
, AV_LOG_ERROR
,
1599 "Filtergraph %u output %u not connected to a sink\n", i
, j
);
1600 return AVERROR(EINVAL
);
1605 // Check that the transcoding graph has no cycles.
1606 ret
= check_acyclic(sch
);
1613 int sch_start(Scheduler
*sch
)
1617 ret
= start_prepare(sch
);
1621 av_assert0(sch
->state
== SCH_STATE_UNINIT
);
1622 sch
->state
= SCH_STATE_STARTED
;
1624 for (unsigned i
= 0; i
< sch
->nb_mux
; i
++) {
1625 SchMux
*mux
= &sch
->mux
[i
];
1627 if (mux
->nb_streams_ready
== mux
->nb_streams
) {
1628 ret
= mux_init(sch
, mux
);
1634 for (unsigned i
= 0; i
< sch
->nb_enc
; i
++) {
1635 SchEnc
*enc
= &sch
->enc
[i
];
1637 ret
= task_start(&enc
->task
);
1642 for (unsigned i
= 0; i
< sch
->nb_filters
; i
++) {
1643 SchFilterGraph
*fg
= &sch
->filters
[i
];
1645 ret
= task_start(&fg
->task
);
1650 for (unsigned i
= 0; i
< sch
->nb_dec
; i
++) {
1651 SchDec
*dec
= &sch
->dec
[i
];
1653 ret
= task_start(&dec
->task
);
1658 for (unsigned i
= 0; i
< sch
->nb_demux
; i
++) {
1659 SchDemux
*d
= &sch
->demux
[i
];
1664 ret
= task_start(&d
->task
);
1669 pthread_mutex_lock(&sch
->schedule_lock
);
1670 schedule_update_locked(sch
);
1671 pthread_mutex_unlock(&sch
->schedule_lock
);
1675 sch_stop(sch
, NULL
);
1679 int sch_wait(Scheduler
*sch
, uint64_t timeout_us
, int64_t *transcode_ts
)
1683 // convert delay to absolute timestamp
1684 timeout_us
+= av_gettime();
1686 pthread_mutex_lock(&sch
->mux_done_lock
);
1688 if (sch
->nb_mux_done
< sch
->nb_mux
) {
1689 struct timespec tv
= { .tv_sec
= timeout_us
/ 1000000,
1690 .tv_nsec
= (timeout_us
% 1000000) * 1000 };
1691 pthread_cond_timedwait(&sch
->mux_done_cond
, &sch
->mux_done_lock
, &tv
);
1694 ret
= sch
->nb_mux_done
== sch
->nb_mux
;
1696 pthread_mutex_unlock(&sch
->mux_done_lock
);
1698 *transcode_ts
= atomic_load(&sch
->last_dts
);
1700 // abort transcoding if any task failed
1701 err
= atomic_load(&sch
->task_failed
);
1706 static int enc_open(Scheduler
*sch
, SchEnc
*enc
, const AVFrame
*frame
)
1710 ret
= enc
->open_cb(enc
->task
.func_arg
, frame
);
1714 // ret>0 signals audio frame size, which means sync queue must
1715 // have been enabled during encoder creation
1719 av_assert0(enc
->sq_idx
[0] >= 0);
1720 sq
= &sch
->sq_enc
[enc
->sq_idx
[0]];
1722 pthread_mutex_lock(&sq
->lock
);
1724 sq_frame_samples(sq
->sq
, enc
->sq_idx
[1], ret
);
1726 pthread_mutex_unlock(&sq
->lock
);
1732 static int send_to_enc_thread(Scheduler
*sch
, SchEnc
*enc
, AVFrame
*frame
)
1737 tq_send_finish(enc
->queue
, 0);
1741 if (enc
->in_finished
)
1744 ret
= tq_send(enc
->queue
, 0, frame
);
1746 enc
->in_finished
= 1;
1751 static int send_to_enc_sq(Scheduler
*sch
, SchEnc
*enc
, AVFrame
*frame
)
1753 SchSyncQueue
*sq
= &sch
->sq_enc
[enc
->sq_idx
[0]];
1756 // inform the scheduling code that no more input will arrive along this path;
1757 // this is necessary because the sync queue may not send an EOF downstream
1758 // until other streams finish
1759 // TODO: consider a cleaner way of passing this information through
1762 for (unsigned i
= 0; i
< enc
->nb_dst
; i
++) {
1766 if (enc
->dst
[i
].type
!= SCH_NODE_TYPE_MUX
)
1769 mux
= &sch
->mux
[enc
->dst
[i
].idx
];
1770 ms
= &mux
->streams
[enc
->dst
[i
].idx_stream
];
1772 pthread_mutex_lock(&sch
->schedule_lock
);
1774 ms
->source_finished
= 1;
1775 schedule_update_locked(sch
);
1777 pthread_mutex_unlock(&sch
->schedule_lock
);
1781 pthread_mutex_lock(&sq
->lock
);
1783 ret
= sq_send(sq
->sq
, enc
->sq_idx
[1], SQFRAME(frame
));
1790 // TODO: the SQ API should be extended to allow returning EOF
1791 // for individual streams
1792 ret
= sq_receive(sq
->sq
, -1, SQFRAME(sq
->frame
));
1794 ret
= (ret
== AVERROR(EAGAIN
)) ? 0 : ret
;
1798 enc
= &sch
->enc
[sq
->enc_idx
[ret
]];
1799 ret
= send_to_enc_thread(sch
, enc
, sq
->frame
);
1801 av_frame_unref(sq
->frame
);
1802 if (ret
!= AVERROR_EOF
)
1805 sq_send(sq
->sq
, enc
->sq_idx
[1], SQFRAME(NULL
));
1811 // close all encoders fed from this sync queue
1812 for (unsigned i
= 0; i
< sq
->nb_enc_idx
; i
++) {
1813 int err
= send_to_enc_thread(sch
, &sch
->enc
[sq
->enc_idx
[i
]], NULL
);
1815 // if the sync queue error is EOF and closing the encoder
1816 // produces a more serious error, make sure to pick the latter
1817 ret
= err_merge((ret
== AVERROR_EOF
&& err
< 0) ? 0 : ret
, err
);
1822 pthread_mutex_unlock(&sq
->lock
);
1827 static int send_to_enc(Scheduler
*sch
, SchEnc
*enc
, AVFrame
*frame
)
1829 if (enc
->open_cb
&& frame
&& !enc
->opened
) {
1830 int ret
= enc_open(sch
, enc
, frame
);
1835 // discard empty frames that only carry encoder init parameters
1836 if (!frame
->buf
[0]) {
1837 av_frame_unref(frame
);
1842 return (enc
->sq_idx
[0] >= 0) ?
1843 send_to_enc_sq (sch
, enc
, frame
) :
1844 send_to_enc_thread(sch
, enc
, frame
);
1847 static int mux_queue_packet(SchMux
*mux
, SchMuxStream
*ms
, AVPacket
*pkt
)
1849 PreMuxQueue
*q
= &ms
->pre_mux_queue
;
1850 AVPacket
*tmp_pkt
= NULL
;
1853 if (!av_fifo_can_write(q
->fifo
)) {
1854 size_t packets
= av_fifo_can_read(q
->fifo
);
1855 size_t pkt_size
= pkt
? pkt
->size
: 0;
1856 int thresh_reached
= (q
->data_size
+ pkt_size
) > q
->data_threshold
;
1857 size_t max_packets
= thresh_reached
? q
->max_packets
: SIZE_MAX
;
1858 size_t new_size
= FFMIN(2 * packets
, max_packets
);
1860 if (new_size
<= packets
) {
1861 av_log(mux
, AV_LOG_ERROR
,
1862 "Too many packets buffered for output stream.\n");
1863 return AVERROR(ENOSPC
);
1865 ret
= av_fifo_grow2(q
->fifo
, new_size
- packets
);
1871 tmp_pkt
= av_packet_alloc();
1873 return AVERROR(ENOMEM
);
1875 av_packet_move_ref(tmp_pkt
, pkt
);
1876 q
->data_size
+= tmp_pkt
->size
;
1878 av_fifo_write(q
->fifo
, &tmp_pkt
, 1);
1883 static int send_to_mux(Scheduler
*sch
, SchMux
*mux
, unsigned stream_idx
,
1886 SchMuxStream
*ms
= &mux
->streams
[stream_idx
];
1887 int64_t dts
= (pkt
&& pkt
->dts
!= AV_NOPTS_VALUE
) ?
1888 av_rescale_q(pkt
->dts
+ pkt
->duration
, pkt
->time_base
, AV_TIME_BASE_Q
) :
1891 // queue the packet if the muxer cannot be started yet
1892 if (!atomic_load(&mux
->mux_started
)) {
1895 // the muxer could have started between the above atomic check and
1896 // locking the mutex, then this block falls through to normal send path
1897 pthread_mutex_lock(&sch
->mux_ready_lock
);
1899 if (!atomic_load(&mux
->mux_started
)) {
1900 int ret
= mux_queue_packet(mux
, ms
, pkt
);
1901 queued
= ret
< 0 ? ret
: 1;
1904 pthread_mutex_unlock(&sch
->mux_ready_lock
);
1909 goto update_schedule
;
1918 ret
= tq_send(mux
->queue
, stream_idx
, pkt
);
1922 tq_send_finish(mux
->queue
, stream_idx
);
1925 // TODO: use atomics to check whether this changes trailing dts
1926 // to avoid locking unnecesarily
1927 if (dts
!= AV_NOPTS_VALUE
|| !pkt
) {
1928 pthread_mutex_lock(&sch
->schedule_lock
);
1930 if (pkt
) ms
->last_dts
= dts
;
1931 else ms
->source_finished
= 1;
1933 schedule_update_locked(sch
);
1935 pthread_mutex_unlock(&sch
->schedule_lock
);
1942 demux_stream_send_to_dst(Scheduler
*sch
, const SchedulerNode dst
,
1943 uint8_t *dst_finished
, AVPacket
*pkt
, unsigned flags
)
1950 if (pkt
&& dst
.type
== SCH_NODE_TYPE_MUX
&&
1951 (flags
& DEMUX_SEND_STREAMCOPY_EOF
)) {
1952 av_packet_unref(pkt
);
1959 ret
= (dst
.type
== SCH_NODE_TYPE_MUX
) ?
1960 send_to_mux(sch
, &sch
->mux
[dst
.idx
], dst
.idx_stream
, pkt
) :
1961 tq_send(sch
->dec
[dst
.idx
].queue
, 0, pkt
);
1962 if (ret
== AVERROR_EOF
)
1968 if (dst
.type
== SCH_NODE_TYPE_MUX
)
1969 send_to_mux(sch
, &sch
->mux
[dst
.idx
], dst
.idx_stream
, NULL
);
1971 tq_send_finish(sch
->dec
[dst
.idx
].queue
, 0);
1977 static int demux_send_for_stream(Scheduler
*sch
, SchDemux
*d
, SchDemuxStream
*ds
,
1978 AVPacket
*pkt
, unsigned flags
)
1980 unsigned nb_done
= 0;
1982 for (unsigned i
= 0; i
< ds
->nb_dst
; i
++) {
1983 AVPacket
*to_send
= pkt
;
1984 uint8_t *finished
= &ds
->dst_finished
[i
];
1988 // sending a packet consumes it, so make a temporary reference if needed
1989 if (pkt
&& i
< ds
->nb_dst
- 1) {
1990 to_send
= d
->send_pkt
;
1992 ret
= av_packet_ref(to_send
, pkt
);
1997 ret
= demux_stream_send_to_dst(sch
, ds
->dst
[i
], finished
, to_send
, flags
);
1999 av_packet_unref(to_send
);
2000 if (ret
== AVERROR_EOF
)
2006 return (nb_done
== ds
->nb_dst
) ? AVERROR_EOF
: 0;
2009 static int demux_flush(Scheduler
*sch
, SchDemux
*d
, AVPacket
*pkt
)
2011 Timestamp max_end_ts
= (Timestamp
){ .ts
= AV_NOPTS_VALUE
};
2013 av_assert0(!pkt
->buf
&& !pkt
->data
&& !pkt
->side_data_elems
);
2015 for (unsigned i
= 0; i
< d
->nb_streams
; i
++) {
2016 SchDemuxStream
*ds
= &d
->streams
[i
];
2018 for (unsigned j
= 0; j
< ds
->nb_dst
; j
++) {
2019 const SchedulerNode
*dst
= &ds
->dst
[j
];
2023 if (ds
->dst_finished
[j
] || dst
->type
!= SCH_NODE_TYPE_DEC
)
2026 dec
= &sch
->dec
[dst
->idx
];
2028 ret
= tq_send(dec
->queue
, 0, pkt
);
2032 if (dec
->queue_end_ts
) {
2034 ret
= av_thread_message_queue_recv(dec
->queue_end_ts
, &ts
, 0);
2038 if (max_end_ts
.ts
== AV_NOPTS_VALUE
||
2039 (ts
.ts
!= AV_NOPTS_VALUE
&&
2040 av_compare_ts(max_end_ts
.ts
, max_end_ts
.tb
, ts
.ts
, ts
.tb
) < 0))
2047 pkt
->pts
= max_end_ts
.ts
;
2048 pkt
->time_base
= max_end_ts
.tb
;
2053 int sch_demux_send(Scheduler
*sch
, unsigned demux_idx
, AVPacket
*pkt
,
2059 av_assert0(demux_idx
< sch
->nb_demux
);
2060 d
= &sch
->demux
[demux_idx
];
2062 terminate
= waiter_wait(sch
, &d
->waiter
);
2064 return AVERROR_EXIT
;
2066 // flush the downstreams after seek
2067 if (pkt
->stream_index
== -1)
2068 return demux_flush(sch
, d
, pkt
);
2070 av_assert0(pkt
->stream_index
< d
->nb_streams
);
2072 return demux_send_for_stream(sch
, d
, &d
->streams
[pkt
->stream_index
], pkt
, flags
);
2075 static int demux_done(Scheduler
*sch
, unsigned demux_idx
)
2077 SchDemux
*d
= &sch
->demux
[demux_idx
];
2080 for (unsigned i
= 0; i
< d
->nb_streams
; i
++) {
2081 int err
= demux_send_for_stream(sch
, d
, &d
->streams
[i
], NULL
, 0);
2082 if (err
!= AVERROR_EOF
)
2083 ret
= err_merge(ret
, err
);
2086 pthread_mutex_lock(&sch
->schedule_lock
);
2090 schedule_update_locked(sch
);
2092 pthread_mutex_unlock(&sch
->schedule_lock
);
2097 int sch_mux_receive(Scheduler
*sch
, unsigned mux_idx
, AVPacket
*pkt
)
2100 int ret
, stream_idx
;
2102 av_assert0(mux_idx
< sch
->nb_mux
);
2103 mux
= &sch
->mux
[mux_idx
];
2105 ret
= tq_receive(mux
->queue
, &stream_idx
, pkt
);
2106 pkt
->stream_index
= stream_idx
;
2110 void sch_mux_receive_finish(Scheduler
*sch
, unsigned mux_idx
, unsigned stream_idx
)
2114 av_assert0(mux_idx
< sch
->nb_mux
);
2115 mux
= &sch
->mux
[mux_idx
];
2117 av_assert0(stream_idx
< mux
->nb_streams
);
2118 tq_receive_finish(mux
->queue
, stream_idx
);
2120 pthread_mutex_lock(&sch
->schedule_lock
);
2121 mux
->streams
[stream_idx
].source_finished
= 1;
2123 schedule_update_locked(sch
);
2125 pthread_mutex_unlock(&sch
->schedule_lock
);
2128 int sch_mux_sub_heartbeat(Scheduler
*sch
, unsigned mux_idx
, unsigned stream_idx
,
2129 const AVPacket
*pkt
)
2134 av_assert0(mux_idx
< sch
->nb_mux
);
2135 mux
= &sch
->mux
[mux_idx
];
2137 av_assert0(stream_idx
< mux
->nb_streams
);
2138 ms
= &mux
->streams
[stream_idx
];
2140 for (unsigned i
= 0; i
< ms
->nb_sub_heartbeat_dst
; i
++) {
2141 SchDec
*dst
= &sch
->dec
[ms
->sub_heartbeat_dst
[i
]];
2144 ret
= av_packet_copy_props(mux
->sub_heartbeat_pkt
, pkt
);
2148 tq_send(dst
->queue
, 0, mux
->sub_heartbeat_pkt
);
2154 static int mux_done(Scheduler
*sch
, unsigned mux_idx
)
2156 SchMux
*mux
= &sch
->mux
[mux_idx
];
2158 pthread_mutex_lock(&sch
->schedule_lock
);
2160 for (unsigned i
= 0; i
< mux
->nb_streams
; i
++) {
2161 tq_receive_finish(mux
->queue
, i
);
2162 mux
->streams
[i
].source_finished
= 1;
2165 schedule_update_locked(sch
);
2167 pthread_mutex_unlock(&sch
->schedule_lock
);
2169 pthread_mutex_lock(&sch
->mux_done_lock
);
2171 av_assert0(sch
->nb_mux_done
< sch
->nb_mux
);
2174 pthread_cond_signal(&sch
->mux_done_cond
);
2176 pthread_mutex_unlock(&sch
->mux_done_lock
);
2181 int sch_dec_receive(Scheduler
*sch
, unsigned dec_idx
, AVPacket
*pkt
)
2186 av_assert0(dec_idx
< sch
->nb_dec
);
2187 dec
= &sch
->dec
[dec_idx
];
2189 // the decoder should have given us post-flush end timestamp in pkt
2190 if (dec
->expect_end_ts
) {
2191 Timestamp ts
= (Timestamp
){ .ts
= pkt
->pts
, .tb
= pkt
->time_base
};
2192 ret
= av_thread_message_queue_send(dec
->queue_end_ts
, &ts
, 0);
2196 dec
->expect_end_ts
= 0;
2199 ret
= tq_receive(dec
->queue
, &dummy
, pkt
);
2200 av_assert0(dummy
<= 0);
2202 // got a flush packet, on the next call to this function the decoder
2203 // will give us post-flush end timestamp
2204 if (ret
>= 0 && !pkt
->data
&& !pkt
->side_data_elems
&& dec
->queue_end_ts
)
2205 dec
->expect_end_ts
= 1;
2210 static int send_to_filter(Scheduler
*sch
, SchFilterGraph
*fg
,
2211 unsigned in_idx
, AVFrame
*frame
)
2214 return tq_send(fg
->queue
, in_idx
, frame
);
2216 if (!fg
->inputs
[in_idx
].send_finished
) {
2217 fg
->inputs
[in_idx
].send_finished
= 1;
2218 tq_send_finish(fg
->queue
, in_idx
);
2220 // close the control stream when all actual inputs are done
2221 if (atomic_fetch_add(&fg
->nb_inputs_finished_send
, 1) == fg
->nb_inputs
- 1)
2222 tq_send_finish(fg
->queue
, fg
->nb_inputs
);
2227 static int dec_send_to_dst(Scheduler
*sch
, const SchedulerNode dst
,
2228 uint8_t *dst_finished
, AVFrame
*frame
)
2238 ret
= (dst
.type
== SCH_NODE_TYPE_FILTER_IN
) ?
2239 send_to_filter(sch
, &sch
->filters
[dst
.idx
], dst
.idx_stream
, frame
) :
2240 send_to_enc(sch
, &sch
->enc
[dst
.idx
], frame
);
2241 if (ret
== AVERROR_EOF
)
2247 if (dst
.type
== SCH_NODE_TYPE_FILTER_IN
)
2248 send_to_filter(sch
, &sch
->filters
[dst
.idx
], dst
.idx_stream
, NULL
);
2250 send_to_enc(sch
, &sch
->enc
[dst
.idx
], NULL
);
2257 int sch_dec_send(Scheduler
*sch
, unsigned dec_idx
,
2258 unsigned out_idx
, AVFrame
*frame
)
2263 unsigned nb_done
= 0;
2265 av_assert0(dec_idx
< sch
->nb_dec
);
2266 dec
= &sch
->dec
[dec_idx
];
2268 av_assert0(out_idx
< dec
->nb_outputs
);
2269 o
= &dec
->outputs
[out_idx
];
2271 for (unsigned i
= 0; i
< o
->nb_dst
; i
++) {
2272 uint8_t *finished
= &o
->dst_finished
[i
];
2273 AVFrame
*to_send
= frame
;
2275 // sending a frame consumes it, so make a temporary reference if needed
2276 if (i
< o
->nb_dst
- 1) {
2277 to_send
= dec
->send_frame
;
2279 // frame may sometimes contain props only,
2280 // e.g. to signal EOF timestamp
2281 ret
= frame
->buf
[0] ? av_frame_ref(to_send
, frame
) :
2282 av_frame_copy_props(to_send
, frame
);
2287 ret
= dec_send_to_dst(sch
, o
->dst
[i
], finished
, to_send
);
2289 av_frame_unref(to_send
);
2290 if (ret
== AVERROR_EOF
) {
2298 return (nb_done
== o
->nb_dst
) ? AVERROR_EOF
: 0;
2301 static int dec_done(Scheduler
*sch
, unsigned dec_idx
)
2303 SchDec
*dec
= &sch
->dec
[dec_idx
];
2306 tq_receive_finish(dec
->queue
, 0);
2308 // make sure our source does not get stuck waiting for end timestamps
2309 // that will never arrive
2310 if (dec
->queue_end_ts
)
2311 av_thread_message_queue_set_err_recv(dec
->queue_end_ts
, AVERROR_EOF
);
2313 for (unsigned i
= 0; i
< dec
->nb_outputs
; i
++) {
2314 SchDecOutput
*o
= &dec
->outputs
[i
];
2316 for (unsigned j
= 0; j
< o
->nb_dst
; j
++) {
2317 int err
= dec_send_to_dst(sch
, o
->dst
[j
], &o
->dst_finished
[j
], NULL
);
2318 if (err
< 0 && err
!= AVERROR_EOF
)
2319 ret
= err_merge(ret
, err
);
2326 int sch_enc_receive(Scheduler
*sch
, unsigned enc_idx
, AVFrame
*frame
)
2331 av_assert0(enc_idx
< sch
->nb_enc
);
2332 enc
= &sch
->enc
[enc_idx
];
2334 ret
= tq_receive(enc
->queue
, &dummy
, frame
);
2335 av_assert0(dummy
<= 0);
2340 static int enc_send_to_dst(Scheduler
*sch
, const SchedulerNode dst
,
2341 uint8_t *dst_finished
, AVPacket
*pkt
)
2351 ret
= (dst
.type
== SCH_NODE_TYPE_MUX
) ?
2352 send_to_mux(sch
, &sch
->mux
[dst
.idx
], dst
.idx_stream
, pkt
) :
2353 tq_send(sch
->dec
[dst
.idx
].queue
, 0, pkt
);
2354 if (ret
== AVERROR_EOF
)
2360 if (dst
.type
== SCH_NODE_TYPE_MUX
)
2361 send_to_mux(sch
, &sch
->mux
[dst
.idx
], dst
.idx_stream
, NULL
);
2363 tq_send_finish(sch
->dec
[dst
.idx
].queue
, 0);
2370 int sch_enc_send(Scheduler
*sch
, unsigned enc_idx
, AVPacket
*pkt
)
2375 av_assert0(enc_idx
< sch
->nb_enc
);
2376 enc
= &sch
->enc
[enc_idx
];
2378 for (unsigned i
= 0; i
< enc
->nb_dst
; i
++) {
2379 uint8_t *finished
= &enc
->dst_finished
[i
];
2380 AVPacket
*to_send
= pkt
;
2382 // sending a packet consumes it, so make a temporary reference if needed
2383 if (i
< enc
->nb_dst
- 1) {
2384 to_send
= enc
->send_pkt
;
2386 ret
= av_packet_ref(to_send
, pkt
);
2391 ret
= enc_send_to_dst(sch
, enc
->dst
[i
], finished
, to_send
);
2393 av_packet_unref(to_send
);
2394 if (ret
== AVERROR_EOF
)
2403 static int enc_done(Scheduler
*sch
, unsigned enc_idx
)
2405 SchEnc
*enc
= &sch
->enc
[enc_idx
];
2408 tq_receive_finish(enc
->queue
, 0);
2410 for (unsigned i
= 0; i
< enc
->nb_dst
; i
++) {
2411 int err
= enc_send_to_dst(sch
, enc
->dst
[i
], &enc
->dst_finished
[i
], NULL
);
2412 if (err
< 0 && err
!= AVERROR_EOF
)
2413 ret
= err_merge(ret
, err
);
2419 int sch_filter_receive(Scheduler
*sch
, unsigned fg_idx
,
2420 unsigned *in_idx
, AVFrame
*frame
)
2424 av_assert0(fg_idx
< sch
->nb_filters
);
2425 fg
= &sch
->filters
[fg_idx
];
2427 av_assert0(*in_idx
<= fg
->nb_inputs
);
2429 // update scheduling to account for desired input stream, if it changed
2431 // this check needs no locking because only the filtering thread
2432 // updates this value
2433 if (*in_idx
!= fg
->best_input
) {
2434 pthread_mutex_lock(&sch
->schedule_lock
);
2436 fg
->best_input
= *in_idx
;
2437 schedule_update_locked(sch
);
2439 pthread_mutex_unlock(&sch
->schedule_lock
);
2442 if (*in_idx
== fg
->nb_inputs
) {
2443 int terminate
= waiter_wait(sch
, &fg
->waiter
);
2444 return terminate
? AVERROR_EOF
: AVERROR(EAGAIN
);
2450 ret
= tq_receive(fg
->queue
, &idx
, frame
);
2453 else if (ret
>= 0) {
2458 // disregard EOFs for specific streams - they should always be
2459 // preceded by an EOF frame
2463 void sch_filter_receive_finish(Scheduler
*sch
, unsigned fg_idx
, unsigned in_idx
)
2468 av_assert0(fg_idx
< sch
->nb_filters
);
2469 fg
= &sch
->filters
[fg_idx
];
2471 av_assert0(in_idx
< fg
->nb_inputs
);
2472 fi
= &fg
->inputs
[in_idx
];
2474 if (!fi
->receive_finished
) {
2475 fi
->receive_finished
= 1;
2476 tq_receive_finish(fg
->queue
, in_idx
);
2478 // close the control stream when all actual inputs are done
2479 if (++fg
->nb_inputs_finished_receive
== fg
->nb_inputs
)
2480 tq_receive_finish(fg
->queue
, fg
->nb_inputs
);
2484 int sch_filter_send(Scheduler
*sch
, unsigned fg_idx
, unsigned out_idx
, AVFrame
*frame
)
2489 av_assert0(fg_idx
< sch
->nb_filters
);
2490 fg
= &sch
->filters
[fg_idx
];
2492 av_assert0(out_idx
< fg
->nb_outputs
);
2493 dst
= fg
->outputs
[out_idx
].dst
;
2495 return (dst
.type
== SCH_NODE_TYPE_ENC
) ?
2496 send_to_enc (sch
, &sch
->enc
[dst
.idx
], frame
) :
2497 send_to_filter(sch
, &sch
->filters
[dst
.idx
], dst
.idx_stream
, frame
);
2500 static int filter_done(Scheduler
*sch
, unsigned fg_idx
)
2502 SchFilterGraph
*fg
= &sch
->filters
[fg_idx
];
2505 for (unsigned i
= 0; i
<= fg
->nb_inputs
; i
++)
2506 tq_receive_finish(fg
->queue
, i
);
2508 for (unsigned i
= 0; i
< fg
->nb_outputs
; i
++) {
2509 SchedulerNode dst
= fg
->outputs
[i
].dst
;
2510 int err
= (dst
.type
== SCH_NODE_TYPE_ENC
) ?
2511 send_to_enc (sch
, &sch
->enc
[dst
.idx
], NULL
) :
2512 send_to_filter(sch
, &sch
->filters
[dst
.idx
], dst
.idx_stream
, NULL
);
2514 if (err
< 0 && err
!= AVERROR_EOF
)
2515 ret
= err_merge(ret
, err
);
2518 pthread_mutex_lock(&sch
->schedule_lock
);
2520 fg
->task_exited
= 1;
2522 schedule_update_locked(sch
);
2524 pthread_mutex_unlock(&sch
->schedule_lock
);
2529 int sch_filter_command(Scheduler
*sch
, unsigned fg_idx
, AVFrame
*frame
)
2533 av_assert0(fg_idx
< sch
->nb_filters
);
2534 fg
= &sch
->filters
[fg_idx
];
2536 return send_to_filter(sch
, fg
, fg
->nb_inputs
, frame
);
2539 static int task_cleanup(Scheduler
*sch
, SchedulerNode node
)
2541 switch (node
.type
) {
2542 case SCH_NODE_TYPE_DEMUX
: return demux_done (sch
, node
.idx
);
2543 case SCH_NODE_TYPE_MUX
: return mux_done (sch
, node
.idx
);
2544 case SCH_NODE_TYPE_DEC
: return dec_done (sch
, node
.idx
);
2545 case SCH_NODE_TYPE_ENC
: return enc_done (sch
, node
.idx
);
2546 case SCH_NODE_TYPE_FILTER_IN
: return filter_done(sch
, node
.idx
);
2547 default: av_assert0(0);
2551 static void *task_wrapper(void *arg
)
2553 SchTask
*task
= arg
;
2554 Scheduler
*sch
= task
->parent
;
2558 ret
= task
->func(task
->func_arg
);
2560 av_log(task
->func_arg
, AV_LOG_ERROR
,
2561 "Task finished with error code: %d (%s)\n", ret
, av_err2str(ret
));
2563 err
= task_cleanup(sch
, task
->node
);
2564 ret
= err_merge(ret
, err
);
2566 // EOF is considered normal termination
2567 if (ret
== AVERROR_EOF
)
2570 atomic_store(&sch
->task_failed
, 1);
2572 av_log(task
->func_arg
, ret
< 0 ? AV_LOG_ERROR
: AV_LOG_VERBOSE
,
2573 "Terminating thread with return code %d (%s)\n", ret
,
2574 ret
< 0 ? av_err2str(ret
) : "success");
2576 return (void*)(intptr_t)ret
;
2579 static int task_stop(Scheduler
*sch
, SchTask
*task
)
2584 if (!task
->thread_running
)
2585 return task_cleanup(sch
, task
->node
);
2587 ret
= pthread_join(task
->thread
, &thread_ret
);
2588 av_assert0(ret
== 0);
2590 task
->thread_running
= 0;
2592 return (intptr_t)thread_ret
;
2595 int sch_stop(Scheduler
*sch
, int64_t *finish_ts
)
2599 if (sch
->state
!= SCH_STATE_STARTED
)
2602 atomic_store(&sch
->terminate
, 1);
2604 for (unsigned type
= 0; type
< 2; type
++)
2605 for (unsigned i
= 0; i
< (type
? sch
->nb_demux
: sch
->nb_filters
); i
++) {
2606 SchWaiter
*w
= type
? &sch
->demux
[i
].waiter
: &sch
->filters
[i
].waiter
;
2610 for (unsigned i
= 0; i
< sch
->nb_demux
; i
++) {
2611 SchDemux
*d
= &sch
->demux
[i
];
2613 err
= task_stop(sch
, &d
->task
);
2614 ret
= err_merge(ret
, err
);
2617 for (unsigned i
= 0; i
< sch
->nb_dec
; i
++) {
2618 SchDec
*dec
= &sch
->dec
[i
];
2620 err
= task_stop(sch
, &dec
->task
);
2621 ret
= err_merge(ret
, err
);
2624 for (unsigned i
= 0; i
< sch
->nb_filters
; i
++) {
2625 SchFilterGraph
*fg
= &sch
->filters
[i
];
2627 err
= task_stop(sch
, &fg
->task
);
2628 ret
= err_merge(ret
, err
);
2631 for (unsigned i
= 0; i
< sch
->nb_enc
; i
++) {
2632 SchEnc
*enc
= &sch
->enc
[i
];
2634 err
= task_stop(sch
, &enc
->task
);
2635 ret
= err_merge(ret
, err
);
2638 for (unsigned i
= 0; i
< sch
->nb_mux
; i
++) {
2639 SchMux
*mux
= &sch
->mux
[i
];
2641 err
= task_stop(sch
, &mux
->task
);
2642 ret
= err_merge(ret
, err
);
2646 *finish_ts
= trailing_dts(sch
, 1);
2648 sch
->state
= SCH_STATE_STOPPED
;