2 * Copyright (C) 2023 Nuo Mi
4 * This file is part of FFmpeg.
6 * FFmpeg is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
11 * FFmpeg is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with FFmpeg; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
32 #define ExecutorThread char
34 #define executor_thread_create(t, a, s, ar) 0
35 #define executor_thread_join(t, r) do {} while(0)
39 #define ExecutorThread pthread_t
41 #define executor_thread_create(t, a, s, ar) pthread_create(t, a, s, ar)
42 #define executor_thread_join(t, r) pthread_join(t, r)
44 #endif //!HAVE_THREADS
46 typedef struct ThreadInfo
{
48 ExecutorThread thread
;
57 uint8_t *local_contexts
;
66 static AVTask
* remove_task(AVTask
**prev
, AVTask
*t
)
73 static void add_task(AVTask
**prev
, AVTask
*t
)
79 static int run_one_task(AVExecutor
*e
, void *lc
)
81 AVTaskCallbacks
*cb
= &e
->cb
;
84 for (prev
= &e
->tasks
; *prev
&& !cb
->ready(*prev
, cb
->user_data
); prev
= &(*prev
)->next
)
87 AVTask
*t
= remove_task(prev
, *prev
);
88 if (e
->thread_count
> 0)
89 ff_mutex_unlock(&e
->lock
);
90 cb
->run(t
, lc
, cb
->user_data
);
91 if (e
->thread_count
> 0)
92 ff_mutex_lock(&e
->lock
);
99 static void *executor_worker_task(void *data
)
101 ThreadInfo
*ti
= (ThreadInfo
*)data
;
102 AVExecutor
*e
= ti
->e
;
103 void *lc
= e
->local_contexts
+ (ti
- e
->threads
) * e
->cb
.local_context_size
;
105 ff_mutex_lock(&e
->lock
);
109 if (!run_one_task(e
, lc
)) {
110 //no task in one loop
111 ff_cond_wait(&e
->cond
, &e
->lock
);
114 ff_mutex_unlock(&e
->lock
);
119 static void executor_free(AVExecutor
*e
, const int has_lock
, const int has_cond
)
121 if (e
->thread_count
) {
123 ff_mutex_lock(&e
->lock
);
125 ff_cond_broadcast(&e
->cond
);
126 ff_mutex_unlock(&e
->lock
);
128 for (int i
= 0; i
< e
->thread_count
; i
++)
129 executor_thread_join(e
->threads
[i
].thread
, NULL
);
132 ff_cond_destroy(&e
->cond
);
134 ff_mutex_destroy(&e
->lock
);
137 av_free(e
->local_contexts
);
142 AVExecutor
* av_executor_alloc(const AVTaskCallbacks
*cb
, int thread_count
)
145 int has_lock
= 0, has_cond
= 0;
146 if (!cb
|| !cb
->user_data
|| !cb
->ready
|| !cb
->run
|| !cb
->priority_higher
)
149 e
= av_mallocz(sizeof(*e
));
154 e
->local_contexts
= av_calloc(FFMAX(thread_count
, 1), e
->cb
.local_context_size
);
155 if (!e
->local_contexts
)
158 e
->threads
= av_calloc(FFMAX(thread_count
, 1), sizeof(*e
->threads
));
165 has_lock
= !ff_mutex_init(&e
->lock
, NULL
);
166 has_cond
= !ff_cond_init(&e
->cond
, NULL
);
168 if (!has_lock
|| !has_cond
)
171 for (/* nothing */; e
->thread_count
< thread_count
; e
->thread_count
++) {
172 ThreadInfo
*ti
= e
->threads
+ e
->thread_count
;
174 if (executor_thread_create(&ti
->thread
, NULL
, executor_worker_task
, ti
))
180 executor_free(e
, has_lock
, has_cond
);
184 void av_executor_free(AVExecutor
**executor
)
188 if (!executor
|| !*executor
)
190 thread_count
= (*executor
)->thread_count
;
191 executor_free(*executor
, thread_count
, thread_count
);
195 void av_executor_execute(AVExecutor
*e
, AVTask
*t
)
197 AVTaskCallbacks
*cb
= &e
->cb
;
201 ff_mutex_lock(&e
->lock
);
203 for (prev
= &e
->tasks
; *prev
&& cb
->priority_higher(*prev
, t
); prev
= &(*prev
)->next
)
207 if (e
->thread_count
) {
208 ff_cond_signal(&e
->cond
);
209 ff_mutex_unlock(&e
->lock
);
212 if (!e
->thread_count
|| !HAVE_THREADS
) {
216 // We are running in a single-threaded environment, so we must handle all tasks ourselves
217 while (run_one_task(e
, e
->local_contexts
))
219 e
->recursive
= false;