From b8bcf5fdbf44e7ede2995c1f4d7512e99bd2d8ab Mon Sep 17 00:00:00 2001 From: Ge Wang Date: Tue, 14 May 2024 04:40:31 -0700 Subject: [PATCH] further code update in util_buffer --- src/core/util_buffers.cpp | 79 +++++++++------------------------------ src/core/util_buffers.h | 43 +++++++++------------ 2 files changed, 35 insertions(+), 87 deletions(-) diff --git a/src/core/util_buffers.cpp b/src/core/util_buffers.cpp index 6536f744b..9fd9e0e2a 100644 --- a/src/core/util_buffers.cpp +++ b/src/core/util_buffers.cpp @@ -120,10 +120,8 @@ void CBufferAdvance::cleanup() //----------------------------------------------------------------------------- UINT__ CBufferAdvance::join( Chuck_Event * event ) { - // TODO: necessary? - #ifndef __DISABLE_THREADS__ - m_mutex.acquire(); - #endif + // lock | 1.5.2.5 (ge) added + std::lock_guard lock(m_mutex); // index of new pointer that will be pushed back UINT__ read_offset_index; @@ -144,42 +142,32 @@ UINT__ CBufferAdvance::join( Chuck_Event * event ) m_read_offsets.push_back( ReadOffset( m_write_offset, event ) ); } - // TODO: necessary? - #ifndef __DISABLE_THREADS__ - m_mutex.release(); - #endif - // return index return read_offset_index; } + + //----------------------------------------------------------------------------- // name: resign // desc: shred quits buffer; frees its index //----------------------------------------------------------------------------- void CBufferAdvance::resign( UINT__ read_offset_index ) { + // lock | 1.5.2.5 (ge) added + std::lock_guard lock(m_mutex); + // make sure read_offset_index passed in is valid if( read_offset_index >= m_read_offsets.size() ) return; - // TODO: necessary? - #ifndef __DISABLE_THREADS__ - m_mutex.acquire(); - #endif - // add this index to free queue m_free.push( read_offset_index ); // "invalidate" the pointer at that index m_read_offsets[read_offset_index].read_offset = -1; m_read_offsets[read_offset_index].event = NULL; - - // TODO: necessary? - #ifndef __DISABLE_THREADS__ - m_mutex.release(); - #endif } @@ -212,14 +200,12 @@ void CBufferAdvance::resign( UINT__ read_offset_index ) void CBufferAdvance::put( void * data, UINT__ num_elem ) { + // lock | 1.5.2.5 (ge) added + std::lock_guard lock(m_mutex); + UINT__ i, j; BYTE__ * d = (BYTE__ *)data; - // TODO: necessary? - #ifndef __DISABLE_THREADS__ - m_mutex.acquire(); - #endif - // copy for( i = 0; i < num_elem; i++ ) { @@ -248,11 +234,6 @@ void CBufferAdvance::put( void * data, UINT__ num_elem ) m_read_offsets[j].event->queue_broadcast( m_event_buffer ); } } - - // TODO: necessary? - #ifndef __DISABLE_THREADS__ - m_mutex.release(); - #endif } @@ -319,24 +300,16 @@ UINT__ CBufferAdvance::get( void * data, UINT__ num_elem, UINT__ read_offset_ind UINT__ i, j; BYTE__ * d = (BYTE__ *)data; - // TODO: necessary? - #ifndef __DISABLE_THREADS__ - m_mutex.acquire(); - #endif + // lock | 1.5.2.5 (ge) added + std::lock_guard lock(m_mutex); // make sure index is valid if( read_offset_index >= m_read_offsets.size() ) { - #ifndef __DISABLE_THREADS__ - m_mutex.release(); - #endif return 0; } if( m_read_offsets[read_offset_index].read_offset < 0 ) { - #ifndef __DISABLE_THREADS__ - m_mutex.release(); - #endif return 0; } @@ -345,9 +318,6 @@ UINT__ CBufferAdvance::get( void * data, UINT__ num_elem, UINT__ read_offset_ind // read catch up with write if( m_read_offset == m_write_offset ) { - #ifndef __DISABLE_THREADS__ - m_mutex.release(); - #endif return 0; } @@ -376,11 +346,6 @@ UINT__ CBufferAdvance::get( void * data, UINT__ num_elem, UINT__ read_offset_ind // update read offset at given index m_read_offsets[read_offset_index].read_offset = m_read_offset; - // TODO: necessary? - #ifndef __DISABLE_THREADS__ - m_mutex.release(); - #endif - // return number of elems return i; } @@ -465,9 +430,8 @@ void CBufferSimple::put( void * data, UINT__ num_elem ) UINT__ i, j; BYTE__ * d = (BYTE__ *)data; -#ifndef __DISABLE_THREADS__ - m_mutex.acquire(); -#endif + // lock | 1.5.2.5 (ge) added + std::lock_guard lock(m_mutex); // copy for( i = 0; i < num_elem; i++ ) @@ -482,10 +446,6 @@ void CBufferSimple::put( void * data, UINT__ num_elem ) // change to fully "atomic" increment+wrap m_write_offset = (m_write_offset + 1) % m_max_elem; } - -#ifndef __DISABLE_THREADS__ - m_mutex.release(); -#endif } @@ -500,14 +460,13 @@ UINT__ CBufferSimple::get( void * data, UINT__ num_elem ) UINT__ i, j; BYTE__ * d = (BYTE__ *)data; + // lock | 1.5.2.5 (ge) added + std::lock_guard lock(m_mutex); + // read catch up with write if( m_read_offset == m_write_offset ) return 0; -#ifndef __DISABLE_THREADS__ - m_mutex.acquire(); -#endif - // copy for( i = 0; i < num_elem; i++ ) { @@ -529,10 +488,6 @@ UINT__ CBufferSimple::get( void * data, UINT__ num_elem ) } } -#ifndef __DISABLE_THREADS__ - m_mutex.release(); -#endif - // return number of elems return 1; // shouldn't it return i? } diff --git a/src/core/util_buffers.h b/src/core/util_buffers.h index b32704fab..444bd5ad7 100644 --- a/src/core/util_buffers.h +++ b/src/core/util_buffers.h @@ -37,14 +37,11 @@ #include "chuck_oo.h" #include "chuck_errmsg.h" -#ifndef __DISABLE_THREADS__ -#include "util_thread.h" -#endif #include #include #include #include // added 1.5.2.5 (ge) for "lock-free" circle buffer -#include // added 1.5.2.5 (ge) hmm so much for "lock-free" +#include // added 1.5.2.5 (ge) hmm so much for "lock-free"... #define DWORD__ t_CKUINT #define SINT__ t_CKINT @@ -85,7 +82,7 @@ class CBufferAdvance protected: BYTE__ * m_data; UINT__ m_data_width; - //UINT__ m_read_offset; + // UINT__ m_read_offset; // this holds the offset allocated by join(), paired with an optional // Chuck_Event to notify when things are put in the buffer @@ -98,15 +95,12 @@ class CBufferAdvance }; std::vector m_read_offsets; std::queue m_free; - SINT__ m_write_offset; SINT__ m_max_elem; - // TODO: necessary? - #ifndef __DISABLE_THREADS__ - XMutex m_mutex; - #endif - + // updated | 1.5.2.5 (ge) from XMutex to std::mutex + std::mutex m_mutex; // TODO necessary? + // buffer CBufferSimple * m_event_buffer; }; @@ -133,15 +127,14 @@ class CBufferSimple protected: BYTE__ * m_data; - UINT__ m_data_width; - UINT__ m_read_offset; - UINT__ m_write_offset; - UINT__ m_max_elem; + std::atomic_ulong m_data_width; + std::atomic_ulong m_read_offset; + std::atomic_ulong m_write_offset; + std::atomic_ulong m_max_elem; -#ifndef __DISABLE_THREADS__ // added | 1.5.1.5 (ge & andrew) twilight zone - XMutex m_mutex; -#endif + // updated | 1.5.2.5 (ge) to std::mutex + std::mutex m_mutex; }; @@ -392,7 +385,7 @@ class XCircleBuffer // num elements std::atomic_ulong m_numElements; // mutex | 1.5.2.5 (ge) - std::mutex m_mutex; + // std::mutex m_mutex; }; @@ -438,7 +431,7 @@ template void XCircleBuffer::init( long length ) { // 1.5.2.5 (ge) added - std::lock_guard lock(m_mutex); + // std::lock_guard lock(m_mutex); // clean up is necessary if( m_buffer ) @@ -504,7 +497,7 @@ template void XCircleBuffer::clear() { // 1.5.2.5 (ge) added - std::lock_guard lock(m_mutex); + // std::lock_guard lock(m_mutex); // zero out m_readIndex = m_writeIndex = m_numElements = 0; @@ -570,7 +563,7 @@ template void XCircleBuffer::put( const T & item ) { // 1.5.2.5 (ge) added - std::lock_guard lock(m_mutex); + // std::lock_guard lock(m_mutex); // sanity check if( m_buffer == NULL ) return; @@ -629,7 +622,7 @@ template long XCircleBuffer::peek( T * array, long numItems, unsigned long stride ) { // 1.5.2.5 (ge) added - std::lock_guard lock(m_mutex); + // std::lock_guard lock(m_mutex); // sanity check if( m_buffer == NULL ) return 0; @@ -683,7 +676,7 @@ template long XCircleBuffer::pop( long numItems ) { // 1.5.2.5 (ge) added - std::lock_guard lock(m_mutex); + // std::lock_guard lock(m_mutex); // sanity check if( m_buffer == NULL ) return 0; @@ -714,7 +707,7 @@ template bool XCircleBuffer::get( T * result ) { // 1.5.2.5 (ge) added - std::lock_guard lock(m_mutex); + // std::lock_guard lock(m_mutex); // sanity check if( m_buffer == NULL || m_readIndex == m_writeIndex ) return false;