2 * Inter-thread scheduling/synchronization.
3 * Copyright (c) 2023 Anton Khirnov
5 * This file is part of FFmpeg.
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22 #include <stdatomic.h>
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
32 #include "libavcodec/packet.h"
34 #include "libavutil/avassert.h"
35 #include "libavutil/error.h"
36 #include "libavutil/fifo.h"
37 #include "libavutil/frame.h"
38 #include "libavutil/mem.h"
39 #include "libavutil/thread.h"
40 #include "libavutil/threadmessage.h"
41 #include "libavutil/time.h"
44 // FIXME: some other value? make this dynamic?
45 #define SCHEDULE_TOLERANCE (100 * 1000)
52 typedef struct SchWaiter
{
57 // the following are internal state of schedule_update_locked() and must not
58 // be accessed outside of it
63 typedef struct SchTask
{
74 typedef struct SchDecOutput
{
76 uint8_t *dst_finished
;
80 typedef struct SchDec
{
85 SchDecOutput
*outputs
;
89 // Queue for receiving input packets, one stream.
92 // Queue for sending post-flush end timestamps back to the source
93 AVThreadMessageQueue
*queue_end_ts
;
96 // temporary storage used by sch_dec_send()
100 typedef struct SchSyncQueue
{
103 pthread_mutex_t lock
;
109 typedef struct SchEnc
{
110 const AVClass
*class;
114 uint8_t *dst_finished
;
117 // [0] - index of the sync queue in Scheduler.sq_enc,
118 // [1] - index of this encoder in the sq
121 /* Opening encoders is somewhat nontrivial due to their interaction with
122 * sync queues, which are (among other things) responsible for maintaining
123 * constant audio frame size, when it is required by the encoder.
125 * Opening the encoder requires stream parameters, obtained from the first
126 * frame. However, that frame cannot be properly chunked by the sync queue
127 * without knowing the required frame size, which is only available after
128 * opening the encoder.
130 * This apparent circular dependency is resolved in the following way:
131 * - the caller creating the encoder gives us a callback which opens the
132 * encoder and returns the required frame size (if any)
133 * - when the first frame is sent to the encoder, the sending thread
134 * - calls this callback, opening the encoder
135 * - passes the returned frame size to the sync queue
137 int (*open_cb
)(void *opaque
, const AVFrame
*frame
);
141 // Queue for receiving input frames, one stream.
143 // tq_send() to queue returned EOF
146 // temporary storage used by sch_enc_send()
150 typedef struct SchDemuxStream
{
152 uint8_t *dst_finished
;
156 typedef struct SchDemux
{
157 const AVClass
*class;
159 SchDemuxStream
*streams
;
165 // temporary storage used by sch_demux_send()
168 // protected by schedule_lock
172 typedef struct PreMuxQueue
{
174 * Queue for buffering the packets before the muxer task can be started.
178 * Maximum number of packets in fifo.
182 * The size of the AVPackets' buffers in queue.
183 * Updated when a packet is either pushed or pulled from the queue.
186 /* Threshold after which max_packets will be in effect */
187 size_t data_threshold
;
190 typedef struct SchMuxStream
{
193 unsigned *sub_heartbeat_dst
;
194 unsigned nb_sub_heartbeat_dst
;
196 PreMuxQueue pre_mux_queue
;
198 // an EOF was generated while flushing the pre-mux queue
201 ////////////////////////////////////////////////////////////
202 // The following are protected by Scheduler.schedule_lock //
204 /* dts+duration of the last packet sent to this stream
207 // this stream no longer accepts input
209 ////////////////////////////////////////////////////////////
212 typedef struct SchMux
{
213 const AVClass
*class;
215 SchMuxStream
*streams
;
217 unsigned nb_streams_ready
;
219 int (*init
)(void *arg
);
223 * Set to 1 after starting the muxer task and flushing the
225 * Set either before any tasks have started, or with
226 * Scheduler.mux_ready_lock held.
228 atomic_int mux_started
;
232 AVPacket
*sub_heartbeat_pkt
;
235 typedef struct SchFilterIn
{
238 int receive_finished
;
241 typedef struct SchFilterOut
{
245 typedef struct SchFilterGraph
{
246 const AVClass
*class;
250 atomic_uint nb_inputs_finished_send
;
251 unsigned nb_inputs_finished_receive
;
253 SchFilterOut
*outputs
;
257 // input queue, nb_inputs+1 streams
258 // last stream is control
262 // protected by schedule_lock
267 enum SchedulerState
{
274 const AVClass
*class;
282 unsigned nb_mux_ready
;
283 pthread_mutex_t mux_ready_lock
;
285 unsigned nb_mux_done
;
286 unsigned task_failed
;
287 pthread_mutex_t finish_lock
;
288 pthread_cond_t finish_cond
;
297 SchSyncQueue
*sq_enc
;
300 SchFilterGraph
*filters
;
306 enum SchedulerState state
;
307 atomic_int terminate
;
309 pthread_mutex_t schedule_lock
;
311 atomic_int_least64_t last_dts
;
315 * Wait until this task is allowed to proceed.
317 * @retval 0 the caller should proceed
318 * @retval 1 the caller should terminate
320 static int waiter_wait(Scheduler
*sch
, SchWaiter
*w
)
324 if (!atomic_load(&w
->choked
))
327 pthread_mutex_lock(&w
->lock
);
329 while (atomic_load(&w
->choked
) && !atomic_load(&sch
->terminate
))
330 pthread_cond_wait(&w
->cond
, &w
->lock
);
332 terminate
= atomic_load(&sch
->terminate
);
334 pthread_mutex_unlock(&w
->lock
);
339 static void waiter_set(SchWaiter
*w
, int choked
)
341 pthread_mutex_lock(&w
->lock
);
343 atomic_store(&w
->choked
, choked
);
344 pthread_cond_signal(&w
->cond
);
346 pthread_mutex_unlock(&w
->lock
);
349 static int waiter_init(SchWaiter
*w
)
353 atomic_init(&w
->choked
, 0);
355 ret
= pthread_mutex_init(&w
->lock
, NULL
);
359 ret
= pthread_cond_init(&w
->cond
, NULL
);
366 static void waiter_uninit(SchWaiter
*w
)
368 pthread_mutex_destroy(&w
->lock
);
369 pthread_cond_destroy(&w
->cond
);
372 static int queue_alloc(ThreadQueue
**ptq
, unsigned nb_streams
, unsigned queue_size
,
377 if (queue_size
<= 0) {
378 if (type
== QUEUE_FRAMES
)
379 queue_size
= DEFAULT_FRAME_THREAD_QUEUE_SIZE
;
381 queue_size
= DEFAULT_PACKET_THREAD_QUEUE_SIZE
;
384 if (type
== QUEUE_FRAMES
) {
385 // This queue length is used in the decoder code to ensure that
386 // there are enough entries in fixed-size frame pools to account
387 // for frames held in queues inside the ffmpeg utility. If this
388 // can ever dynamically change then the corresponding decode
389 // code needs to be updated as well.
390 av_assert0(queue_size
<= DEFAULT_FRAME_THREAD_QUEUE_SIZE
);
393 tq
= tq_alloc(nb_streams
, queue_size
,
394 (type
== QUEUE_PACKETS
) ? THREAD_QUEUE_PACKETS
: THREAD_QUEUE_FRAMES
);
396 return AVERROR(ENOMEM
);
402 static void *task_wrapper(void *arg
);
404 static int task_start(SchTask
*task
)
411 av_log(task
->func_arg
, AV_LOG_VERBOSE
, "Starting thread...\n");
413 av_assert0(!task
->thread_running
);
415 ret
= pthread_create(&task
->thread
, NULL
, task_wrapper
, task
);
417 av_log(task
->func_arg
, AV_LOG_ERROR
, "pthread_create() failed: %s\n",
422 task
->thread_running
= 1;
426 static void task_init(Scheduler
*sch
, SchTask
*task
, enum SchedulerNodeType type
, unsigned idx
,
427 SchThreadFunc func
, void *func_arg
)
431 task
->node
.type
= type
;
432 task
->node
.idx
= idx
;
435 task
->func_arg
= func_arg
;
438 static int64_t trailing_dts(const Scheduler
*sch
, int count_finished
)
440 int64_t min_dts
= INT64_MAX
;
442 for (unsigned i
= 0; i
< sch
->nb_mux
; i
++) {
443 const SchMux
*mux
= &sch
->mux
[i
];
445 for (unsigned j
= 0; j
< mux
->nb_streams
; j
++) {
446 const SchMuxStream
*ms
= &mux
->streams
[j
];
448 if (ms
->source_finished
&& !count_finished
)
450 if (ms
->last_dts
== AV_NOPTS_VALUE
)
451 return AV_NOPTS_VALUE
;
453 min_dts
= FFMIN(min_dts
, ms
->last_dts
);
457 return min_dts
== INT64_MAX
? AV_NOPTS_VALUE
: min_dts
;
460 void sch_remove_filtergraph(Scheduler
*sch
, int idx
)
462 SchFilterGraph
*fg
= &sch
->filters
[idx
];
464 av_assert0(!fg
->task
.thread_running
);
465 memset(&fg
->task
, 0, sizeof(fg
->task
));
469 av_freep(&fg
->inputs
);
471 av_freep(&fg
->outputs
);
477 void sch_free(Scheduler
**psch
)
479 Scheduler
*sch
= *psch
;
486 for (unsigned i
= 0; i
< sch
->nb_demux
; i
++) {
487 SchDemux
*d
= &sch
->demux
[i
];
489 for (unsigned j
= 0; j
< d
->nb_streams
; j
++) {
490 SchDemuxStream
*ds
= &d
->streams
[j
];
492 av_freep(&ds
->dst_finished
);
494 av_freep(&d
->streams
);
496 av_packet_free(&d
->send_pkt
);
498 waiter_uninit(&d
->waiter
);
500 av_freep(&sch
->demux
);
502 for (unsigned i
= 0; i
< sch
->nb_mux
; i
++) {
503 SchMux
*mux
= &sch
->mux
[i
];
505 for (unsigned j
= 0; j
< mux
->nb_streams
; j
++) {
506 SchMuxStream
*ms
= &mux
->streams
[j
];
508 if (ms
->pre_mux_queue
.fifo
) {
510 while (av_fifo_read(ms
->pre_mux_queue
.fifo
, &pkt
, 1) >= 0)
511 av_packet_free(&pkt
);
512 av_fifo_freep2(&ms
->pre_mux_queue
.fifo
);
515 av_freep(&ms
->sub_heartbeat_dst
);
517 av_freep(&mux
->streams
);
519 av_packet_free(&mux
->sub_heartbeat_pkt
);
521 tq_free(&mux
->queue
);
525 for (unsigned i
= 0; i
< sch
->nb_dec
; i
++) {
526 SchDec
*dec
= &sch
->dec
[i
];
528 tq_free(&dec
->queue
);
530 av_thread_message_queue_free(&dec
->queue_end_ts
);
532 for (unsigned j
= 0; j
< dec
->nb_outputs
; j
++) {
533 SchDecOutput
*o
= &dec
->outputs
[j
];
536 av_freep(&o
->dst_finished
);
539 av_freep(&dec
->outputs
);
541 av_frame_free(&dec
->send_frame
);
545 for (unsigned i
= 0; i
< sch
->nb_enc
; i
++) {
546 SchEnc
*enc
= &sch
->enc
[i
];
548 tq_free(&enc
->queue
);
550 av_packet_free(&enc
->send_pkt
);
553 av_freep(&enc
->dst_finished
);
557 for (unsigned i
= 0; i
< sch
->nb_sq_enc
; i
++) {
558 SchSyncQueue
*sq
= &sch
->sq_enc
[i
];
560 av_frame_free(&sq
->frame
);
561 pthread_mutex_destroy(&sq
->lock
);
562 av_freep(&sq
->enc_idx
);
564 av_freep(&sch
->sq_enc
);
566 for (unsigned i
= 0; i
< sch
->nb_filters
; i
++) {
567 SchFilterGraph
*fg
= &sch
->filters
[i
];
571 av_freep(&fg
->inputs
);
572 av_freep(&fg
->outputs
);
574 waiter_uninit(&fg
->waiter
);
576 av_freep(&sch
->filters
);
578 av_freep(&sch
->sdp_filename
);
580 pthread_mutex_destroy(&sch
->schedule_lock
);
582 pthread_mutex_destroy(&sch
->mux_ready_lock
);
584 pthread_mutex_destroy(&sch
->finish_lock
);
585 pthread_cond_destroy(&sch
->finish_cond
);
590 static const AVClass scheduler_class
= {
591 .class_name
= "Scheduler",
592 .version
= LIBAVUTIL_VERSION_INT
,
595 Scheduler
*sch_alloc(void)
600 sch
= av_mallocz(sizeof(*sch
));
604 sch
->class = &scheduler_class
;
607 ret
= pthread_mutex_init(&sch
->schedule_lock
, NULL
);
611 ret
= pthread_mutex_init(&sch
->mux_ready_lock
, NULL
);
615 ret
= pthread_mutex_init(&sch
->finish_lock
, NULL
);
619 ret
= pthread_cond_init(&sch
->finish_cond
, NULL
);
629 int sch_sdp_filename(Scheduler
*sch
, const char *sdp_filename
)
631 av_freep(&sch
->sdp_filename
);
632 sch
->sdp_filename
= av_strdup(sdp_filename
);
633 return sch
->sdp_filename
? 0 : AVERROR(ENOMEM
);
636 static const AVClass sch_mux_class
= {
637 .class_name
= "SchMux",
638 .version
= LIBAVUTIL_VERSION_INT
,
639 .parent_log_context_offset
= offsetof(SchMux
, task
.func_arg
),
642 int sch_add_mux(Scheduler
*sch
, SchThreadFunc func
, int (*init
)(void *),
643 void *arg
, int sdp_auto
, unsigned thread_queue_size
)
645 const unsigned idx
= sch
->nb_mux
;
650 ret
= GROW_ARRAY(sch
->mux
, sch
->nb_mux
);
654 mux
= &sch
->mux
[idx
];
655 mux
->class = &sch_mux_class
;
657 mux
->queue_size
= thread_queue_size
;
659 task_init(sch
, &mux
->task
, SCH_NODE_TYPE_MUX
, idx
, func
, arg
);
661 sch
->sdp_auto
&= sdp_auto
;
666 int sch_add_mux_stream(Scheduler
*sch
, unsigned mux_idx
)
673 av_assert0(mux_idx
< sch
->nb_mux
);
674 mux
= &sch
->mux
[mux_idx
];
676 ret
= GROW_ARRAY(mux
->streams
, mux
->nb_streams
);
679 stream_idx
= mux
->nb_streams
- 1;
681 ms
= &mux
->streams
[stream_idx
];
683 ms
->pre_mux_queue
.fifo
= av_fifo_alloc2(8, sizeof(AVPacket
*), 0);
684 if (!ms
->pre_mux_queue
.fifo
)
685 return AVERROR(ENOMEM
);
687 ms
->last_dts
= AV_NOPTS_VALUE
;
692 static const AVClass sch_demux_class
= {
693 .class_name
= "SchDemux",
694 .version
= LIBAVUTIL_VERSION_INT
,
695 .parent_log_context_offset
= offsetof(SchDemux
, task
.func_arg
),
698 int sch_add_demux(Scheduler
*sch
, SchThreadFunc func
, void *ctx
)
700 const unsigned idx
= sch
->nb_demux
;
705 ret
= GROW_ARRAY(sch
->demux
, sch
->nb_demux
);
709 d
= &sch
->demux
[idx
];
711 task_init(sch
, &d
->task
, SCH_NODE_TYPE_DEMUX
, idx
, func
, ctx
);
713 d
->class = &sch_demux_class
;
714 d
->send_pkt
= av_packet_alloc();
716 return AVERROR(ENOMEM
);
718 ret
= waiter_init(&d
->waiter
);
725 int sch_add_demux_stream(Scheduler
*sch
, unsigned demux_idx
)
730 av_assert0(demux_idx
< sch
->nb_demux
);
731 d
= &sch
->demux
[demux_idx
];
733 ret
= GROW_ARRAY(d
->streams
, d
->nb_streams
);
734 return ret
< 0 ? ret
: d
->nb_streams
- 1;
737 int sch_add_dec_output(Scheduler
*sch
, unsigned dec_idx
)
742 av_assert0(dec_idx
< sch
->nb_dec
);
743 dec
= &sch
->dec
[dec_idx
];
745 ret
= GROW_ARRAY(dec
->outputs
, dec
->nb_outputs
);
749 return dec
->nb_outputs
- 1;
752 static const AVClass sch_dec_class
= {
753 .class_name
= "SchDec",
754 .version
= LIBAVUTIL_VERSION_INT
,
755 .parent_log_context_offset
= offsetof(SchDec
, task
.func_arg
),
758 int sch_add_dec(Scheduler
*sch
, SchThreadFunc func
, void *ctx
, int send_end_ts
)
760 const unsigned idx
= sch
->nb_dec
;
765 ret
= GROW_ARRAY(sch
->dec
, sch
->nb_dec
);
769 dec
= &sch
->dec
[idx
];
771 task_init(sch
, &dec
->task
, SCH_NODE_TYPE_DEC
, idx
, func
, ctx
);
773 dec
->class = &sch_dec_class
;
774 dec
->send_frame
= av_frame_alloc();
775 if (!dec
->send_frame
)
776 return AVERROR(ENOMEM
);
778 ret
= sch_add_dec_output(sch
, idx
);
782 ret
= queue_alloc(&dec
->queue
, 1, 0, QUEUE_PACKETS
);
787 ret
= av_thread_message_queue_alloc(&dec
->queue_end_ts
, 1, sizeof(Timestamp
));
795 static const AVClass sch_enc_class
= {
796 .class_name
= "SchEnc",
797 .version
= LIBAVUTIL_VERSION_INT
,
798 .parent_log_context_offset
= offsetof(SchEnc
, task
.func_arg
),
801 int sch_add_enc(Scheduler
*sch
, SchThreadFunc func
, void *ctx
,
802 int (*open_cb
)(void *opaque
, const AVFrame
*frame
))
804 const unsigned idx
= sch
->nb_enc
;
809 ret
= GROW_ARRAY(sch
->enc
, sch
->nb_enc
);
813 enc
= &sch
->enc
[idx
];
815 enc
->class = &sch_enc_class
;
816 enc
->open_cb
= open_cb
;
820 task_init(sch
, &enc
->task
, SCH_NODE_TYPE_ENC
, idx
, func
, ctx
);
822 enc
->send_pkt
= av_packet_alloc();
824 return AVERROR(ENOMEM
);
826 ret
= queue_alloc(&enc
->queue
, 1, 0, QUEUE_FRAMES
);
833 static const AVClass sch_fg_class
= {
834 .class_name
= "SchFilterGraph",
835 .version
= LIBAVUTIL_VERSION_INT
,
836 .parent_log_context_offset
= offsetof(SchFilterGraph
, task
.func_arg
),
839 int sch_add_filtergraph(Scheduler
*sch
, unsigned nb_inputs
, unsigned nb_outputs
,
840 SchThreadFunc func
, void *ctx
)
842 const unsigned idx
= sch
->nb_filters
;
847 ret
= GROW_ARRAY(sch
->filters
, sch
->nb_filters
);
850 fg
= &sch
->filters
[idx
];
852 fg
->class = &sch_fg_class
;
854 task_init(sch
, &fg
->task
, SCH_NODE_TYPE_FILTER_IN
, idx
, func
, ctx
);
857 fg
->inputs
= av_calloc(nb_inputs
, sizeof(*fg
->inputs
));
859 return AVERROR(ENOMEM
);
860 fg
->nb_inputs
= nb_inputs
;
864 fg
->outputs
= av_calloc(nb_outputs
, sizeof(*fg
->outputs
));
866 return AVERROR(ENOMEM
);
867 fg
->nb_outputs
= nb_outputs
;
870 ret
= waiter_init(&fg
->waiter
);
874 ret
= queue_alloc(&fg
->queue
, fg
->nb_inputs
+ 1, 0, QUEUE_FRAMES
);
881 int sch_add_sq_enc(Scheduler
*sch
, uint64_t buf_size_us
, void *logctx
)
886 ret
= GROW_ARRAY(sch
->sq_enc
, sch
->nb_sq_enc
);
889 sq
= &sch
->sq_enc
[sch
->nb_sq_enc
- 1];
891 sq
->sq
= sq_alloc(SYNC_QUEUE_FRAMES
, buf_size_us
, logctx
);
893 return AVERROR(ENOMEM
);
895 sq
->frame
= av_frame_alloc();
897 return AVERROR(ENOMEM
);
899 ret
= pthread_mutex_init(&sq
->lock
, NULL
);
903 return sq
- sch
->sq_enc
;
906 int sch_sq_add_enc(Scheduler
*sch
, unsigned sq_idx
, unsigned enc_idx
,
907 int limiting
, uint64_t max_frames
)
913 av_assert0(sq_idx
< sch
->nb_sq_enc
);
914 sq
= &sch
->sq_enc
[sq_idx
];
916 av_assert0(enc_idx
< sch
->nb_enc
);
917 enc
= &sch
->enc
[enc_idx
];
919 ret
= GROW_ARRAY(sq
->enc_idx
, sq
->nb_enc_idx
);
922 sq
->enc_idx
[sq
->nb_enc_idx
- 1] = enc_idx
;
924 ret
= sq_add_stream(sq
->sq
, limiting
);
928 enc
->sq_idx
[0] = sq_idx
;
929 enc
->sq_idx
[1] = ret
;
931 if (max_frames
!= INT64_MAX
)
932 sq_limit_frames(sq
->sq
, enc
->sq_idx
[1], max_frames
);
937 int sch_connect(Scheduler
*sch
, SchedulerNode src
, SchedulerNode dst
)
942 case SCH_NODE_TYPE_DEMUX
: {
945 av_assert0(src
.idx
< sch
->nb_demux
&&
946 src
.idx_stream
< sch
->demux
[src
.idx
].nb_streams
);
947 ds
= &sch
->demux
[src
.idx
].streams
[src
.idx_stream
];
949 ret
= GROW_ARRAY(ds
->dst
, ds
->nb_dst
);
953 ds
->dst
[ds
->nb_dst
- 1] = dst
;
955 // demuxed packets go to decoding or streamcopy
957 case SCH_NODE_TYPE_DEC
: {
960 av_assert0(dst
.idx
< sch
->nb_dec
);
961 dec
= &sch
->dec
[dst
.idx
];
963 av_assert0(!dec
->src
.type
);
967 case SCH_NODE_TYPE_MUX
: {
970 av_assert0(dst
.idx
< sch
->nb_mux
&&
971 dst
.idx_stream
< sch
->mux
[dst
.idx
].nb_streams
);
972 ms
= &sch
->mux
[dst
.idx
].streams
[dst
.idx_stream
];
974 av_assert0(!ms
->src
.type
);
979 default: av_assert0(0);
984 case SCH_NODE_TYPE_DEC
: {
988 av_assert0(src
.idx
< sch
->nb_dec
);
989 dec
= &sch
->dec
[src
.idx
];
991 av_assert0(src
.idx_stream
< dec
->nb_outputs
);
992 o
= &dec
->outputs
[src
.idx_stream
];
994 ret
= GROW_ARRAY(o
->dst
, o
->nb_dst
);
998 o
->dst
[o
->nb_dst
- 1] = dst
;
1000 // decoded frames go to filters or encoding
1002 case SCH_NODE_TYPE_FILTER_IN
: {
1005 av_assert0(dst
.idx
< sch
->nb_filters
&&
1006 dst
.idx_stream
< sch
->filters
[dst
.idx
].nb_inputs
);
1007 fi
= &sch
->filters
[dst
.idx
].inputs
[dst
.idx_stream
];
1009 av_assert0(!fi
->src
.type
);
1013 case SCH_NODE_TYPE_ENC
: {
1016 av_assert0(dst
.idx
< sch
->nb_enc
);
1017 enc
= &sch
->enc
[dst
.idx
];
1019 av_assert0(!enc
->src
.type
);
1023 default: av_assert0(0);
1028 case SCH_NODE_TYPE_FILTER_OUT
: {
1031 av_assert0(src
.idx
< sch
->nb_filters
&&
1032 src
.idx_stream
< sch
->filters
[src
.idx
].nb_outputs
);
1033 fo
= &sch
->filters
[src
.idx
].outputs
[src
.idx_stream
];
1035 av_assert0(!fo
->dst
.type
);
1038 // filtered frames go to encoding or another filtergraph
1040 case SCH_NODE_TYPE_ENC
: {
1043 av_assert0(dst
.idx
< sch
->nb_enc
);
1044 enc
= &sch
->enc
[dst
.idx
];
1046 av_assert0(!enc
->src
.type
);
1050 case SCH_NODE_TYPE_FILTER_IN
: {
1053 av_assert0(dst
.idx
< sch
->nb_filters
&&
1054 dst
.idx_stream
< sch
->filters
[dst
.idx
].nb_inputs
);
1055 fi
= &sch
->filters
[dst
.idx
].inputs
[dst
.idx_stream
];
1057 av_assert0(!fi
->src
.type
);
1061 default: av_assert0(0);
1067 case SCH_NODE_TYPE_ENC
: {
1070 av_assert0(src
.idx
< sch
->nb_enc
);
1071 enc
= &sch
->enc
[src
.idx
];
1073 ret
= GROW_ARRAY(enc
->dst
, enc
->nb_dst
);
1077 enc
->dst
[enc
->nb_dst
- 1] = dst
;
1079 // encoding packets go to muxing or decoding
1081 case SCH_NODE_TYPE_MUX
: {
1084 av_assert0(dst
.idx
< sch
->nb_mux
&&
1085 dst
.idx_stream
< sch
->mux
[dst
.idx
].nb_streams
);
1086 ms
= &sch
->mux
[dst
.idx
].streams
[dst
.idx_stream
];
1088 av_assert0(!ms
->src
.type
);
1093 case SCH_NODE_TYPE_DEC
: {
1096 av_assert0(dst
.idx
< sch
->nb_dec
);
1097 dec
= &sch
->dec
[dst
.idx
];
1099 av_assert0(!dec
->src
.type
);
1104 default: av_assert0(0);
1109 default: av_assert0(0);
1115 static int mux_task_start(SchMux
*mux
)
1119 ret
= task_start(&mux
->task
);
1123 /* flush the pre-muxing queues */
1125 int min_stream
= -1;
1126 Timestamp min_ts
= { .ts
= AV_NOPTS_VALUE
};
1130 // find the stream with the earliest dts or EOF in pre-muxing queue
1131 for (unsigned i
= 0; i
< mux
->nb_streams
; i
++) {
1132 SchMuxStream
*ms
= &mux
->streams
[i
];
1134 if (av_fifo_peek(ms
->pre_mux_queue
.fifo
, &pkt
, 1, 0) < 0)
1137 if (!pkt
|| pkt
->dts
== AV_NOPTS_VALUE
) {
1142 if (min_ts
.ts
== AV_NOPTS_VALUE
||
1143 av_compare_ts(min_ts
.ts
, min_ts
.tb
, pkt
->dts
, pkt
->time_base
) > 0) {
1145 min_ts
= (Timestamp
){ .ts
= pkt
->dts
, .tb
= pkt
->time_base
};
1149 if (min_stream
>= 0) {
1150 SchMuxStream
*ms
= &mux
->streams
[min_stream
];
1152 ret
= av_fifo_read(ms
->pre_mux_queue
.fifo
, &pkt
, 1);
1153 av_assert0(ret
>= 0);
1157 ret
= tq_send(mux
->queue
, min_stream
, pkt
);
1158 av_packet_free(&pkt
);
1159 if (ret
== AVERROR_EOF
)
1164 tq_send_finish(mux
->queue
, min_stream
);
1172 atomic_store(&mux
->mux_started
, 1);
1177 int print_sdp(const char *filename
);
1179 static int mux_init(Scheduler
*sch
, SchMux
*mux
)
1183 ret
= mux
->init(mux
->task
.func_arg
);
1187 sch
->nb_mux_ready
++;
1189 if (sch
->sdp_filename
|| sch
->sdp_auto
) {
1190 if (sch
->nb_mux_ready
< sch
->nb_mux
)
1193 ret
= print_sdp(sch
->sdp_filename
);
1195 av_log(sch
, AV_LOG_ERROR
, "Error writing the SDP.\n");
1199 /* SDP is written only after all the muxers are ready, so now we
1200 * start ALL the threads */
1201 for (unsigned i
= 0; i
< sch
->nb_mux
; i
++) {
1202 ret
= mux_task_start(&sch
->mux
[i
]);
1207 ret
= mux_task_start(mux
);
1215 void sch_mux_stream_buffering(Scheduler
*sch
, unsigned mux_idx
, unsigned stream_idx
,
1216 size_t data_threshold
, int max_packets
)
1221 av_assert0(mux_idx
< sch
->nb_mux
);
1222 mux
= &sch
->mux
[mux_idx
];
1224 av_assert0(stream_idx
< mux
->nb_streams
);
1225 ms
= &mux
->streams
[stream_idx
];
1227 ms
->pre_mux_queue
.max_packets
= max_packets
;
1228 ms
->pre_mux_queue
.data_threshold
= data_threshold
;
1231 int sch_mux_stream_ready(Scheduler
*sch
, unsigned mux_idx
, unsigned stream_idx
)
1236 av_assert0(mux_idx
< sch
->nb_mux
);
1237 mux
= &sch
->mux
[mux_idx
];
1239 av_assert0(stream_idx
< mux
->nb_streams
);
1241 pthread_mutex_lock(&sch
->mux_ready_lock
);
1243 av_assert0(mux
->nb_streams_ready
< mux
->nb_streams
);
1245 // this may be called during initialization - do not start
1246 // threads before sch_start() is called
1247 if (++mux
->nb_streams_ready
== mux
->nb_streams
&&
1248 sch
->state
>= SCH_STATE_STARTED
)
1249 ret
= mux_init(sch
, mux
);
1251 pthread_mutex_unlock(&sch
->mux_ready_lock
);
1256 int sch_mux_sub_heartbeat_add(Scheduler
*sch
, unsigned mux_idx
, unsigned stream_idx
,
1263 av_assert0(mux_idx
< sch
->nb_mux
);
1264 mux
= &sch
->mux
[mux_idx
];
1266 av_assert0(stream_idx
< mux
->nb_streams
);
1267 ms
= &mux
->streams
[stream_idx
];
1269 ret
= GROW_ARRAY(ms
->sub_heartbeat_dst
, ms
->nb_sub_heartbeat_dst
);
1273 av_assert0(dec_idx
< sch
->nb_dec
);
1274 ms
->sub_heartbeat_dst
[ms
->nb_sub_heartbeat_dst
- 1] = dec_idx
;
1276 if (!mux
->sub_heartbeat_pkt
) {
1277 mux
->sub_heartbeat_pkt
= av_packet_alloc();
1278 if (!mux
->sub_heartbeat_pkt
)
1279 return AVERROR(ENOMEM
);
1285 static void unchoke_for_stream(Scheduler
*sch
, SchedulerNode src
);
1287 // Unchoke any filter graphs that are downstream of this node, to prevent it
1288 // from getting stuck trying to push data to a full queue
1289 static void unchoke_downstream(Scheduler
*sch
, SchedulerNode
*dst
)
1294 switch (dst
->type
) {
1295 case SCH_NODE_TYPE_DEC
:
1296 dec
= &sch
->dec
[dst
->idx
];
1297 for (int i
= 0; i
< dec
->nb_outputs
; i
++)
1298 unchoke_downstream(sch
, dec
->outputs
[i
].dst
);
1300 case SCH_NODE_TYPE_ENC
:
1301 enc
= &sch
->enc
[dst
->idx
];
1302 for (int i
= 0; i
< enc
->nb_dst
; i
++)
1303 unchoke_downstream(sch
, &enc
->dst
[i
]);
1305 case SCH_NODE_TYPE_MUX
:
1306 // muxers are never choked
1308 case SCH_NODE_TYPE_FILTER_IN
:
1309 fg
= &sch
->filters
[dst
->idx
];
1310 if (fg
->best_input
== fg
->nb_inputs
) {
1311 fg
->waiter
.choked_next
= 0;
1313 // ensure that this filter graph is not stuck waiting for
1314 // input from a different upstream demuxer
1315 unchoke_for_stream(sch
, fg
->inputs
[fg
->best_input
].src
);
1319 av_unreachable("Invalid destination node type?");
1324 static void unchoke_for_stream(Scheduler
*sch
, SchedulerNode src
)
1330 case SCH_NODE_TYPE_DEMUX
:
1331 // fed directly by a demuxer (i.e. not through a filtergraph)
1332 demux
= &sch
->demux
[src
.idx
];
1333 if (demux
->waiter
.choked_next
== 0)
1334 return; // prevent infinite loop
1335 demux
->waiter
.choked_next
= 0;
1336 for (int i
= 0; i
< demux
->nb_streams
; i
++)
1337 unchoke_downstream(sch
, demux
->streams
[i
].dst
);
1339 case SCH_NODE_TYPE_DEC
:
1340 src
= sch
->dec
[src
.idx
].src
;
1342 case SCH_NODE_TYPE_ENC
:
1343 src
= sch
->enc
[src
.idx
].src
;
1345 case SCH_NODE_TYPE_FILTER_OUT
:
1346 fg
= &sch
->filters
[src
.idx
];
1347 // the filtergraph contains internal sources and
1348 // requested to be scheduled directly
1349 if (fg
->best_input
== fg
->nb_inputs
) {
1350 fg
->waiter
.choked_next
= 0;
1353 src
= fg
->inputs
[fg
->best_input
].src
;
1356 av_unreachable("Invalid source node type?");
1362 static void choke_demux(const Scheduler
*sch
, int demux_id
, int choked
)
1364 av_assert1(demux_id
< sch
->nb_demux
);
1365 SchDemux
*demux
= &sch
->demux
[demux_id
];
1367 for (int i
= 0; i
< demux
->nb_streams
; i
++) {
1368 SchedulerNode
*dst
= demux
->streams
[i
].dst
;
1371 switch (dst
->type
) {
1372 case SCH_NODE_TYPE_DEC
:
1373 tq_choke(sch
->dec
[dst
->idx
].queue
, choked
);
1375 case SCH_NODE_TYPE_ENC
:
1376 tq_choke(sch
->enc
[dst
->idx
].queue
, choked
);
1378 case SCH_NODE_TYPE_MUX
:
1380 case SCH_NODE_TYPE_FILTER_IN
:
1381 fg
= &sch
->filters
[dst
->idx
];
1382 if (fg
->nb_inputs
== 1)
1383 tq_choke(fg
->queue
, choked
);
1386 av_unreachable("Invalid destination node type?");
1392 static void schedule_update_locked(Scheduler
*sch
)
1395 int have_unchoked
= 0;
1397 // on termination request all waiters are choked,
1398 // we are not to unchoke them
1399 if (atomic_load(&sch
->terminate
))
1402 dts
= trailing_dts(sch
, 0);
1404 atomic_store(&sch
->last_dts
, dts
);
1406 // initialize our internal state
1407 for (unsigned type
= 0; type
< 2; type
++)
1408 for (unsigned i
= 0; i
< (type
? sch
->nb_filters
: sch
->nb_demux
); i
++) {
1409 SchWaiter
*w
= type
? &sch
->filters
[i
].waiter
: &sch
->demux
[i
].waiter
;
1410 w
->choked_prev
= atomic_load(&w
->choked
);
1414 // figure out the sources that are allowed to proceed
1415 for (unsigned i
= 0; i
< sch
->nb_mux
; i
++) {
1416 SchMux
*mux
= &sch
->mux
[i
];
1418 for (unsigned j
= 0; j
< mux
->nb_streams
; j
++) {
1419 SchMuxStream
*ms
= &mux
->streams
[j
];
1421 // unblock sources for output streams that are not finished
1422 // and not too far ahead of the trailing stream
1423 if (ms
->source_finished
)
1425 if (dts
== AV_NOPTS_VALUE
&& ms
->last_dts
!= AV_NOPTS_VALUE
)
1427 if (dts
!= AV_NOPTS_VALUE
&& ms
->last_dts
- dts
>= SCHEDULE_TOLERANCE
)
1430 // resolve the source to unchoke
1431 unchoke_for_stream(sch
, ms
->src
);
1436 // also unchoke any sources feeding into closed filter graph inputs, so
1437 // that they can observe the downstream EOF
1438 for (unsigned i
= 0; i
< sch
->nb_filters
; i
++) {
1439 SchFilterGraph
*fg
= &sch
->filters
[i
];
1441 for (unsigned j
= 0; j
< fg
->nb_inputs
; j
++) {
1442 SchFilterIn
*fi
= &fg
->inputs
[j
];
1443 if (fi
->receive_finished
&& !fi
->send_finished
)
1444 unchoke_for_stream(sch
, fi
->src
);
1448 // make sure to unchoke at least one source, if still available
1449 for (unsigned type
= 0; !have_unchoked
&& type
< 2; type
++)
1450 for (unsigned i
= 0; i
< (type
? sch
->nb_filters
: sch
->nb_demux
); i
++) {
1451 int exited
= type
? sch
->filters
[i
].task_exited
: sch
->demux
[i
].task_exited
;
1452 SchWaiter
*w
= type
? &sch
->filters
[i
].waiter
: &sch
->demux
[i
].waiter
;
1460 for (unsigned type
= 0; type
< 2; type
++) {
1461 for (unsigned i
= 0; i
< (type
? sch
->nb_filters
: sch
->nb_demux
); i
++) {
1462 SchWaiter
*w
= type
? &sch
->filters
[i
].waiter
: &sch
->demux
[i
].waiter
;
1463 if (w
->choked_prev
!= w
->choked_next
) {
1464 waiter_set(w
, w
->choked_next
);
1466 choke_demux(sch
, i
, w
->choked_next
);
1479 // Finds the filtergraph or muxer upstream of a scheduler node
1480 static SchedulerNode
src_filtergraph(const Scheduler
*sch
, SchedulerNode src
)
1484 case SCH_NODE_TYPE_DEMUX
:
1485 case SCH_NODE_TYPE_FILTER_OUT
:
1487 case SCH_NODE_TYPE_DEC
:
1488 src
= sch
->dec
[src
.idx
].src
;
1490 case SCH_NODE_TYPE_ENC
:
1491 src
= sch
->enc
[src
.idx
].src
;
1494 av_unreachable("Invalid source node type?");
1495 return (SchedulerNode
) {0};
1501 check_acyclic_for_output(const Scheduler
*sch
, SchedulerNode src
,
1502 uint8_t *filters_visited
, SchedulerNode
*filters_stack
)
1504 unsigned nb_filters_stack
= 0;
1506 memset(filters_visited
, 0, sch
->nb_filters
* sizeof(*filters_visited
));
1509 const SchFilterGraph
*fg
= &sch
->filters
[src
.idx
];
1511 filters_visited
[src
.idx
] = CYCLE_NODE_STARTED
;
1513 // descend into every input, depth first
1514 if (src
.idx_stream
< fg
->nb_inputs
) {
1515 const SchFilterIn
*fi
= &fg
->inputs
[src
.idx_stream
++];
1516 SchedulerNode node
= src_filtergraph(sch
, fi
->src
);
1518 // connected to demuxer, no cycles possible
1519 if (node
.type
== SCH_NODE_TYPE_DEMUX
)
1522 // otherwise connected to another filtergraph
1523 av_assert0(node
.type
== SCH_NODE_TYPE_FILTER_OUT
);
1526 if (filters_visited
[node
.idx
] == CYCLE_NODE_STARTED
)
1527 return AVERROR(EINVAL
);
1529 // place current position on stack and descend
1530 av_assert0(nb_filters_stack
< sch
->nb_filters
);
1531 filters_stack
[nb_filters_stack
++] = src
;
1532 src
= (SchedulerNode
){ .idx
= node
.idx
, .idx_stream
= 0 };
1536 filters_visited
[src
.idx
] = CYCLE_NODE_DONE
;
1538 // previous search finished,
1539 if (nb_filters_stack
) {
1540 src
= filters_stack
[--nb_filters_stack
];
1547 static int check_acyclic(Scheduler
*sch
)
1549 uint8_t *filters_visited
= NULL
;
1550 SchedulerNode
*filters_stack
= NULL
;
1554 if (!sch
->nb_filters
)
1557 filters_visited
= av_malloc_array(sch
->nb_filters
, sizeof(*filters_visited
));
1558 if (!filters_visited
)
1559 return AVERROR(ENOMEM
);
1561 filters_stack
= av_malloc_array(sch
->nb_filters
, sizeof(*filters_stack
));
1562 if (!filters_stack
) {
1563 ret
= AVERROR(ENOMEM
);
1567 // trace the transcoding graph upstream from every filtegraph
1568 for (unsigned i
= 0; i
< sch
->nb_filters
; i
++) {
1569 ret
= check_acyclic_for_output(sch
, (SchedulerNode
){ .idx
= i
},
1570 filters_visited
, filters_stack
);
1572 av_log(&sch
->filters
[i
], AV_LOG_ERROR
, "Transcoding graph has a cycle\n");
1578 av_freep(&filters_visited
);
1579 av_freep(&filters_stack
);
1583 static int start_prepare(Scheduler
*sch
)
1587 for (unsigned i
= 0; i
< sch
->nb_demux
; i
++) {
1588 SchDemux
*d
= &sch
->demux
[i
];
1590 for (unsigned j
= 0; j
< d
->nb_streams
; j
++) {
1591 SchDemuxStream
*ds
= &d
->streams
[j
];
1594 av_log(d
, AV_LOG_ERROR
,
1595 "Demuxer stream %u not connected to any sink\n", j
);
1596 return AVERROR(EINVAL
);
1599 ds
->dst_finished
= av_calloc(ds
->nb_dst
, sizeof(*ds
->dst_finished
));
1600 if (!ds
->dst_finished
)
1601 return AVERROR(ENOMEM
);
1605 for (unsigned i
= 0; i
< sch
->nb_dec
; i
++) {
1606 SchDec
*dec
= &sch
->dec
[i
];
1608 if (!dec
->src
.type
) {
1609 av_log(dec
, AV_LOG_ERROR
,
1610 "Decoder not connected to a source\n");
1611 return AVERROR(EINVAL
);
1614 for (unsigned j
= 0; j
< dec
->nb_outputs
; j
++) {
1615 SchDecOutput
*o
= &dec
->outputs
[j
];
1618 av_log(dec
, AV_LOG_ERROR
,
1619 "Decoder output %u not connected to any sink\n", j
);
1620 return AVERROR(EINVAL
);
1623 o
->dst_finished
= av_calloc(o
->nb_dst
, sizeof(*o
->dst_finished
));
1624 if (!o
->dst_finished
)
1625 return AVERROR(ENOMEM
);
1629 for (unsigned i
= 0; i
< sch
->nb_enc
; i
++) {
1630 SchEnc
*enc
= &sch
->enc
[i
];
1632 if (!enc
->src
.type
) {
1633 av_log(enc
, AV_LOG_ERROR
,
1634 "Encoder not connected to a source\n");
1635 return AVERROR(EINVAL
);
1638 av_log(enc
, AV_LOG_ERROR
,
1639 "Encoder not connected to any sink\n");
1640 return AVERROR(EINVAL
);
1643 enc
->dst_finished
= av_calloc(enc
->nb_dst
, sizeof(*enc
->dst_finished
));
1644 if (!enc
->dst_finished
)
1645 return AVERROR(ENOMEM
);
1648 for (unsigned i
= 0; i
< sch
->nb_mux
; i
++) {
1649 SchMux
*mux
= &sch
->mux
[i
];
1651 for (unsigned j
= 0; j
< mux
->nb_streams
; j
++) {
1652 SchMuxStream
*ms
= &mux
->streams
[j
];
1654 if (!ms
->src
.type
) {
1655 av_log(mux
, AV_LOG_ERROR
,
1656 "Muxer stream #%u not connected to a source\n", j
);
1657 return AVERROR(EINVAL
);
1661 ret
= queue_alloc(&mux
->queue
, mux
->nb_streams
, mux
->queue_size
,
1667 for (unsigned i
= 0; i
< sch
->nb_filters
; i
++) {
1668 SchFilterGraph
*fg
= &sch
->filters
[i
];
1670 for (unsigned j
= 0; j
< fg
->nb_inputs
; j
++) {
1671 SchFilterIn
*fi
= &fg
->inputs
[j
];
1673 if (!fi
->src
.type
) {
1674 av_log(fg
, AV_LOG_ERROR
,
1675 "Filtergraph input %u not connected to a source\n", j
);
1676 return AVERROR(EINVAL
);
1680 for (unsigned j
= 0; j
< fg
->nb_outputs
; j
++) {
1681 SchFilterOut
*fo
= &fg
->outputs
[j
];
1683 if (!fo
->dst
.type
) {
1684 av_log(fg
, AV_LOG_ERROR
,
1685 "Filtergraph %u output %u not connected to a sink\n", i
, j
);
1686 return AVERROR(EINVAL
);
1691 // Check that the transcoding graph has no cycles.
1692 ret
= check_acyclic(sch
);
1699 int sch_start(Scheduler
*sch
)
1703 ret
= start_prepare(sch
);
1707 av_assert0(sch
->state
== SCH_STATE_UNINIT
);
1708 sch
->state
= SCH_STATE_STARTED
;
1710 for (unsigned i
= 0; i
< sch
->nb_mux
; i
++) {
1711 SchMux
*mux
= &sch
->mux
[i
];
1713 if (mux
->nb_streams_ready
== mux
->nb_streams
) {
1714 ret
= mux_init(sch
, mux
);
1720 for (unsigned i
= 0; i
< sch
->nb_enc
; i
++) {
1721 SchEnc
*enc
= &sch
->enc
[i
];
1723 ret
= task_start(&enc
->task
);
1728 for (unsigned i
= 0; i
< sch
->nb_filters
; i
++) {
1729 SchFilterGraph
*fg
= &sch
->filters
[i
];
1731 ret
= task_start(&fg
->task
);
1736 for (unsigned i
= 0; i
< sch
->nb_dec
; i
++) {
1737 SchDec
*dec
= &sch
->dec
[i
];
1739 ret
= task_start(&dec
->task
);
1744 for (unsigned i
= 0; i
< sch
->nb_demux
; i
++) {
1745 SchDemux
*d
= &sch
->demux
[i
];
1750 ret
= task_start(&d
->task
);
1755 pthread_mutex_lock(&sch
->schedule_lock
);
1756 schedule_update_locked(sch
);
1757 pthread_mutex_unlock(&sch
->schedule_lock
);
1761 sch_stop(sch
, NULL
);
1765 int sch_wait(Scheduler
*sch
, uint64_t timeout_us
, int64_t *transcode_ts
)
1769 // convert delay to absolute timestamp
1770 timeout_us
+= av_gettime();
1772 pthread_mutex_lock(&sch
->finish_lock
);
1774 if (sch
->nb_mux_done
< sch
->nb_mux
) {
1775 struct timespec tv
= { .tv_sec
= timeout_us
/ 1000000,
1776 .tv_nsec
= (timeout_us
% 1000000) * 1000 };
1777 pthread_cond_timedwait(&sch
->finish_cond
, &sch
->finish_lock
, &tv
);
1780 // abort transcoding if any task failed
1781 ret
= sch
->nb_mux_done
== sch
->nb_mux
|| sch
->task_failed
;
1783 pthread_mutex_unlock(&sch
->finish_lock
);
1785 *transcode_ts
= atomic_load(&sch
->last_dts
);
1790 static int enc_open(Scheduler
*sch
, SchEnc
*enc
, const AVFrame
*frame
)
1794 ret
= enc
->open_cb(enc
->task
.func_arg
, frame
);
1798 // ret>0 signals audio frame size, which means sync queue must
1799 // have been enabled during encoder creation
1803 av_assert0(enc
->sq_idx
[0] >= 0);
1804 sq
= &sch
->sq_enc
[enc
->sq_idx
[0]];
1806 pthread_mutex_lock(&sq
->lock
);
1808 sq_frame_samples(sq
->sq
, enc
->sq_idx
[1], ret
);
1810 pthread_mutex_unlock(&sq
->lock
);
1816 static int send_to_enc_thread(Scheduler
*sch
, SchEnc
*enc
, AVFrame
*frame
)
1821 tq_send_finish(enc
->queue
, 0);
1825 if (enc
->in_finished
)
1828 ret
= tq_send(enc
->queue
, 0, frame
);
1830 enc
->in_finished
= 1;
1835 static int send_to_enc_sq(Scheduler
*sch
, SchEnc
*enc
, AVFrame
*frame
)
1837 SchSyncQueue
*sq
= &sch
->sq_enc
[enc
->sq_idx
[0]];
1840 // inform the scheduling code that no more input will arrive along this path;
1841 // this is necessary because the sync queue may not send an EOF downstream
1842 // until other streams finish
1843 // TODO: consider a cleaner way of passing this information through
1846 for (unsigned i
= 0; i
< enc
->nb_dst
; i
++) {
1850 if (enc
->dst
[i
].type
!= SCH_NODE_TYPE_MUX
)
1853 mux
= &sch
->mux
[enc
->dst
[i
].idx
];
1854 ms
= &mux
->streams
[enc
->dst
[i
].idx_stream
];
1856 pthread_mutex_lock(&sch
->schedule_lock
);
1858 ms
->source_finished
= 1;
1859 schedule_update_locked(sch
);
1861 pthread_mutex_unlock(&sch
->schedule_lock
);
1865 pthread_mutex_lock(&sq
->lock
);
1867 ret
= sq_send(sq
->sq
, enc
->sq_idx
[1], SQFRAME(frame
));
1874 // TODO: the SQ API should be extended to allow returning EOF
1875 // for individual streams
1876 ret
= sq_receive(sq
->sq
, -1, SQFRAME(sq
->frame
));
1878 ret
= (ret
== AVERROR(EAGAIN
)) ? 0 : ret
;
1882 enc
= &sch
->enc
[sq
->enc_idx
[ret
]];
1883 ret
= send_to_enc_thread(sch
, enc
, sq
->frame
);
1885 av_frame_unref(sq
->frame
);
1886 if (ret
!= AVERROR_EOF
)
1889 sq_send(sq
->sq
, enc
->sq_idx
[1], SQFRAME(NULL
));
1895 // close all encoders fed from this sync queue
1896 for (unsigned i
= 0; i
< sq
->nb_enc_idx
; i
++) {
1897 int err
= send_to_enc_thread(sch
, &sch
->enc
[sq
->enc_idx
[i
]], NULL
);
1899 // if the sync queue error is EOF and closing the encoder
1900 // produces a more serious error, make sure to pick the latter
1901 ret
= err_merge((ret
== AVERROR_EOF
&& err
< 0) ? 0 : ret
, err
);
1906 pthread_mutex_unlock(&sq
->lock
);
1911 static int send_to_enc(Scheduler
*sch
, SchEnc
*enc
, AVFrame
*frame
)
1913 if (enc
->open_cb
&& frame
&& !enc
->opened
) {
1914 int ret
= enc_open(sch
, enc
, frame
);
1919 // discard empty frames that only carry encoder init parameters
1920 if (!frame
->buf
[0]) {
1921 av_frame_unref(frame
);
1926 return (enc
->sq_idx
[0] >= 0) ?
1927 send_to_enc_sq (sch
, enc
, frame
) :
1928 send_to_enc_thread(sch
, enc
, frame
);
1931 static int mux_queue_packet(SchMux
*mux
, SchMuxStream
*ms
, AVPacket
*pkt
)
1933 PreMuxQueue
*q
= &ms
->pre_mux_queue
;
1934 AVPacket
*tmp_pkt
= NULL
;
1937 if (!av_fifo_can_write(q
->fifo
)) {
1938 size_t packets
= av_fifo_can_read(q
->fifo
);
1939 size_t pkt_size
= pkt
? pkt
->size
: 0;
1940 int thresh_reached
= (q
->data_size
+ pkt_size
) > q
->data_threshold
;
1941 size_t max_packets
= thresh_reached
? q
->max_packets
: SIZE_MAX
;
1942 size_t new_size
= FFMIN(2 * packets
, max_packets
);
1944 if (new_size
<= packets
) {
1945 av_log(mux
, AV_LOG_ERROR
,
1946 "Too many packets buffered for output stream.\n");
1947 return AVERROR_BUFFER_TOO_SMALL
;
1949 ret
= av_fifo_grow2(q
->fifo
, new_size
- packets
);
1955 tmp_pkt
= av_packet_alloc();
1957 return AVERROR(ENOMEM
);
1959 av_packet_move_ref(tmp_pkt
, pkt
);
1960 q
->data_size
+= tmp_pkt
->size
;
1962 av_fifo_write(q
->fifo
, &tmp_pkt
, 1);
1967 static int send_to_mux(Scheduler
*sch
, SchMux
*mux
, unsigned stream_idx
,
1970 SchMuxStream
*ms
= &mux
->streams
[stream_idx
];
1971 int64_t dts
= (pkt
&& pkt
->dts
!= AV_NOPTS_VALUE
) ?
1972 av_rescale_q(pkt
->dts
+ pkt
->duration
, pkt
->time_base
, AV_TIME_BASE_Q
) :
1975 // queue the packet if the muxer cannot be started yet
1976 if (!atomic_load(&mux
->mux_started
)) {
1979 // the muxer could have started between the above atomic check and
1980 // locking the mutex, then this block falls through to normal send path
1981 pthread_mutex_lock(&sch
->mux_ready_lock
);
1983 if (!atomic_load(&mux
->mux_started
)) {
1984 int ret
= mux_queue_packet(mux
, ms
, pkt
);
1985 queued
= ret
< 0 ? ret
: 1;
1988 pthread_mutex_unlock(&sch
->mux_ready_lock
);
1993 goto update_schedule
;
2002 ret
= tq_send(mux
->queue
, stream_idx
, pkt
);
2006 tq_send_finish(mux
->queue
, stream_idx
);
2009 // TODO: use atomics to check whether this changes trailing dts
2010 // to avoid locking unnecessarily
2011 if (dts
!= AV_NOPTS_VALUE
|| !pkt
) {
2012 pthread_mutex_lock(&sch
->schedule_lock
);
2014 if (pkt
) ms
->last_dts
= dts
;
2015 else ms
->source_finished
= 1;
2017 schedule_update_locked(sch
);
2019 pthread_mutex_unlock(&sch
->schedule_lock
);
2026 demux_stream_send_to_dst(Scheduler
*sch
, const SchedulerNode dst
,
2027 uint8_t *dst_finished
, AVPacket
*pkt
, unsigned flags
)
2034 if (pkt
&& dst
.type
== SCH_NODE_TYPE_MUX
&&
2035 (flags
& DEMUX_SEND_STREAMCOPY_EOF
)) {
2036 av_packet_unref(pkt
);
2043 ret
= (dst
.type
== SCH_NODE_TYPE_MUX
) ?
2044 send_to_mux(sch
, &sch
->mux
[dst
.idx
], dst
.idx_stream
, pkt
) :
2045 tq_send(sch
->dec
[dst
.idx
].queue
, 0, pkt
);
2046 if (ret
== AVERROR_EOF
)
2052 if (dst
.type
== SCH_NODE_TYPE_MUX
)
2053 send_to_mux(sch
, &sch
->mux
[dst
.idx
], dst
.idx_stream
, NULL
);
2055 tq_send_finish(sch
->dec
[dst
.idx
].queue
, 0);
2061 static int demux_send_for_stream(Scheduler
*sch
, SchDemux
*d
, SchDemuxStream
*ds
,
2062 AVPacket
*pkt
, unsigned flags
)
2064 unsigned nb_done
= 0;
2066 for (unsigned i
= 0; i
< ds
->nb_dst
; i
++) {
2067 AVPacket
*to_send
= pkt
;
2068 uint8_t *finished
= &ds
->dst_finished
[i
];
2072 // sending a packet consumes it, so make a temporary reference if needed
2073 if (pkt
&& i
< ds
->nb_dst
- 1) {
2074 to_send
= d
->send_pkt
;
2076 ret
= av_packet_ref(to_send
, pkt
);
2081 ret
= demux_stream_send_to_dst(sch
, ds
->dst
[i
], finished
, to_send
, flags
);
2083 av_packet_unref(to_send
);
2084 if (ret
== AVERROR_EOF
)
2090 return (nb_done
== ds
->nb_dst
) ? AVERROR_EOF
: 0;
2093 static int demux_flush(Scheduler
*sch
, SchDemux
*d
, AVPacket
*pkt
)
2095 Timestamp max_end_ts
= (Timestamp
){ .ts
= AV_NOPTS_VALUE
};
2097 av_assert0(!pkt
->buf
&& !pkt
->data
&& !pkt
->side_data_elems
);
2099 for (unsigned i
= 0; i
< d
->nb_streams
; i
++) {
2100 SchDemuxStream
*ds
= &d
->streams
[i
];
2102 for (unsigned j
= 0; j
< ds
->nb_dst
; j
++) {
2103 const SchedulerNode
*dst
= &ds
->dst
[j
];
2107 if (ds
->dst_finished
[j
] || dst
->type
!= SCH_NODE_TYPE_DEC
)
2110 dec
= &sch
->dec
[dst
->idx
];
2112 ret
= tq_send(dec
->queue
, 0, pkt
);
2116 if (dec
->queue_end_ts
) {
2118 ret
= av_thread_message_queue_recv(dec
->queue_end_ts
, &ts
, 0);
2122 if (max_end_ts
.ts
== AV_NOPTS_VALUE
||
2123 (ts
.ts
!= AV_NOPTS_VALUE
&&
2124 av_compare_ts(max_end_ts
.ts
, max_end_ts
.tb
, ts
.ts
, ts
.tb
) < 0))
2131 pkt
->pts
= max_end_ts
.ts
;
2132 pkt
->time_base
= max_end_ts
.tb
;
2137 int sch_demux_send(Scheduler
*sch
, unsigned demux_idx
, AVPacket
*pkt
,
2143 av_assert0(demux_idx
< sch
->nb_demux
);
2144 d
= &sch
->demux
[demux_idx
];
2146 terminate
= waiter_wait(sch
, &d
->waiter
);
2148 return AVERROR_EXIT
;
2150 // flush the downstreams after seek
2151 if (pkt
->stream_index
== -1)
2152 return demux_flush(sch
, d
, pkt
);
2154 av_assert0(pkt
->stream_index
< d
->nb_streams
);
2156 return demux_send_for_stream(sch
, d
, &d
->streams
[pkt
->stream_index
], pkt
, flags
);
2159 static int demux_done(Scheduler
*sch
, unsigned demux_idx
)
2161 SchDemux
*d
= &sch
->demux
[demux_idx
];
2164 for (unsigned i
= 0; i
< d
->nb_streams
; i
++) {
2165 int err
= demux_send_for_stream(sch
, d
, &d
->streams
[i
], NULL
, 0);
2166 if (err
!= AVERROR_EOF
)
2167 ret
= err_merge(ret
, err
);
2170 pthread_mutex_lock(&sch
->schedule_lock
);
2174 schedule_update_locked(sch
);
2176 pthread_mutex_unlock(&sch
->schedule_lock
);
2181 int sch_mux_receive(Scheduler
*sch
, unsigned mux_idx
, AVPacket
*pkt
)
2184 int ret
, stream_idx
;
2186 av_assert0(mux_idx
< sch
->nb_mux
);
2187 mux
= &sch
->mux
[mux_idx
];
2189 ret
= tq_receive(mux
->queue
, &stream_idx
, pkt
);
2190 pkt
->stream_index
= stream_idx
;
2194 void sch_mux_receive_finish(Scheduler
*sch
, unsigned mux_idx
, unsigned stream_idx
)
2198 av_assert0(mux_idx
< sch
->nb_mux
);
2199 mux
= &sch
->mux
[mux_idx
];
2201 av_assert0(stream_idx
< mux
->nb_streams
);
2202 tq_receive_finish(mux
->queue
, stream_idx
);
2204 pthread_mutex_lock(&sch
->schedule_lock
);
2205 mux
->streams
[stream_idx
].source_finished
= 1;
2207 schedule_update_locked(sch
);
2209 pthread_mutex_unlock(&sch
->schedule_lock
);
2212 int sch_mux_sub_heartbeat(Scheduler
*sch
, unsigned mux_idx
, unsigned stream_idx
,
2213 const AVPacket
*pkt
)
2218 av_assert0(mux_idx
< sch
->nb_mux
);
2219 mux
= &sch
->mux
[mux_idx
];
2221 av_assert0(stream_idx
< mux
->nb_streams
);
2222 ms
= &mux
->streams
[stream_idx
];
2224 for (unsigned i
= 0; i
< ms
->nb_sub_heartbeat_dst
; i
++) {
2225 SchDec
*dst
= &sch
->dec
[ms
->sub_heartbeat_dst
[i
]];
2228 ret
= av_packet_copy_props(mux
->sub_heartbeat_pkt
, pkt
);
2232 tq_send(dst
->queue
, 0, mux
->sub_heartbeat_pkt
);
2238 static int mux_done(Scheduler
*sch
, unsigned mux_idx
)
2240 SchMux
*mux
= &sch
->mux
[mux_idx
];
2242 pthread_mutex_lock(&sch
->schedule_lock
);
2244 for (unsigned i
= 0; i
< mux
->nb_streams
; i
++) {
2245 tq_receive_finish(mux
->queue
, i
);
2246 mux
->streams
[i
].source_finished
= 1;
2249 schedule_update_locked(sch
);
2251 pthread_mutex_unlock(&sch
->schedule_lock
);
2253 pthread_mutex_lock(&sch
->finish_lock
);
2255 av_assert0(sch
->nb_mux_done
< sch
->nb_mux
);
2258 pthread_cond_signal(&sch
->finish_cond
);
2260 pthread_mutex_unlock(&sch
->finish_lock
);
2265 int sch_dec_receive(Scheduler
*sch
, unsigned dec_idx
, AVPacket
*pkt
)
2270 av_assert0(dec_idx
< sch
->nb_dec
);
2271 dec
= &sch
->dec
[dec_idx
];
2273 // the decoder should have given us post-flush end timestamp in pkt
2274 if (dec
->expect_end_ts
) {
2275 Timestamp ts
= (Timestamp
){ .ts
= pkt
->pts
, .tb
= pkt
->time_base
};
2276 ret
= av_thread_message_queue_send(dec
->queue_end_ts
, &ts
, 0);
2280 dec
->expect_end_ts
= 0;
2283 ret
= tq_receive(dec
->queue
, &dummy
, pkt
);
2284 av_assert0(dummy
<= 0);
2286 // got a flush packet, on the next call to this function the decoder
2287 // will give us post-flush end timestamp
2288 if (ret
>= 0 && !pkt
->data
&& !pkt
->side_data_elems
&& dec
->queue_end_ts
)
2289 dec
->expect_end_ts
= 1;
2294 static int send_to_filter(Scheduler
*sch
, SchFilterGraph
*fg
,
2295 unsigned in_idx
, AVFrame
*frame
)
2298 return tq_send(fg
->queue
, in_idx
, frame
);
2300 if (!fg
->inputs
[in_idx
].send_finished
) {
2301 fg
->inputs
[in_idx
].send_finished
= 1;
2302 tq_send_finish(fg
->queue
, in_idx
);
2304 // close the control stream when all actual inputs are done
2305 if (atomic_fetch_add(&fg
->nb_inputs_finished_send
, 1) == fg
->nb_inputs
- 1)
2306 tq_send_finish(fg
->queue
, fg
->nb_inputs
);
2311 static int dec_send_to_dst(Scheduler
*sch
, const SchedulerNode dst
,
2312 uint8_t *dst_finished
, AVFrame
*frame
)
2322 ret
= (dst
.type
== SCH_NODE_TYPE_FILTER_IN
) ?
2323 send_to_filter(sch
, &sch
->filters
[dst
.idx
], dst
.idx_stream
, frame
) :
2324 send_to_enc(sch
, &sch
->enc
[dst
.idx
], frame
);
2325 if (ret
== AVERROR_EOF
)
2331 if (dst
.type
== SCH_NODE_TYPE_FILTER_IN
)
2332 send_to_filter(sch
, &sch
->filters
[dst
.idx
], dst
.idx_stream
, NULL
);
2334 send_to_enc(sch
, &sch
->enc
[dst
.idx
], NULL
);
2341 int sch_dec_send(Scheduler
*sch
, unsigned dec_idx
,
2342 unsigned out_idx
, AVFrame
*frame
)
2347 unsigned nb_done
= 0;
2349 av_assert0(dec_idx
< sch
->nb_dec
);
2350 dec
= &sch
->dec
[dec_idx
];
2352 av_assert0(out_idx
< dec
->nb_outputs
);
2353 o
= &dec
->outputs
[out_idx
];
2355 for (unsigned i
= 0; i
< o
->nb_dst
; i
++) {
2356 uint8_t *finished
= &o
->dst_finished
[i
];
2357 AVFrame
*to_send
= frame
;
2359 // sending a frame consumes it, so make a temporary reference if needed
2360 if (i
< o
->nb_dst
- 1) {
2361 to_send
= dec
->send_frame
;
2363 // frame may sometimes contain props only,
2364 // e.g. to signal EOF timestamp
2365 ret
= frame
->buf
[0] ? av_frame_ref(to_send
, frame
) :
2366 av_frame_copy_props(to_send
, frame
);
2371 ret
= dec_send_to_dst(sch
, o
->dst
[i
], finished
, to_send
);
2373 av_frame_unref(to_send
);
2374 if (ret
== AVERROR_EOF
) {
2382 return (nb_done
== o
->nb_dst
) ? AVERROR_EOF
: 0;
2385 static int dec_done(Scheduler
*sch
, unsigned dec_idx
)
2387 SchDec
*dec
= &sch
->dec
[dec_idx
];
2390 tq_receive_finish(dec
->queue
, 0);
2392 // make sure our source does not get stuck waiting for end timestamps
2393 // that will never arrive
2394 if (dec
->queue_end_ts
)
2395 av_thread_message_queue_set_err_recv(dec
->queue_end_ts
, AVERROR_EOF
);
2397 for (unsigned i
= 0; i
< dec
->nb_outputs
; i
++) {
2398 SchDecOutput
*o
= &dec
->outputs
[i
];
2400 for (unsigned j
= 0; j
< o
->nb_dst
; j
++) {
2401 int err
= dec_send_to_dst(sch
, o
->dst
[j
], &o
->dst_finished
[j
], NULL
);
2402 if (err
< 0 && err
!= AVERROR_EOF
)
2403 ret
= err_merge(ret
, err
);
2410 int sch_enc_receive(Scheduler
*sch
, unsigned enc_idx
, AVFrame
*frame
)
2415 av_assert0(enc_idx
< sch
->nb_enc
);
2416 enc
= &sch
->enc
[enc_idx
];
2418 ret
= tq_receive(enc
->queue
, &dummy
, frame
);
2419 av_assert0(dummy
<= 0);
2424 static int enc_send_to_dst(Scheduler
*sch
, const SchedulerNode dst
,
2425 uint8_t *dst_finished
, AVPacket
*pkt
)
2435 ret
= (dst
.type
== SCH_NODE_TYPE_MUX
) ?
2436 send_to_mux(sch
, &sch
->mux
[dst
.idx
], dst
.idx_stream
, pkt
) :
2437 tq_send(sch
->dec
[dst
.idx
].queue
, 0, pkt
);
2438 if (ret
== AVERROR_EOF
)
2444 if (dst
.type
== SCH_NODE_TYPE_MUX
)
2445 send_to_mux(sch
, &sch
->mux
[dst
.idx
], dst
.idx_stream
, NULL
);
2447 tq_send_finish(sch
->dec
[dst
.idx
].queue
, 0);
2454 int sch_enc_send(Scheduler
*sch
, unsigned enc_idx
, AVPacket
*pkt
)
2459 av_assert0(enc_idx
< sch
->nb_enc
);
2460 enc
= &sch
->enc
[enc_idx
];
2462 for (unsigned i
= 0; i
< enc
->nb_dst
; i
++) {
2463 uint8_t *finished
= &enc
->dst_finished
[i
];
2464 AVPacket
*to_send
= pkt
;
2466 // sending a packet consumes it, so make a temporary reference if needed
2467 if (i
< enc
->nb_dst
- 1) {
2468 to_send
= enc
->send_pkt
;
2470 ret
= av_packet_ref(to_send
, pkt
);
2475 ret
= enc_send_to_dst(sch
, enc
->dst
[i
], finished
, to_send
);
2477 av_packet_unref(to_send
);
2478 if (ret
== AVERROR_EOF
)
2487 static int enc_done(Scheduler
*sch
, unsigned enc_idx
)
2489 SchEnc
*enc
= &sch
->enc
[enc_idx
];
2492 tq_receive_finish(enc
->queue
, 0);
2494 for (unsigned i
= 0; i
< enc
->nb_dst
; i
++) {
2495 int err
= enc_send_to_dst(sch
, enc
->dst
[i
], &enc
->dst_finished
[i
], NULL
);
2496 if (err
< 0 && err
!= AVERROR_EOF
)
2497 ret
= err_merge(ret
, err
);
2503 int sch_filter_receive(Scheduler
*sch
, unsigned fg_idx
,
2504 unsigned *in_idx
, AVFrame
*frame
)
2508 av_assert0(fg_idx
< sch
->nb_filters
);
2509 fg
= &sch
->filters
[fg_idx
];
2511 av_assert0(*in_idx
<= fg
->nb_inputs
);
2513 // update scheduling to account for desired input stream, if it changed
2515 // this check needs no locking because only the filtering thread
2516 // updates this value
2517 if (*in_idx
!= fg
->best_input
) {
2518 pthread_mutex_lock(&sch
->schedule_lock
);
2520 fg
->best_input
= *in_idx
;
2521 schedule_update_locked(sch
);
2523 pthread_mutex_unlock(&sch
->schedule_lock
);
2526 if (*in_idx
== fg
->nb_inputs
) {
2527 int terminate
= waiter_wait(sch
, &fg
->waiter
);
2528 return terminate
? AVERROR_EOF
: AVERROR(EAGAIN
);
2534 ret
= tq_receive(fg
->queue
, &idx
, frame
);
2537 else if (ret
>= 0) {
2542 // disregard EOFs for specific streams - they should always be
2543 // preceded by an EOF frame
2547 void sch_filter_receive_finish(Scheduler
*sch
, unsigned fg_idx
, unsigned in_idx
)
2552 av_assert0(fg_idx
< sch
->nb_filters
);
2553 fg
= &sch
->filters
[fg_idx
];
2555 av_assert0(in_idx
< fg
->nb_inputs
);
2556 fi
= &fg
->inputs
[in_idx
];
2558 pthread_mutex_lock(&sch
->schedule_lock
);
2560 if (!fi
->receive_finished
) {
2561 fi
->receive_finished
= 1;
2562 tq_receive_finish(fg
->queue
, in_idx
);
2564 // close the control stream when all actual inputs are done
2565 if (++fg
->nb_inputs_finished_receive
== fg
->nb_inputs
)
2566 tq_receive_finish(fg
->queue
, fg
->nb_inputs
);
2568 schedule_update_locked(sch
);
2571 pthread_mutex_unlock(&sch
->schedule_lock
);
2574 int sch_filter_send(Scheduler
*sch
, unsigned fg_idx
, unsigned out_idx
, AVFrame
*frame
)
2580 av_assert0(fg_idx
< sch
->nb_filters
);
2581 fg
= &sch
->filters
[fg_idx
];
2583 av_assert0(out_idx
< fg
->nb_outputs
);
2584 dst
= fg
->outputs
[out_idx
].dst
;
2586 if (dst
.type
== SCH_NODE_TYPE_ENC
) {
2587 ret
= send_to_enc(sch
, &sch
->enc
[dst
.idx
], frame
);
2588 if (ret
== AVERROR_EOF
)
2589 send_to_enc(sch
, &sch
->enc
[dst
.idx
], NULL
);
2591 ret
= send_to_filter(sch
, &sch
->filters
[dst
.idx
], dst
.idx_stream
, frame
);
2592 if (ret
== AVERROR_EOF
)
2593 send_to_filter(sch
, &sch
->filters
[dst
.idx
], dst
.idx_stream
, NULL
);
2598 static int filter_done(Scheduler
*sch
, unsigned fg_idx
)
2600 SchFilterGraph
*fg
= &sch
->filters
[fg_idx
];
2603 for (unsigned i
= 0; i
<= fg
->nb_inputs
; i
++)
2604 tq_receive_finish(fg
->queue
, i
);
2606 for (unsigned i
= 0; i
< fg
->nb_outputs
; i
++) {
2607 SchedulerNode dst
= fg
->outputs
[i
].dst
;
2608 int err
= (dst
.type
== SCH_NODE_TYPE_ENC
) ?
2609 send_to_enc (sch
, &sch
->enc
[dst
.idx
], NULL
) :
2610 send_to_filter(sch
, &sch
->filters
[dst
.idx
], dst
.idx_stream
, NULL
);
2612 if (err
< 0 && err
!= AVERROR_EOF
)
2613 ret
= err_merge(ret
, err
);
2616 pthread_mutex_lock(&sch
->schedule_lock
);
2618 fg
->task_exited
= 1;
2620 schedule_update_locked(sch
);
2622 pthread_mutex_unlock(&sch
->schedule_lock
);
2627 int sch_filter_command(Scheduler
*sch
, unsigned fg_idx
, AVFrame
*frame
)
2631 av_assert0(fg_idx
< sch
->nb_filters
);
2632 fg
= &sch
->filters
[fg_idx
];
2634 return send_to_filter(sch
, fg
, fg
->nb_inputs
, frame
);
2637 void sch_filter_choke_inputs(Scheduler
*sch
, unsigned fg_idx
)
2640 av_assert0(fg_idx
< sch
->nb_filters
);
2641 fg
= &sch
->filters
[fg_idx
];
2643 pthread_mutex_lock(&sch
->schedule_lock
);
2644 fg
->best_input
= fg
->nb_inputs
;
2645 schedule_update_locked(sch
);
2646 pthread_mutex_unlock(&sch
->schedule_lock
);
2649 static int task_cleanup(Scheduler
*sch
, SchedulerNode node
)
2651 switch (node
.type
) {
2652 case SCH_NODE_TYPE_DEMUX
: return demux_done (sch
, node
.idx
);
2653 case SCH_NODE_TYPE_MUX
: return mux_done (sch
, node
.idx
);
2654 case SCH_NODE_TYPE_DEC
: return dec_done (sch
, node
.idx
);
2655 case SCH_NODE_TYPE_ENC
: return enc_done (sch
, node
.idx
);
2656 case SCH_NODE_TYPE_FILTER_IN
: return filter_done(sch
, node
.idx
);
2657 default: av_assert0(0);
2661 static void *task_wrapper(void *arg
)
2663 SchTask
*task
= arg
;
2664 Scheduler
*sch
= task
->parent
;
2668 ret
= task
->func(task
->func_arg
);
2670 av_log(task
->func_arg
, AV_LOG_ERROR
,
2671 "Task finished with error code: %d (%s)\n", ret
, av_err2str(ret
));
2673 err
= task_cleanup(sch
, task
->node
);
2674 ret
= err_merge(ret
, err
);
2676 // EOF is considered normal termination
2677 if (ret
== AVERROR_EOF
)
2680 pthread_mutex_lock(&sch
->finish_lock
);
2681 sch
->task_failed
= 1;
2682 pthread_cond_signal(&sch
->finish_cond
);
2683 pthread_mutex_unlock(&sch
->finish_lock
);
2686 av_log(task
->func_arg
, ret
< 0 ? AV_LOG_ERROR
: AV_LOG_VERBOSE
,
2687 "Terminating thread with return code %d (%s)\n", ret
,
2688 ret
< 0 ? av_err2str(ret
) : "success");
2690 return (void*)(intptr_t)ret
;
2693 static int task_stop(Scheduler
*sch
, SchTask
*task
)
2701 if (!task
->thread_running
)
2702 return task_cleanup(sch
, task
->node
);
2704 ret
= pthread_join(task
->thread
, &thread_ret
);
2705 av_assert0(ret
== 0);
2707 task
->thread_running
= 0;
2709 return (intptr_t)thread_ret
;
2712 int sch_stop(Scheduler
*sch
, int64_t *finish_ts
)
2716 if (sch
->state
!= SCH_STATE_STARTED
)
2719 atomic_store(&sch
->terminate
, 1);
2721 for (unsigned type
= 0; type
< 2; type
++)
2722 for (unsigned i
= 0; i
< (type
? sch
->nb_demux
: sch
->nb_filters
); i
++) {
2723 SchWaiter
*w
= type
? &sch
->demux
[i
].waiter
: &sch
->filters
[i
].waiter
;
2726 choke_demux(sch
, i
, 0); // unfreeze to allow draining
2729 for (unsigned i
= 0; i
< sch
->nb_demux
; i
++) {
2730 SchDemux
*d
= &sch
->demux
[i
];
2732 err
= task_stop(sch
, &d
->task
);
2733 ret
= err_merge(ret
, err
);
2736 for (unsigned i
= 0; i
< sch
->nb_dec
; i
++) {
2737 SchDec
*dec
= &sch
->dec
[i
];
2739 err
= task_stop(sch
, &dec
->task
);
2740 ret
= err_merge(ret
, err
);
2743 for (unsigned i
= 0; i
< sch
->nb_filters
; i
++) {
2744 SchFilterGraph
*fg
= &sch
->filters
[i
];
2746 err
= task_stop(sch
, &fg
->task
);
2747 ret
= err_merge(ret
, err
);
2750 for (unsigned i
= 0; i
< sch
->nb_enc
; i
++) {
2751 SchEnc
*enc
= &sch
->enc
[i
];
2753 err
= task_stop(sch
, &enc
->task
);
2754 ret
= err_merge(ret
, err
);
2757 for (unsigned i
= 0; i
< sch
->nb_mux
; i
++) {
2758 SchMux
*mux
= &sch
->mux
[i
];
2760 err
= task_stop(sch
, &mux
->task
);
2761 ret
= err_merge(ret
, err
);
2765 *finish_ts
= trailing_dts(sch
, 1);
2767 sch
->state
= SCH_STATE_STOPPED
;