2 * Permission is hereby granted, free of charge, to any person obtaining a copy
3 * of this software and associated documentation files (the "Software"), to deal
4 * in the Software without restriction, including without limitation the rights
5 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
6 * copies of the Software, and to permit persons to whom the Software is
7 * furnished to do so, subject to the following conditions:
9 * The above copyright notice and this permission notice shall be included in
10 * all copies or substantial portions of the Software.
12 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
13 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
14 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
15 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
16 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
17 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 * Thread message API test
25 #include "libavutil/avassert.h"
26 #include "libavutil/avstring.h"
27 #include "libavutil/frame.h"
28 #include "libavutil/mem.h"
29 #include "libavutil/threadmessage.h"
30 #include "libavutil/thread.h" // not public
36 AVThreadMessageQueue
*queue
;
39 /* same as sender_data but shuffled for testing purpose */
40 struct receiver_data
{
44 AVThreadMessageQueue
*queue
;
49 // we add some junk in the message to make sure the message size is >
54 #define MAGIC 0xdeadc0de
56 static void free_frame(void *arg
)
58 struct message
*msg
= arg
;
59 av_assert0(msg
->magic
== MAGIC
);
60 av_frame_free(&msg
->frame
);
63 static void *sender_thread(void *arg
)
66 struct sender_data
*wd
= arg
;
68 av_log(NULL
, AV_LOG_INFO
, "sender #%d: workload=%d\n", wd
->id
, wd
->workload
);
69 for (i
= 0; i
< wd
->workload
; i
++) {
70 if (rand() % wd
->workload
< wd
->workload
/ 10) {
71 av_log(NULL
, AV_LOG_INFO
, "sender #%d: flushing the queue\n", wd
->id
);
72 av_thread_message_flush(wd
->queue
);
75 AVDictionary
*meta
= NULL
;
76 struct message msg
= {
78 .frame
= av_frame_alloc(),
82 ret
= AVERROR(ENOMEM
);
86 /* we add some metadata to identify the frames */
87 val
= av_asprintf("frame %d/%d from sender %d",
88 i
+ 1, wd
->workload
, wd
->id
);
90 av_frame_free(&msg
.frame
);
91 ret
= AVERROR(ENOMEM
);
94 ret
= av_dict_set(&meta
, "sig", val
, AV_DICT_DONT_STRDUP_VAL
);
96 av_frame_free(&msg
.frame
);
99 msg
.frame
->metadata
= meta
;
101 /* allocate a real frame in order to simulate "real" work */
102 msg
.frame
->format
= AV_PIX_FMT_RGBA
;
103 msg
.frame
->width
= 320;
104 msg
.frame
->height
= 240;
105 ret
= av_frame_get_buffer(msg
.frame
, 0);
107 av_frame_free(&msg
.frame
);
111 /* push the frame in the common queue */
112 av_log(NULL
, AV_LOG_INFO
, "sender #%d: sending my work (%d/%d frame:%p)\n",
113 wd
->id
, i
+ 1, wd
->workload
, msg
.frame
);
114 ret
= av_thread_message_queue_send(wd
->queue
, &msg
, 0);
116 av_frame_free(&msg
.frame
);
121 av_log(NULL
, AV_LOG_INFO
, "sender #%d: my work is done here (%s)\n",
122 wd
->id
, av_err2str(ret
));
123 av_thread_message_queue_set_err_recv(wd
->queue
, ret
< 0 ? ret
: AVERROR_EOF
);
127 static void *receiver_thread(void *arg
)
130 struct receiver_data
*rd
= arg
;
132 for (i
= 0; i
< rd
->workload
; i
++) {
133 if (rand() % rd
->workload
< rd
->workload
/ 10) {
134 av_log(NULL
, AV_LOG_INFO
, "receiver #%d: flushing the queue, "
135 "discarding %d message(s)\n", rd
->id
,
136 av_thread_message_queue_nb_elems(rd
->queue
));
137 av_thread_message_flush(rd
->queue
);
141 AVDictionaryEntry
*e
;
143 ret
= av_thread_message_queue_recv(rd
->queue
, &msg
, 0);
146 av_assert0(msg
.magic
== MAGIC
);
147 meta
= msg
.frame
->metadata
;
148 e
= av_dict_get(meta
, "sig", NULL
, 0);
149 av_log(NULL
, AV_LOG_INFO
, "got \"%s\" (%p)\n", e
->value
, msg
.frame
);
150 av_frame_free(&msg
.frame
);
154 av_log(NULL
, AV_LOG_INFO
, "consumed enough (%d), stop\n", i
);
155 av_thread_message_queue_set_err_send(rd
->queue
, ret
< 0 ? ret
: AVERROR_EOF
);
160 static int get_workload(int minv
, int maxv
)
162 return maxv
== minv
? maxv
: rand() % (maxv
- minv
) + minv
;
165 int main(int ac
, char **av
)
169 int nb_senders
, sender_min_load
, sender_max_load
;
170 int nb_receivers
, receiver_min_load
, receiver_max_load
;
171 struct sender_data
*senders
;
172 struct receiver_data
*receivers
;
173 AVThreadMessageQueue
*queue
= NULL
;
176 av_log(NULL
, AV_LOG_ERROR
, "%s <max_queue_size> "
177 "<nb_senders> <sender_min_send> <sender_max_send> "
178 "<nb_receivers> <receiver_min_recv> <receiver_max_recv>\n", av
[0]);
182 max_queue_size
= atoi(av
[1]);
183 nb_senders
= atoi(av
[2]);
184 sender_min_load
= atoi(av
[3]);
185 sender_max_load
= atoi(av
[4]);
186 nb_receivers
= atoi(av
[5]);
187 receiver_min_load
= atoi(av
[6]);
188 receiver_max_load
= atoi(av
[7]);
190 if (max_queue_size
<= 0 ||
191 nb_senders
<= 0 || sender_min_load
<= 0 || sender_max_load
<= 0 ||
192 nb_receivers
<= 0 || receiver_min_load
<= 0 || receiver_max_load
<= 0) {
193 av_log(NULL
, AV_LOG_ERROR
, "negative values not allowed\n");
197 av_log(NULL
, AV_LOG_INFO
, "qsize:%d / %d senders sending [%d-%d] / "
198 "%d receivers receiving [%d-%d]\n", max_queue_size
,
199 nb_senders
, sender_min_load
, sender_max_load
,
200 nb_receivers
, receiver_min_load
, receiver_max_load
);
202 senders
= av_calloc(nb_senders
, sizeof(*senders
));
203 receivers
= av_calloc(nb_receivers
, sizeof(*receivers
));
204 if (!senders
|| !receivers
) {
205 ret
= AVERROR(ENOMEM
);
209 ret
= av_thread_message_queue_alloc(&queue
, max_queue_size
, sizeof(struct message
));
213 av_thread_message_queue_set_free_func(queue
, free_frame
);
215 #define SPAWN_THREADS(type) do { \
216 for (i = 0; i < nb_##type##s; i++) { \
217 struct type##_data *td = &type##s[i]; \
221 td->workload = get_workload(type##_min_load, type##_max_load); \
223 ret = pthread_create(&td->tid, NULL, type##_thread, td); \
225 const int err = AVERROR(ret); \
226 av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type) \
227 " thread: %s\n", av_err2str(err)); \
233 #define WAIT_THREADS(type) do { \
234 for (i = 0; i < nb_##type##s; i++) { \
235 struct type##_data *td = &type##s[i]; \
237 ret = pthread_join(td->tid, NULL); \
239 const int err = AVERROR(ret); \
240 av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type) \
241 " thread: %s\n", av_err2str(err)); \
247 SPAWN_THREADS(receiver
);
248 SPAWN_THREADS(sender
);
250 WAIT_THREADS(sender
);
251 WAIT_THREADS(receiver
);
254 av_thread_message_queue_free(&queue
);
256 av_freep(&receivers
);
258 if (ret
< 0 && ret
!= AVERROR_EOF
) {
259 av_log(NULL
, AV_LOG_ERROR
, "Error: %s\n", av_err2str(ret
));