2 * This file is part of FFmpeg.
4 * FFmpeg is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * FFmpeg is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with FFmpeg; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 #include <stdatomic.h>
22 #include "slicethread.h"
27 #define MAX_AUTO_THREADS 16
29 #if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS
31 typedef struct WorkerContext
{
33 pthread_mutex_t mutex
;
39 struct AVSliceThread
{
40 WorkerContext
*workers
;
42 int nb_active_threads
;
45 atomic_uint first_job
;
46 atomic_uint current_job
;
47 pthread_mutex_t done_mutex
;
48 pthread_cond_t done_cond
;
53 void (*worker_func
)(void *priv
, int jobnr
, int threadnr
, int nb_jobs
, int nb_threads
);
54 void (*main_func
)(void *priv
);
57 static int run_jobs(AVSliceThread
*ctx
)
59 unsigned nb_jobs
= ctx
->nb_jobs
;
60 unsigned nb_active_threads
= ctx
->nb_active_threads
;
61 unsigned first_job
= atomic_fetch_add_explicit(&ctx
->first_job
, 1, memory_order_acq_rel
);
62 unsigned current_job
= first_job
;
65 ctx
->worker_func(ctx
->priv
, current_job
, first_job
, nb_jobs
, nb_active_threads
);
66 } while ((current_job
= atomic_fetch_add_explicit(&ctx
->current_job
, 1, memory_order_acq_rel
)) < nb_jobs
);
68 return current_job
== nb_jobs
+ nb_active_threads
- 1;
71 static void *attribute_align_arg
thread_worker(void *v
)
74 AVSliceThread
*ctx
= w
->ctx
;
76 pthread_mutex_lock(&w
->mutex
);
77 pthread_cond_signal(&w
->cond
);
82 pthread_cond_wait(&w
->cond
, &w
->mutex
);
85 pthread_mutex_unlock(&w
->mutex
);
90 pthread_mutex_lock(&ctx
->done_mutex
);
92 pthread_cond_signal(&ctx
->done_cond
);
93 pthread_mutex_unlock(&ctx
->done_mutex
);
99 int avpriv_slicethread_create(AVSliceThread
**pctx
, void *priv
,
100 void (*worker_func
)(void *priv
, int jobnr
, int threadnr
, int nb_jobs
, int nb_threads
),
101 void (*main_func
)(void *priv
),
108 av_assert0(nb_threads
>= 0);
110 int nb_cpus
= av_cpu_count();
112 nb_threads
= FFMIN(nb_cpus
+ 1, MAX_AUTO_THREADS
);
117 nb_workers
= nb_threads
;
121 *pctx
= ctx
= av_mallocz(sizeof(*ctx
));
123 return AVERROR(ENOMEM
);
125 if (nb_workers
&& !(ctx
->workers
= av_calloc(nb_workers
, sizeof(*ctx
->workers
)))) {
127 return AVERROR(ENOMEM
);
131 ctx
->worker_func
= worker_func
;
132 ctx
->main_func
= main_func
;
133 ctx
->nb_threads
= nb_threads
;
134 ctx
->nb_active_threads
= 0;
138 atomic_init(&ctx
->first_job
, 0);
139 atomic_init(&ctx
->current_job
, 0);
140 ret
= pthread_mutex_init(&ctx
->done_mutex
, NULL
);
142 av_freep(&ctx
->workers
);
146 ret
= pthread_cond_init(&ctx
->done_cond
, NULL
);
148 ctx
->nb_threads
= main_func
? 0 : 1;
149 avpriv_slicethread_free(pctx
);
154 for (i
= 0; i
< nb_workers
; i
++) {
155 WorkerContext
*w
= &ctx
->workers
[i
];
158 ret
= pthread_mutex_init(&w
->mutex
, NULL
);
160 ctx
->nb_threads
= main_func
? i
: i
+ 1;
161 avpriv_slicethread_free(pctx
);
164 ret
= pthread_cond_init(&w
->cond
, NULL
);
166 pthread_mutex_destroy(&w
->mutex
);
167 ctx
->nb_threads
= main_func
? i
: i
+ 1;
168 avpriv_slicethread_free(pctx
);
171 pthread_mutex_lock(&w
->mutex
);
174 if (ret
= pthread_create(&w
->thread
, NULL
, thread_worker
, w
)) {
175 ctx
->nb_threads
= main_func
? i
: i
+ 1;
176 pthread_mutex_unlock(&w
->mutex
);
177 pthread_cond_destroy(&w
->cond
);
178 pthread_mutex_destroy(&w
->mutex
);
179 avpriv_slicethread_free(pctx
);
184 pthread_cond_wait(&w
->cond
, &w
->mutex
);
185 pthread_mutex_unlock(&w
->mutex
);
191 void avpriv_slicethread_execute(AVSliceThread
*ctx
, int nb_jobs
, int execute_main
)
193 int nb_workers
, i
, is_last
= 0;
195 av_assert0(nb_jobs
> 0);
196 ctx
->nb_jobs
= nb_jobs
;
197 ctx
->nb_active_threads
= FFMIN(nb_jobs
, ctx
->nb_threads
);
198 atomic_store_explicit(&ctx
->first_job
, 0, memory_order_relaxed
);
199 atomic_store_explicit(&ctx
->current_job
, ctx
->nb_active_threads
, memory_order_relaxed
);
200 nb_workers
= ctx
->nb_active_threads
;
201 if (!ctx
->main_func
|| !execute_main
)
204 for (i
= 0; i
< nb_workers
; i
++) {
205 WorkerContext
*w
= &ctx
->workers
[i
];
206 pthread_mutex_lock(&w
->mutex
);
208 pthread_cond_signal(&w
->cond
);
209 pthread_mutex_unlock(&w
->mutex
);
212 if (ctx
->main_func
&& execute_main
)
213 ctx
->main_func(ctx
->priv
);
215 is_last
= run_jobs(ctx
);
218 pthread_mutex_lock(&ctx
->done_mutex
);
220 pthread_cond_wait(&ctx
->done_cond
, &ctx
->done_mutex
);
222 pthread_mutex_unlock(&ctx
->done_mutex
);
226 av_cold
void avpriv_slicethread_free(AVSliceThread
**pctx
)
228 AVSliceThread
*ctx
= *pctx
;
234 nb_workers
= ctx
->nb_threads
;
239 for (i
= 0; i
< nb_workers
; i
++) {
240 WorkerContext
*w
= &ctx
->workers
[i
];
241 pthread_mutex_lock(&w
->mutex
);
243 pthread_cond_signal(&w
->cond
);
244 pthread_mutex_unlock(&w
->mutex
);
247 for (i
= 0; i
< nb_workers
; i
++) {
248 WorkerContext
*w
= &ctx
->workers
[i
];
249 pthread_join(w
->thread
, NULL
);
250 pthread_cond_destroy(&w
->cond
);
251 pthread_mutex_destroy(&w
->mutex
);
254 pthread_cond_destroy(&ctx
->done_cond
);
255 pthread_mutex_destroy(&ctx
->done_mutex
);
256 av_freep(&ctx
->workers
);
260 #else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
262 int avpriv_slicethread_create(AVSliceThread
**pctx
, void *priv
,
263 void (*worker_func
)(void *priv
, int jobnr
, int threadnr
, int nb_jobs
, int nb_threads
),
264 void (*main_func
)(void *priv
),
268 return AVERROR(ENOSYS
);
271 void avpriv_slicethread_execute(AVSliceThread
*ctx
, int nb_jobs
, int execute_main
)
276 void avpriv_slicethread_free(AVSliceThread
**pctx
)
278 av_assert0(!pctx
|| !*pctx
);
281 #endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */