3 * Copyright (c) 2016 Jan Sebechlebsky
5 * This file is part of FFmpeg.
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public License
9 * as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public License
18 * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
19 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22 #include "libavutil/opt.h"
23 #include "libavutil/time.h"
24 #include "libavutil/thread.h"
25 #include "libavutil/threadmessage.h"
29 #define FIFO_DEFAULT_QUEUE_SIZE 60
30 #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS 0
31 #define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 seconds
33 typedef struct FifoContext
{
38 char *format_options_str
;
39 AVDictionary
*format_options
;
42 AVThreadMessageQueue
*queue
;
44 pthread_t writer_thread
;
46 /* Return value of last write_trailer_call */
47 int write_trailer_ret
;
49 /* Time to wait before next recovery attempt
50 * This can refer to the time in processed stream,
52 int64_t recovery_wait_time
;
54 /* Maximal number of unsuccessful successive recovery attempts */
55 int max_recovery_attempts
;
57 /* Whether to attempt recovery from failure */
60 /* If >0 stream time will be used when waiting
61 * for the recovery attempt instead of real time */
62 int recovery_wait_streamtime
;
64 /* If >0 recovery will be attempted regardless of error code
65 * (except AVERROR_EXIT, so exit request is never ignored) */
66 int recover_any_error
;
68 /* Whether to drop packets in case the queue is full. */
69 int drop_pkts_on_overflow
;
71 /* Whether to wait for keyframe when recovering
72 * from failure or queue overflow */
73 int restart_with_keyframe
;
75 pthread_mutex_t overflow_flag_lock
;
76 /* Value > 0 signals queue overflow */
77 volatile uint8_t overflow_flag
;
81 typedef struct FifoThreadContext
{
84 /* Timestamp of last failure.
85 * This is either pts in case stream time is used,
86 * or microseconds as returned by av_getttime_relative() */
87 int64_t last_recovery_ts
;
89 /* Number of current recovery process
90 * Value > 0 means we are in recovery process */
93 /* If > 0 all frames will be dropped until keyframe is received */
94 uint8_t drop_until_keyframe
;
96 /* Value > 0 means that the previous write_header call was successful
97 * so finalization by calling write_trailer and ff_io_close must be done
98 * before exiting / reinitialization of underlying muxer */
99 uint8_t header_written
;
102 typedef enum FifoMessageType
{
108 typedef struct FifoMessage
{
109 FifoMessageType type
;
113 static int fifo_thread_write_header(FifoThreadContext
*ctx
)
115 AVFormatContext
*avf
= ctx
->avf
;
116 FifoContext
*fifo
= avf
->priv_data
;
117 AVFormatContext
*avf2
= fifo
->avf
;
118 AVDictionary
*format_options
= NULL
;
121 ret
= av_dict_copy(&format_options
, fifo
->format_options
, 0);
125 ret
= ff_format_output_open(avf2
, avf
->filename
, &format_options
);
127 av_log(avf
, AV_LOG_ERROR
, "Error opening %s: %s\n", avf
->filename
,
132 for (i
= 0;i
< avf2
->nb_streams
; i
++)
133 avf2
->streams
[i
]->cur_dts
= 0;
135 ret
= avformat_write_header(avf2
, &format_options
);
137 ctx
->header_written
= 1;
139 // Check for options unrecognized by underlying muxer
140 if (format_options
) {
141 AVDictionaryEntry
*entry
= NULL
;
142 while ((entry
= av_dict_get(format_options
, "", entry
, AV_DICT_IGNORE_SUFFIX
)))
143 av_log(avf2
, AV_LOG_ERROR
, "Unknown option '%s'\n", entry
->key
);
144 ret
= AVERROR(EINVAL
);
148 av_dict_free(&format_options
);
152 static int fifo_thread_flush_output(FifoThreadContext
*ctx
)
154 AVFormatContext
*avf
= ctx
->avf
;
155 FifoContext
*fifo
= avf
->priv_data
;
156 AVFormatContext
*avf2
= fifo
->avf
;
158 return av_write_frame(avf2
, NULL
);
161 static int fifo_thread_write_packet(FifoThreadContext
*ctx
, AVPacket
*pkt
)
163 AVFormatContext
*avf
= ctx
->avf
;
164 FifoContext
*fifo
= avf
->priv_data
;
165 AVFormatContext
*avf2
= fifo
->avf
;
166 AVRational src_tb
, dst_tb
;
169 if (ctx
->drop_until_keyframe
) {
170 if (pkt
->flags
& AV_PKT_FLAG_KEY
) {
171 ctx
->drop_until_keyframe
= 0;
172 av_log(avf
, AV_LOG_VERBOSE
, "Keyframe received, recovering...\n");
174 av_log(avf
, AV_LOG_VERBOSE
, "Dropping non-keyframe packet\n");
175 av_packet_unref(pkt
);
180 s_idx
= pkt
->stream_index
;
181 src_tb
= avf
->streams
[s_idx
]->time_base
;
182 dst_tb
= avf2
->streams
[s_idx
]->time_base
;
183 av_packet_rescale_ts(pkt
, src_tb
, dst_tb
);
185 ret
= av_write_frame(avf2
, pkt
);
187 av_packet_unref(pkt
);
191 static int fifo_thread_write_trailer(FifoThreadContext
*ctx
)
193 AVFormatContext
*avf
= ctx
->avf
;
194 FifoContext
*fifo
= avf
->priv_data
;
195 AVFormatContext
*avf2
= fifo
->avf
;
198 if (!ctx
->header_written
)
201 ret
= av_write_trailer(avf2
);
202 ff_format_io_close(avf2
, &avf2
->pb
);
207 static int fifo_thread_dispatch_message(FifoThreadContext
*ctx
, FifoMessage
*msg
)
211 if (!ctx
->header_written
) {
212 ret
= fifo_thread_write_header(ctx
);
218 case FIFO_WRITE_HEADER
:
220 case FIFO_WRITE_PACKET
:
221 return fifo_thread_write_packet(ctx
, &msg
->pkt
);
222 case FIFO_FLUSH_OUTPUT
:
223 return fifo_thread_flush_output(ctx
);
226 return AVERROR(EINVAL
);
229 static int is_recoverable(const FifoContext
*fifo
, int err_no
) {
230 if (!fifo
->attempt_recovery
)
233 if (fifo
->recover_any_error
)
234 return err_no
!= AVERROR_EXIT
;
237 case AVERROR(EINVAL
):
238 case AVERROR(ENOSYS
):
241 case AVERROR_PATCHWELCOME
:
248 static void free_message(void *msg
)
250 FifoMessage
*fifo_msg
= msg
;
252 if (fifo_msg
->type
== FIFO_WRITE_PACKET
)
253 av_packet_unref(&fifo_msg
->pkt
);
256 static int fifo_thread_process_recovery_failure(FifoThreadContext
*ctx
, AVPacket
*pkt
,
259 AVFormatContext
*avf
= ctx
->avf
;
260 FifoContext
*fifo
= avf
->priv_data
;
263 av_log(avf
, AV_LOG_INFO
, "Recovery failed: %s\n",
266 if (fifo
->recovery_wait_streamtime
) {
267 if (pkt
->pts
== AV_NOPTS_VALUE
)
268 av_log(avf
, AV_LOG_WARNING
, "Packet does not contain presentation"
269 " timestamp, recovery will be attempted immediately");
270 ctx
->last_recovery_ts
= pkt
->pts
;
272 ctx
->last_recovery_ts
= av_gettime_relative();
275 if (fifo
->max_recovery_attempts
&&
276 ctx
->recovery_nr
>= fifo
->max_recovery_attempts
) {
277 av_log(avf
, AV_LOG_ERROR
,
278 "Maximal number of %d recovery attempts reached.\n",
279 fifo
->max_recovery_attempts
);
282 ret
= AVERROR(EAGAIN
);
288 static int fifo_thread_attempt_recovery(FifoThreadContext
*ctx
, FifoMessage
*msg
, int err_no
)
290 AVFormatContext
*avf
= ctx
->avf
;
291 FifoContext
*fifo
= avf
->priv_data
;
292 AVPacket
*pkt
= &msg
->pkt
;
293 int64_t time_since_recovery
;
296 if (!is_recoverable(fifo
, err_no
)) {
301 if (ctx
->header_written
) {
302 fifo
->write_trailer_ret
= fifo_thread_write_trailer(ctx
);
303 ctx
->header_written
= 0;
306 if (!ctx
->recovery_nr
) {
307 ctx
->last_recovery_ts
= fifo
->recovery_wait_streamtime
?
310 if (fifo
->recovery_wait_streamtime
) {
311 if (ctx
->last_recovery_ts
== AV_NOPTS_VALUE
) {
312 AVRational tb
= avf
->streams
[pkt
->stream_index
]->time_base
;
313 time_since_recovery
= av_rescale_q(pkt
->pts
- ctx
->last_recovery_ts
,
316 /* Enforce recovery immediately */
317 time_since_recovery
= fifo
->recovery_wait_time
;
320 time_since_recovery
= av_gettime_relative() - ctx
->last_recovery_ts
;
323 if (time_since_recovery
< fifo
->recovery_wait_time
)
324 return AVERROR(EAGAIN
);
329 if (fifo
->max_recovery_attempts
) {
330 av_log(avf
, AV_LOG_VERBOSE
, "Recovery attempt #%d/%d\n",
331 ctx
->recovery_nr
, fifo
->max_recovery_attempts
);
333 av_log(avf
, AV_LOG_VERBOSE
, "Recovery attempt #%d\n",
337 if (fifo
->restart_with_keyframe
&& fifo
->drop_pkts_on_overflow
)
338 ctx
->drop_until_keyframe
= 1;
340 ret
= fifo_thread_dispatch_message(ctx
, msg
);
342 if (is_recoverable(fifo
, ret
)) {
343 return fifo_thread_process_recovery_failure(ctx
, pkt
, ret
);
348 av_log(avf
, AV_LOG_INFO
, "Recovery successful\n");
349 ctx
->recovery_nr
= 0;
359 static int fifo_thread_recover(FifoThreadContext
*ctx
, FifoMessage
*msg
, int err_no
)
361 AVFormatContext
*avf
= ctx
->avf
;
362 FifoContext
*fifo
= avf
->priv_data
;
366 if (!fifo
->recovery_wait_streamtime
&& ctx
->recovery_nr
> 0) {
367 int64_t time_since_recovery
= av_gettime_relative() - ctx
->last_recovery_ts
;
368 int64_t time_to_wait
= FFMAX(0, fifo
->recovery_wait_time
- time_since_recovery
);
370 av_usleep(FFMIN(10000, time_to_wait
));
373 ret
= fifo_thread_attempt_recovery(ctx
, msg
, err_no
);
374 } while (ret
== AVERROR(EAGAIN
) && !fifo
->drop_pkts_on_overflow
);
376 if (ret
== AVERROR(EAGAIN
) && fifo
->drop_pkts_on_overflow
) {
377 if (msg
->type
== FIFO_WRITE_PACKET
)
378 av_packet_unref(&msg
->pkt
);
385 static void *fifo_consumer_thread(void *data
)
387 AVFormatContext
*avf
= data
;
388 FifoContext
*fifo
= avf
->priv_data
;
389 AVThreadMessageQueue
*queue
= fifo
->queue
;
390 FifoMessage msg
= {FIFO_WRITE_HEADER
, {0}};
393 FifoThreadContext fifo_thread_ctx
;
394 memset(&fifo_thread_ctx
, 0, sizeof(FifoThreadContext
));
395 fifo_thread_ctx
.avf
= avf
;
398 uint8_t just_flushed
= 0;
400 if (!fifo_thread_ctx
.recovery_nr
)
401 ret
= fifo_thread_dispatch_message(&fifo_thread_ctx
, &msg
);
403 if (ret
< 0 || fifo_thread_ctx
.recovery_nr
> 0) {
404 int rec_ret
= fifo_thread_recover(&fifo_thread_ctx
, &msg
, ret
);
406 av_thread_message_queue_set_err_send(queue
, rec_ret
);
411 /* If the queue is full at the moment when fifo_write_packet
412 * attempts to insert new message (packet) to the queue,
413 * it sets the fifo->overflow_flag to 1 and drops packet.
414 * Here in consumer thread, the flag is checked and if it is
415 * set, the queue is flushed and flag cleared. */
416 pthread_mutex_lock(&fifo
->overflow_flag_lock
);
417 if (fifo
->overflow_flag
) {
418 av_thread_message_flush(queue
);
419 if (fifo
->restart_with_keyframe
)
420 fifo_thread_ctx
.drop_until_keyframe
= 1;
421 fifo
->overflow_flag
= 0;
424 pthread_mutex_unlock(&fifo
->overflow_flag_lock
);
427 av_log(avf
, AV_LOG_INFO
, "FIFO queue flushed\n");
429 ret
= av_thread_message_queue_recv(queue
, &msg
, 0);
431 av_thread_message_queue_set_err_send(queue
, ret
);
436 fifo
->write_trailer_ret
= fifo_thread_write_trailer(&fifo_thread_ctx
);
441 static int fifo_mux_init(AVFormatContext
*avf
, AVOutputFormat
*oformat
)
443 FifoContext
*fifo
= avf
->priv_data
;
444 AVFormatContext
*avf2
;
447 ret
= avformat_alloc_output_context2(&avf2
, oformat
, NULL
, NULL
);
453 avf2
->interrupt_callback
= avf
->interrupt_callback
;
454 avf2
->max_delay
= avf
->max_delay
;
455 ret
= av_dict_copy(&avf2
->metadata
, avf
->metadata
, 0);
458 avf2
->opaque
= avf
->opaque
;
459 avf2
->io_close
= avf
->io_close
;
460 avf2
->io_open
= avf
->io_open
;
461 avf2
->flags
= avf
->flags
;
463 for (i
= 0; i
< avf
->nb_streams
; ++i
) {
464 AVStream
*st
= avformat_new_stream(avf2
, NULL
);
466 return AVERROR(ENOMEM
);
468 ret
= ff_stream_encode_params_copy(st
, avf
->streams
[i
]);
476 static int fifo_init(AVFormatContext
*avf
)
478 FifoContext
*fifo
= avf
->priv_data
;
479 AVOutputFormat
*oformat
;
482 if (fifo
->recovery_wait_streamtime
&& !fifo
->drop_pkts_on_overflow
) {
483 av_log(avf
, AV_LOG_ERROR
, "recovery_wait_streamtime can be turned on"
484 " only when drop_pkts_on_overflow is also turned on\n");
485 return AVERROR(EINVAL
);
488 if (fifo
->format_options_str
) {
489 ret
= av_dict_parse_string(&fifo
->format_options
, fifo
->format_options_str
,
492 av_log(avf
, AV_LOG_ERROR
, "Could not parse format options list '%s'\n",
493 fifo
->format_options_str
);
498 oformat
= av_guess_format(fifo
->format
, avf
->filename
, NULL
);
500 ret
= AVERROR_MUXER_NOT_FOUND
;
504 ret
= fifo_mux_init(avf
, oformat
);
508 ret
= av_thread_message_queue_alloc(&fifo
->queue
, (unsigned) fifo
->queue_size
,
509 sizeof(FifoMessage
));
513 av_thread_message_queue_set_free_func(fifo
->queue
, free_message
);
515 ret
= pthread_mutex_init(&fifo
->overflow_flag_lock
, NULL
);
522 static int fifo_write_header(AVFormatContext
*avf
)
524 FifoContext
* fifo
= avf
->priv_data
;
527 ret
= pthread_create(&fifo
->writer_thread
, NULL
, fifo_consumer_thread
, avf
);
529 av_log(avf
, AV_LOG_ERROR
, "Failed to start thread: %s\n",
530 av_err2str(AVERROR(ret
)));
537 static int fifo_write_packet(AVFormatContext
*avf
, AVPacket
*pkt
)
539 FifoContext
*fifo
= avf
->priv_data
;
540 FifoMessage msg
= {.type
= pkt
? FIFO_WRITE_PACKET
: FIFO_FLUSH_OUTPUT
};
544 av_init_packet(&msg
.pkt
);
545 ret
= av_packet_ref(&msg
.pkt
,pkt
);
550 ret
= av_thread_message_queue_send(fifo
->queue
, &msg
,
551 fifo
->drop_pkts_on_overflow
?
552 AV_THREAD_MESSAGE_NONBLOCK
: 0);
553 if (ret
== AVERROR(EAGAIN
)) {
554 uint8_t overflow_set
= 0;
556 /* Queue is full, set fifo->overflow_flag to 1
557 * to let consumer thread know the queue should
559 pthread_mutex_lock(&fifo
->overflow_flag_lock
);
560 if (!fifo
->overflow_flag
)
561 fifo
->overflow_flag
= overflow_set
= 1;
562 pthread_mutex_unlock(&fifo
->overflow_flag_lock
);
565 av_log(avf
, AV_LOG_WARNING
, "FIFO queue full\n");
568 } else if (ret
< 0) {
575 av_packet_unref(&msg
.pkt
);
579 static int fifo_write_trailer(AVFormatContext
*avf
)
581 FifoContext
*fifo
= avf
->priv_data
;
584 av_thread_message_queue_set_err_recv(fifo
->queue
, AVERROR_EOF
);
586 ret
= pthread_join(fifo
->writer_thread
, NULL
);
588 av_log(avf
, AV_LOG_ERROR
, "pthread join error: %s\n",
589 av_err2str(AVERROR(ret
)));
593 ret
= fifo
->write_trailer_ret
;
597 static void fifo_deinit(AVFormatContext
*avf
)
599 FifoContext
*fifo
= avf
->priv_data
;
601 av_dict_free(&fifo
->format_options
);
602 avformat_free_context(fifo
->avf
);
603 av_thread_message_queue_free(&fifo
->queue
);
604 pthread_mutex_destroy(&fifo
->overflow_flag_lock
);
607 #define OFFSET(x) offsetof(FifoContext, x)
608 static const AVOption options
[] = {
609 {"fifo_format", "Target muxer", OFFSET(format
),
610 AV_OPT_TYPE_STRING
, {.str
= NULL
}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM
},
612 {"queue_size", "Size of fifo queue", OFFSET(queue_size
),
613 AV_OPT_TYPE_INT
, {.i64
= FIFO_DEFAULT_QUEUE_SIZE
}, 1, INT_MAX
, AV_OPT_FLAG_ENCODING_PARAM
},
615 {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options_str
),
616 AV_OPT_TYPE_STRING
, {.str
= NULL
}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM
},
618 {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow
),
619 AV_OPT_TYPE_BOOL
, {.i64
= 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM
},
621 {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe
),
622 AV_OPT_TYPE_BOOL
, {.i64
= 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM
},
624 {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery
),
625 AV_OPT_TYPE_BOOL
, {.i64
= 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM
},
627 {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts
),
628 AV_OPT_TYPE_INT
, {.i64
= FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS
}, 0, INT_MAX
, AV_OPT_FLAG_ENCODING_PARAM
},
630 {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time
),
631 AV_OPT_TYPE_DURATION
, {.i64
= FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC
}, 0, INT64_MAX
, AV_OPT_FLAG_ENCODING_PARAM
},
633 {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery",
634 OFFSET(recovery_wait_streamtime
), AV_OPT_TYPE_BOOL
, {.i64
= 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM
},
636 {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error
),
637 AV_OPT_TYPE_BOOL
, {.i64
= 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM
},
642 static const AVClass fifo_muxer_class
= {
643 .class_name
= "Fifo muxer",
644 .item_name
= av_default_item_name
,
646 .version
= LIBAVUTIL_VERSION_INT
,
649 AVOutputFormat ff_fifo_muxer
= {
651 .long_name
= NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
652 .priv_data_size
= sizeof(FifoContext
),
654 .write_header
= fifo_write_header
,
655 .write_packet
= fifo_write_packet
,
656 .write_trailer
= fifo_write_trailer
,
657 .deinit
= fifo_deinit
,
658 .priv_class
= &fifo_muxer_class
,
659 .flags
= AVFMT_NOFILE
| AVFMT_ALLOW_FLUSH
| AVFMT_TS_NEGATIVE
,