2 * This file is part of FFmpeg.
4 * FFmpeg is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * FFmpeg is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with FFmpeg; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22 #include "libavutil/avassert.h"
23 #include "libavutil/container_fifo.h"
24 #include "libavutil/error.h"
25 #include "libavutil/fifo.h"
26 #include "libavutil/frame.h"
27 #include "libavutil/intreadwrite.h"
28 #include "libavutil/mem.h"
29 #include "libavutil/thread.h"
31 #include "libavcodec/packet.h"
33 #include "thread_queue.h"
36 FINISHED_SEND
= (1 << 0),
37 FINISHED_RECV
= (1 << 1),
43 unsigned int nb_streams
;
45 enum ThreadQueueType type
;
47 AVContainerFifo
*fifo
;
48 AVFifo
*fifo_stream_index
;
54 void tq_free(ThreadQueue
**ptq
)
56 ThreadQueue
*tq
= *ptq
;
61 av_container_fifo_free(&tq
->fifo
);
62 av_fifo_freep2(&tq
->fifo_stream_index
);
64 av_freep(&tq
->finished
);
66 pthread_cond_destroy(&tq
->cond
);
67 pthread_mutex_destroy(&tq
->lock
);
72 ThreadQueue
*tq_alloc(unsigned int nb_streams
, size_t queue_size
,
73 enum ThreadQueueType type
)
78 tq
= av_mallocz(sizeof(*tq
));
82 ret
= pthread_cond_init(&tq
->cond
, NULL
);
88 ret
= pthread_mutex_init(&tq
->lock
, NULL
);
90 pthread_cond_destroy(&tq
->cond
);
95 tq
->finished
= av_calloc(nb_streams
, sizeof(*tq
->finished
));
98 tq
->nb_streams
= nb_streams
;
102 tq
->fifo
= (type
== THREAD_QUEUE_FRAMES
) ?
103 av_container_fifo_alloc_avframe(0) : av_container_fifo_alloc_avpacket(0);
107 tq
->fifo_stream_index
= av_fifo_alloc2(queue_size
, sizeof(unsigned), 0);
108 if (!tq
->fifo_stream_index
)
117 int tq_send(ThreadQueue
*tq
, unsigned int stream_idx
, void *data
)
122 av_assert0(stream_idx
< tq
->nb_streams
);
123 finished
= &tq
->finished
[stream_idx
];
125 pthread_mutex_lock(&tq
->lock
);
127 if (*finished
& FINISHED_SEND
) {
128 ret
= AVERROR(EINVAL
);
132 while (!(*finished
& FINISHED_RECV
) && !av_fifo_can_write(tq
->fifo_stream_index
))
133 pthread_cond_wait(&tq
->cond
, &tq
->lock
);
135 if (*finished
& FINISHED_RECV
) {
137 *finished
|= FINISHED_SEND
;
139 ret
= av_fifo_write(tq
->fifo_stream_index
, &stream_idx
, 1);
143 ret
= av_container_fifo_write(tq
->fifo
, data
, 0);
147 pthread_cond_broadcast(&tq
->cond
);
151 pthread_mutex_unlock(&tq
->lock
);
156 static int receive_locked(ThreadQueue
*tq
, int *stream_idx
,
159 unsigned int nb_finished
= 0;
162 return AVERROR(EAGAIN
);
164 while (av_container_fifo_read(tq
->fifo
, data
, 0) >= 0) {
168 ret
= av_fifo_read(tq
->fifo_stream_index
, &idx
, 1);
169 av_assert0(ret
>= 0);
170 if (tq
->finished
[idx
] & FINISHED_RECV
) {
171 (tq
->type
== THREAD_QUEUE_FRAMES
) ?
172 av_frame_unref(data
) : av_packet_unref(data
);
180 for (unsigned int i
= 0; i
< tq
->nb_streams
; i
++) {
181 if (!tq
->finished
[i
])
184 /* return EOF to the consumer at most once for each stream */
185 if (!(tq
->finished
[i
] & FINISHED_RECV
)) {
186 tq
->finished
[i
] |= FINISHED_RECV
;
194 return nb_finished
== tq
->nb_streams
? AVERROR_EOF
: AVERROR(EAGAIN
);
197 int tq_receive(ThreadQueue
*tq
, int *stream_idx
, void *data
)
203 pthread_mutex_lock(&tq
->lock
);
206 size_t can_read
= av_container_fifo_can_read(tq
->fifo
);
208 ret
= receive_locked(tq
, stream_idx
, data
);
210 // signal other threads if the fifo state changed
211 if (can_read
!= av_container_fifo_can_read(tq
->fifo
))
212 pthread_cond_broadcast(&tq
->cond
);
214 if (ret
== AVERROR(EAGAIN
)) {
215 pthread_cond_wait(&tq
->cond
, &tq
->lock
);
222 pthread_mutex_unlock(&tq
->lock
);
227 void tq_send_finish(ThreadQueue
*tq
, unsigned int stream_idx
)
229 av_assert0(stream_idx
< tq
->nb_streams
);
231 pthread_mutex_lock(&tq
->lock
);
233 /* mark the stream as send-finished;
234 * next time the consumer thread tries to read this stream it will get
235 * an EOF and recv-finished flag will be set */
236 tq
->finished
[stream_idx
] |= FINISHED_SEND
;
238 pthread_cond_broadcast(&tq
->cond
);
240 pthread_mutex_unlock(&tq
->lock
);
243 void tq_receive_finish(ThreadQueue
*tq
, unsigned int stream_idx
)
245 av_assert0(stream_idx
< tq
->nb_streams
);
247 pthread_mutex_lock(&tq
->lock
);
249 /* mark the stream as recv-finished;
250 * next time the producer thread tries to send for this stream, it will
251 * get an EOF and send-finished flag will be set */
252 tq
->finished
[stream_idx
] |= FINISHED_RECV
;
253 pthread_cond_broadcast(&tq
->cond
);
255 pthread_mutex_unlock(&tq
->lock
);
258 void tq_choke(ThreadQueue
*tq
, int choked
)
260 pthread_mutex_lock(&tq
->lock
);
262 int prev_choked
= tq
->choked
;
264 if (choked
!= prev_choked
)
265 pthread_cond_broadcast(&tq
->cond
);
267 pthread_mutex_unlock(&tq
->lock
);