Skip to content

Commit

Permalink
further code update in util_buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
gewang committed May 14, 2024
1 parent 86dcd6e commit b8bcf5f
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 87 deletions.
79 changes: 17 additions & 62 deletions src/core/util_buffers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,8 @@ void CBufferAdvance::cleanup()
//-----------------------------------------------------------------------------
UINT__ CBufferAdvance::join( Chuck_Event * event )
{
// TODO: necessary?
#ifndef __DISABLE_THREADS__
m_mutex.acquire();
#endif
// lock | 1.5.2.5 (ge) added
std::lock_guard<std::mutex> lock(m_mutex);

// index of new pointer that will be pushed back
UINT__ read_offset_index;
Expand All @@ -144,42 +142,32 @@ UINT__ CBufferAdvance::join( Chuck_Event * event )
m_read_offsets.push_back( ReadOffset( m_write_offset, event ) );
}

// TODO: necessary?
#ifndef __DISABLE_THREADS__
m_mutex.release();
#endif

// return index
return read_offset_index;
}




//-----------------------------------------------------------------------------
// name: resign
// desc: shred quits buffer; frees its index
//-----------------------------------------------------------------------------
void CBufferAdvance::resign( UINT__ read_offset_index )
{
// lock | 1.5.2.5 (ge) added
std::lock_guard<std::mutex> lock(m_mutex);

// make sure read_offset_index passed in is valid
if( read_offset_index >= m_read_offsets.size() )
return;

// TODO: necessary?
#ifndef __DISABLE_THREADS__
m_mutex.acquire();
#endif

// add this index to free queue
m_free.push( read_offset_index );

// "invalidate" the pointer at that index
m_read_offsets[read_offset_index].read_offset = -1;
m_read_offsets[read_offset_index].event = NULL;

// TODO: necessary?
#ifndef __DISABLE_THREADS__
m_mutex.release();
#endif
}


Expand Down Expand Up @@ -212,14 +200,12 @@ void CBufferAdvance::resign( UINT__ read_offset_index )

void CBufferAdvance::put( void * data, UINT__ num_elem )
{
// lock | 1.5.2.5 (ge) added
std::lock_guard<std::mutex> lock(m_mutex);

UINT__ i, j;
BYTE__ * d = (BYTE__ *)data;

// TODO: necessary?
#ifndef __DISABLE_THREADS__
m_mutex.acquire();
#endif

// copy
for( i = 0; i < num_elem; i++ )
{
Expand Down Expand Up @@ -248,11 +234,6 @@ void CBufferAdvance::put( void * data, UINT__ num_elem )
m_read_offsets[j].event->queue_broadcast( m_event_buffer );
}
}

// TODO: necessary?
#ifndef __DISABLE_THREADS__
m_mutex.release();
#endif
}


Expand Down Expand Up @@ -319,24 +300,16 @@ UINT__ CBufferAdvance::get( void * data, UINT__ num_elem, UINT__ read_offset_ind
UINT__ i, j;
BYTE__ * d = (BYTE__ *)data;

// TODO: necessary?
#ifndef __DISABLE_THREADS__
m_mutex.acquire();
#endif
// lock | 1.5.2.5 (ge) added
std::lock_guard<std::mutex> lock(m_mutex);

// make sure index is valid
if( read_offset_index >= m_read_offsets.size() )
{
#ifndef __DISABLE_THREADS__
m_mutex.release();
#endif
return 0;
}
if( m_read_offsets[read_offset_index].read_offset < 0 )
{
#ifndef __DISABLE_THREADS__
m_mutex.release();
#endif
return 0;
}

Expand All @@ -345,9 +318,6 @@ UINT__ CBufferAdvance::get( void * data, UINT__ num_elem, UINT__ read_offset_ind
// read catch up with write
if( m_read_offset == m_write_offset )
{
#ifndef __DISABLE_THREADS__
m_mutex.release();
#endif
return 0;
}

Expand Down Expand Up @@ -376,11 +346,6 @@ UINT__ CBufferAdvance::get( void * data, UINT__ num_elem, UINT__ read_offset_ind
// update read offset at given index
m_read_offsets[read_offset_index].read_offset = m_read_offset;

// TODO: necessary?
#ifndef __DISABLE_THREADS__
m_mutex.release();
#endif

// return number of elems
return i;
}
Expand Down Expand Up @@ -465,9 +430,8 @@ void CBufferSimple::put( void * data, UINT__ num_elem )
UINT__ i, j;
BYTE__ * d = (BYTE__ *)data;

#ifndef __DISABLE_THREADS__
m_mutex.acquire();
#endif
// lock | 1.5.2.5 (ge) added
std::lock_guard<std::mutex> lock(m_mutex);

// copy
for( i = 0; i < num_elem; i++ )
Expand All @@ -482,10 +446,6 @@ void CBufferSimple::put( void * data, UINT__ num_elem )
// change to fully "atomic" increment+wrap
m_write_offset = (m_write_offset + 1) % m_max_elem;
}

#ifndef __DISABLE_THREADS__
m_mutex.release();
#endif
}


Expand All @@ -500,14 +460,13 @@ UINT__ CBufferSimple::get( void * data, UINT__ num_elem )
UINT__ i, j;
BYTE__ * d = (BYTE__ *)data;

// lock | 1.5.2.5 (ge) added
std::lock_guard<std::mutex> lock(m_mutex);

// read catch up with write
if( m_read_offset == m_write_offset )
return 0;

#ifndef __DISABLE_THREADS__
m_mutex.acquire();
#endif

// copy
for( i = 0; i < num_elem; i++ )
{
Expand All @@ -529,10 +488,6 @@ UINT__ CBufferSimple::get( void * data, UINT__ num_elem )
}
}

#ifndef __DISABLE_THREADS__
m_mutex.release();
#endif

// return number of elems
return 1; // shouldn't it return i?
}
Expand Down
43 changes: 18 additions & 25 deletions src/core/util_buffers.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,11 @@

#include "chuck_oo.h"
#include "chuck_errmsg.h"
#ifndef __DISABLE_THREADS__
#include "util_thread.h"
#endif
#include <vector>
#include <queue>
#include <iostream>
#include <atomic> // added 1.5.2.5 (ge) for "lock-free" circle buffer
#include <mutex> // added 1.5.2.5 (ge) hmm so much for "lock-free"
#include <mutex> // added 1.5.2.5 (ge) hmm so much for "lock-free"...

#define DWORD__ t_CKUINT
#define SINT__ t_CKINT
Expand Down Expand Up @@ -85,7 +82,7 @@ class CBufferAdvance
protected:
BYTE__ * m_data;
UINT__ m_data_width;
//UINT__ m_read_offset;
// UINT__ m_read_offset;

// this holds the offset allocated by join(), paired with an optional
// Chuck_Event to notify when things are put in the buffer
Expand All @@ -98,15 +95,12 @@ class CBufferAdvance
};
std::vector<ReadOffset> m_read_offsets;
std::queue<UINT__> m_free;

SINT__ m_write_offset;
SINT__ m_max_elem;

// TODO: necessary?
#ifndef __DISABLE_THREADS__
XMutex m_mutex;
#endif

// updated | 1.5.2.5 (ge) from XMutex to std::mutex
std::mutex m_mutex; // TODO necessary?
// buffer
CBufferSimple * m_event_buffer;
};

Expand All @@ -133,15 +127,14 @@ class CBufferSimple

protected:
BYTE__ * m_data;
UINT__ m_data_width;
UINT__ m_read_offset;
UINT__ m_write_offset;
UINT__ m_max_elem;
std::atomic_ulong m_data_width;
std::atomic_ulong m_read_offset;
std::atomic_ulong m_write_offset;
std::atomic_ulong m_max_elem;

#ifndef __DISABLE_THREADS__
// added | 1.5.1.5 (ge & andrew) twilight zone
XMutex m_mutex;
#endif
// updated | 1.5.2.5 (ge) to std::mutex
std::mutex m_mutex;
};


Expand Down Expand Up @@ -392,7 +385,7 @@ class XCircleBuffer
// num elements
std::atomic_ulong m_numElements;
// mutex | 1.5.2.5 (ge)
std::mutex m_mutex;
// std::mutex m_mutex;
};


Expand Down Expand Up @@ -438,7 +431,7 @@ template <typename T>
void XCircleBuffer<T>::init( long length )
{
// 1.5.2.5 (ge) added
std::lock_guard<std::mutex> lock(m_mutex);
// std::lock_guard<std::mutex> lock(m_mutex);

// clean up is necessary
if( m_buffer )
Expand Down Expand Up @@ -504,7 +497,7 @@ template <typename T>
void XCircleBuffer<T>::clear()
{
// 1.5.2.5 (ge) added
std::lock_guard<std::mutex> lock(m_mutex);
// std::lock_guard<std::mutex> lock(m_mutex);

// zero out
m_readIndex = m_writeIndex = m_numElements = 0;
Expand Down Expand Up @@ -570,7 +563,7 @@ template <typename T>
void XCircleBuffer<T>::put( const T & item )
{
// 1.5.2.5 (ge) added
std::lock_guard<std::mutex> lock(m_mutex);
// std::lock_guard<std::mutex> lock(m_mutex);

// sanity check
if( m_buffer == NULL ) return;
Expand Down Expand Up @@ -629,7 +622,7 @@ template <typename T>
long XCircleBuffer<T>::peek( T * array, long numItems, unsigned long stride )
{
// 1.5.2.5 (ge) added
std::lock_guard<std::mutex> lock(m_mutex);
// std::lock_guard<std::mutex> lock(m_mutex);

// sanity check
if( m_buffer == NULL ) return 0;
Expand Down Expand Up @@ -683,7 +676,7 @@ template <typename T>
long XCircleBuffer<T>::pop( long numItems )
{
// 1.5.2.5 (ge) added
std::lock_guard<std::mutex> lock(m_mutex);
// std::lock_guard<std::mutex> lock(m_mutex);

// sanity check
if( m_buffer == NULL ) return 0;
Expand Down Expand Up @@ -714,7 +707,7 @@ template <typename T>
bool XCircleBuffer<T>::get( T * result )
{
// 1.5.2.5 (ge) added
std::lock_guard<std::mutex> lock(m_mutex);
// std::lock_guard<std::mutex> lock(m_mutex);

// sanity check
if( m_buffer == NULL || m_readIndex == m_writeIndex ) return false;
Expand Down

0 comments on commit b8bcf5f

Please sign in to comment.