3535import java .util .concurrent .ConcurrentLinkedQueue ;
3636import java .util .concurrent .ConcurrentMap ;
3737import java .util .concurrent .atomic .AtomicLong ;
38+ import java .util .concurrent .atomic .AtomicLongArray ;
3839import java .util .concurrent .atomic .AtomicReference ;
40+ import java .util .concurrent .atomic .AtomicReferenceArray ;
3941import java .util .concurrent .locks .Lock ;
4042import java .util .concurrent .locks .ReentrantLock ;
4143
@@ -132,14 +134,20 @@ public final class PrivateMaxEntriesMap<K, V> extends AbstractMap<K, V>
132134 /** The maximum capacity of the map. */
133135 static final long MAXIMUM_CAPACITY = Long .MAX_VALUE - Integer .MAX_VALUE ;
134136
135- /** The number of read buffers to use. */
136- static final int NUMBER_OF_READ_BUFFERS = ceilingNextPowerOfTwo (NCPU );
137+ /**
138+ * The number of read buffers to use.
139+ * The max of 4 was introduced due to https://github.com/FasterXML/jackson-databind/issues/3665.
140+ */
141+ static final int NUMBER_OF_READ_BUFFERS = Math .min (4 , ceilingNextPowerOfTwo (NCPU ));
137142
138143 /** Mask value for indexing into the read buffers. */
139144 static final int READ_BUFFERS_MASK = NUMBER_OF_READ_BUFFERS - 1 ;
140145
141- /** The number of pending read operations before attempting to drain. */
142- static final int READ_BUFFER_THRESHOLD = 32 ;
146+ /**
147+ * The number of pending read operations before attempting to drain.
148+ * The threshold of 4 was introduced due to https://github.com/FasterXML/jackson-databind/issues/3665.
149+ */
150+ static final int READ_BUFFER_THRESHOLD = 4 ;
143151
144152 /** The maximum number of read operations to perform per amortized drain. */
145153 static final int READ_BUFFER_DRAIN_THRESHOLD = 2 * READ_BUFFER_THRESHOLD ;
@@ -175,16 +183,20 @@ static int ceilingNextPowerOfTwo(int x) {
175183
176184 final Lock evictionLock ;
177185 final Queue <Runnable > writeBuffer ;
178- final AtomicLong [] readBufferWriteCount ;
179- final AtomicLong [] readBufferDrainAtWriteCount ;
180- final AtomicReference <Node <K , V >>[][] readBuffers ;
186+ final AtomicLongArray readBufferWriteCount ;
187+ final AtomicLongArray readBufferDrainAtWriteCount ;
188+ final AtomicReferenceArray <Node <K , V >> readBuffers ;
181189
182190 final AtomicReference <DrainStatus > drainStatus ;
183191
184192 transient Set <K > keySet ;
185193 transient Collection <V > values ;
186194 transient Set <Entry <K , V >> entrySet ;
187195
196+ private static int readBufferIndex (int bufferIndex , int entryIndex ) {
197+ return READ_BUFFER_SIZE * bufferIndex + entryIndex ;
198+ }
199+
188200 /**
189201 * Creates an instance based on the builder's configuration.
190202 */
@@ -203,17 +215,9 @@ private PrivateMaxEntriesMap(Builder<K, V> builder) {
203215 drainStatus = new AtomicReference <DrainStatus >(IDLE );
204216
205217 readBufferReadCount = new long [NUMBER_OF_READ_BUFFERS ];
206- readBufferWriteCount = new AtomicLong [NUMBER_OF_READ_BUFFERS ];
207- readBufferDrainAtWriteCount = new AtomicLong [NUMBER_OF_READ_BUFFERS ];
208- readBuffers = new AtomicReference [NUMBER_OF_READ_BUFFERS ][READ_BUFFER_SIZE ];
209- for (int i = 0 ; i < NUMBER_OF_READ_BUFFERS ; i ++) {
210- readBufferWriteCount [i ] = new AtomicLong ();
211- readBufferDrainAtWriteCount [i ] = new AtomicLong ();
212- readBuffers [i ] = new AtomicReference [READ_BUFFER_SIZE ];
213- for (int j = 0 ; j < READ_BUFFER_SIZE ; j ++) {
214- readBuffers [i ][j ] = new AtomicReference <Node <K , V >>();
215- }
216- }
218+ readBufferWriteCount = new AtomicLongArray (NUMBER_OF_READ_BUFFERS );
219+ readBufferDrainAtWriteCount = new AtomicLongArray (NUMBER_OF_READ_BUFFERS );
220+ readBuffers = new AtomicReferenceArray <>(NUMBER_OF_READ_BUFFERS * READ_BUFFER_SIZE );
217221 }
218222
219223 /** Ensures that the object is not null. */
@@ -330,12 +334,11 @@ long recordRead(int bufferIndex, Node<K, V> node) {
330334 // The location in the buffer is chosen in a racy fashion as the increment
331335 // is not atomic with the insertion. This means that concurrent reads can
332336 // overlap and overwrite one another, resulting in a lossy buffer.
333- final AtomicLong counter = readBufferWriteCount [bufferIndex ];
334- final long writeCount = counter .get ();
335- counter .lazySet (writeCount + 1 );
337+ final long writeCount = readBufferWriteCount .get (bufferIndex );
338+ readBufferWriteCount .lazySet (bufferIndex , writeCount + 1 );
336339
337340 final int index = (int ) (writeCount & READ_BUFFER_INDEX_MASK );
338- readBuffers [ bufferIndex ][ index ] .lazySet (node );
341+ readBuffers .lazySet (readBufferIndex ( bufferIndex , index ), node );
339342
340343 return writeCount ;
341344 }
@@ -348,7 +351,7 @@ long recordRead(int bufferIndex, Node<K, V> node) {
348351 * @param writeCount the number of writes on the chosen read buffer
349352 */
350353 void drainOnReadIfNeeded (int bufferIndex , long writeCount ) {
351- final long pending = (writeCount - readBufferDrainAtWriteCount [ bufferIndex ] .get ());
354+ final long pending = (writeCount - readBufferDrainAtWriteCount .get (bufferIndex ));
352355 final boolean delayable = (pending < READ_BUFFER_THRESHOLD );
353356 final DrainStatus status = drainStatus .get ();
354357 if (status .shouldDrainBuffers (delayable )) {
@@ -403,20 +406,20 @@ void drainReadBuffers() {
403406 /** Drains the read buffer up to an amortized threshold. */
404407 //@GuardedBy("evictionLock")
405408 void drainReadBuffer (int bufferIndex ) {
406- final long writeCount = readBufferWriteCount [ bufferIndex ] .get ();
409+ final long writeCount = readBufferWriteCount .get (bufferIndex );
407410 for (int i = 0 ; i < READ_BUFFER_DRAIN_THRESHOLD ; i ++) {
408411 final int index = (int ) (readBufferReadCount [bufferIndex ] & READ_BUFFER_INDEX_MASK );
409- final AtomicReference < Node < K , V >> slot = readBuffers [ bufferIndex ][ index ] ;
410- final Node <K , V > node = slot .get ();
412+ final int arrayIndex = readBufferIndex ( bufferIndex , index ) ;
413+ final Node <K , V > node = readBuffers .get (arrayIndex );
411414 if (node == null ) {
412415 break ;
413416 }
414417
415- slot .lazySet (null );
418+ readBuffers .lazySet (arrayIndex , null );
416419 applyRead (node );
417420 readBufferReadCount [bufferIndex ]++;
418421 }
419- readBufferDrainAtWriteCount [ bufferIndex ] .lazySet (writeCount );
422+ readBufferDrainAtWriteCount .lazySet (bufferIndex , writeCount );
420423 }
421424
422425 /** Updates the node's location in the page replacement policy. */
@@ -579,10 +582,8 @@ public void clear() {
579582 }
580583
581584 // Discard all pending reads
582- for (AtomicReference <Node <K , V >>[] buffer : readBuffers ) {
583- for (AtomicReference <Node <K , V >> slot : buffer ) {
584- slot .lazySet (null );
585- }
585+ for (int i = 0 ; i < readBuffers .length (); i ++) {
586+ readBuffers .lazySet (i , null );
586587 }
587588
588589 // Apply all pending writes
0 commit comments