-
Notifications
You must be signed in to change notification settings - Fork 3
Add Job management system #34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,213 @@ | ||||||
| #pragma once | ||||||
| #include <atomic> | ||||||
| #include <cstdint> | ||||||
| #include <functional> | ||||||
|
|
||||||
|
|
||||||
| /** | ||||||
| * A threaded task/job system. | ||||||
| * Can run multiple things in parallel, with automatic dependency ordering. | ||||||
| * Create a TaskContext and call the static execute or dispatch methods to run stuff off-thread. | ||||||
| */ | ||||||
| namespace SH::Jobs { | ||||||
| /** | ||||||
| * The data that a task needs in order to handle itself and its relation with other tasks that may be running simultaneously. | ||||||
| * Most data relates to task groups, but the idx means this is easier set per task. | ||||||
| */ | ||||||
| struct TaskArguments { | ||||||
| uint32_t idx; // The index of the current task. When using dispatch, this is the iterator index, denoting the nth task that was started. | ||||||
| uint32_t group; // The index of the current task group. When using dispatch, this is the group index, denoting the nth dispatch that was started. | ||||||
| uint32_t groupIdx; // The index of the current task in the group. | ||||||
| bool first; // Whether this is the first task in its' group. | ||||||
| bool last; // Whether this is the last task in its' group. | ||||||
| void* data; // Data shared between a whole group. | ||||||
| }; | ||||||
|
|
||||||
| /** | ||||||
| * A task priority. | ||||||
| * Higher priority tasks will pause lower priority tasks when all threads are being utilized. | ||||||
| */ | ||||||
| enum class Priority { | ||||||
| HIGHEST, // Default priority. | ||||||
| LOW, // Low priority tasks. Do not pause regular tasks. | ||||||
| STREAM, // Resource streaming, background tasks that never take priority over others | ||||||
| SIZE | ||||||
| }; | ||||||
|
|
||||||
| /** | ||||||
| * The context of a thread. Stores priority and how many tasks are waiting for this to finish. | ||||||
| */ | ||||||
| struct ExecutionContext { | ||||||
| std::atomic<uint32_t> count { 0 }; | ||||||
| Priority priority; | ||||||
| }; | ||||||
|
|
||||||
| /** | ||||||
| * A task, a member of a task queue, running on any thread, with ordering data. | ||||||
| * A single function (task) that can be executed one or more times. | ||||||
| * If executed more than once, it will be passed the index (amount of times) of the run through TaskArguments. | ||||||
| * Optionally, multiple executions of the same task may share memory between them, for reconstructing or processing buffers. | ||||||
| * If executed once, groupIdx == groupEnd == 0, and shared memory == 0. | ||||||
| */ | ||||||
| struct TaskGroup { | ||||||
| std::function<void(TaskArguments)> task; // The actual code the task executes | ||||||
| ExecutionContext* context; // A context, with priority | ||||||
| uint32_t group; // The ID of the group that the thread is a part of | ||||||
| uint32_t groupIdx; // The index of the current task into the group it is a part of | ||||||
| uint32_t groupEnd; // The index of the last task in the current group - may be the same as the group index for groups of 1, or the last task in a group. | ||||||
| uint32_t shared; // The size of the shared memory buffer for the current task group. | ||||||
|
|
||||||
| /** | ||||||
| * Execute all tasks in the group between the current idx and the last task in the group. | ||||||
| * func will be passed the indexes via the TaskArguments. | ||||||
| * Shared memory buffers will be allocated and passed if requested. | ||||||
| */ | ||||||
| inline void Execute() { | ||||||
| TaskArguments args; | ||||||
| args.group = group; | ||||||
| if (shared > 0) { | ||||||
| thread_local static std::vector<uint8_t> sharedData(shared); | ||||||
| args.data = sharedData.data(); | ||||||
| } else | ||||||
| args.data = nullptr; | ||||||
| for (size_t i = groupIdx; i < groupEnd; i++) { | ||||||
| args.idx = i; | ||||||
| args.groupIdx = i - groupIdx; | ||||||
| args.first = i == groupIdx; | ||||||
| args.last = i == groupEnd - 1; | ||||||
|
|
||||||
| task(args); | ||||||
| } | ||||||
|
|
||||||
| context->count.fetch_sub(1); | ||||||
| } | ||||||
| }; | ||||||
|
|
||||||
| /** | ||||||
| * A list of Task Groups, representing the groups assigned to a single thread. | ||||||
| * A thread may be assigned between zero to infinite Groups depending on how many threads are available, and how many tasks have been requested to run. | ||||||
| * The amount of TaskQueues that can be processed in parallel depends on the hardware concurrency property. | ||||||
| * The Queue is first-in-first-out. | ||||||
| */ | ||||||
| struct TaskQueue { | ||||||
| std::deque<TaskGroup> groups; | ||||||
| std::mutex lock; | ||||||
|
|
||||||
| /** | ||||||
| * Add a task group to be processed by this thread. | ||||||
| * Pushes may happen while the thread is being processed, but it must not coincide with the parallel execution of the thread itself. | ||||||
| * @parameter tsk the function / task to push to the list | ||||||
| */ | ||||||
| inline void Push(const TaskGroup& tsk) { | ||||||
| std::scoped_lock locker(lock); | ||||||
| groups.push_back(tsk); | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Remove the oldest task group from the queue. | ||||||
| * @parameter a reference for the task output | ||||||
| * @return whether the operation succeded (false if the list was already empty) | ||||||
| */ | ||||||
| inline bool Pop(TaskGroup& tsk) { | ||||||
| std::scoped_lock locker(lock); | ||||||
| if (groups.empty()) return false; | ||||||
| tsk = std::move(groups.front()); | ||||||
| groups.pop_front(); | ||||||
| return true; | ||||||
| } | ||||||
| }; | ||||||
|
|
||||||
| /** | ||||||
| * A list of Task Queues, representing all the tasks set for a given priority. | ||||||
| * Each Priority is given a number of Threads to process, between 1 and infinity. | ||||||
| * If a thread assigned to a given Priority Queue is requested by a Queue of a higher priority, the given Queue will be paused to allow it to run instead. | ||||||
| * By default: | ||||||
| * STREAM gets 1 thread | ||||||
| * LOW gets 2 threads | ||||||
| * HIGHEST gets all hardware threads | ||||||
| * An active std::thread may or may not be assigned to any hardware threads, allowing for a single Priority Queue to have more threads waiting than exists on hardware. | ||||||
| * In this case, the priority will always take precedence, and a LOW or STREAM will never run instead of a HIGHEST priority thread. | ||||||
| */ | ||||||
| struct PriorityQueue { | ||||||
| size_t nThreads; // The number of threads allocated to this priority. | ||||||
| std::vector<std::thread> threads; // The actual threads executing code for this priority. | ||||||
| std::unique_ptr<TaskQueue[]> queues; // A list of TaskQueues, one for every thread. | ||||||
| std::atomic<uint32_t> nextQueue { 0 }; // The next TaskQueue to process. | ||||||
| std::condition_variable wake; // The condition that, when passed, will wake any queues depending on the current. | ||||||
| std::mutex locker; | ||||||
|
|
||||||
| /** | ||||||
| * Process all waiting threads. | ||||||
| * @param firstQueue the first queue to perform work on, enabling skipping queues already processed. | ||||||
| */ | ||||||
| inline void Execute(size_t firstQueue) { | ||||||
| TaskGroup task; | ||||||
| for (size_t i = 0; i < nThreads; i++) { | ||||||
| TaskQueue& tasks = queues[firstQueue & nThreads]; | ||||||
| while (tasks.Pop(task)) | ||||||
| task.Execute(); | ||||||
| firstQueue++; | ||||||
| } | ||||||
| } | ||||||
| }; | ||||||
|
|
||||||
| /** | ||||||
| * Initialize the Task management system. | ||||||
| * @param maxThreads the maximum number of threads that can be dedicated entirely to running Tasks. | ||||||
| */ | ||||||
| void Init(uint32_t maxThreads = 0xFFFFFFFF); | ||||||
|
|
||||||
| /** | ||||||
| * Prepare the Task management system for engine shutdown. | ||||||
| */ | ||||||
| void Destroy(); | ||||||
|
|
||||||
| /** | ||||||
| * Fetch the number of Threads currently running, for the given priority. | ||||||
| * @param p the priority of threads to check | ||||||
| * @return The number of threads currently running a task with the given priority. | ||||||
| */ | ||||||
| size_t GetThreadsOfPriority(Priority p = Priority::HIGHEST); | ||||||
|
|
||||||
| /** | ||||||
| * Run the given task off-thread, with the given priority. | ||||||
| * The task's function will be executed until return. The task cannot be restarted, unless you call Run again. | ||||||
| * @param context a task context, with priority | ||||||
| * @param task the function to run off-thread | ||||||
| */ | ||||||
| void Run(ExecutionContext& context, const std::function<void(TaskArguments)>& task); | ||||||
|
|
||||||
| /** | ||||||
| * Dispatch a set of jobs, running the same code with different arguments. | ||||||
| * The arguments that change are the task index and group index. | ||||||
| * Usually use this when iterating over a static list and performing an expensive task on every object within. | ||||||
| * @param context a task context, with priority | ||||||
| * @param jobs the number of jobs to create, total. Usually, max index of the list being processed | ||||||
| * @param groups the number of jobs to run per thread. Usually, jobs / max_threads if jobs > max_threads, or 5 otherwise. | ||||||
| * @param task the function to run per index | ||||||
| * @param sharedMem the size of a shared memory buffer to allocate for every group. | ||||||
| */ | ||||||
| void Dispatch(ExecutionContext& context, size_t jobs, size_t groups, const std::function<void(TaskArguments)>& task, size_t sharedMem = 0); | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would rename
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In most cases, it'll be one. I can move it to the end of the function as a default parameter if it'll help.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now that is just more confusing cause you say in most cases it will be Also what about the
Suggested change
|
||||||
|
|
||||||
| /** | ||||||
| * Fetch the number of groups to be created for a given number of jobs, for groups of a given size. | ||||||
| * @param jobs the total number of jobs to create | ||||||
| * @param groups the number of jobs to put in each group | ||||||
| * @return The number of groups to create to fit all jobs | ||||||
| */ | ||||||
| size_t DispatchGroups(size_t jobs, size_t groups); | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wait is this a helper for doing
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not quite. It's basically a weighted average of the number of times to iterate the task and the number of threads to spread the iteration over, with heavy weighting towards the jobs. This is an implementation detail and should not be documented except in the source. |
||||||
|
|
||||||
| /** | ||||||
| * Check whether any threads are working for the given context. | ||||||
| * @param context a task context, with priority | ||||||
| * @return Whether any threads are working for this context | ||||||
| */ | ||||||
| bool IsWorking(const ExecutionContext& context); | ||||||
|
|
||||||
| /** | ||||||
| * Wait for the given context to stop running. | ||||||
| * The current thread will be added to the thread pool for jobs. | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does this mean?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It means that if the main thread waits for a different thread to run, it can be paused and another thread can take its place until the dependency thread finishes. The main thread does not take priority over anything else.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was mainly interested in the wording of the last parts |
||||||
| * @param context a task context, with priority | ||||||
| */ | ||||||
| void WaitFor(const ExecutionContext& context); | ||||||
| } | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,177 @@ | ||
| #include <cassert> | ||
| #include <condition_variable> | ||
| #include <deque> | ||
| #include <mutex> | ||
| #include <thread> | ||
| #include <shadow/core/Time.h> | ||
| #include <shadow/core/jobs/Job.h> | ||
|
|
||
| #include "spdlog/spdlog.h" | ||
|
|
||
| namespace SH::Jobs { | ||
| struct State { | ||
| size_t nCores = 0; | ||
| PriorityQueue priorities[Priority::SIZE]; | ||
| std::atomic_bool alive { true }; | ||
|
|
||
| void Destroy() { | ||
| alive.store(false); | ||
| bool wake = true; | ||
| // Notify all threads to finish their work | ||
| std::thread waker([&] { while (wake) for (auto& x : priorities) x.wake.notify_all(); }); | ||
| // Wait for all threads to stop | ||
| for (auto& x : priorities) for (auto& thread : x.threads) thread.join(); | ||
| wake = false; | ||
| // Wait for all threads' dependencies to finish | ||
| waker.join(); | ||
|
|
||
| // Cleanup | ||
| for (auto& x : priorities) { | ||
| x.queues.reset(); | ||
| x.threads.clear(); | ||
| x.nThreads = 0; | ||
| } | ||
| nCores = 0; | ||
| } | ||
|
|
||
| ~State() { Destroy(); } | ||
| }; | ||
|
|
||
| static State internalState; | ||
|
|
||
| void Init(size_t maxThreads) { | ||
| // Don't reinit if we're already doing something, that's BAD | ||
| if (internalState.nCores > 0) return; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean technically.... Feels a bit weird.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you remove the list of running threads while a thread is running, it becomes impossible to join all those threads back, and the process CANNOT stop cleanly any more. It feels weird because C++ threading is a fucking disaster. We need to just deal with it in this case.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment is on the fact that you are using the |
||
| maxThreads = std::max(1ull, maxThreads); | ||
|
|
||
| SH::Timer timer; | ||
|
|
||
| internalState.nCores = std::thread::hardware_concurrency(); | ||
|
|
||
| for (size_t pri = 0; pri < static_cast<size_t>(Priority::SIZE); pri++) { | ||
| const Priority p = (Priority) pri; | ||
| PriorityQueue& queue = internalState.priorities[pri]; | ||
|
|
||
| queue.nThreads = | ||
| p == Priority::HIGHEST ? internalState.nCores - 1 : | ||
| p == Priority::LOW ? internalState.nCores - 2 : | ||
| /* p == Priority::STREAM ? */ 1; | ||
| queue.nThreads = std::clamp(queue.nThreads, 1ull, maxThreads); | ||
| queue.queues.reset(new TaskQueue[queue.nThreads]); | ||
| queue.threads.reserve(queue.nThreads); | ||
|
|
||
| for (size_t thread = 0; thread < queue.nThreads; thread++) { | ||
| std::thread& worker = queue.threads.emplace_back([thread, &queue] { | ||
| while (internalState.alive.load()) { | ||
| queue.Execute(thread); | ||
| std::unique_lock lock(queue.locker); | ||
| queue.wake.wait(lock); | ||
| } | ||
| }); | ||
|
|
||
| auto handle = worker.native_handle(); | ||
| int core = p == Priority::STREAM ? internalState.nCores - 1 - thread : thread + 1; | ||
|
|
||
| // TODO: What the fuck? | ||
| // MINGW doesn't support either std::thread, pthread, or WIN32 threads properly. | ||
| // std::thread returns a pthread_t, with no way to get a WIN32 handle from it. | ||
| // To get a WIN32 handle, we'd have to have the thread store its own handle, and wait for it to be available from the parent thread. | ||
| // pthread doesn't support pthread_setaffinity_np, but it does support pthread_setname_np | ||
| // WIN32 doesn't support SetThreadDescription, but it does support SetThreadPriority. | ||
| // That's a very disgusting system for setting thread name & priority in one go. | ||
| // The only real solutions are: | ||
| // - MINGW does not get any thread management for jobs | ||
| // As a consequence, debugging expensive / crashing jobs becomes literally impossible on that platform | ||
| // - MINGW is banned in developing Umbra. | ||
| // As a consequence, developing the engine is significantly harder for those of us stuck to Windows, thanks to MSVC being extremely broken | ||
| // - Creating jobs is significantly more expensive due to the 4-way wait, lock and join required to fetch all the context we require to properly set job priorities | ||
| // As a consequence, the runtime of the engine suffers, as opposed to just the development. | ||
| // I think this one needs to be mulled over. | ||
| // For now, I'll leave mingw without thread management, and implement it for linux when i switch to that platform later on. | ||
| } | ||
| } | ||
|
|
||
| spdlog::debug(std::string("SH::Jobs initialized in ") + std::to_string(timer.elapsedMillis()) + "ms, utilizing " + std::to_string(internalState.nCores) + " cores, with " + std::to_string(GetThreadsOfPriority(Priority::HIGHEST)) + " high-priority threads, " + std::to_string(GetThreadsOfPriority(Priority::LOW)) + " low-priority threads, and " + std::to_string(GetThreadsOfPriority(Priority::STREAM)) + "asset-streaming threads."); | ||
| } | ||
|
|
||
| void Destroy() { | ||
| internalState.Destroy(); | ||
| } | ||
|
|
||
| size_t GetThreadsOfPriority(Priority p) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. GetThreadCountOfPriority ?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. GetNumberOfThreadsThatCanRunForPriorityQueue? |
||
| return internalState.priorities[static_cast<size_t>(p)].nThreads; | ||
| } | ||
|
|
||
| void Run(ExecutionContext &context, const std::function<void(TaskArguments)> &task) { | ||
| PriorityQueue& queue = internalState.priorities[static_cast<size_t>(context.priority)]; | ||
| context.count.fetch_add(1); | ||
|
|
||
| TaskGroup group { task, &context, 0, 0, 1, 0 }; | ||
|
|
||
| if (queue.nThreads <= 1) { | ||
| group.Execute(); | ||
| return; | ||
| } | ||
|
|
||
| queue.queues[queue.nextQueue.fetch_add(1) & queue.nThreads].Push(group); | ||
| queue.wake.notify_one(); | ||
| } | ||
|
|
||
| void Dispatch(ExecutionContext &context, size_t jobs, size_t groups, const std::function<void(TaskArguments)> &task, size_t sharedMem) { | ||
| if (jobs == 0 || groups == 0) return; | ||
|
|
||
| PriorityQueue& queue = internalState.priorities[static_cast<size_t>(context.priority)]; | ||
| const uint32_t nGroups = DispatchGroups(jobs, groups); | ||
|
|
||
| context.count.fetch_add(nGroups); | ||
|
|
||
| TaskGroup group; | ||
| group.context = &context; | ||
| group.func = task; | ||
| group.shared = (uint32_t) sharedMem; | ||
|
|
||
| for (uint32_t grp = 0; grp < nGroups; grp++) { | ||
| group.group = grp; | ||
| group.groupIdx = grp * groups; | ||
| group.groupEnd = std::min(group.groupIdx + groups, jobs); | ||
|
|
||
| if (queue.nThreads <= 1) | ||
| group.Execute(); | ||
| else | ||
| queue.queues[queue.nextQueue.fetch_add(1) % queue.nThreads].Push(group); | ||
| } | ||
|
|
||
| if (queue.nThreads > 1) | ||
| queue.wake.notify_all(); | ||
| } | ||
|
|
||
| size_t DispatchGroups(size_t jobs, size_t groups) { | ||
| return (jobs + groups - 1) / groups; | ||
| } | ||
|
|
||
|
|
||
| bool IsWorking(const ExecutionContext &context) { | ||
| return context.count.load() > 0; | ||
| } | ||
|
|
||
| void WaitFor(const ExecutionContext &context) { | ||
| if (IsWorking(context)) { | ||
| PriorityQueue& queue = internalState.priorities[static_cast<size_t>(context.priority)]; | ||
|
|
||
| queue.wake.notify_all(); | ||
|
|
||
| queue.Execute(queue.nextQueue.fetch_add(1) % queue.nThreads); | ||
|
|
||
| while (IsWorking(context)) | ||
| std::this_thread::yield(); | ||
| } | ||
| } | ||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.