Skip to content
This repository has been archived by the owner on Dec 25, 2023. It is now read-only.

Commit

Permalink
Task lists for each of the DataUploader threads to improve cpu effici…
Browse files Browse the repository at this point in the history
…ency.
  • Loading branch information
allenhux-intel committed Nov 15, 2022
1 parent 1b753fe commit 72265aa
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 66 deletions.
146 changes: 84 additions & 62 deletions TileUpdateManager/DataUploader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ Streaming::DataUploader::DataUploader(
, m_gpuTimer(in_pDevice, in_maxCopyBatches, D3D12GpuTimer::TimerType::Copy)
, m_mappingUpdater(in_maxTileMappingUpdatesPerApiCall)
, m_threadPriority(in_threadPriority)
, m_submitTaskAlloc(in_maxCopyBatches), m_submitTasks(in_maxCopyBatches)
, m_monitorTaskAlloc(in_maxCopyBatches), m_monitorTasks(in_maxCopyBatches)
{
// copy queue just for UpdateTileMappings() on reserved resources
{
Expand Down Expand Up @@ -273,7 +275,11 @@ Streaming::UpdateList* Streaming::DataUploader::AllocateUpdateList(Streaming::St
pUpdateList->m_executionState = UpdateList::State::STATE_ALLOCATED;

// start fence polling thread now
m_fenceMonitorFlag.Set();
{
m_monitorTasks[m_monitorTaskAlloc.GetWriteIndex()] = index;
m_monitorTaskAlloc.Allocate();
m_fenceMonitorFlag.Set();
}
}

return pUpdateList;
Expand Down Expand Up @@ -308,7 +314,12 @@ void Streaming::DataUploader::SubmitUpdateList(Streaming::UpdateList& in_updateL
m_pFileStreamer->StreamTexture(in_updateList);
}

m_submitFlag.Set();
// add to submit task queue
{
m_submitTasks[m_submitTaskAlloc.GetWriteIndex()] = UINT(&in_updateList - m_updateLists.data());
m_submitTaskAlloc.Allocate();
m_submitFlag.Set();
}
}

//-----------------------------------------------------------------------------
Expand All @@ -323,8 +334,21 @@ void Streaming::DataUploader::SubmitUpdateList(Streaming::UpdateList& in_updateL
void Streaming::DataUploader::FenceMonitorThread()
{
bool loadPackedMips = false;
for (auto& updateList : m_updateLists)

const UINT numTasks = m_monitorTaskAlloc.GetReadyToRead();
if (0 == numTasks)
{
return;
}

const UINT startIndex = m_monitorTaskAlloc.GetReadIndex();
for (UINT i = startIndex; i < (startIndex + numTasks); i++)
{
ASSERT(numTasks != 0);
auto& updateList = m_updateLists[m_monitorTasks[i % m_monitorTasks.size()]];

bool freeUpdateList = false;

// assign a start time to every in-flight update list. this will give us an upper bound on latency.
// latency is only measured for tile uploads
if ((UpdateList::State::STATE_FREE != updateList.m_executionState) && (0 == updateList.m_copyLatencyTimer))
Expand Down Expand Up @@ -356,7 +380,7 @@ void Streaming::DataUploader::FenceMonitorThread()
if (m_memoryFence->GetCompletedValue() >= updateList.m_copyFenceValue)
{
updateList.m_pStreamingResource->NotifyPackedMips();
FreeUpdateList(updateList);
freeUpdateList = true;
}
break;

Expand All @@ -375,41 +399,43 @@ void Streaming::DataUploader::FenceMonitorThread()
[[fallthrough]];

case UpdateList::State::STATE_MAP_PENDING:
{
// standard updates or mapping only? check if mapping complete
if (updateList.m_mappingFenceValue > m_mappingFence->GetCompletedValue())
if (updateList.m_mappingFenceValue <= m_mappingFence->GetCompletedValue())
{
break;
}
// notify evictions
if (updateList.GetNumEvictions())
{
updateList.m_pStreamingResource->NotifyEvicted(updateList.m_evictCoords);

// notify evictions
if (updateList.GetNumEvictions())
{
updateList.m_pStreamingResource->NotifyEvicted(updateList.m_evictCoords);
m_numTotalEvictions.fetch_add(updateList.GetNumEvictions(), std::memory_order_relaxed);
}

m_numTotalEvictions.fetch_add(updateList.GetNumEvictions(), std::memory_order_relaxed);
}
// notify regular tiles
if (updateList.GetNumStandardUpdates())
{
updateList.m_pStreamingResource->NotifyCopyComplete(updateList.m_coords);

// notify regular tiles
if (updateList.GetNumStandardUpdates())
{
updateList.m_pStreamingResource->NotifyCopyComplete(updateList.m_coords);
auto updateLatency = m_pFenceThreadTimer->GetTime() - updateList.m_copyLatencyTimer;
m_totalTileCopyLatency.fetch_add(updateLatency * updateList.GetNumStandardUpdates(), std::memory_order_relaxed);

auto updateLatency = m_pFenceThreadTimer->GetTime() - updateList.m_copyLatencyTimer;
m_totalTileCopyLatency.fetch_add(updateLatency * updateList.GetNumStandardUpdates(), std::memory_order_relaxed);
m_numTotalUploads.fetch_add(updateList.GetNumStandardUpdates(), std::memory_order_relaxed);

m_numTotalUploads.fetch_add(updateList.GetNumStandardUpdates(), std::memory_order_relaxed);
}

freeUpdateList = true;
}

// UpdateList complete
FreeUpdateList(updateList);
}
break;

default:
break;
}

if (freeUpdateList)
{
// O(1) array compaction: move the first element to the position of the element to be freed, then reduce size by 1.
m_monitorTasks[i % m_monitorTasks.size()] = m_monitorTasks[m_monitorTaskAlloc.GetReadIndex()];
m_monitorTaskAlloc.Free();
FreeUpdateList(updateList);
}
} // end loop over updatelists

if (loadPackedMips)
Expand All @@ -424,58 +450,54 @@ void Streaming::DataUploader::FenceMonitorThread()
// set next state depending on the task
// Note: QueryPerformanceCounter() needs to be called from the same CPU for values to be compared,
// but this thread starts work while a different thread handles completion
// NOTE: if UpdateTileMappings is slow, throughput will be impacted
//-----------------------------------------------------------------------------
void Streaming::DataUploader::SubmitThread()
{
bool signalMap = false;

for (auto& updateList : m_updateLists)
// look through tasks
while (m_submitTaskAlloc.GetReadyToRead())
{
switch (updateList.m_executionState)
{
case UpdateList::State::STATE_SUBMITTED:
{
// all UpdateLists require mapping
signalMap = true;
updateList.m_mappingFenceValue = m_mappingFenceValue;
signalMap = true;

// WARNING: UpdateTileMappings performance is an issue on some hardware
// throughput will degrade if UpdateTileMappings isn't ~free
UINT index = m_submitTasks[m_submitTaskAlloc.GetReadIndex()]; // get the next task
m_submitTaskAlloc.Free(); // consume this task

// unmap tiles that are being evicted
if (updateList.GetNumEvictions())
{
m_mappingUpdater.UnMap(GetMappingQueue(), updateList.m_pStreamingResource->GetTiledResource(), updateList.m_evictCoords);
auto& updateList = m_updateLists[index];

// this will skip the uploading state unless there are uploads
updateList.m_executionState = UpdateList::State::STATE_MAP_PENDING;
}
ASSERT(UpdateList::State::STATE_SUBMITTED == updateList.m_executionState);

// map standard tiles
// can upload and evict in a single UpdateList
if (updateList.GetNumStandardUpdates())
{
m_mappingUpdater.Map(GetMappingQueue(),
updateList.m_pStreamingResource->GetTiledResource(),
updateList.m_pStreamingResource->GetHeap()->GetHeap(),
updateList.m_coords, updateList.m_heapIndices);
// set to the fence value to be signaled next
updateList.m_mappingFenceValue = m_mappingFenceValue;

updateList.m_executionState = UpdateList::State::STATE_UPLOADING;
}
// unmap tiles that are being evicted
if (updateList.GetNumEvictions())
{
m_mappingUpdater.UnMap(GetMappingQueue(), updateList.m_pStreamingResource->GetTiledResource(), updateList.m_evictCoords);

// no uploads or evictions? must be mapping packed mips
else if (0 == updateList.GetNumEvictions())
{
updateList.m_pStreamingResource->MapPackedMips(GetMappingQueue());
// this will skip the uploading state unless there are uploads
updateList.m_executionState = UpdateList::State::STATE_MAP_PENDING;
}

updateList.m_executionState = UpdateList::State::STATE_PACKED_MAPPING;
}
// map standard tiles
// can upload and evict in a single UpdateList
if (updateList.GetNumStandardUpdates())
{
m_mappingUpdater.Map(GetMappingQueue(),
updateList.m_pStreamingResource->GetTiledResource(),
updateList.m_pStreamingResource->GetHeap()->GetHeap(),
updateList.m_coords, updateList.m_heapIndices);

updateList.m_executionState = UpdateList::State::STATE_UPLOADING;
}
break; // end STATE_SUBMITTED

default:
break;
// no uploads or evictions? must be mapping packed mips
else if (0 == updateList.GetNumEvictions())
{
updateList.m_pStreamingResource->MapPackedMips(GetMappingQueue());

updateList.m_executionState = UpdateList::State::STATE_PACKED_MAPPING;
}
}

Expand Down
6 changes: 5 additions & 1 deletion TileUpdateManager/DataUploader.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ namespace Streaming
void SubmitThread();
std::thread m_submitThread;
Streaming::SynchronizationFlag m_submitFlag;
std::vector<UINT> m_submitTasks;
RingBuffer m_submitTaskAlloc;

// thread to poll copy and mapping fences
// this thread could have been designed using WaitForMultipleObjects, but it was found that SetEventOnCompletion() was expensive in a tight thread loop
Expand All @@ -133,13 +135,15 @@ namespace Streaming
std::thread m_fenceMonitorThread;
Streaming::SynchronizationFlag m_fenceMonitorFlag;
RawCpuTimer* m_pFenceThreadTimer{ nullptr }; // init timer on the thread that uses it. can't really worry about thread migration.
std::vector<UINT> m_monitorTasks;
RingBuffer m_monitorTaskAlloc;

void StartThreads();
void StopThreads();
std::atomic<bool> m_threadsRunning{ false };
const int m_threadPriority{ 0 };

// DS memory queue used just for loading packed mips when the file doesn't include padding
// DS memory queue used just for loading packed mips
// separate memory queue means needing a second fence - can't wait across DS queues
void InitDirectStorage(ID3D12Device* in_pDevice);
ComPtr<IDStorageFactory> m_dsFactory;
Expand Down
12 changes: 10 additions & 2 deletions TileUpdateManager/SimpleAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ namespace Streaming
// writer methods
//-------------------------
UINT GetAvailableToWrite() const { return m_size - m_counter; } // how many could be written
UINT GetWriteIndex() const { return (m_writerIndex % m_size); } // can start writing here
UINT GetWriteIndex(UINT in_offset = 0) const // can start writing here
{
ASSERT(in_offset < GetAvailableToWrite()); // fixme? probably an error because there's no valid index
return (m_writerIndex + in_offset) % m_size;
}
void Allocate(UINT in_n = 1) // notify reader there's data ready
{
ASSERT((m_counter + in_n) <= m_size); // can't have more in-flight than m_size
Expand All @@ -84,7 +88,11 @@ namespace Streaming
// reader methods
//-------------------------
UINT GetReadyToRead() const { return m_counter; } // how many can be read
UINT GetReadIndex() const { return (m_readerIndex % m_size); } // can start reading here
UINT GetReadIndex(UINT in_offset = 0) const // can start reading here
{
ASSERT(in_offset < GetReadyToRead()); // fixme? probably an error because there's no valid index
return (m_readerIndex + in_offset) % m_size;
}
void Free(UINT in_n = 1) // return entries to pool
{
ASSERT(m_counter >= in_n);
Expand Down
2 changes: 1 addition & 1 deletion include/ArgParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ template<> inline void ArgParser::AddArg(std::wstring s, ArgFunction f, bool def
{
std::wstringstream w;
std::string b = default_value ? "True" : "False";
w << ": " << d << " (default: " << b.c_str() << ") ";
w << d << " (default: " << b.c_str() << ") ";
AddArg(s, f, w.str());
}

Expand Down

0 comments on commit 72265aa

Please sign in to comment.