diff --git a/docs/overview/advanced-examples.rst b/docs/overview/advanced-examples.rst index 8c8407f5..7611a2ee 100644 --- a/docs/overview/advanced-examples.rst +++ b/docs/overview/advanced-examples.rst @@ -162,6 +162,9 @@ Async/Await Example A simple example of asynchronous submission can be found below. +You can also use async submissions with Vulkan semaphores to synchronize +Kompute-generated submits with user-managed queue submits. + First we are able to create the manager as we normally would. .. code-block:: cpp @@ -231,17 +234,27 @@ The parameter provided is the maximum amount of time to wait in nanoseconds. Whe .. code-block:: cpp :linenos: - auto sq = mgr.sequence(); - - // Run Async Kompute operation on the parameters provided - sq->evalAsync(algo); + // Optional: pass submit-level synchronization primitives once when + // creating the sequence so every submit waits/signals alongside + // user-managed queue work + std::vector waitSemaphores = { externalWaitSemaphore }; + std::vector waitDstStageMasks = { + vk::PipelineStageFlagBits::eComputeShader + }; + std::vector signalSemaphores = { externalSignalSemaphore }; + auto sq = mgr.sequence(0, 0, waitSemaphores, waitDstStageMasks, signalSemaphores); + auto opAlgo = std::make_shared(algo); + sq->evalAsync(opAlgo); // Here we can do other work - // When we're ready we can wait + // When we're ready we can wait // The default wait time is UINT64_MAX sq->evalAwait(); +``evalAwait()`` must be called before invoking ``evalAsync()`` again on the +same ``Sequence``. + Finally, below you can see that we can also run syncrhonous commands without having to change anything. diff --git a/docs/overview/custom-operations.rst b/docs/overview/custom-operations.rst index d5ac07b6..ed4bb93f 100644 --- a/docs/overview/custom-operations.rst +++ b/docs/overview/custom-operations.rst @@ -33,7 +33,7 @@ Below you * - preEval() - When the Sequence is Evaluated this preEval is called across all operations before dispatching the batch of recorded commands to the GPU. This is useful for example if you need to copy data from local to host memory. * - postEval() - - After the sequence is Evaluated this postEval is called across all operations. When running asynchronously the postEval is called when you call `evalAwait()`, which is why it's important to always run evalAwait() to ensure the process doesn't go into inconsistent state. + - After the sequence is Evaluated this postEval is called across all operations. In asynchronous flows postEval is called when you run `evalAwait()`, and `evalAwait()` must be called before triggering `evalAsync()` again on the same sequence to avoid inconsistent state. Simple Operation Extending OpAlgoBase diff --git a/python/src/main.cpp b/python/src/main.cpp index 8bec2970..92b9b48b 100644 --- a/python/src/main.cpp +++ b/python/src/main.cpp @@ -336,7 +336,9 @@ PYBIND11_MODULE(kp, m) py::arg("desired_extensions") = std::vector()) .def("destroy", &kp::Manager::destroy, DOC(kp, Manager, destroy)) .def("sequence", - &kp::Manager::sequence, + [](kp::Manager& self, uint32_t queueIndex, uint32_t totalTimestamps) { + return self.sequence(queueIndex, totalTimestamps, {}, {}, {}); + }, DOC(kp, Manager, sequence), py::arg("queue_index") = 0, py::arg("total_timestamps") = 0) diff --git a/src/Manager.cpp b/src/Manager.cpp index e5a56d0e..720baa9d 100644 --- a/src/Manager.cpp +++ b/src/Manager.cpp @@ -499,10 +499,20 @@ Manager::createDevice(const std::vector& familyQueueIndices, } std::shared_ptr -Manager::sequence(uint32_t queueIndex, uint32_t totalTimestamps) +Manager::sequence(uint32_t queueIndex, + uint32_t totalTimestamps, + const std::vector& waitSemaphores, + const std::vector& waitDstStageMasks, + const std::vector& signalSemaphores) { KP_LOG_DEBUG("Kompute Manager sequence() with queueIndex: {}", queueIndex); + if (!waitDstStageMasks.empty() && + waitSemaphores.size() != waitDstStageMasks.size()) { + throw std::runtime_error("Kompute Manager sequence() wait semaphore " + "count must match wait dst stage mask count"); + } + std::shared_ptr submitMutex = nullptr; #ifdef KOMPUTE_OPT_THREAD_SAFE_COMPUTE_QUEUE submitMutex = this->mSequenceSubmitMutex; @@ -514,6 +524,9 @@ Manager::sequence(uint32_t queueIndex, uint32_t totalTimestamps) this->mComputeQueues[queueIndex], this->mComputeQueueFamilyIndices[queueIndex], totalTimestamps, + waitSemaphores, + waitDstStageMasks, + signalSemaphores, submitMutex) }; if (this->mManageResources) { diff --git a/src/Sequence.cpp b/src/Sequence.cpp index 120f9c99..5b9e5eb3 100644 --- a/src/Sequence.cpp +++ b/src/Sequence.cpp @@ -9,6 +9,9 @@ Sequence::Sequence(std::shared_ptr physicalDevice, std::shared_ptr computeQueue, uint32_t queueIndex, uint32_t totalTimestamps, + const std::vector& waitSemaphores, + const std::vector& waitDstStageMasks, + const std::vector& signalSemaphores, std::shared_ptr submitMutex) noexcept { KP_LOG_DEBUG("Kompute Sequence Constructor with existing device & queue"); @@ -19,6 +22,14 @@ Sequence::Sequence(std::shared_ptr physicalDevice, this->mQueueIndex = queueIndex; this->mFence = this->mDevice->createFence(vk::FenceCreateInfo()); this->mSubmitMutex = submitMutex; + this->mWaitSemaphores = waitSemaphores; + this->mWaitDstStageMasks = waitDstStageMasks; + this->mSignalSemaphores = signalSemaphores; + + if (this->mWaitDstStageMasks.empty() && !this->mWaitSemaphores.empty()) { + this->mWaitDstStageMasks.resize(this->mWaitSemaphores.size(), + vk::PipelineStageFlagBits::eAllCommands); + } this->createCommandPool(); this->createCommandBuffer(); @@ -127,8 +138,27 @@ Sequence::evalAsync() this->mOperations[i]->preEval(*this->mCommandBuffer); } + if (!this->mWaitDstStageMasks.empty() && + this->mWaitSemaphores.size() != this->mWaitDstStageMasks.size()) { + throw std::runtime_error("Kompute Sequence evalAsync wait semaphore " + "count must match wait dst stage mask count"); + } + + const vk::Semaphore* waitSemaphoresPtr = + this->mWaitSemaphores.empty() ? nullptr : this->mWaitSemaphores.data(); + const vk::PipelineStageFlags* waitDstStageMasksPtr = + this->mWaitDstStageMasks.empty() ? nullptr : this->mWaitDstStageMasks.data(); + const vk::Semaphore* signalSemaphoresPtr = + this->mSignalSemaphores.empty() ? nullptr : this->mSignalSemaphores.data(); + vk::SubmitInfo submitInfo( - 0, nullptr, nullptr, 1, this->mCommandBuffer.get()); + static_cast(this->mWaitSemaphores.size()), + waitSemaphoresPtr, + waitDstStageMasksPtr, + 1, + this->mCommandBuffer.get(), + static_cast(this->mSignalSemaphores.size()), + signalSemaphoresPtr); KP_LOG_DEBUG( "Kompute sequence submitting command buffer into compute queue"); @@ -156,8 +186,7 @@ Sequence::evalAsync(std::shared_ptr op) { this->clear(); this->record(op); - this->evalAsync(); - return shared_from_this(); + return this->evalAsync(); } std::shared_ptr diff --git a/src/include/kompute/Manager.hpp b/src/include/kompute/Manager.hpp index d76b10d0..49f2e1b4 100644 --- a/src/include/kompute/Manager.hpp +++ b/src/include/kompute/Manager.hpp @@ -77,10 +77,23 @@ class Manager * @param queueIndex The queue to use from the available queues * @param nrOfTimestamps The maximum number of timestamps to allocate. * If zero (default), disables latching of timestamps. + * @param waitSemaphores Semaphores to wait on before each submit from this + * sequence. + * @param waitDstStageMasks Pipeline stages to use for each wait semaphore. + * If empty and waitSemaphores is not empty, defaults to + * vk::PipelineStageFlagBits::eAllCommands for every wait semaphore. + * @param signalSemaphores Semaphores to signal after each submit from this + * sequence. * @returns Shared pointer with initialised sequence */ std::shared_ptr sequence(uint32_t queueIndex = 0, - uint32_t totalTimestamps = 0); + uint32_t totalTimestamps = 0, + const std::vector& + waitSemaphores = {}, + const std::vector& + waitDstStageMasks = {}, + const std::vector& + signalSemaphores = {}); /** * Create a managed tensor that will be destroyed by this manager diff --git a/src/include/kompute/Sequence.hpp b/src/include/kompute/Sequence.hpp index bea23fc2..489030a8 100644 --- a/src/include/kompute/Sequence.hpp +++ b/src/include/kompute/Sequence.hpp @@ -31,6 +31,9 @@ class Sequence : public std::enable_shared_from_this std::shared_ptr computeQueue, uint32_t queueIndex, uint32_t totalTimestamps = 0, + const std::vector& waitSemaphores = {}, + const std::vector& waitDstStageMasks = {}, + const std::vector& signalSemaphores = {}, std::shared_ptr submitMutex = nullptr) noexcept; /** @@ -157,18 +160,24 @@ class Sequence : public std::enable_shared_from_this /** * Eval Async sends all the recorded and stored operations in the vector of - * operations into the gpu as a submit job without a barrier. EvalAwait() - * must ALWAYS be called after to ensure the sequence is terminated - * correctly. + * operations into the gpu as a submit job without a barrier. + * Submit-level wait/signal semaphores are configured when creating the + * Sequence. + * + * evalAwait() must be called before invoking evalAsync() again on this same + * Sequence to complete the previous async run and reset internal state. * * @return Boolean stating whether execution was successful. */ std::shared_ptr evalAsync(); /** * Clears currnet operations to record provided one in the vector of - * operations into the gpu as a submit job without a barrier. EvalAwait() - * must ALWAYS be called after to ensure the sequence is terminated - * correctly. + * operations into the gpu as a submit job without a barrier. + * Submit-level wait/signal semaphores are configured when creating the + * Sequence. + * + * evalAwait() must be called before invoking evalAsync() again on this same + * Sequence to complete the previous async run and reset internal state. * * @return Boolean stating whether execution was successful. */ @@ -297,6 +306,9 @@ class Sequence : public std::enable_shared_from_this std::vector> mOperations{}; std::shared_ptr timestampQueryPool = nullptr; std::shared_ptr mSubmitMutex = nullptr; + std::vector mWaitSemaphores{}; + std::vector mWaitDstStageMasks{}; + std::vector mSignalSemaphores{}; // State bool mRecording = false; diff --git a/test/TestSequence.cpp b/test/TestSequence.cpp index 3a4cce66..26641bd4 100644 --- a/test/TestSequence.cpp +++ b/test/TestSequence.cpp @@ -243,3 +243,57 @@ TEST(TestSequence, CorrectSequenceRunningError) EXPECT_EQ(tensorOut->vector(), std::vector({ 2, 4, 6 })); } + +TEST(TestSequence, SequenceSubmitSyncSupportsEmptySyncLists) +{ + kp::Manager mgr; + + std::shared_ptr sq = mgr.sequence(); + + std::shared_ptr> tensorA = mgr.tensor({ 1, 2, 3 }); + std::shared_ptr> tensorB = mgr.tensor({ 2, 2, 2 }); + std::shared_ptr> tensorOut = mgr.tensor({ 0, 0, 0 }); + + sq->eval({ tensorA, tensorB, tensorOut }); + + std::vector spirv = compileSource(R"( + #version 450 + + layout (local_size_x = 1) in; + + layout(set = 0, binding = 0) buffer bina { float tina[]; }; + layout(set = 0, binding = 1) buffer binb { float tinb[]; }; + layout(set = 0, binding = 2) buffer bout { float tout[]; }; + + void main() { + uint index = gl_GlobalInvocationID.x; + tout[index] = tina[index] * tinb[index]; + } + )"); + + std::shared_ptr algo = + mgr.algorithm({ tensorA, tensorB, tensorOut }, spirv); + + sq->record(algo)->record( + { tensorA, tensorB, tensorOut }); + + EXPECT_NO_THROW(sq->evalAsync()); + EXPECT_NO_THROW(sq->evalAwait()); + + EXPECT_EQ(tensorOut->vector(), std::vector({ 2, 4, 6 })); +} + +TEST(TestSequence, SequenceSubmitSyncValidatesWaitMaskCount) +{ + kp::Manager mgr; + + std::vector waitSemaphores = { vk::Semaphore{} }; + std::vector waitDstStageMasks = { + vk::PipelineStageFlagBits::eComputeShader, + vk::PipelineStageFlagBits::eTransfer + }; + std::vector signalSemaphores = {}; + + EXPECT_ANY_THROW(mgr.sequence( + 0, 0, waitSemaphores, waitDstStageMasks, signalSemaphores)); +}