2 * Copyright (c) 2014 Nicolas George
4 * This file is part of FFmpeg.
6 * FFmpeg is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public License
8 * as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
11 * FFmpeg is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with FFmpeg; if not, write to the Free Software Foundation, Inc.,
18 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24 #include "threadmessage.h"
27 struct AVThreadMessageQueue
{
31 pthread_cond_t cond_recv
;
32 pthread_cond_t cond_send
;
36 void (*free_func
)(void *msg
);
42 int av_thread_message_queue_alloc(AVThreadMessageQueue
**mq
,
47 AVThreadMessageQueue
*rmq
;
50 if (nelem
> INT_MAX
/ elsize
)
51 return AVERROR(EINVAL
);
52 if (!(rmq
= av_mallocz(sizeof(*rmq
))))
53 return AVERROR(ENOMEM
);
54 if ((ret
= pthread_mutex_init(&rmq
->lock
, NULL
))) {
58 if ((ret
= pthread_cond_init(&rmq
->cond_recv
, NULL
))) {
59 pthread_mutex_destroy(&rmq
->lock
);
63 if ((ret
= pthread_cond_init(&rmq
->cond_send
, NULL
))) {
64 pthread_cond_destroy(&rmq
->cond_recv
);
65 pthread_mutex_destroy(&rmq
->lock
);
69 if (!(rmq
->fifo
= av_fifo_alloc2(nelem
, elsize
, 0))) {
70 pthread_cond_destroy(&rmq
->cond_send
);
71 pthread_cond_destroy(&rmq
->cond_recv
);
72 pthread_mutex_destroy(&rmq
->lock
);
74 return AVERROR(ENOMEM
);
81 return AVERROR(ENOSYS
);
82 #endif /* HAVE_THREADS */
85 void av_thread_message_queue_set_free_func(AVThreadMessageQueue
*mq
,
86 void (*free_func
)(void *msg
))
89 mq
->free_func
= free_func
;
93 void av_thread_message_queue_free(AVThreadMessageQueue
**mq
)
97 av_thread_message_flush(*mq
);
98 av_fifo_freep2(&(*mq
)->fifo
);
99 pthread_cond_destroy(&(*mq
)->cond_send
);
100 pthread_cond_destroy(&(*mq
)->cond_recv
);
101 pthread_mutex_destroy(&(*mq
)->lock
);
107 int av_thread_message_queue_nb_elems(AVThreadMessageQueue
*mq
)
111 pthread_mutex_lock(&mq
->lock
);
112 ret
= av_fifo_can_read(mq
->fifo
);
113 pthread_mutex_unlock(&mq
->lock
);
116 return AVERROR(ENOSYS
);
122 static int av_thread_message_queue_send_locked(AVThreadMessageQueue
*mq
,
126 while (!mq
->err_send
&& !av_fifo_can_write(mq
->fifo
)) {
127 if ((flags
& AV_THREAD_MESSAGE_NONBLOCK
))
128 return AVERROR(EAGAIN
);
129 pthread_cond_wait(&mq
->cond_send
, &mq
->lock
);
133 av_fifo_write(mq
->fifo
, msg
, 1);
134 /* one message is sent, signal one receiver */
135 pthread_cond_signal(&mq
->cond_recv
);
139 static int av_thread_message_queue_recv_locked(AVThreadMessageQueue
*mq
,
143 while (!mq
->err_recv
&& !av_fifo_can_read(mq
->fifo
)) {
144 if ((flags
& AV_THREAD_MESSAGE_NONBLOCK
))
145 return AVERROR(EAGAIN
);
146 pthread_cond_wait(&mq
->cond_recv
, &mq
->lock
);
148 if (!av_fifo_can_read(mq
->fifo
))
150 av_fifo_read(mq
->fifo
, msg
, 1);
151 /* one message space appeared, signal one sender */
152 pthread_cond_signal(&mq
->cond_send
);
156 #endif /* HAVE_THREADS */
158 int av_thread_message_queue_send(AVThreadMessageQueue
*mq
,
165 pthread_mutex_lock(&mq
->lock
);
166 ret
= av_thread_message_queue_send_locked(mq
, msg
, flags
);
167 pthread_mutex_unlock(&mq
->lock
);
170 return AVERROR(ENOSYS
);
171 #endif /* HAVE_THREADS */
174 int av_thread_message_queue_recv(AVThreadMessageQueue
*mq
,
181 pthread_mutex_lock(&mq
->lock
);
182 ret
= av_thread_message_queue_recv_locked(mq
, msg
, flags
);
183 pthread_mutex_unlock(&mq
->lock
);
186 return AVERROR(ENOSYS
);
187 #endif /* HAVE_THREADS */
190 void av_thread_message_queue_set_err_send(AVThreadMessageQueue
*mq
,
194 pthread_mutex_lock(&mq
->lock
);
196 pthread_cond_broadcast(&mq
->cond_send
);
197 pthread_mutex_unlock(&mq
->lock
);
198 #endif /* HAVE_THREADS */
201 void av_thread_message_queue_set_err_recv(AVThreadMessageQueue
*mq
,
205 pthread_mutex_lock(&mq
->lock
);
207 pthread_cond_broadcast(&mq
->cond_recv
);
208 pthread_mutex_unlock(&mq
->lock
);
209 #endif /* HAVE_THREADS */
213 static int free_func_wrap(void *arg
, void *buf
, size_t *nb_elems
)
215 AVThreadMessageQueue
*mq
= arg
;
217 for (size_t i
= 0; i
< *nb_elems
; i
++)
218 mq
->free_func(msg
+ i
* mq
->elsize
);
223 void av_thread_message_flush(AVThreadMessageQueue
*mq
)
228 pthread_mutex_lock(&mq
->lock
);
229 used
= av_fifo_can_read(mq
->fifo
);
231 av_fifo_read_to_cb(mq
->fifo
, free_func_wrap
, mq
, &used
);
232 /* only the senders need to be notified since the queue is empty and there
233 * is nothing to read */
234 pthread_cond_broadcast(&mq
->cond_send
);
235 pthread_mutex_unlock(&mq
->lock
);
236 #endif /* HAVE_THREADS */