avformat/iamf_parse: fix compilation error
[ffmpeg.git] / fftools / ffmpeg_sched.c
1 /*
2 * Inter-thread scheduling/synchronization.
3 * Copyright (c) 2023 Anton Khirnov
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <stdatomic.h>
23 #include <stddef.h>
24 #include <stdint.h>
25
26 #include "cmdutils.h"
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
31
32 #include "libavcodec/packet.h"
33
34 #include "libavutil/avassert.h"
35 #include "libavutil/error.h"
36 #include "libavutil/fifo.h"
37 #include "libavutil/frame.h"
38 #include "libavutil/mem.h"
39 #include "libavutil/thread.h"
40 #include "libavutil/threadmessage.h"
41 #include "libavutil/time.h"
42
43 // 100 ms
44 // FIXME: some other value? make this dynamic?
45 #define SCHEDULE_TOLERANCE (100 * 1000)
46
47 enum QueueType {
48 QUEUE_PACKETS,
49 QUEUE_FRAMES,
50 };
51
52 typedef struct SchWaiter {
53 pthread_mutex_t lock;
54 pthread_cond_t cond;
55 atomic_int choked;
56
57 // the following are internal state of schedule_update_locked() and must not
58 // be accessed outside of it
59 int choked_prev;
60 int choked_next;
61 } SchWaiter;
62
63 typedef struct SchTask {
64 Scheduler *parent;
65 SchedulerNode node;
66
67 SchThreadFunc func;
68 void *func_arg;
69
70 pthread_t thread;
71 int thread_running;
72 } SchTask;
73
74 typedef struct SchDecOutput {
75 SchedulerNode *dst;
76 uint8_t *dst_finished;
77 unsigned nb_dst;
78 } SchDecOutput;
79
80 typedef struct SchDec {
81 const AVClass *class;
82
83 SchedulerNode src;
84
85 SchDecOutput *outputs;
86 unsigned nb_outputs;
87
88 SchTask task;
89 // Queue for receiving input packets, one stream.
90 ThreadQueue *queue;
91
92 // Queue for sending post-flush end timestamps back to the source
93 AVThreadMessageQueue *queue_end_ts;
94 int expect_end_ts;
95
96 // temporary storage used by sch_dec_send()
97 AVFrame *send_frame;
98 } SchDec;
99
100 typedef struct SchSyncQueue {
101 SyncQueue *sq;
102 AVFrame *frame;
103 pthread_mutex_t lock;
104
105 unsigned *enc_idx;
106 unsigned nb_enc_idx;
107 } SchSyncQueue;
108
109 typedef struct SchEnc {
110 const AVClass *class;
111
112 SchedulerNode src;
113 SchedulerNode *dst;
114 uint8_t *dst_finished;
115 unsigned nb_dst;
116
117 // [0] - index of the sync queue in Scheduler.sq_enc,
118 // [1] - index of this encoder in the sq
119 int sq_idx[2];
120
121 /* Opening encoders is somewhat nontrivial due to their interaction with
122 * sync queues, which are (among other things) responsible for maintaining
123 * constant audio frame size, when it is required by the encoder.
124 *
125 * Opening the encoder requires stream parameters, obtained from the first
126 * frame. However, that frame cannot be properly chunked by the sync queue
127 * without knowing the required frame size, which is only available after
128 * opening the encoder.
129 *
130 * This apparent circular dependency is resolved in the following way:
131 * - the caller creating the encoder gives us a callback which opens the
132 * encoder and returns the required frame size (if any)
133 * - when the first frame is sent to the encoder, the sending thread
134 * - calls this callback, opening the encoder
135 * - passes the returned frame size to the sync queue
136 */
137 int (*open_cb)(void *opaque, const AVFrame *frame);
138 int opened;
139
140 SchTask task;
141 // Queue for receiving input frames, one stream.
142 ThreadQueue *queue;
143 // tq_send() to queue returned EOF
144 int in_finished;
145
146 // temporary storage used by sch_enc_send()
147 AVPacket *send_pkt;
148 } SchEnc;
149
150 typedef struct SchDemuxStream {
151 SchedulerNode *dst;
152 uint8_t *dst_finished;
153 unsigned nb_dst;
154 } SchDemuxStream;
155
156 typedef struct SchDemux {
157 const AVClass *class;
158
159 SchDemuxStream *streams;
160 unsigned nb_streams;
161
162 SchTask task;
163 SchWaiter waiter;
164
165 // temporary storage used by sch_demux_send()
166 AVPacket *send_pkt;
167
168 // protected by schedule_lock
169 int task_exited;
170 } SchDemux;
171
172 typedef struct PreMuxQueue {
173 /**
174 * Queue for buffering the packets before the muxer task can be started.
175 */
176 AVFifo *fifo;
177 /**
178 * Maximum number of packets in fifo.
179 */
180 int max_packets;
181 /*
182 * The size of the AVPackets' buffers in queue.
183 * Updated when a packet is either pushed or pulled from the queue.
184 */
185 size_t data_size;
186 /* Threshold after which max_packets will be in effect */
187 size_t data_threshold;
188 } PreMuxQueue;
189
190 typedef struct SchMuxStream {
191 SchedulerNode src;
192 SchedulerNode src_sched;
193
194 unsigned *sub_heartbeat_dst;
195 unsigned nb_sub_heartbeat_dst;
196
197 PreMuxQueue pre_mux_queue;
198
199 // an EOF was generated while flushing the pre-mux queue
200 int init_eof;
201
202 ////////////////////////////////////////////////////////////
203 // The following are protected by Scheduler.schedule_lock //
204
205 /* dts+duration of the last packet sent to this stream
206 in AV_TIME_BASE_Q */
207 int64_t last_dts;
208 // this stream no longer accepts input
209 int source_finished;
210 ////////////////////////////////////////////////////////////
211 } SchMuxStream;
212
213 typedef struct SchMux {
214 const AVClass *class;
215
216 SchMuxStream *streams;
217 unsigned nb_streams;
218 unsigned nb_streams_ready;
219
220 int (*init)(void *arg);
221
222 SchTask task;
223 /**
224 * Set to 1 after starting the muxer task and flushing the
225 * pre-muxing queues.
226 * Set either before any tasks have started, or with
227 * Scheduler.mux_ready_lock held.
228 */
229 atomic_int mux_started;
230 ThreadQueue *queue;
231 unsigned queue_size;
232
233 AVPacket *sub_heartbeat_pkt;
234 } SchMux;
235
236 typedef struct SchFilterIn {
237 SchedulerNode src;
238 SchedulerNode src_sched;
239 int send_finished;
240 int receive_finished;
241 } SchFilterIn;
242
243 typedef struct SchFilterOut {
244 SchedulerNode dst;
245 } SchFilterOut;
246
247 typedef struct SchFilterGraph {
248 const AVClass *class;
249
250 SchFilterIn *inputs;
251 unsigned nb_inputs;
252 atomic_uint nb_inputs_finished_send;
253 unsigned nb_inputs_finished_receive;
254
255 SchFilterOut *outputs;
256 unsigned nb_outputs;
257
258 SchTask task;
259 // input queue, nb_inputs+1 streams
260 // last stream is control
261 ThreadQueue *queue;
262 SchWaiter waiter;
263
264 // protected by schedule_lock
265 unsigned best_input;
266 int task_exited;
267 } SchFilterGraph;
268
269 enum SchedulerState {
270 SCH_STATE_UNINIT,
271 SCH_STATE_STARTED,
272 SCH_STATE_STOPPED,
273 };
274
275 struct Scheduler {
276 const AVClass *class;
277
278 SchDemux *demux;
279 unsigned nb_demux;
280
281 SchMux *mux;
282 unsigned nb_mux;
283
284 unsigned nb_mux_ready;
285 pthread_mutex_t mux_ready_lock;
286
287 unsigned nb_mux_done;
288 pthread_mutex_t mux_done_lock;
289 pthread_cond_t mux_done_cond;
290
291
292 SchDec *dec;
293 unsigned nb_dec;
294
295 SchEnc *enc;
296 unsigned nb_enc;
297
298 SchSyncQueue *sq_enc;
299 unsigned nb_sq_enc;
300
301 SchFilterGraph *filters;
302 unsigned nb_filters;
303
304 char *sdp_filename;
305 int sdp_auto;
306
307 enum SchedulerState state;
308 atomic_int terminate;
309 atomic_int task_failed;
310
311 pthread_mutex_t schedule_lock;
312
313 atomic_int_least64_t last_dts;
314 };
315
316 /**
317 * Wait until this task is allowed to proceed.
318 *
319 * @retval 0 the caller should proceed
320 * @retval 1 the caller should terminate
321 */
322 static int waiter_wait(Scheduler *sch, SchWaiter *w)
323 {
324 int terminate;
325
326 if (!atomic_load(&w->choked))
327 return 0;
328
329 pthread_mutex_lock(&w->lock);
330
331 while (atomic_load(&w->choked) && !atomic_load(&sch->terminate))
332 pthread_cond_wait(&w->cond, &w->lock);
333
334 terminate = atomic_load(&sch->terminate);
335
336 pthread_mutex_unlock(&w->lock);
337
338 return terminate;
339 }
340
341 static void waiter_set(SchWaiter *w, int choked)
342 {
343 pthread_mutex_lock(&w->lock);
344
345 atomic_store(&w->choked, choked);
346 pthread_cond_signal(&w->cond);
347
348 pthread_mutex_unlock(&w->lock);
349 }
350
351 static int waiter_init(SchWaiter *w)
352 {
353 int ret;
354
355 atomic_init(&w->choked, 0);
356
357 ret = pthread_mutex_init(&w->lock, NULL);
358 if (ret)
359 return AVERROR(ret);
360
361 ret = pthread_cond_init(&w->cond, NULL);
362 if (ret)
363 return AVERROR(ret);
364
365 return 0;
366 }
367
368 static void waiter_uninit(SchWaiter *w)
369 {
370 pthread_mutex_destroy(&w->lock);
371 pthread_cond_destroy(&w->cond);
372 }
373
374 static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
375 enum QueueType type)
376 {
377 ThreadQueue *tq;
378 ObjPool *op;
379
380 if (queue_size <= 0) {
381 if (type == QUEUE_FRAMES)
382 queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE;
383 else
384 queue_size = DEFAULT_PACKET_THREAD_QUEUE_SIZE;
385 }
386
387 if (type == QUEUE_FRAMES) {
388 // This queue length is used in the decoder code to ensure that
389 // there are enough entries in fixed-size frame pools to account
390 // for frames held in queues inside the ffmpeg utility. If this
391 // can ever dynamically change then the corresponding decode
392 // code needs to be updated as well.
393 av_assert0(queue_size == DEFAULT_FRAME_THREAD_QUEUE_SIZE);
394 }
395
396 op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() :
397 objpool_alloc_frames();
398 if (!op)
399 return AVERROR(ENOMEM);
400
401 tq = tq_alloc(nb_streams, queue_size, op,
402 (type == QUEUE_PACKETS) ? pkt_move : frame_move);
403 if (!tq) {
404 objpool_free(&op);
405 return AVERROR(ENOMEM);
406 }
407
408 *ptq = tq;
409 return 0;
410 }
411
412 static void *task_wrapper(void *arg);
413
414 static int task_start(SchTask *task)
415 {
416 int ret;
417
418 av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n");
419
420 av_assert0(!task->thread_running);
421
422 ret = pthread_create(&task->thread, NULL, task_wrapper, task);
423 if (ret) {
424 av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n",
425 strerror(ret));
426 return AVERROR(ret);
427 }
428
429 task->thread_running = 1;
430 return 0;
431 }
432
433 static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx,
434 SchThreadFunc func, void *func_arg)
435 {
436 task->parent = sch;
437
438 task->node.type = type;
439 task->node.idx = idx;
440
441 task->func = func;
442 task->func_arg = func_arg;
443 }
444
445 static int64_t trailing_dts(const Scheduler *sch, int count_finished)
446 {
447 int64_t min_dts = INT64_MAX;
448
449 for (unsigned i = 0; i < sch->nb_mux; i++) {
450 const SchMux *mux = &sch->mux[i];
451
452 for (unsigned j = 0; j < mux->nb_streams; j++) {
453 const SchMuxStream *ms = &mux->streams[j];
454
455 if (ms->source_finished && !count_finished)
456 continue;
457 if (ms->last_dts == AV_NOPTS_VALUE)
458 return AV_NOPTS_VALUE;
459
460 min_dts = FFMIN(min_dts, ms->last_dts);
461 }
462 }
463
464 return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
465 }
466
467 void sch_free(Scheduler **psch)
468 {
469 Scheduler *sch = *psch;
470
471 if (!sch)
472 return;
473
474 sch_stop(sch, NULL);
475
476 for (unsigned i = 0; i < sch->nb_demux; i++) {
477 SchDemux *d = &sch->demux[i];
478
479 for (unsigned j = 0; j < d->nb_streams; j++) {
480 SchDemuxStream *ds = &d->streams[j];
481 av_freep(&ds->dst);
482 av_freep(&ds->dst_finished);
483 }
484 av_freep(&d->streams);
485
486 av_packet_free(&d->send_pkt);
487
488 waiter_uninit(&d->waiter);
489 }
490 av_freep(&sch->demux);
491
492 for (unsigned i = 0; i < sch->nb_mux; i++) {
493 SchMux *mux = &sch->mux[i];
494
495 for (unsigned j = 0; j < mux->nb_streams; j++) {
496 SchMuxStream *ms = &mux->streams[j];
497
498 if (ms->pre_mux_queue.fifo) {
499 AVPacket *pkt;
500 while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0)
501 av_packet_free(&pkt);
502 av_fifo_freep2(&ms->pre_mux_queue.fifo);
503 }
504
505 av_freep(&ms->sub_heartbeat_dst);
506 }
507 av_freep(&mux->streams);
508
509 av_packet_free(&mux->sub_heartbeat_pkt);
510
511 tq_free(&mux->queue);
512 }
513 av_freep(&sch->mux);
514
515 for (unsigned i = 0; i < sch->nb_dec; i++) {
516 SchDec *dec = &sch->dec[i];
517
518 tq_free(&dec->queue);
519
520 av_thread_message_queue_free(&dec->queue_end_ts);
521
522 for (unsigned j = 0; j < dec->nb_outputs; j++) {
523 SchDecOutput *o = &dec->outputs[j];
524
525 av_freep(&o->dst);
526 av_freep(&o->dst_finished);
527 }
528
529 av_freep(&dec->outputs);
530
531 av_frame_free(&dec->send_frame);
532 }
533 av_freep(&sch->dec);
534
535 for (unsigned i = 0; i < sch->nb_enc; i++) {
536 SchEnc *enc = &sch->enc[i];
537
538 tq_free(&enc->queue);
539
540 av_packet_free(&enc->send_pkt);
541
542 av_freep(&enc->dst);
543 av_freep(&enc->dst_finished);
544 }
545 av_freep(&sch->enc);
546
547 for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
548 SchSyncQueue *sq = &sch->sq_enc[i];
549 sq_free(&sq->sq);
550 av_frame_free(&sq->frame);
551 pthread_mutex_destroy(&sq->lock);
552 av_freep(&sq->enc_idx);
553 }
554 av_freep(&sch->sq_enc);
555
556 for (unsigned i = 0; i < sch->nb_filters; i++) {
557 SchFilterGraph *fg = &sch->filters[i];
558
559 tq_free(&fg->queue);
560
561 av_freep(&fg->inputs);
562 av_freep(&fg->outputs);
563
564 waiter_uninit(&fg->waiter);
565 }
566 av_freep(&sch->filters);
567
568 av_freep(&sch->sdp_filename);
569
570 pthread_mutex_destroy(&sch->schedule_lock);
571
572 pthread_mutex_destroy(&sch->mux_ready_lock);
573
574 pthread_mutex_destroy(&sch->mux_done_lock);
575 pthread_cond_destroy(&sch->mux_done_cond);
576
577 av_freep(psch);
578 }
579
580 static const AVClass scheduler_class = {
581 .class_name = "Scheduler",
582 .version = LIBAVUTIL_VERSION_INT,
583 };
584
585 Scheduler *sch_alloc(void)
586 {
587 Scheduler *sch;
588 int ret;
589
590 sch = av_mallocz(sizeof(*sch));
591 if (!sch)
592 return NULL;
593
594 sch->class = &scheduler_class;
595 sch->sdp_auto = 1;
596
597 ret = pthread_mutex_init(&sch->schedule_lock, NULL);
598 if (ret)
599 goto fail;
600
601 ret = pthread_mutex_init(&sch->mux_ready_lock, NULL);
602 if (ret)
603 goto fail;
604
605 ret = pthread_mutex_init(&sch->mux_done_lock, NULL);
606 if (ret)
607 goto fail;
608
609 ret = pthread_cond_init(&sch->mux_done_cond, NULL);
610 if (ret)
611 goto fail;
612
613 return sch;
614 fail:
615 sch_free(&sch);
616 return NULL;
617 }
618
619 int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
620 {
621 av_freep(&sch->sdp_filename);
622 sch->sdp_filename = av_strdup(sdp_filename);
623 return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
624 }
625
626 static const AVClass sch_mux_class = {
627 .class_name = "SchMux",
628 .version = LIBAVUTIL_VERSION_INT,
629 .parent_log_context_offset = offsetof(SchMux, task.func_arg),
630 };
631
632 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
633 void *arg, int sdp_auto, unsigned thread_queue_size)
634 {
635 const unsigned idx = sch->nb_mux;
636
637 SchMux *mux;
638 int ret;
639
640 ret = GROW_ARRAY(sch->mux, sch->nb_mux);
641 if (ret < 0)
642 return ret;
643
644 mux = &sch->mux[idx];
645 mux->class = &sch_mux_class;
646 mux->init = init;
647 mux->queue_size = thread_queue_size;
648
649 task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
650
651 sch->sdp_auto &= sdp_auto;
652
653 return idx;
654 }
655
656 int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
657 {
658 SchMux *mux;
659 SchMuxStream *ms;
660 unsigned stream_idx;
661 int ret;
662
663 av_assert0(mux_idx < sch->nb_mux);
664 mux = &sch->mux[mux_idx];
665
666 ret = GROW_ARRAY(mux->streams, mux->nb_streams);
667 if (ret < 0)
668 return ret;
669 stream_idx = mux->nb_streams - 1;
670
671 ms = &mux->streams[stream_idx];
672
673 ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
674 if (!ms->pre_mux_queue.fifo)
675 return AVERROR(ENOMEM);
676
677 ms->last_dts = AV_NOPTS_VALUE;
678
679 return stream_idx;
680 }
681
682 static const AVClass sch_demux_class = {
683 .class_name = "SchDemux",
684 .version = LIBAVUTIL_VERSION_INT,
685 .parent_log_context_offset = offsetof(SchDemux, task.func_arg),
686 };
687
688 int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
689 {
690 const unsigned idx = sch->nb_demux;
691
692 SchDemux *d;
693 int ret;
694
695 ret = GROW_ARRAY(sch->demux, sch->nb_demux);
696 if (ret < 0)
697 return ret;
698
699 d = &sch->demux[idx];
700
701 task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx);
702
703 d->class = &sch_demux_class;
704 d->send_pkt = av_packet_alloc();
705 if (!d->send_pkt)
706 return AVERROR(ENOMEM);
707
708 ret = waiter_init(&d->waiter);
709 if (ret < 0)
710 return ret;
711
712 return idx;
713 }
714
715 int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
716 {
717 SchDemux *d;
718 int ret;
719
720 av_assert0(demux_idx < sch->nb_demux);
721 d = &sch->demux[demux_idx];
722
723 ret = GROW_ARRAY(d->streams, d->nb_streams);
724 return ret < 0 ? ret : d->nb_streams - 1;
725 }
726
727 int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
728 {
729 SchDec *dec;
730 int ret;
731
732 av_assert0(dec_idx < sch->nb_dec);
733 dec = &sch->dec[dec_idx];
734
735 ret = GROW_ARRAY(dec->outputs, dec->nb_outputs);
736 if (ret < 0)
737 return ret;
738
739 return dec->nb_outputs - 1;
740 }
741
742 static const AVClass sch_dec_class = {
743 .class_name = "SchDec",
744 .version = LIBAVUTIL_VERSION_INT,
745 .parent_log_context_offset = offsetof(SchDec, task.func_arg),
746 };
747
748 int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
749 {
750 const unsigned idx = sch->nb_dec;
751
752 SchDec *dec;
753 int ret;
754
755 ret = GROW_ARRAY(sch->dec, sch->nb_dec);
756 if (ret < 0)
757 return ret;
758
759 dec = &sch->dec[idx];
760
761 task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx);
762
763 dec->class = &sch_dec_class;
764 dec->send_frame = av_frame_alloc();
765 if (!dec->send_frame)
766 return AVERROR(ENOMEM);
767
768 ret = sch_add_dec_output(sch, idx);
769 if (ret < 0)
770 return ret;
771
772 ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
773 if (ret < 0)
774 return ret;
775
776 if (send_end_ts) {
777 ret = av_thread_message_queue_alloc(&dec->queue_end_ts, 1, sizeof(Timestamp));
778 if (ret < 0)
779 return ret;
780 }
781
782 return idx;
783 }
784
785 static const AVClass sch_enc_class = {
786 .class_name = "SchEnc",
787 .version = LIBAVUTIL_VERSION_INT,
788 .parent_log_context_offset = offsetof(SchEnc, task.func_arg),
789 };
790
791 int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
792 int (*open_cb)(void *opaque, const AVFrame *frame))
793 {
794 const unsigned idx = sch->nb_enc;
795
796 SchEnc *enc;
797 int ret;
798
799 ret = GROW_ARRAY(sch->enc, sch->nb_enc);
800 if (ret < 0)
801 return ret;
802
803 enc = &sch->enc[idx];
804
805 enc->class = &sch_enc_class;
806 enc->open_cb = open_cb;
807 enc->sq_idx[0] = -1;
808 enc->sq_idx[1] = -1;
809
810 task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
811
812 enc->send_pkt = av_packet_alloc();
813 if (!enc->send_pkt)
814 return AVERROR(ENOMEM);
815
816 ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
817 if (ret < 0)
818 return ret;
819
820 return idx;
821 }
822
823 static const AVClass sch_fg_class = {
824 .class_name = "SchFilterGraph",
825 .version = LIBAVUTIL_VERSION_INT,
826 .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg),
827 };
828
829 int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
830 SchThreadFunc func, void *ctx)
831 {
832 const unsigned idx = sch->nb_filters;
833
834 SchFilterGraph *fg;
835 int ret;
836
837 ret = GROW_ARRAY(sch->filters, sch->nb_filters);
838 if (ret < 0)
839 return ret;
840 fg = &sch->filters[idx];
841
842 fg->class = &sch_fg_class;
843
844 task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx);
845
846 if (nb_inputs) {
847 fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
848 if (!fg->inputs)
849 return AVERROR(ENOMEM);
850 fg->nb_inputs = nb_inputs;
851 }
852
853 if (nb_outputs) {
854 fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
855 if (!fg->outputs)
856 return AVERROR(ENOMEM);
857 fg->nb_outputs = nb_outputs;
858 }
859
860 ret = waiter_init(&fg->waiter);
861 if (ret < 0)
862 return ret;
863
864 ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
865 if (ret < 0)
866 return ret;
867
868 return idx;
869 }
870
871 int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
872 {
873 SchSyncQueue *sq;
874 int ret;
875
876 ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
877 if (ret < 0)
878 return ret;
879 sq = &sch->sq_enc[sch->nb_sq_enc - 1];
880
881 sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
882 if (!sq->sq)
883 return AVERROR(ENOMEM);
884
885 sq->frame = av_frame_alloc();
886 if (!sq->frame)
887 return AVERROR(ENOMEM);
888
889 ret = pthread_mutex_init(&sq->lock, NULL);
890 if (ret)
891 return AVERROR(ret);
892
893 return sq - sch->sq_enc;
894 }
895
896 int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
897 int limiting, uint64_t max_frames)
898 {
899 SchSyncQueue *sq;
900 SchEnc *enc;
901 int ret;
902
903 av_assert0(sq_idx < sch->nb_sq_enc);
904 sq = &sch->sq_enc[sq_idx];
905
906 av_assert0(enc_idx < sch->nb_enc);
907 enc = &sch->enc[enc_idx];
908
909 ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
910 if (ret < 0)
911 return ret;
912 sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
913
914 ret = sq_add_stream(sq->sq, limiting);
915 if (ret < 0)
916 return ret;
917
918 enc->sq_idx[0] = sq_idx;
919 enc->sq_idx[1] = ret;
920
921 if (max_frames != INT64_MAX)
922 sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
923
924 return 0;
925 }
926
927 int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
928 {
929 int ret;
930
931 switch (src.type) {
932 case SCH_NODE_TYPE_DEMUX: {
933 SchDemuxStream *ds;
934
935 av_assert0(src.idx < sch->nb_demux &&
936 src.idx_stream < sch->demux[src.idx].nb_streams);
937 ds = &sch->demux[src.idx].streams[src.idx_stream];
938
939 ret = GROW_ARRAY(ds->dst, ds->nb_dst);
940 if (ret < 0)
941 return ret;
942
943 ds->dst[ds->nb_dst - 1] = dst;
944
945 // demuxed packets go to decoding or streamcopy
946 switch (dst.type) {
947 case SCH_NODE_TYPE_DEC: {
948 SchDec *dec;
949
950 av_assert0(dst.idx < sch->nb_dec);
951 dec = &sch->dec[dst.idx];
952
953 av_assert0(!dec->src.type);
954 dec->src = src;
955 break;
956 }
957 case SCH_NODE_TYPE_MUX: {
958 SchMuxStream *ms;
959
960 av_assert0(dst.idx < sch->nb_mux &&
961 dst.idx_stream < sch->mux[dst.idx].nb_streams);
962 ms = &sch->mux[dst.idx].streams[dst.idx_stream];
963
964 av_assert0(!ms->src.type);
965 ms->src = src;
966
967 break;
968 }
969 default: av_assert0(0);
970 }
971
972 break;
973 }
974 case SCH_NODE_TYPE_DEC: {
975 SchDec *dec;
976 SchDecOutput *o;
977
978 av_assert0(src.idx < sch->nb_dec);
979 dec = &sch->dec[src.idx];
980
981 av_assert0(src.idx_stream < dec->nb_outputs);
982 o = &dec->outputs[src.idx_stream];
983
984 ret = GROW_ARRAY(o->dst, o->nb_dst);
985 if (ret < 0)
986 return ret;
987
988 o->dst[o->nb_dst - 1] = dst;
989
990 // decoded frames go to filters or encoding
991 switch (dst.type) {
992 case SCH_NODE_TYPE_FILTER_IN: {
993 SchFilterIn *fi;
994
995 av_assert0(dst.idx < sch->nb_filters &&
996 dst.idx_stream < sch->filters[dst.idx].nb_inputs);
997 fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
998
999 av_assert0(!fi->src.type);
1000 fi->src = src;
1001 break;
1002 }
1003 case SCH_NODE_TYPE_ENC: {
1004 SchEnc *enc;
1005
1006 av_assert0(dst.idx < sch->nb_enc);
1007 enc = &sch->enc[dst.idx];
1008
1009 av_assert0(!enc->src.type);
1010 enc->src = src;
1011 break;
1012 }
1013 default: av_assert0(0);
1014 }
1015
1016 break;
1017 }
1018 case SCH_NODE_TYPE_FILTER_OUT: {
1019 SchFilterOut *fo;
1020
1021 av_assert0(src.idx < sch->nb_filters &&
1022 src.idx_stream < sch->filters[src.idx].nb_outputs);
1023 fo = &sch->filters[src.idx].outputs[src.idx_stream];
1024
1025 av_assert0(!fo->dst.type);
1026 fo->dst = dst;
1027
1028 // filtered frames go to encoding or another filtergraph
1029 switch (dst.type) {
1030 case SCH_NODE_TYPE_ENC: {
1031 SchEnc *enc;
1032
1033 av_assert0(dst.idx < sch->nb_enc);
1034 enc = &sch->enc[dst.idx];
1035
1036 av_assert0(!enc->src.type);
1037 enc->src = src;
1038 break;
1039 }
1040 case SCH_NODE_TYPE_FILTER_IN: {
1041 SchFilterIn *fi;
1042
1043 av_assert0(dst.idx < sch->nb_filters &&
1044 dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1045 fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1046
1047 av_assert0(!fi->src.type);
1048 fi->src = src;
1049 break;
1050 }
1051 default: av_assert0(0);
1052 }
1053
1054
1055 break;
1056 }
1057 case SCH_NODE_TYPE_ENC: {
1058 SchEnc *enc;
1059
1060 av_assert0(src.idx < sch->nb_enc);
1061 enc = &sch->enc[src.idx];
1062
1063 ret = GROW_ARRAY(enc->dst, enc->nb_dst);
1064 if (ret < 0)
1065 return ret;
1066
1067 enc->dst[enc->nb_dst - 1] = dst;
1068
1069 // encoding packets go to muxing or decoding
1070 switch (dst.type) {
1071 case SCH_NODE_TYPE_MUX: {
1072 SchMuxStream *ms;
1073
1074 av_assert0(dst.idx < sch->nb_mux &&
1075 dst.idx_stream < sch->mux[dst.idx].nb_streams);
1076 ms = &sch->mux[dst.idx].streams[dst.idx_stream];
1077
1078 av_assert0(!ms->src.type);
1079 ms->src = src;
1080
1081 break;
1082 }
1083 case SCH_NODE_TYPE_DEC: {
1084 SchDec *dec;
1085
1086 av_assert0(dst.idx < sch->nb_dec);
1087 dec = &sch->dec[dst.idx];
1088
1089 av_assert0(!dec->src.type);
1090 dec->src = src;
1091
1092 break;
1093 }
1094 default: av_assert0(0);
1095 }
1096
1097 break;
1098 }
1099 default: av_assert0(0);
1100 }
1101
1102 return 0;
1103 }
1104
1105 static int mux_task_start(SchMux *mux)
1106 {
1107 int ret = 0;
1108
1109 ret = task_start(&mux->task);
1110 if (ret < 0)
1111 return ret;
1112
1113 /* flush the pre-muxing queues */
1114 for (unsigned i = 0; i < mux->nb_streams; i++) {
1115 SchMuxStream *ms = &mux->streams[i];
1116 AVPacket *pkt;
1117
1118 while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0) {
1119 if (pkt) {
1120 if (!ms->init_eof)
1121 ret = tq_send(mux->queue, i, pkt);
1122 av_packet_free(&pkt);
1123 if (ret == AVERROR_EOF)
1124 ms->init_eof = 1;
1125 else if (ret < 0)
1126 return ret;
1127 } else
1128 tq_send_finish(mux->queue, i);
1129 }
1130 }
1131
1132 atomic_store(&mux->mux_started, 1);
1133
1134 return 0;
1135 }
1136
1137 int print_sdp(const char *filename);
1138
1139 static int mux_init(Scheduler *sch, SchMux *mux)
1140 {
1141 int ret;
1142
1143 ret = mux->init(mux->task.func_arg);
1144 if (ret < 0)
1145 return ret;
1146
1147 sch->nb_mux_ready++;
1148
1149 if (sch->sdp_filename || sch->sdp_auto) {
1150 if (sch->nb_mux_ready < sch->nb_mux)
1151 return 0;
1152
1153 ret = print_sdp(sch->sdp_filename);
1154 if (ret < 0) {
1155 av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n");
1156 return ret;
1157 }
1158
1159 /* SDP is written only after all the muxers are ready, so now we
1160 * start ALL the threads */
1161 for (unsigned i = 0; i < sch->nb_mux; i++) {
1162 ret = mux_task_start(&sch->mux[i]);
1163 if (ret < 0)
1164 return ret;
1165 }
1166 } else {
1167 ret = mux_task_start(mux);
1168 if (ret < 0)
1169 return ret;
1170 }
1171
1172 return 0;
1173 }
1174
1175 void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1176 size_t data_threshold, int max_packets)
1177 {
1178 SchMux *mux;
1179 SchMuxStream *ms;
1180
1181 av_assert0(mux_idx < sch->nb_mux);
1182 mux = &sch->mux[mux_idx];
1183
1184 av_assert0(stream_idx < mux->nb_streams);
1185 ms = &mux->streams[stream_idx];
1186
1187 ms->pre_mux_queue.max_packets = max_packets;
1188 ms->pre_mux_queue.data_threshold = data_threshold;
1189 }
1190
1191 int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
1192 {
1193 SchMux *mux;
1194 int ret = 0;
1195
1196 av_assert0(mux_idx < sch->nb_mux);
1197 mux = &sch->mux[mux_idx];
1198
1199 av_assert0(stream_idx < mux->nb_streams);
1200
1201 pthread_mutex_lock(&sch->mux_ready_lock);
1202
1203 av_assert0(mux->nb_streams_ready < mux->nb_streams);
1204
1205 // this may be called during initialization - do not start
1206 // threads before sch_start() is called
1207 if (++mux->nb_streams_ready == mux->nb_streams &&
1208 sch->state >= SCH_STATE_STARTED)
1209 ret = mux_init(sch, mux);
1210
1211 pthread_mutex_unlock(&sch->mux_ready_lock);
1212
1213 return ret;
1214 }
1215
1216 int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1217 unsigned dec_idx)
1218 {
1219 SchMux *mux;
1220 SchMuxStream *ms;
1221 int ret = 0;
1222
1223 av_assert0(mux_idx < sch->nb_mux);
1224 mux = &sch->mux[mux_idx];
1225
1226 av_assert0(stream_idx < mux->nb_streams);
1227 ms = &mux->streams[stream_idx];
1228
1229 ret = GROW_ARRAY(ms->sub_heartbeat_dst, ms->nb_sub_heartbeat_dst);
1230 if (ret < 0)
1231 return ret;
1232
1233 av_assert0(dec_idx < sch->nb_dec);
1234 ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx;
1235
1236 if (!mux->sub_heartbeat_pkt) {
1237 mux->sub_heartbeat_pkt = av_packet_alloc();
1238 if (!mux->sub_heartbeat_pkt)
1239 return AVERROR(ENOMEM);
1240 }
1241
1242 return 0;
1243 }
1244
1245 static void unchoke_for_stream(Scheduler *sch, SchedulerNode src);
1246
1247 // Unchoke any filter graphs that are downstream of this node, to prevent it
1248 // from getting stuck trying to push data to a full queue
1249 static void unchoke_downstream(Scheduler *sch, SchedulerNode *dst)
1250 {
1251 SchFilterGraph *fg;
1252 SchDec *dec;
1253 SchEnc *enc;
1254 switch (dst->type) {
1255 case SCH_NODE_TYPE_DEC:
1256 dec = &sch->dec[dst->idx];
1257 for (int i = 0; i < dec->nb_outputs; i++)
1258 unchoke_downstream(sch, dec->outputs[i].dst);
1259 break;
1260 case SCH_NODE_TYPE_ENC:
1261 enc = &sch->enc[dst->idx];
1262 for (int i = 0; i < enc->nb_dst; i++)
1263 unchoke_downstream(sch, &enc->dst[i]);
1264 break;
1265 case SCH_NODE_TYPE_MUX:
1266 // muxers are never choked
1267 break;
1268 case SCH_NODE_TYPE_FILTER_IN:
1269 fg = &sch->filters[dst->idx];
1270 if (fg->best_input == fg->nb_inputs) {
1271 fg->waiter.choked_next = 0;
1272 } else {
1273 // ensure that this filter graph is not stuck waiting for
1274 // input from a different upstream demuxer
1275 unchoke_for_stream(sch, fg->inputs[fg->best_input].src_sched);
1276 }
1277 break;
1278 default:
1279 av_assert0(!"Invalid destination node type?");
1280 break;
1281 }
1282 }
1283
1284 static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
1285 {
1286 while (1) {
1287 SchFilterGraph *fg;
1288
1289 // fed directly by a demuxer (i.e. not through a filtergraph)
1290 if (src.type == SCH_NODE_TYPE_DEMUX) {
1291 SchDemux *demux = &sch->demux[src.idx];
1292 if (demux->waiter.choked_next == 0)
1293 return; // prevent infinite loop
1294 demux->waiter.choked_next = 0;
1295 for (int i = 0; i < demux->nb_streams; i++)
1296 unchoke_downstream(sch, demux->streams[i].dst);
1297 return;
1298 }
1299
1300 av_assert0(src.type == SCH_NODE_TYPE_FILTER_OUT);
1301 fg = &sch->filters[src.idx];
1302
1303 // the filtergraph contains internal sources and
1304 // requested to be scheduled directly
1305 if (fg->best_input == fg->nb_inputs) {
1306 fg->waiter.choked_next = 0;
1307 return;
1308 }
1309
1310 src = fg->inputs[fg->best_input].src_sched;
1311 }
1312 }
1313
1314 static void schedule_update_locked(Scheduler *sch)
1315 {
1316 int64_t dts;
1317 int have_unchoked = 0;
1318
1319 // on termination request all waiters are choked,
1320 // we are not to unchoke them
1321 if (atomic_load(&sch->terminate))
1322 return;
1323
1324 dts = trailing_dts(sch, 0);
1325
1326 atomic_store(&sch->last_dts, dts);
1327
1328 // initialize our internal state
1329 for (unsigned type = 0; type < 2; type++)
1330 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1331 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1332 w->choked_prev = atomic_load(&w->choked);
1333 w->choked_next = 1;
1334 }
1335
1336 // figure out the sources that are allowed to proceed
1337 for (unsigned i = 0; i < sch->nb_mux; i++) {
1338 SchMux *mux = &sch->mux[i];
1339
1340 for (unsigned j = 0; j < mux->nb_streams; j++) {
1341 SchMuxStream *ms = &mux->streams[j];
1342
1343 // unblock sources for output streams that are not finished
1344 // and not too far ahead of the trailing stream
1345 if (ms->source_finished)
1346 continue;
1347 if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
1348 continue;
1349 if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
1350 continue;
1351
1352 // resolve the source to unchoke
1353 unchoke_for_stream(sch, ms->src_sched);
1354 have_unchoked = 1;
1355 }
1356 }
1357
1358 // make sure to unchoke at least one source, if still available
1359 for (unsigned type = 0; !have_unchoked && type < 2; type++)
1360 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1361 int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited;
1362 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1363 if (!exited) {
1364 w->choked_next = 0;
1365 have_unchoked = 1;
1366 break;
1367 }
1368 }
1369
1370
1371 for (unsigned type = 0; type < 2; type++)
1372 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1373 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1374 if (w->choked_prev != w->choked_next)
1375 waiter_set(w, w->choked_next);
1376 }
1377
1378 }
1379
1380 enum {
1381 CYCLE_NODE_NEW = 0,
1382 CYCLE_NODE_STARTED,
1383 CYCLE_NODE_DONE,
1384 };
1385
1386 static int
1387 check_acyclic_for_output(const Scheduler *sch, SchedulerNode src,
1388 uint8_t *filters_visited, SchedulerNode *filters_stack)
1389 {
1390 unsigned nb_filters_stack = 0;
1391
1392 memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
1393
1394 while (1) {
1395 const SchFilterGraph *fg = &sch->filters[src.idx];
1396
1397 filters_visited[src.idx] = CYCLE_NODE_STARTED;
1398
1399 // descend into every input, depth first
1400 if (src.idx_stream < fg->nb_inputs) {
1401 const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
1402
1403 // connected to demuxer, no cycles possible
1404 if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX)
1405 continue;
1406
1407 // otherwise connected to another filtergraph
1408 av_assert0(fi->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
1409
1410 // found a cycle
1411 if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED)
1412 return AVERROR(EINVAL);
1413
1414 // place current position on stack and descend
1415 av_assert0(nb_filters_stack < sch->nb_filters);
1416 filters_stack[nb_filters_stack++] = src;
1417 src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 };
1418 continue;
1419 }
1420
1421 filters_visited[src.idx] = CYCLE_NODE_DONE;
1422
1423 // previous search finished,
1424 if (nb_filters_stack) {
1425 src = filters_stack[--nb_filters_stack];
1426 continue;
1427 }
1428 return 0;
1429 }
1430 }
1431
1432 static int check_acyclic(Scheduler *sch)
1433 {
1434 uint8_t *filters_visited = NULL;
1435 SchedulerNode *filters_stack = NULL;
1436
1437 int ret = 0;
1438
1439 if (!sch->nb_filters)
1440 return 0;
1441
1442 filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
1443 if (!filters_visited)
1444 return AVERROR(ENOMEM);
1445
1446 filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
1447 if (!filters_stack) {
1448 ret = AVERROR(ENOMEM);
1449 goto fail;
1450 }
1451
1452 // trace the transcoding graph upstream from every filtegraph
1453 for (unsigned i = 0; i < sch->nb_filters; i++) {
1454 ret = check_acyclic_for_output(sch, (SchedulerNode){ .idx = i },
1455 filters_visited, filters_stack);
1456 if (ret < 0) {
1457 av_log(&sch->filters[i], AV_LOG_ERROR, "Transcoding graph has a cycle\n");
1458 goto fail;
1459 }
1460 }
1461
1462 fail:
1463 av_freep(&filters_visited);
1464 av_freep(&filters_stack);
1465 return ret;
1466 }
1467
1468 static int start_prepare(Scheduler *sch)
1469 {
1470 int ret;
1471
1472 for (unsigned i = 0; i < sch->nb_demux; i++) {
1473 SchDemux *d = &sch->demux[i];
1474
1475 for (unsigned j = 0; j < d->nb_streams; j++) {
1476 SchDemuxStream *ds = &d->streams[j];
1477
1478 if (!ds->nb_dst) {
1479 av_log(d, AV_LOG_ERROR,
1480 "Demuxer stream %u not connected to any sink\n", j);
1481 return AVERROR(EINVAL);
1482 }
1483
1484 ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
1485 if (!ds->dst_finished)
1486 return AVERROR(ENOMEM);
1487 }
1488 }
1489
1490 for (unsigned i = 0; i < sch->nb_dec; i++) {
1491 SchDec *dec = &sch->dec[i];
1492
1493 if (!dec->src.type) {
1494 av_log(dec, AV_LOG_ERROR,
1495 "Decoder not connected to a source\n");
1496 return AVERROR(EINVAL);
1497 }
1498
1499 for (unsigned j = 0; j < dec->nb_outputs; j++) {
1500 SchDecOutput *o = &dec->outputs[j];
1501
1502 if (!o->nb_dst) {
1503 av_log(dec, AV_LOG_ERROR,
1504 "Decoder output %u not connected to any sink\n", j);
1505 return AVERROR(EINVAL);
1506 }
1507
1508 o->dst_finished = av_calloc(o->nb_dst, sizeof(*o->dst_finished));
1509 if (!o->dst_finished)
1510 return AVERROR(ENOMEM);
1511 }
1512 }
1513
1514 for (unsigned i = 0; i < sch->nb_enc; i++) {
1515 SchEnc *enc = &sch->enc[i];
1516
1517 if (!enc->src.type) {
1518 av_log(enc, AV_LOG_ERROR,
1519 "Encoder not connected to a source\n");
1520 return AVERROR(EINVAL);
1521 }
1522 if (!enc->nb_dst) {
1523 av_log(enc, AV_LOG_ERROR,
1524 "Encoder not connected to any sink\n");
1525 return AVERROR(EINVAL);
1526 }
1527
1528 enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished));
1529 if (!enc->dst_finished)
1530 return AVERROR(ENOMEM);
1531 }
1532
1533 for (unsigned i = 0; i < sch->nb_mux; i++) {
1534 SchMux *mux = &sch->mux[i];
1535
1536 for (unsigned j = 0; j < mux->nb_streams; j++) {
1537 SchMuxStream *ms = &mux->streams[j];
1538
1539 switch (ms->src.type) {
1540 case SCH_NODE_TYPE_ENC: {
1541 SchEnc *enc = &sch->enc[ms->src.idx];
1542 if (enc->src.type == SCH_NODE_TYPE_DEC) {
1543 ms->src_sched = sch->dec[enc->src.idx].src;
1544 av_assert0(ms->src_sched.type == SCH_NODE_TYPE_DEMUX);
1545 } else {
1546 ms->src_sched = enc->src;
1547 av_assert0(ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
1548 }
1549 break;
1550 }
1551 case SCH_NODE_TYPE_DEMUX:
1552 ms->src_sched = ms->src;
1553 break;
1554 default:
1555 av_log(mux, AV_LOG_ERROR,
1556 "Muxer stream #%u not connected to a source\n", j);
1557 return AVERROR(EINVAL);
1558 }
1559 }
1560
1561 ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
1562 QUEUE_PACKETS);
1563 if (ret < 0)
1564 return ret;
1565 }
1566
1567 for (unsigned i = 0; i < sch->nb_filters; i++) {
1568 SchFilterGraph *fg = &sch->filters[i];
1569
1570 for (unsigned j = 0; j < fg->nb_inputs; j++) {
1571 SchFilterIn *fi = &fg->inputs[j];
1572 SchDec *dec;
1573
1574 if (!fi->src.type) {
1575 av_log(fg, AV_LOG_ERROR,
1576 "Filtergraph input %u not connected to a source\n", j);
1577 return AVERROR(EINVAL);
1578 }
1579
1580 if (fi->src.type == SCH_NODE_TYPE_FILTER_OUT)
1581 fi->src_sched = fi->src;
1582 else {
1583 av_assert0(fi->src.type == SCH_NODE_TYPE_DEC);
1584 dec = &sch->dec[fi->src.idx];
1585
1586 switch (dec->src.type) {
1587 case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src; break;
1588 case SCH_NODE_TYPE_ENC: fi->src_sched = sch->enc[dec->src.idx].src; break;
1589 default: av_assert0(0);
1590 }
1591 }
1592 }
1593
1594 for (unsigned j = 0; j < fg->nb_outputs; j++) {
1595 SchFilterOut *fo = &fg->outputs[j];
1596
1597 if (!fo->dst.type) {
1598 av_log(fg, AV_LOG_ERROR,
1599 "Filtergraph %u output %u not connected to a sink\n", i, j);
1600 return AVERROR(EINVAL);
1601 }
1602 }
1603 }
1604
1605 // Check that the transcoding graph has no cycles.
1606 ret = check_acyclic(sch);
1607 if (ret < 0)
1608 return ret;
1609
1610 return 0;
1611 }
1612
1613 int sch_start(Scheduler *sch)
1614 {
1615 int ret;
1616
1617 ret = start_prepare(sch);
1618 if (ret < 0)
1619 return ret;
1620
1621 av_assert0(sch->state == SCH_STATE_UNINIT);
1622 sch->state = SCH_STATE_STARTED;
1623
1624 for (unsigned i = 0; i < sch->nb_mux; i++) {
1625 SchMux *mux = &sch->mux[i];
1626
1627 if (mux->nb_streams_ready == mux->nb_streams) {
1628 ret = mux_init(sch, mux);
1629 if (ret < 0)
1630 goto fail;
1631 }
1632 }
1633
1634 for (unsigned i = 0; i < sch->nb_enc; i++) {
1635 SchEnc *enc = &sch->enc[i];
1636
1637 ret = task_start(&enc->task);
1638 if (ret < 0)
1639 goto fail;
1640 }
1641
1642 for (unsigned i = 0; i < sch->nb_filters; i++) {
1643 SchFilterGraph *fg = &sch->filters[i];
1644
1645 ret = task_start(&fg->task);
1646 if (ret < 0)
1647 goto fail;
1648 }
1649
1650 for (unsigned i = 0; i < sch->nb_dec; i++) {
1651 SchDec *dec = &sch->dec[i];
1652
1653 ret = task_start(&dec->task);
1654 if (ret < 0)
1655 goto fail;
1656 }
1657
1658 for (unsigned i = 0; i < sch->nb_demux; i++) {
1659 SchDemux *d = &sch->demux[i];
1660
1661 if (!d->nb_streams)
1662 continue;
1663
1664 ret = task_start(&d->task);
1665 if (ret < 0)
1666 goto fail;
1667 }
1668
1669 pthread_mutex_lock(&sch->schedule_lock);
1670 schedule_update_locked(sch);
1671 pthread_mutex_unlock(&sch->schedule_lock);
1672
1673 return 0;
1674 fail:
1675 sch_stop(sch, NULL);
1676 return ret;
1677 }
1678
1679 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
1680 {
1681 int ret, err;
1682
1683 // convert delay to absolute timestamp
1684 timeout_us += av_gettime();
1685
1686 pthread_mutex_lock(&sch->mux_done_lock);
1687
1688 if (sch->nb_mux_done < sch->nb_mux) {
1689 struct timespec tv = { .tv_sec = timeout_us / 1000000,
1690 .tv_nsec = (timeout_us % 1000000) * 1000 };
1691 pthread_cond_timedwait(&sch->mux_done_cond, &sch->mux_done_lock, &tv);
1692 }
1693
1694 ret = sch->nb_mux_done == sch->nb_mux;
1695
1696 pthread_mutex_unlock(&sch->mux_done_lock);
1697
1698 *transcode_ts = atomic_load(&sch->last_dts);
1699
1700 // abort transcoding if any task failed
1701 err = atomic_load(&sch->task_failed);
1702
1703 return ret || err;
1704 }
1705
1706 static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
1707 {
1708 int ret;
1709
1710 ret = enc->open_cb(enc->task.func_arg, frame);
1711 if (ret < 0)
1712 return ret;
1713
1714 // ret>0 signals audio frame size, which means sync queue must
1715 // have been enabled during encoder creation
1716 if (ret > 0) {
1717 SchSyncQueue *sq;
1718
1719 av_assert0(enc->sq_idx[0] >= 0);
1720 sq = &sch->sq_enc[enc->sq_idx[0]];
1721
1722 pthread_mutex_lock(&sq->lock);
1723
1724 sq_frame_samples(sq->sq, enc->sq_idx[1], ret);
1725
1726 pthread_mutex_unlock(&sq->lock);
1727 }
1728
1729 return 0;
1730 }
1731
1732 static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1733 {
1734 int ret;
1735
1736 if (!frame) {
1737 tq_send_finish(enc->queue, 0);
1738 return 0;
1739 }
1740
1741 if (enc->in_finished)
1742 return AVERROR_EOF;
1743
1744 ret = tq_send(enc->queue, 0, frame);
1745 if (ret < 0)
1746 enc->in_finished = 1;
1747
1748 return ret;
1749 }
1750
1751 static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1752 {
1753 SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]];
1754 int ret = 0;
1755
1756 // inform the scheduling code that no more input will arrive along this path;
1757 // this is necessary because the sync queue may not send an EOF downstream
1758 // until other streams finish
1759 // TODO: consider a cleaner way of passing this information through
1760 // the pipeline
1761 if (!frame) {
1762 for (unsigned i = 0; i < enc->nb_dst; i++) {
1763 SchMux *mux;
1764 SchMuxStream *ms;
1765
1766 if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
1767 continue;
1768
1769 mux = &sch->mux[enc->dst[i].idx];
1770 ms = &mux->streams[enc->dst[i].idx_stream];
1771
1772 pthread_mutex_lock(&sch->schedule_lock);
1773
1774 ms->source_finished = 1;
1775 schedule_update_locked(sch);
1776
1777 pthread_mutex_unlock(&sch->schedule_lock);
1778 }
1779 }
1780
1781 pthread_mutex_lock(&sq->lock);
1782
1783 ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame));
1784 if (ret < 0)
1785 goto finish;
1786
1787 while (1) {
1788 SchEnc *enc;
1789
1790 // TODO: the SQ API should be extended to allow returning EOF
1791 // for individual streams
1792 ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
1793 if (ret < 0) {
1794 ret = (ret == AVERROR(EAGAIN)) ? 0 : ret;
1795 break;
1796 }
1797
1798 enc = &sch->enc[sq->enc_idx[ret]];
1799 ret = send_to_enc_thread(sch, enc, sq->frame);
1800 if (ret < 0) {
1801 av_frame_unref(sq->frame);
1802 if (ret != AVERROR_EOF)
1803 break;
1804
1805 sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
1806 continue;
1807 }
1808 }
1809
1810 if (ret < 0) {
1811 // close all encoders fed from this sync queue
1812 for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
1813 int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
1814
1815 // if the sync queue error is EOF and closing the encoder
1816 // produces a more serious error, make sure to pick the latter
1817 ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
1818 }
1819 }
1820
1821 finish:
1822 pthread_mutex_unlock(&sq->lock);
1823
1824 return ret;
1825 }
1826
1827 static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1828 {
1829 if (enc->open_cb && frame && !enc->opened) {
1830 int ret = enc_open(sch, enc, frame);
1831 if (ret < 0)
1832 return ret;
1833 enc->opened = 1;
1834
1835 // discard empty frames that only carry encoder init parameters
1836 if (!frame->buf[0]) {
1837 av_frame_unref(frame);
1838 return 0;
1839 }
1840 }
1841
1842 return (enc->sq_idx[0] >= 0) ?
1843 send_to_enc_sq (sch, enc, frame) :
1844 send_to_enc_thread(sch, enc, frame);
1845 }
1846
1847 static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
1848 {
1849 PreMuxQueue *q = &ms->pre_mux_queue;
1850 AVPacket *tmp_pkt = NULL;
1851 int ret;
1852
1853 if (!av_fifo_can_write(q->fifo)) {
1854 size_t packets = av_fifo_can_read(q->fifo);
1855 size_t pkt_size = pkt ? pkt->size : 0;
1856 int thresh_reached = (q->data_size + pkt_size) > q->data_threshold;
1857 size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX;
1858 size_t new_size = FFMIN(2 * packets, max_packets);
1859
1860 if (new_size <= packets) {
1861 av_log(mux, AV_LOG_ERROR,
1862 "Too many packets buffered for output stream.\n");
1863 return AVERROR(ENOSPC);
1864 }
1865 ret = av_fifo_grow2(q->fifo, new_size - packets);
1866 if (ret < 0)
1867 return ret;
1868 }
1869
1870 if (pkt) {
1871 tmp_pkt = av_packet_alloc();
1872 if (!tmp_pkt)
1873 return AVERROR(ENOMEM);
1874
1875 av_packet_move_ref(tmp_pkt, pkt);
1876 q->data_size += tmp_pkt->size;
1877 }
1878 av_fifo_write(q->fifo, &tmp_pkt, 1);
1879
1880 return 0;
1881 }
1882
1883 static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
1884 AVPacket *pkt)
1885 {
1886 SchMuxStream *ms = &mux->streams[stream_idx];
1887 int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ?
1888 av_rescale_q(pkt->dts + pkt->duration, pkt->time_base, AV_TIME_BASE_Q) :
1889 AV_NOPTS_VALUE;
1890
1891 // queue the packet if the muxer cannot be started yet
1892 if (!atomic_load(&mux->mux_started)) {
1893 int queued = 0;
1894
1895 // the muxer could have started between the above atomic check and
1896 // locking the mutex, then this block falls through to normal send path
1897 pthread_mutex_lock(&sch->mux_ready_lock);
1898
1899 if (!atomic_load(&mux->mux_started)) {
1900 int ret = mux_queue_packet(mux, ms, pkt);
1901 queued = ret < 0 ? ret : 1;
1902 }
1903
1904 pthread_mutex_unlock(&sch->mux_ready_lock);
1905
1906 if (queued < 0)
1907 return queued;
1908 else if (queued)
1909 goto update_schedule;
1910 }
1911
1912 if (pkt) {
1913 int ret;
1914
1915 if (ms->init_eof)
1916 return AVERROR_EOF;
1917
1918 ret = tq_send(mux->queue, stream_idx, pkt);
1919 if (ret < 0)
1920 return ret;
1921 } else
1922 tq_send_finish(mux->queue, stream_idx);
1923
1924 update_schedule:
1925 // TODO: use atomics to check whether this changes trailing dts
1926 // to avoid locking unnecesarily
1927 if (dts != AV_NOPTS_VALUE || !pkt) {
1928 pthread_mutex_lock(&sch->schedule_lock);
1929
1930 if (pkt) ms->last_dts = dts;
1931 else ms->source_finished = 1;
1932
1933 schedule_update_locked(sch);
1934
1935 pthread_mutex_unlock(&sch->schedule_lock);
1936 }
1937
1938 return 0;
1939 }
1940
1941 static int
1942 demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst,
1943 uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
1944 {
1945 int ret;
1946
1947 if (*dst_finished)
1948 return AVERROR_EOF;
1949
1950 if (pkt && dst.type == SCH_NODE_TYPE_MUX &&
1951 (flags & DEMUX_SEND_STREAMCOPY_EOF)) {
1952 av_packet_unref(pkt);
1953 pkt = NULL;
1954 }
1955
1956 if (!pkt)
1957 goto finish;
1958
1959 ret = (dst.type == SCH_NODE_TYPE_MUX) ?
1960 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
1961 tq_send(sch->dec[dst.idx].queue, 0, pkt);
1962 if (ret == AVERROR_EOF)
1963 goto finish;
1964
1965 return ret;
1966
1967 finish:
1968 if (dst.type == SCH_NODE_TYPE_MUX)
1969 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
1970 else
1971 tq_send_finish(sch->dec[dst.idx].queue, 0);
1972
1973 *dst_finished = 1;
1974 return AVERROR_EOF;
1975 }
1976
1977 static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds,
1978 AVPacket *pkt, unsigned flags)
1979 {
1980 unsigned nb_done = 0;
1981
1982 for (unsigned i = 0; i < ds->nb_dst; i++) {
1983 AVPacket *to_send = pkt;
1984 uint8_t *finished = &ds->dst_finished[i];
1985
1986 int ret;
1987
1988 // sending a packet consumes it, so make a temporary reference if needed
1989 if (pkt && i < ds->nb_dst - 1) {
1990 to_send = d->send_pkt;
1991
1992 ret = av_packet_ref(to_send, pkt);
1993 if (ret < 0)
1994 return ret;
1995 }
1996
1997 ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags);
1998 if (to_send)
1999 av_packet_unref(to_send);
2000 if (ret == AVERROR_EOF)
2001 nb_done++;
2002 else if (ret < 0)
2003 return ret;
2004 }
2005
2006 return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
2007 }
2008
2009 static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
2010 {
2011 Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE };
2012
2013 av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems);
2014
2015 for (unsigned i = 0; i < d->nb_streams; i++) {
2016 SchDemuxStream *ds = &d->streams[i];
2017
2018 for (unsigned j = 0; j < ds->nb_dst; j++) {
2019 const SchedulerNode *dst = &ds->dst[j];
2020 SchDec *dec;
2021 int ret;
2022
2023 if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC)
2024 continue;
2025
2026 dec = &sch->dec[dst->idx];
2027
2028 ret = tq_send(dec->queue, 0, pkt);
2029 if (ret < 0)
2030 return ret;
2031
2032 if (dec->queue_end_ts) {
2033 Timestamp ts;
2034 ret = av_thread_message_queue_recv(dec->queue_end_ts, &ts, 0);
2035 if (ret < 0)
2036 return ret;
2037
2038 if (max_end_ts.ts == AV_NOPTS_VALUE ||
2039 (ts.ts != AV_NOPTS_VALUE &&
2040 av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0))
2041 max_end_ts = ts;
2042
2043 }
2044 }
2045 }
2046
2047 pkt->pts = max_end_ts.ts;
2048 pkt->time_base = max_end_ts.tb;
2049
2050 return 0;
2051 }
2052
2053 int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt,
2054 unsigned flags)
2055 {
2056 SchDemux *d;
2057 int terminate;
2058
2059 av_assert0(demux_idx < sch->nb_demux);
2060 d = &sch->demux[demux_idx];
2061
2062 terminate = waiter_wait(sch, &d->waiter);
2063 if (terminate)
2064 return AVERROR_EXIT;
2065
2066 // flush the downstreams after seek
2067 if (pkt->stream_index == -1)
2068 return demux_flush(sch, d, pkt);
2069
2070 av_assert0(pkt->stream_index < d->nb_streams);
2071
2072 return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags);
2073 }
2074
2075 static int demux_done(Scheduler *sch, unsigned demux_idx)
2076 {
2077 SchDemux *d = &sch->demux[demux_idx];
2078 int ret = 0;
2079
2080 for (unsigned i = 0; i < d->nb_streams; i++) {
2081 int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0);
2082 if (err != AVERROR_EOF)
2083 ret = err_merge(ret, err);
2084 }
2085
2086 pthread_mutex_lock(&sch->schedule_lock);
2087
2088 d->task_exited = 1;
2089
2090 schedule_update_locked(sch);
2091
2092 pthread_mutex_unlock(&sch->schedule_lock);
2093
2094 return ret;
2095 }
2096
2097 int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
2098 {
2099 SchMux *mux;
2100 int ret, stream_idx;
2101
2102 av_assert0(mux_idx < sch->nb_mux);
2103 mux = &sch->mux[mux_idx];
2104
2105 ret = tq_receive(mux->queue, &stream_idx, pkt);
2106 pkt->stream_index = stream_idx;
2107 return ret;
2108 }
2109
2110 void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
2111 {
2112 SchMux *mux;
2113
2114 av_assert0(mux_idx < sch->nb_mux);
2115 mux = &sch->mux[mux_idx];
2116
2117 av_assert0(stream_idx < mux->nb_streams);
2118 tq_receive_finish(mux->queue, stream_idx);
2119
2120 pthread_mutex_lock(&sch->schedule_lock);
2121 mux->streams[stream_idx].source_finished = 1;
2122
2123 schedule_update_locked(sch);
2124
2125 pthread_mutex_unlock(&sch->schedule_lock);
2126 }
2127
2128 int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
2129 const AVPacket *pkt)
2130 {
2131 SchMux *mux;
2132 SchMuxStream *ms;
2133
2134 av_assert0(mux_idx < sch->nb_mux);
2135 mux = &sch->mux[mux_idx];
2136
2137 av_assert0(stream_idx < mux->nb_streams);
2138 ms = &mux->streams[stream_idx];
2139
2140 for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) {
2141 SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]];
2142 int ret;
2143
2144 ret = av_packet_copy_props(mux->sub_heartbeat_pkt, pkt);
2145 if (ret < 0)
2146 return ret;
2147
2148 tq_send(dst->queue, 0, mux->sub_heartbeat_pkt);
2149 }
2150
2151 return 0;
2152 }
2153
2154 static int mux_done(Scheduler *sch, unsigned mux_idx)
2155 {
2156 SchMux *mux = &sch->mux[mux_idx];
2157
2158 pthread_mutex_lock(&sch->schedule_lock);
2159
2160 for (unsigned i = 0; i < mux->nb_streams; i++) {
2161 tq_receive_finish(mux->queue, i);
2162 mux->streams[i].source_finished = 1;
2163 }
2164
2165 schedule_update_locked(sch);
2166
2167 pthread_mutex_unlock(&sch->schedule_lock);
2168
2169 pthread_mutex_lock(&sch->mux_done_lock);
2170
2171 av_assert0(sch->nb_mux_done < sch->nb_mux);
2172 sch->nb_mux_done++;
2173
2174 pthread_cond_signal(&sch->mux_done_cond);
2175
2176 pthread_mutex_unlock(&sch->mux_done_lock);
2177
2178 return 0;
2179 }
2180
2181 int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
2182 {
2183 SchDec *dec;
2184 int ret, dummy;
2185
2186 av_assert0(dec_idx < sch->nb_dec);
2187 dec = &sch->dec[dec_idx];
2188
2189 // the decoder should have given us post-flush end timestamp in pkt
2190 if (dec->expect_end_ts) {
2191 Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
2192 ret = av_thread_message_queue_send(dec->queue_end_ts, &ts, 0);
2193 if (ret < 0)
2194 return ret;
2195
2196 dec->expect_end_ts = 0;
2197 }
2198
2199 ret = tq_receive(dec->queue, &dummy, pkt);
2200 av_assert0(dummy <= 0);
2201
2202 // got a flush packet, on the next call to this function the decoder
2203 // will give us post-flush end timestamp
2204 if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
2205 dec->expect_end_ts = 1;
2206
2207 return ret;
2208 }
2209
2210 static int send_to_filter(Scheduler *sch, SchFilterGraph *fg,
2211 unsigned in_idx, AVFrame *frame)
2212 {
2213 if (frame)
2214 return tq_send(fg->queue, in_idx, frame);
2215
2216 if (!fg->inputs[in_idx].send_finished) {
2217 fg->inputs[in_idx].send_finished = 1;
2218 tq_send_finish(fg->queue, in_idx);
2219
2220 // close the control stream when all actual inputs are done
2221 if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1)
2222 tq_send_finish(fg->queue, fg->nb_inputs);
2223 }
2224 return 0;
2225 }
2226
2227 static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2228 uint8_t *dst_finished, AVFrame *frame)
2229 {
2230 int ret;
2231
2232 if (*dst_finished)
2233 return AVERROR_EOF;
2234
2235 if (!frame)
2236 goto finish;
2237
2238 ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
2239 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
2240 send_to_enc(sch, &sch->enc[dst.idx], frame);
2241 if (ret == AVERROR_EOF)
2242 goto finish;
2243
2244 return ret;
2245
2246 finish:
2247 if (dst.type == SCH_NODE_TYPE_FILTER_IN)
2248 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2249 else
2250 send_to_enc(sch, &sch->enc[dst.idx], NULL);
2251
2252 *dst_finished = 1;
2253
2254 return AVERROR_EOF;
2255 }
2256
2257 int sch_dec_send(Scheduler *sch, unsigned dec_idx,
2258 unsigned out_idx, AVFrame *frame)
2259 {
2260 SchDec *dec;
2261 SchDecOutput *o;
2262 int ret;
2263 unsigned nb_done = 0;
2264
2265 av_assert0(dec_idx < sch->nb_dec);
2266 dec = &sch->dec[dec_idx];
2267
2268 av_assert0(out_idx < dec->nb_outputs);
2269 o = &dec->outputs[out_idx];
2270
2271 for (unsigned i = 0; i < o->nb_dst; i++) {
2272 uint8_t *finished = &o->dst_finished[i];
2273 AVFrame *to_send = frame;
2274
2275 // sending a frame consumes it, so make a temporary reference if needed
2276 if (i < o->nb_dst - 1) {
2277 to_send = dec->send_frame;
2278
2279 // frame may sometimes contain props only,
2280 // e.g. to signal EOF timestamp
2281 ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
2282 av_frame_copy_props(to_send, frame);
2283 if (ret < 0)
2284 return ret;
2285 }
2286
2287 ret = dec_send_to_dst(sch, o->dst[i], finished, to_send);
2288 if (ret < 0) {
2289 av_frame_unref(to_send);
2290 if (ret == AVERROR_EOF) {
2291 nb_done++;
2292 continue;
2293 }
2294 return ret;
2295 }
2296 }
2297
2298 return (nb_done == o->nb_dst) ? AVERROR_EOF : 0;
2299 }
2300
2301 static int dec_done(Scheduler *sch, unsigned dec_idx)
2302 {
2303 SchDec *dec = &sch->dec[dec_idx];
2304 int ret = 0;
2305
2306 tq_receive_finish(dec->queue, 0);
2307
2308 // make sure our source does not get stuck waiting for end timestamps
2309 // that will never arrive
2310 if (dec->queue_end_ts)
2311 av_thread_message_queue_set_err_recv(dec->queue_end_ts, AVERROR_EOF);
2312
2313 for (unsigned i = 0; i < dec->nb_outputs; i++) {
2314 SchDecOutput *o = &dec->outputs[i];
2315
2316 for (unsigned j = 0; j < o->nb_dst; j++) {
2317 int err = dec_send_to_dst(sch, o->dst[j], &o->dst_finished[j], NULL);
2318 if (err < 0 && err != AVERROR_EOF)
2319 ret = err_merge(ret, err);
2320 }
2321 }
2322
2323 return ret;
2324 }
2325
2326 int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
2327 {
2328 SchEnc *enc;
2329 int ret, dummy;
2330
2331 av_assert0(enc_idx < sch->nb_enc);
2332 enc = &sch->enc[enc_idx];
2333
2334 ret = tq_receive(enc->queue, &dummy, frame);
2335 av_assert0(dummy <= 0);
2336
2337 return ret;
2338 }
2339
2340 static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2341 uint8_t *dst_finished, AVPacket *pkt)
2342 {
2343 int ret;
2344
2345 if (*dst_finished)
2346 return AVERROR_EOF;
2347
2348 if (!pkt)
2349 goto finish;
2350
2351 ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2352 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2353 tq_send(sch->dec[dst.idx].queue, 0, pkt);
2354 if (ret == AVERROR_EOF)
2355 goto finish;
2356
2357 return ret;
2358
2359 finish:
2360 if (dst.type == SCH_NODE_TYPE_MUX)
2361 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2362 else
2363 tq_send_finish(sch->dec[dst.idx].queue, 0);
2364
2365 *dst_finished = 1;
2366
2367 return AVERROR_EOF;
2368 }
2369
2370 int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
2371 {
2372 SchEnc *enc;
2373 int ret;
2374
2375 av_assert0(enc_idx < sch->nb_enc);
2376 enc = &sch->enc[enc_idx];
2377
2378 for (unsigned i = 0; i < enc->nb_dst; i++) {
2379 uint8_t *finished = &enc->dst_finished[i];
2380 AVPacket *to_send = pkt;
2381
2382 // sending a packet consumes it, so make a temporary reference if needed
2383 if (i < enc->nb_dst - 1) {
2384 to_send = enc->send_pkt;
2385
2386 ret = av_packet_ref(to_send, pkt);
2387 if (ret < 0)
2388 return ret;
2389 }
2390
2391 ret = enc_send_to_dst(sch, enc->dst[i], finished, to_send);
2392 if (ret < 0) {
2393 av_packet_unref(to_send);
2394 if (ret == AVERROR_EOF)
2395 continue;
2396 return ret;
2397 }
2398 }
2399
2400 return 0;
2401 }
2402
2403 static int enc_done(Scheduler *sch, unsigned enc_idx)
2404 {
2405 SchEnc *enc = &sch->enc[enc_idx];
2406 int ret = 0;
2407
2408 tq_receive_finish(enc->queue, 0);
2409
2410 for (unsigned i = 0; i < enc->nb_dst; i++) {
2411 int err = enc_send_to_dst(sch, enc->dst[i], &enc->dst_finished[i], NULL);
2412 if (err < 0 && err != AVERROR_EOF)
2413 ret = err_merge(ret, err);
2414 }
2415
2416 return ret;
2417 }
2418
2419 int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
2420 unsigned *in_idx, AVFrame *frame)
2421 {
2422 SchFilterGraph *fg;
2423
2424 av_assert0(fg_idx < sch->nb_filters);
2425 fg = &sch->filters[fg_idx];
2426
2427 av_assert0(*in_idx <= fg->nb_inputs);
2428
2429 // update scheduling to account for desired input stream, if it changed
2430 //
2431 // this check needs no locking because only the filtering thread
2432 // updates this value
2433 if (*in_idx != fg->best_input) {
2434 pthread_mutex_lock(&sch->schedule_lock);
2435
2436 fg->best_input = *in_idx;
2437 schedule_update_locked(sch);
2438
2439 pthread_mutex_unlock(&sch->schedule_lock);
2440 }
2441
2442 if (*in_idx == fg->nb_inputs) {
2443 int terminate = waiter_wait(sch, &fg->waiter);
2444 return terminate ? AVERROR_EOF : AVERROR(EAGAIN);
2445 }
2446
2447 while (1) {
2448 int ret, idx;
2449
2450 ret = tq_receive(fg->queue, &idx, frame);
2451 if (idx < 0)
2452 return AVERROR_EOF;
2453 else if (ret >= 0) {
2454 *in_idx = idx;
2455 return 0;
2456 }
2457
2458 // disregard EOFs for specific streams - they should always be
2459 // preceded by an EOF frame
2460 }
2461 }
2462
2463 void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
2464 {
2465 SchFilterGraph *fg;
2466 SchFilterIn *fi;
2467
2468 av_assert0(fg_idx < sch->nb_filters);
2469 fg = &sch->filters[fg_idx];
2470
2471 av_assert0(in_idx < fg->nb_inputs);
2472 fi = &fg->inputs[in_idx];
2473
2474 if (!fi->receive_finished) {
2475 fi->receive_finished = 1;
2476 tq_receive_finish(fg->queue, in_idx);
2477
2478 // close the control stream when all actual inputs are done
2479 if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
2480 tq_receive_finish(fg->queue, fg->nb_inputs);
2481 }
2482 }
2483
2484 int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
2485 {
2486 SchFilterGraph *fg;
2487 SchedulerNode dst;
2488
2489 av_assert0(fg_idx < sch->nb_filters);
2490 fg = &sch->filters[fg_idx];
2491
2492 av_assert0(out_idx < fg->nb_outputs);
2493 dst = fg->outputs[out_idx].dst;
2494
2495 return (dst.type == SCH_NODE_TYPE_ENC) ?
2496 send_to_enc (sch, &sch->enc[dst.idx], frame) :
2497 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame);
2498 }
2499
2500 static int filter_done(Scheduler *sch, unsigned fg_idx)
2501 {
2502 SchFilterGraph *fg = &sch->filters[fg_idx];
2503 int ret = 0;
2504
2505 for (unsigned i = 0; i <= fg->nb_inputs; i++)
2506 tq_receive_finish(fg->queue, i);
2507
2508 for (unsigned i = 0; i < fg->nb_outputs; i++) {
2509 SchedulerNode dst = fg->outputs[i].dst;
2510 int err = (dst.type == SCH_NODE_TYPE_ENC) ?
2511 send_to_enc (sch, &sch->enc[dst.idx], NULL) :
2512 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2513
2514 if (err < 0 && err != AVERROR_EOF)
2515 ret = err_merge(ret, err);
2516 }
2517
2518 pthread_mutex_lock(&sch->schedule_lock);
2519
2520 fg->task_exited = 1;
2521
2522 schedule_update_locked(sch);
2523
2524 pthread_mutex_unlock(&sch->schedule_lock);
2525
2526 return ret;
2527 }
2528
2529 int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
2530 {
2531 SchFilterGraph *fg;
2532
2533 av_assert0(fg_idx < sch->nb_filters);
2534 fg = &sch->filters[fg_idx];
2535
2536 return send_to_filter(sch, fg, fg->nb_inputs, frame);
2537 }
2538
2539 static int task_cleanup(Scheduler *sch, SchedulerNode node)
2540 {
2541 switch (node.type) {
2542 case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx);
2543 case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx);
2544 case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx);
2545 case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx);
2546 case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx);
2547 default: av_assert0(0);
2548 }
2549 }
2550
2551 static void *task_wrapper(void *arg)
2552 {
2553 SchTask *task = arg;
2554 Scheduler *sch = task->parent;
2555 int ret;
2556 int err = 0;
2557
2558 ret = task->func(task->func_arg);
2559 if (ret < 0)
2560 av_log(task->func_arg, AV_LOG_ERROR,
2561 "Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
2562
2563 err = task_cleanup(sch, task->node);
2564 ret = err_merge(ret, err);
2565
2566 // EOF is considered normal termination
2567 if (ret == AVERROR_EOF)
2568 ret = 0;
2569 if (ret < 0)
2570 atomic_store(&sch->task_failed, 1);
2571
2572 av_log(task->func_arg, ret < 0 ? AV_LOG_ERROR : AV_LOG_VERBOSE,
2573 "Terminating thread with return code %d (%s)\n", ret,
2574 ret < 0 ? av_err2str(ret) : "success");
2575
2576 return (void*)(intptr_t)ret;
2577 }
2578
2579 static int task_stop(Scheduler *sch, SchTask *task)
2580 {
2581 int ret;
2582 void *thread_ret;
2583
2584 if (!task->thread_running)
2585 return task_cleanup(sch, task->node);
2586
2587 ret = pthread_join(task->thread, &thread_ret);
2588 av_assert0(ret == 0);
2589
2590 task->thread_running = 0;
2591
2592 return (intptr_t)thread_ret;
2593 }
2594
2595 int sch_stop(Scheduler *sch, int64_t *finish_ts)
2596 {
2597 int ret = 0, err;
2598
2599 if (sch->state != SCH_STATE_STARTED)
2600 return 0;
2601
2602 atomic_store(&sch->terminate, 1);
2603
2604 for (unsigned type = 0; type < 2; type++)
2605 for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
2606 SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
2607 waiter_set(w, 1);
2608 }
2609
2610 for (unsigned i = 0; i < sch->nb_demux; i++) {
2611 SchDemux *d = &sch->demux[i];
2612
2613 err = task_stop(sch, &d->task);
2614 ret = err_merge(ret, err);
2615 }
2616
2617 for (unsigned i = 0; i < sch->nb_dec; i++) {
2618 SchDec *dec = &sch->dec[i];
2619
2620 err = task_stop(sch, &dec->task);
2621 ret = err_merge(ret, err);
2622 }
2623
2624 for (unsigned i = 0; i < sch->nb_filters; i++) {
2625 SchFilterGraph *fg = &sch->filters[i];
2626
2627 err = task_stop(sch, &fg->task);
2628 ret = err_merge(ret, err);
2629 }
2630
2631 for (unsigned i = 0; i < sch->nb_enc; i++) {
2632 SchEnc *enc = &sch->enc[i];
2633
2634 err = task_stop(sch, &enc->task);
2635 ret = err_merge(ret, err);
2636 }
2637
2638 for (unsigned i = 0; i < sch->nb_mux; i++) {
2639 SchMux *mux = &sch->mux[i];
2640
2641 err = task_stop(sch, &mux->task);
2642 ret = err_merge(ret, err);
2643 }
2644
2645 if (finish_ts)
2646 *finish_ts = trailing_dts(sch, 1);
2647
2648 sch->state = SCH_STATE_STOPPED;
2649
2650 return ret;
2651 }