diff --git a/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceBuilder.java b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceBuilder.java index 501f2f2..37570aa 100644 --- a/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceBuilder.java +++ b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceBuilder.java @@ -7,6 +7,7 @@ import java.util.Properties; import java.util.function.BooleanSupplier; import java.util.function.Consumer; +import java.util.function.Supplier; /** * Builder for DataSourcePool. @@ -639,6 +640,26 @@ default DataSourceBuilder customProperties(Map customProperties) */ DataSourceBuilder addProperty(String key, int value); + /** + * sets the affinity-size (internal hashmap of distinct affinity keys). Should be a prime number. Default: 257 + */ + DataSourceBuilder affinitySize(int affinitySize); + + /** + * Returns the affinity size. + */ + int getAffinitySize(); + + /** + * Sets the affinity provider. e.g. Thread::currentThread. + */ + DataSourceBuilder affinityProvider(Supplier affinityProvider); + + /** + * Returns the affinity provider. + */ + Supplier getAffinityProvider(); + /** * Set the database owner username (used to create connection for use with InitDatabase). */ diff --git a/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceConfig.java b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceConfig.java index 5008fcc..fd464ac 100644 --- a/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceConfig.java +++ b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceConfig.java @@ -9,6 +9,7 @@ import java.util.Map; import java.util.Properties; import java.util.function.Consumer; +import java.util.function.Supplier; /** * Configuration information for a DataSource. @@ -84,6 +85,8 @@ public class DataSourceConfig implements DataSourceBuilder.Settings { private boolean shutdownOnJvmExit; private boolean validateOnHeartbeat = !System.getenv().containsKey("LAMBDA_TASK_ROOT"); private boolean enforceCleanClose; + private int affinitySize = 257; + private Supplier affinityProvider; @Override public Settings settings() { @@ -147,6 +150,8 @@ public DataSourceConfig copy() { copy.alert = alert; copy.listener = listener; copy.enforceCleanClose = enforceCleanClose; + copy.affinitySize = affinitySize; + copy.affinityProvider = affinityProvider; return copy; } @@ -658,6 +663,28 @@ public DataSourceConfig addProperty(String key, int value) { return addProperty(key, Integer.toString(value)); } + @Override + public DataSourceBuilder affinitySize(int affinitySize) { + this.affinitySize = affinitySize; + return this; + } + + @Override + public int getAffinitySize() { + return affinitySize; + } + + @Override + public DataSourceBuilder affinityProvider(Supplier affinityProvider) { + this.affinityProvider = affinityProvider; + return this; + } + + @Override + public Supplier getAffinityProvider() { + return affinityProvider; + } + @Override public String getOwnerUsername() { return ownerUsername; @@ -805,6 +832,7 @@ private void loadSettings(ConfigPropertiesHelper properties) { shutdownOnJvmExit = properties.getBoolean("shutdownOnJvmExit", shutdownOnJvmExit); validateOnHeartbeat = properties.getBoolean("validateOnHeartbeat", validateOnHeartbeat); enforceCleanClose = properties.getBoolean("enforceCleanClose", enforceCleanClose); + affinitySize = properties.getInt("affinityCacheSize", affinitySize); String isoLevel = properties.get("isolationLevel", _isolationLevel(isolationLevel)); diff --git a/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceConnection.java b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceConnection.java new file mode 100644 index 0000000..a419709 --- /dev/null +++ b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourceConnection.java @@ -0,0 +1,17 @@ +package io.ebean.datasource; + +import java.sql.Connection; + +/** + * Interface for connection objects returned from the ebean-datasource connection pool + * + * @author Roland Praml, Foconis Analytics GmbH + */ +public interface DataSourceConnection extends Connection { + + /** + * Returns the affinity-ID, this connection was assigned to. + */ + Object affinityId(); + +} diff --git a/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePool.java b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePool.java index 97186c8..68260cd 100644 --- a/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePool.java +++ b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePool.java @@ -81,6 +81,12 @@ static DataSourceBuilder builder() { */ void offline(); + /** + * Returns a connection for given affinity ID. It is guaranteed, that connection.affinityId in listener etc. + * is the same object. + */ + DataSourceConnection getConnection(Object affinityId) throws SQLException; + /** * Shutdown the pool. *

diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/BusyConnectionBuffer.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/BusyConnectionBuffer.java deleted file mode 100644 index 2fe702d..0000000 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/BusyConnectionBuffer.java +++ /dev/null @@ -1,159 +0,0 @@ -package io.ebean.datasource.pool; - -import java.util.Arrays; - -/** - * A buffer especially designed for Busy PooledConnections. - *

- * All thread safety controlled externally (by PooledConnectionQueue). - *

- * It has a set of 'slots' and PooledConnections know which slot they went into - * and this allows for fast addition and removal (by slotId without looping). - * The capacity will increase on demand by the 'growBy' amount. - */ -final class BusyConnectionBuffer { - - private PooledConnection[] slots; - private final int growBy; - private int size; - private int pos = -1; - - /** - * Create the buffer with an initial capacity and fixed growBy. - * We generally do not want the buffer to grow very often. - * - * @param capacity the initial capacity - * @param growBy the fixed amount to grow the buffer by. - */ - BusyConnectionBuffer(int capacity, int growBy) { - this.slots = new PooledConnection[capacity]; - this.growBy = growBy; - } - - /** - * We can only grow (not shrink) the capacity. - */ - void setCapacity(int newCapacity) { - if (newCapacity > slots.length) { - PooledConnection[] current = this.slots; - this.slots = new PooledConnection[newCapacity]; - System.arraycopy(current, 0, this.slots, 0, current.length); - } - } - - @Override - public String toString() { - return Arrays.toString(slots); - } - - int capacity() { - return slots.length; - } - - int size() { - return size; - } - - boolean isEmpty() { - return size == 0; - } - - int add(PooledConnection pc) { - if (size == slots.length) { - // grow the capacity - setCapacity(slots.length + growBy); - } - int slot = nextEmptySlot(); - pc.setSlotId(slot); - slots[slot] = pc; - return ++size; - } - - boolean remove(PooledConnection pc) { - int slotId = pc.slotId(); - if (slots[slotId] != pc) { - PooledConnection heldBy = slots[slotId]; - Log.warn("Failed to remove from slot[{0}] PooledConnection[{1}] - HeldBy[{2}]", pc.slotId(), pc, heldBy); - return false; - } - slots[slotId] = null; - --size; - return true; - } - - /** - * Close connections that should be considered leaked. - */ - void closeBusyConnections(long leakTimeMinutes) { - long olderThanTime = System.currentTimeMillis() - (leakTimeMinutes * 60000); - Log.debug("Closing busy connections using leakTimeMinutes {0}", leakTimeMinutes); - for (int i = 0; i < slots.length; i++) { - if (slots[i] != null) { - //tmp.add(slots[i]); - PooledConnection pc = slots[i]; - //noinspection StatementWithEmptyBody - if (pc.lastUsedTime() > olderThanTime) { - // PooledConnection has been used recently or - // expected to be longRunning so not closing... - } else { - slots[i] = null; - --size; - closeBusyConnection(pc); - } - } - } - } - - private void closeBusyConnection(PooledConnection pc) { - try { - Log.warn("DataSource closing busy connection? {0}", pc.fullDescription()); - System.out.println("CLOSING busy connection: " + pc.fullDescription()); - pc.closeConnectionFully(false); - } catch (Exception ex) { - Log.error("Error when closing potentially leaked connection " + pc.description(), ex); - } - } - - /** - * Returns information describing connections that are currently being used. - */ - String busyConnectionInformation(boolean toLogger) { - if (toLogger) { - Log.info("Dumping [{0}] busy connections: (Use datasource.xxx.capturestacktrace=true ... to get stackTraces)", size()); - } - StringBuilder sb = new StringBuilder(); - for (PooledConnection pc : slots) { - if (pc != null) { - if (toLogger) { - Log.info("Busy Connection - {0}", pc.fullDescription()); - } else { - sb.append(pc.fullDescription()).append("\r\n"); - } - } - } - return sb.toString(); - } - - - /** - * Return the position of the next empty slot. - */ - private int nextEmptySlot() { - // search forward - while (++pos < slots.length) { - if (slots[pos] == null) { - return pos; - } - } - // search from beginning - pos = -1; - while (++pos < slots.length) { - if (slots[pos] == null) { - return pos; - } - } - // not expecting this - throw new RuntimeException("No Empty Slot Found?"); - } - -} diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java new file mode 100644 index 0000000..8d83a10 --- /dev/null +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionBuffer.java @@ -0,0 +1,319 @@ +package io.ebean.datasource.pool; + +import java.util.ArrayList; +import java.util.List; + +/** + * A buffer designed especially to hold pooled connections (free and busy ones) + *

+ * The buffer contains two linkedLists (free and busy connection nodes) and optional + * affinityLists. + *

+ * The PooledConnections holds references to list-Nodes, which can be reused. + * This avoids object creation/gc during add/unlink operations. + *

+ * The connectionBuffer itself has one freeList, that contains all free connections + * ordered by their last-used time. (the oldest connection is at the end) + * In parallel, connections in the freeList can also be in an affinityList. + * A hashing algoritm is used to distribute the connections to the affinityLists. + *

+ * If hashSize == 0 affinity support is disabled, and the connectionBuffer + * handles only free and busyList. + *

+ * Otherwise, there are hashSize+1 affinityLists. + *

+ * The last one is used for affinity-less connections and for the others, the + * object.hashCode() mod hashSize is computed. + *

+ * (When affinity is enabled, and no affinityID is used, freeList + * and affinityLists[hashSize] have the same content.) + *

+ * When we call removeFree(someObjKey), + *

    + *
  • try to return a matching connection from the according affinity-list slot
  • + *
  • try to return a connection with affinityId == null
  • + *
  • return null (and let the caller decide to create a new connection or query + * again with GET_OLDEST to return the last free connecion)
  • + *
+ *

+ * A free node is member in two lists: + *

    + *
  1. it is member in the freeList
  2. + *
  3. it is EITHER member in one of the affinity-hash-slots + * OR it is member of the null (=last) affiinity-slot
  4. + *
+ * The remove / transition from free to busy will remove the node from both lists. + *

+ * Graphical exammple + *

+ *     By default, the busy list is empty
+ *     busyList (empty)
+ *     freeList --> c1 --> c2 --> c3 --> c4 --> (end)
+ *     al[0]    (empty - first hash slot)
+ *     al[1]    (empty)
+ *     al[2]    (empty)
+ *     ...
+ *     al[N-1]  (empty - last hash slot)
+ *     al[N]    --> c1 --> c2 --> c3 --> c4 --> (end - null slot)
+ *
+ *     if a removeFree(1) is called, we lookup in al[1] and found no usable node.
+ *     in this case, we return the first node of al[N] that has no affinity yet.
+ *     When we remove the node from the affinityList, it is automatically removed
+ *     from the freeList
+ *
+ *     busyList --> c1 --> (end)
+ *     freeList ---------> c2 --> c3 --> c4 --> (end)
+ *     al[0]    (empty)
+ *     al[1]    (empty)
+ *     al[2]    (empty)
+ *     ...
+ *     al[N-1]  (empty)
+ *     al[N]    ---------> c2 --> c3 --> c4 --> (end)
+ *
+ *     When we put that node back in the freelist, it becomes the first node
+ *     and it will be also linked in al[1]
+ *
+ *     busyList (empty)
+ *     freeList --> c1 --> c2 --> c3 --> c4 --> (end)
+ *     al[0]    (empty)
+ *     al[1]    --> c1 --> (end)
+ *     al[2]    (empty)
+ *     ...
+ *     al[N-1]  (empty)
+ *     al[N]    ---------> c2 --> c3 --> c4 --> (end)
+ *
+ *     now we call removeFree(2) tree times. This will move c2 to c4 to busy list
+ *     busyList --> c4 --> c3 --> c2 --> (end)
+ *     freeList --> c1 ----------------> (end)
+ *     al[0]    (empty)
+ *     al[1]    --> c1 --> (end)
+ *     al[2]    (empty)
+ *     ...
+ *     al[N-1]  (empty)
+ *     al[N]    (empty)
+ *
+ *     when we return the connections (c4 to c2), we have this picture
+ *     and there are no more affinity nodes left.
+ *
+ *     busyList (empty)
+ *     freeList --> c2 --> c3 --> c4 --> c1 --> (end)
+ *     al[0]    (empty)
+ *     al[1]    --> c1 --> (end)
+ *     al[2]    --> c2 --> c3 --> c4 --> (end)
+ *     ...
+ *     al[N-1]  (empty)
+ *     al[N]    (empty)
+ *
+ *     subsequent queries to affinityId=1 / 2 will return c1, respectively c2..c4
+ *
+ *     querying for a connection with affinityId=3 will return null,
+ *     because there is neither a matching one nor a null one.
+ *
+ *     The caller can now decide to create a new connection "c5" or
+ *     query with GET_OLDEST for "c1"
+ * 
+ *

+ * All thread safety controlled externally (by PooledConnectionQueue). + *

+ */ +final class ConnectionBuffer { + + // special key to return the oldest connection from freeList. + static final Object GET_LAST = new Object(); + static final Object GET_FIRST = new Object(); + + private final ConnectionList[] affinityLists; + private final ConnectionList freeList = new ConnectionList(); + private final ConnectionList busyList = new ConnectionList(); + + private final int hashSize; + + ConnectionBuffer(int hashSize) { + assert hashSize >= 0; + this.hashSize = hashSize; + if (hashSize == 0) { + affinityLists = null; + } else { + // we instantiate hashSize+1 slots. The last slot is reserved for connections + // with `null` as affinityId + affinityLists = new ConnectionList[hashSize + 1]; + for (int i = 0; i < affinityLists.length; i++) { + affinityLists[i] = new ConnectionList(); + } + } + } + + /** + * affinity is supported by the buffer. + */ + boolean isAffinitySupported() { + return affinityLists != null; + } + + /** + * Return the number of free connections. + */ + int freeSize() { + return freeList.size(); + } + + /** + * Return the number of busy connections. + */ + int busySize() { + return busyList.size(); + } + + /** + * Add the connection to the beginning of the free list. + *

+ * Note, the connection must be either new or unlinked from the busy list. + */ + void addFree(PooledConnection c) { + c.unlink(); + freeList.addFirst(c.busyFree()); + if (affinityLists != null) { + if (c.affinityId() != null) { + affinityLists[c.affinityId().hashCode() % hashSize].addFirst(c.affinity()); + } else { + affinityLists[hashSize].addFirst(c.affinity()); + } + } + } + + /** + * Adds the connection to the busy list. + *

+ * Note, the connection must be either new or unlinked from the free list. + */ + int addBusy(PooledConnection c) { + busyList.addFirst(c.busyFree()); + return busyList.size(); + } + + /** + * Removes the connection from the busy list. + * Returns true, if this connection was part of the busy list or false, if not (or removed twice) + */ + boolean removeBusy(PooledConnection c) { + if (busyList.isLinkedTo(c.busyFree())) { + c.unlink(); + return true; + } + return false; + } + + /** + * Remove a connection from the free list. Returns null if there is not any. + *

+ * Connections that are returend from this method must be either added to busyList with + * addBusy or closed fully. + * + * @param affinityId the preferred affinity-id. + * If null is provided, the first element in the list is + * returned. + * If the affinity-id is not present in the list, null + * is returned. The caller can decide to create a new connection or + * ask again with POP_LAST, which returns the last + * (=oldest) connection if affinity is enabled. + */ + PooledConnection removeFree(Object affinityId) { + PooledConnection pc; + if (affinityLists == null) { + // affinity disabled. Always use first in list + pc = freeList.peekFirst(); + } else if (affinityId == null) { + // null affinity passed + pc = affinityLists[hashSize].peekFirst(); + } else if (affinityId == GET_FIRST) { + // explicitly first one was requested (for heartbeat) + pc = freeList.peekFirst(); + } else if (affinityId == GET_LAST) { + // explicitly last one was requested (for changing affinityId) + pc = freeList.peekLast(); + } else { + // we have an affinity id request + pc = affinityLists[affinityId.hashCode() % hashSize].find(affinityId); + if (pc == null) { + // no pc with this affinity-id in the pool. Query "null"-affinityList + pc = affinityLists[hashSize].peekFirst(); + } + } + if (pc == null) { + return null; + } + pc.unlink(); + return pc; + } + + + /** + * Clears the freelist and return the connections. + */ + List clearFreeList() { + List tempList = new ArrayList<>(); + + freeList.forEach(pc -> { + pc.unlink(); + tempList.add(pc); + }); + return tempList; + } + + + /** + * Clears the busy list and return the connections that should be considered leaked. + */ + List cleanupBusyList(long leakTimeMinutes) { + long olderThanTime = System.currentTimeMillis() - (leakTimeMinutes * 60000); + List tempList = new ArrayList<>(); + + busyList.forEach(pc -> { + if (pc.lastUsedTime() > olderThanTime) { + // PooledConnection has been used recently or + // expected to be longRunning so not closing... + } else { + pc.unlink(); + tempList.add(pc); + } + }); + return tempList; + } + + /** + * Trim any inactive connections that have not been used since usedSince. + */ + List trim(int minSize, long usedSince, long createdSince) { + int toTrim = freeSize() - minSize; + + List ret = new ArrayList<>(toTrim); + for (PooledConnection pc : freeList.reverse()) { + if (ret.size() >= toTrim) { + break; + } + if (pc.shouldTrim(usedSince, createdSince)) { + pc.unlink(); + ret.add(pc); + } + } + return ret; + } + + /** + * Returns information describing connections that are currently being used. + */ + String busyConnectionInformation(boolean toLogger) { + if (toLogger) { + Log.info("Dumping [{0}] busy connections: (Use datasource.xxx.capturestacktrace=true ... to get stackTraces)", busySize()); + } + StringBuilder sb = new StringBuilder(); + busyList.forEach(pc -> { + if (toLogger) { + Log.info("Busy Connection - {0}", pc.fullDescription()); + } else { + sb.append(pc.fullDescription()).append("\r\n"); + } + }); + return sb.toString(); + } +} diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionList.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionList.java new file mode 100644 index 0000000..210ca82 --- /dev/null +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionList.java @@ -0,0 +1,199 @@ +package io.ebean.datasource.pool; + +import java.util.Iterator; +import java.util.Objects; + +/** + * A linked list implementation designed for pooledConnections. + *

+ * The linkedList supports adding and removing connections in constant time. + * In contrast to the java.util.LinkedList, this linkedList provides access + * to the list nodes. The nodes can be unlinked from one list and can be + * added to another. This saves a bit overhead of creating new node objects + * on each transition from free to busy list. + *

+ * the list implements Iterable and reverse() for reverse traversion. + * + * @author Roland Praml, Foconis Analytics GmbH + */ +final class ConnectionList implements Iterable { + + private final Node head; + private final Node tail; + private int size; + + /** + * Construct new list with two boundary nodes. + */ + ConnectionList() { + // initialize the two boundary nodes; + head = new Node(null); + tail = new Node(null); + head.next = tail; + tail.prev = head; + } + + /** + * Adds the node in front of the list. This works in constant time + */ + void addFirst(Node n) { + assert !n.isBoundaryNode() : "this is a boundary node"; + assert !n.isLinked() : "Node already member of a list"; + n.next = head.next; + n.prev = head; + head.next.prev = n; + head.next = n; + n.list = this; + size++; + } + + /** + * returns the first element in the list or null if list is empty + */ + PooledConnection peekFirst() { + Node ret = head.next; + return ret.isBoundaryNode() ? null : ret.pc; + } + + /** + * returns last first element in the list or null if list is empty + */ + PooledConnection peekLast() { + Node ret = tail.prev; + return ret.isBoundaryNode() ? null : ret.pc; + } + + /** + * Iterates the list starting with first element. + */ + public Iterator iterator() { + return new Iterator<>() { + private Node n = head.next; + private PooledConnection pc; + + @Override + public boolean hasNext() { + return !n.isBoundaryNode(); + } + + @Override + public PooledConnection next() { + pc = n.pc; + n = n.next; + return pc; + } + + @Override + public void remove() { + pc.unlink(); + } + }; + } + + /** + * Iterates the reverse way over the list + */ + Iterable reverse() { + return () -> new Iterator<>() { + private Node n = tail.prev; + private PooledConnection pc; + + @Override + public boolean hasNext() { + return !n.isBoundaryNode(); + } + + @Override + public PooledConnection next() { + pc = n.pc; + n = n.prev; + return pc; + } + + @Override + public void remove() { + pc.unlink(); + } + }; + } + + /** + * Finds the node with this affinity id. + */ + PooledConnection find(Object affinityId) { + Node n = head.next; + while (!n.isBoundaryNode()) { + if (Objects.equals(affinityId, n.pc.affinityId())) { + return n.pc; + } + n = n.next; + } + return null; + } + + int size() { + return size; + } + + /** + * Returns true, if this node is linked to that list. + */ + boolean isLinkedTo(Node node) { + // The implementation relies on node.list == this + // which is guaranteed by the add/unlink mehtod. + return node != null && node.list == this; + } + + + /** + * Node of a linkedlist. The linkedLists always have two empty nodes + * at the start and end. (boundary nodes) + *

+ * the first usable node is startNode.next (which could be the end boundary) + */ + static final class Node { + + private Node next; + private Node prev; + private ConnectionList list; + private final PooledConnection pc; + + Node(PooledConnection pc) { + this.pc = pc; + } + + /** + * Retruns true, if this is a boundary node. (start or end node of list) + */ + private boolean isBoundaryNode() { + return pc == null; + } + + /** + * Removes the node from the list. The node can be re-added to an other list. + *

+ * Note: As PooledConnections are often in two lists, always use + * PooledConnection.detach() instead of calling this method directly. + */ + boolean unlink() { + assert !isBoundaryNode() : "called remove on a boundary node"; + if (!isLinked()) { + return false; + } + list.size--; + next.prev = prev; + prev.next = next; + prev = null; + next = null; + list = null; + return true; + } + + /** + * Returns true, if this node is linked in a list. + */ + boolean isLinked() { + return list != null; + } + } +} diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java index 3fdc440..ff660a7 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java @@ -10,6 +10,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; import static io.ebean.datasource.pool.TransactionIsolation.description; @@ -67,6 +68,8 @@ final class ConnectionPool implements DataSourcePool { private final String applicationName; private final DataSource source; private final boolean validateOnHeartbeat; + private final int affinitySize; + private final Supplier affinityProvider; private long nextTrimTime; /** @@ -123,6 +126,13 @@ final class ConnectionPool implements DataSourcePool { this.validateStaleMillis = params.validateStaleMillis(); this.applicationName = params.getApplicationName(); this.clientInfo = params.getClientInfo(); + if (params.getAffinityProvider() == null) { + this.affinitySize = 0; + this.affinityProvider = () -> null; // dummy + } else { + this.affinityProvider = params.getAffinityProvider(); + this.affinitySize = params.getAffinitySize(); + } this.queue = new PooledConnectionQueue(this); this.schema = params.getSchema(); this.catalog = params.catalog(); @@ -372,7 +382,7 @@ private void testConnection() { PooledConnection conn = null; try { // Get a connection from the pool and test it - conn = getPooledConnection(); + conn = getPooledConnection(ConnectionBuffer.GET_FIRST); heartbeatPoolExhaustedCount = 0; if (testConnection(conn)) { notifyDataSourceIsUp(); @@ -619,7 +629,15 @@ public Connection getConnection(String username, String password) throws SQLExce */ @Override public Connection getConnection() throws SQLException { - return getPooledConnection(); + return getPooledConnection(affinityProvider.get()); + } + + /** + * Return a pooled connection with given affinity id. + */ + @Override + public DataSourceConnection getConnection(Object affinityId) throws SQLException { + return getPooledConnection(affinityId); } /** @@ -628,8 +646,8 @@ public Connection getConnection() throws SQLException { * This will grow the pool if all the current connections are busy. This * will go into a wait if the pool has hit its maximum size. */ - private PooledConnection getPooledConnection() throws SQLException { - PooledConnection c = queue.obtainConnection(); + private PooledConnection getPooledConnection(Object affinitiyId) throws SQLException { + PooledConnection c = queue.obtainConnection(affinitiyId); if (captureStackTrace) { c.setStackTrace(Thread.currentThread().getStackTrace()); } @@ -753,6 +771,10 @@ int pstmtCacheSize() { return pstmtCacheSize; } + int affinitySize() { + return affinitySize; + } + /** * Not implemented and shouldn't be used. */ diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/FreeConnectionBuffer.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/FreeConnectionBuffer.java deleted file mode 100644 index 73ba836..0000000 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/FreeConnectionBuffer.java +++ /dev/null @@ -1,76 +0,0 @@ -package io.ebean.datasource.pool; - -import java.util.*; - -/** - * A buffer designed especially to hold free pooled connections. - *

- * All thread safety controlled externally (by PooledConnectionQueue). - *

- */ -final class FreeConnectionBuffer { - - /** - * Buffer oriented for add and remove. - */ - private final LinkedList freeBuffer = new LinkedList<>(); - - /** - * Return the number of entries in the buffer. - */ - int size() { - return freeBuffer.size(); - } - - /** - * Return true if the buffer is empty. - */ - boolean isEmpty() { - return freeBuffer.isEmpty(); - } - - /** - * Add connection to the free list. - */ - void add(PooledConnection pc) { - freeBuffer.addFirst(pc); - } - - /** - * Remove a connection from the free list. - */ - PooledConnection remove() { - return freeBuffer.removeFirst(); - } - - /** - * Close all connections in this buffer. - */ - void closeAll(boolean logErrors) { - List tempList = new ArrayList<>(freeBuffer); - freeBuffer.clear(); - if (Log.isLoggable(System.Logger.Level.TRACE)) { - Log.trace("... closing all {0} connections from the free list with logErrors: {1}", tempList.size(), logErrors); - } - for (PooledConnection connection : tempList) { - connection.closeConnectionFully(logErrors); - } - } - - /** - * Trim any inactive connections that have not been used since usedSince. - */ - int trim(int minSize, long usedSince, long createdSince) { - int trimCount = 0; - ListIterator iterator = freeBuffer.listIterator(minSize); - while (iterator.hasNext()) { - PooledConnection pooledConnection = iterator.next(); - if (pooledConnection.shouldTrim(usedSince, createdSince)) { - iterator.remove(); - pooledConnection.closeConnectionFully(true); - trimCount++; - } - } - return trimCount; - } -} diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java index 4d2b2bd..e9b1288 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java @@ -1,5 +1,7 @@ package io.ebean.datasource.pool; +import io.ebean.datasource.DataSourceConnection; + import java.sql.*; import java.util.ArrayList; import java.util.Map; @@ -17,7 +19,7 @@ * It has caching of Statements and PreparedStatements. Remembers the last * statement that was executed. Keeps statistics on how long it is in use. */ -final class PooledConnection extends ConnectionDelegator { +final class PooledConnection extends ConnectionDelegator implements DataSourceConnection { private static final String IDLE_CONNECTION_ACCESSED_ERROR = "Pooled Connection has been accessed whilst idle in the pool, via method: "; @@ -132,10 +134,12 @@ final class PooledConnection extends ConnectionDelegator { private String createdByMethod; private StackTraceElement[] stackTrace; private final int maxStackTrace; - /** - * Slot position in the BusyConnectionBuffer. - */ - private int slotId; + + // node in busyFree or affinity list + private final ConnectionList.Node busyFree = new ConnectionList.Node(this); + private final ConnectionList.Node affinity = new ConnectionList.Node(this); + + private Object affinityId; /** @@ -184,17 +188,34 @@ final class PooledConnection extends ConnectionDelegator { } /** - * Return the slot position in the busy buffer. + * Return the node in the busy list. If this is empty, the connection is free + */ + ConnectionList.Node busyFree() { + return busyFree; + } + + ConnectionList.Node affinity() { + return affinity; + } + + /** + * Unlinks the pooledConnection from the busyFree and affinity-list. */ - int slotId() { - return slotId; + void unlink() { + busyFree.unlink(); + affinity.unlink(); } /** - * Set the slot position in the busy buffer. + * Return the affinity-id (only for busy connections!) */ - void setSlotId(int slotId) { - this.slotId = slotId; + @Override + public Object affinityId() { + return affinityId; + } + + void setAffinityId(Object affinityId) { + this.affinityId = affinityId; } /** diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java index 7a66c98..85d8167 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java @@ -5,6 +5,8 @@ import io.ebean.datasource.pool.ConnectionPool.Status; import java.sql.SQLException; +import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -18,14 +20,9 @@ final class PooledConnectionQueue { private final String name; private final ConnectionPool pool; /** - * A 'circular' buffer designed specifically for free connections. + * A buffer designed specifically for free and busy connections. */ - private final FreeConnectionBuffer freeList; - /** - * A 'slots' buffer designed specifically for busy connections. - * Fast add remove based on slot id. - */ - private final BusyConnectionBuffer busyList; + private final ConnectionBuffer buffer; /** * Main lock guarding all access */ @@ -74,14 +71,13 @@ final class PooledConnectionQueue { this.waitTimeoutMillis = pool.waitTimeoutMillis(); this.maxAgeMillis = pool.maxAgeMillis(); this.validateStaleMillis = pool.validateStaleMillis(); - this.busyList = new BusyConnectionBuffer(maxSize, 20); - this.freeList = new FreeConnectionBuffer(); + this.buffer = new ConnectionBuffer(pool.affinitySize()); this.lock = new ReentrantLock(false); this.notEmpty = lock.newCondition(); } private PoolStatus createStatus() { - return new Status(minSize, maxSize, freeList.size(), busyList.size(), waitingThreads, highWaterMark, + return new Status(minSize, maxSize, buffer.freeSize(), buffer.busySize(), waitingThreads, highWaterMark, waitCount, hitCount, totalAcquireNanos, maxAcquireNanos); } @@ -100,7 +96,7 @@ PoolStatus status(boolean reset) { try { PoolStatus s = createStatus(); if (reset) { - highWaterMark = busyList.size(); + highWaterMark = buffer.busySize(); hitCount = 0; waitCount = 0; maxAcquireNanos = 0; @@ -113,20 +109,14 @@ PoolStatus status(boolean reset) { } void setMaxSize(int maxSize) { - lock.lock(); - try { - if (maxSize < this.minSize) { - throw new IllegalArgumentException("maxSize " + maxSize + " < minSize " + this.minSize); - } - this.busyList.setCapacity(maxSize); - this.maxSize = maxSize; - } finally { - lock.unlock(); + if (maxSize < this.minSize) { + throw new IllegalArgumentException("maxSize " + maxSize + " < minSize " + this.minSize); } + this.maxSize = maxSize; } private int totalConnections() { - return freeList.size() + busyList.size(); + return buffer.freeSize() + buffer.busySize(); } void ensureMinimumConnections() throws SQLException { @@ -135,7 +125,7 @@ void ensureMinimumConnections() throws SQLException { int add = minSize - totalConnections(); if (add > 0) { for (int i = 0; i < add; i++) { - freeList.add(pool.createConnectionForQueue(connectionId++)); + buffer.addFree(pool.createConnectionForQueue(connectionId++)); } notEmpty.signal(); } @@ -148,27 +138,48 @@ void ensureMinimumConnections() throws SQLException { * Return a PooledConnection. */ void returnPooledConnection(PooledConnection c, boolean forceClose) { + boolean closeConnection = false; lock.lock(); try { - if (!busyList.remove(c)) { + if (!buffer.removeBusy(c)) { Log.error("Connection [{0}] not found in BusyList?", c); } if (forceClose || c.shouldTrimOnReturn(lastResetTime, maxAgeMillis)) { - c.closeConnectionFully(false); + closeConnection = true; } else { - freeList.add(c); + buffer.addFree(c); notEmpty.signal(); } } finally { lock.unlock(); } + if (closeConnection) { + c.closeConnectionFully(false); + } } - private PooledConnection extractFromFreeList() { - if (freeList.isEmpty()) { + /** + * Tries to extract the best matching connection from the freeList. + * If there is none, it tries to create one or steal one from other affinity slot. + */ + private PooledConnection extractFromFreeList(Object affinitiyId, boolean create) throws SQLException { + PooledConnection c = extractFromFreeList(affinitiyId); + if (c == null && create) { + c = createConnection(); + } + if (c == null && buffer.isAffinitySupported()) { + // TODO: This "stealing" stragegy could be optimized. + // we might build a heurestic, which one would be the best candidate + c = extractFromFreeList(ConnectionBuffer.GET_LAST); + } + return c; + } + + private PooledConnection extractFromFreeList(Object affinitiyId) { + PooledConnection c = buffer.removeFree(affinitiyId); + if (c == null) { return null; } - final PooledConnection c = freeList.remove(); if (validateStaleMillis > 0 && staleEviction(c)) { c.closeConnectionFully(false); return null; @@ -187,14 +198,32 @@ private boolean staleEviction(PooledConnection c) { return pool.invalidConnection(c); } + + private PooledConnection createConnection() throws SQLException { + if (totalConnections() < maxSize) { + // grow the connection pool + PooledConnection c = pool.createConnectionForQueue(connectionId++); + int busySize = registerBusyConnection(c); + if (Log.isLoggable(DEBUG)) { + Log.debug("DataSource [{0}] grow; id[{1}] busy[{2}] max[{3}]", name, c.name(), busySize, maxSize); + } + return c; + } else { + return null; + } + } + private boolean stale(PooledConnection c) { return c.lastUsedTime() < System.currentTimeMillis() - validateStaleMillis; } - PooledConnection obtainConnection() throws SQLException { + PooledConnection obtainConnection(Object affinitiyId) throws SQLException { try { - PooledConnection pc = _obtainConnection(); + PooledConnection pc = _obtainConnection(affinitiyId); pc.resetForUse(); + if (affinitiyId != ConnectionBuffer.GET_FIRST && affinitiyId != ConnectionBuffer.GET_LAST) { + pc.setAffinityId(affinitiyId); + } return pc; } catch (InterruptedException e) { @@ -208,14 +237,14 @@ PooledConnection obtainConnection() throws SQLException { * Register the PooledConnection with the busyList. */ private int registerBusyConnection(PooledConnection connection) { - int busySize = busyList.add(connection); + int busySize = buffer.addBusy(connection); if (busySize > highWaterMark) { highWaterMark = busySize; } return busySize; } - private PooledConnection _obtainConnection() throws InterruptedException, SQLException { + private PooledConnection _obtainConnection(Object affinitiyId) throws InterruptedException, SQLException { var start = System.nanoTime(); lock.lockInterruptibly(); try { @@ -227,13 +256,9 @@ private PooledConnection _obtainConnection() throws InterruptedException, SQLExc hitCount++; // are other threads already waiting? (they get priority) if (waitingThreads == 0) { - PooledConnection connection = extractFromFreeList(); - if (connection != null) { - return connection; - } - connection = createConnection(); - if (connection != null) { - return connection; + PooledConnection c = this.extractFromFreeList(affinitiyId, true); + if (c != null) { + return c; } } try { @@ -241,7 +266,7 @@ private PooledConnection _obtainConnection() throws InterruptedException, SQLExc // a wait loop until connections are returned into the pool. waitCount++; waitingThreads++; - return _obtainConnectionWaitLoop(); + return _obtainConnectionWaitLoop(null); } finally { waitingThreads--; } @@ -253,24 +278,10 @@ private PooledConnection _obtainConnection() throws InterruptedException, SQLExc } } - private PooledConnection createConnection() throws SQLException { - if (busyList.size() < maxSize) { - // grow the connection pool - PooledConnection c = pool.createConnectionForQueue(connectionId++); - int busySize = registerBusyConnection(c); - if (Log.isLoggable(DEBUG)) { - Log.debug("DataSource [{0}] grow; id[{1}] busy[{2}] max[{3}]", name, c.name(), busySize, maxSize); - } - return c; - } else { - return null; - } - } - /** * Got into a loop waiting for connections to be returned to the pool. */ - private PooledConnection _obtainConnectionWaitLoop() throws SQLException, InterruptedException { + private PooledConnection _obtainConnectionWaitLoop(Object affinitiyId) throws SQLException, InterruptedException { long nanos = MILLIS_TIME_UNIT.toNanos(waitTimeoutMillis); for (; ; ) { if (nanos <= 0) { @@ -291,9 +302,10 @@ private PooledConnection _obtainConnectionWaitLoop() throws SQLException, Interr try { nanos = notEmpty.awaitNanos(nanos); - if (!freeList.isEmpty()) { + PooledConnection c = this.extractFromFreeList(affinitiyId, false); + if (c != null) { // successfully waited - return extractFromFreeList(); + return c; } } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread @@ -313,8 +325,8 @@ PoolStatus shutdown(boolean closeBusyConnections) { // connections close on return to pool lastResetTime = System.currentTimeMillis() - 100; } else { - if (!busyList.isEmpty()) { - Log.warn("Closing busy connections on shutdown size: {0}", busyList.size()); + if (buffer.busySize() > 0) { + Log.warn("Closing busy connections on shutdown size: {0}", buffer.busySize()); dumpBusyConnectionInformation(); closeBusyConnections(0); } @@ -353,17 +365,12 @@ void reset(long leakTimeMinutes) { } void trim(long maxInactiveMillis, long maxAgeMillis) { - lock.lock(); - try { - if (trimInactiveConnections(maxInactiveMillis, maxAgeMillis)) { - try { - ensureMinimumConnections(); - } catch (SQLException e) { - Log.error("Error trying to ensure minimum connections", e); - } + if (trimInactiveConnections(maxInactiveMillis, maxAgeMillis)) { + try { + ensureMinimumConnections(); + } catch (SQLException e) { + Log.error("Error trying to ensure minimum connections", e); } - } finally { - lock.unlock(); } } @@ -372,33 +379,50 @@ void trim(long maxInactiveMillis, long maxAgeMillis) { */ private boolean trimInactiveConnections(long maxInactiveMillis, long maxAgeMillis) { final long createdSince = (maxAgeMillis == 0) ? 0 : System.currentTimeMillis() - maxAgeMillis; - final int trimmedCount; - if (freeList.size() > minSize) { - // trim on maxInactive and maxAge - long usedSince = System.currentTimeMillis() - maxInactiveMillis; - trimmedCount = freeList.trim(minSize, usedSince, createdSince); - } else if (createdSince > 0) { - // trim only on maxAge - trimmedCount = freeList.trim(0, createdSince, createdSince); - } else { - trimmedCount = 0; + final List toTrim; + lock.lock(); + try { + if (buffer.freeSize() > minSize) { + // trim on maxInactive and maxAge + long usedSince = System.currentTimeMillis() - maxInactiveMillis; + toTrim = buffer.trim(minSize, usedSince, createdSince); + } else if (createdSince > 0) { + // trim only on maxAge + toTrim = buffer.trim(0, createdSince, createdSince); + } else { + toTrim = Collections.emptyList(); + } + } finally { + lock.unlock(); } - if (trimmedCount > 0 && Log.isLoggable(DEBUG)) { - Log.debug("DataSource [{0}] trimmed [{1}] inactive connections. New size[{2}]", name, trimmedCount, totalConnections()); + if (toTrim.isEmpty()) { + return false; + } + toTrim.forEach(pc -> pc.closeConnectionFully(true)); + if (Log.isLoggable(DEBUG)) { + Log.debug("DataSource [{0}] trimmed [{1}] inactive connections. New size[{2}]", name, toTrim.size(), totalConnections()); } - return trimmedCount > 0 && freeList.size() < minSize; + return buffer.freeSize() < minSize; } /** * Close all the connections that are in the free list. */ private void closeFreeConnections(boolean logErrors) { + List tempList; lock.lock(); try { - freeList.closeAll(logErrors); + tempList = buffer.clearFreeList(); } finally { lock.unlock(); } + // closing the connections is done outside lock + if (Log.isLoggable(System.Logger.Level.TRACE)) { + Log.trace("... closing all {0} connections from the free list with logErrors: {1}", tempList.size(), logErrors); + } + for (PooledConnection connection : tempList) { + connection.closeConnectionFully(logErrors); + } } /** @@ -412,12 +436,22 @@ private void closeFreeConnections(boolean logErrors) { * closed and put back into the pool. */ void closeBusyConnections(long leakTimeMinutes) { + List tempList; lock.lock(); try { - busyList.closeBusyConnections(leakTimeMinutes); + tempList = buffer.cleanupBusyList(leakTimeMinutes); } finally { lock.unlock(); } + for (PooledConnection pc : tempList) { + try { + Log.warn("DataSource closing busy connection? {0}", pc.fullDescription()); + System.out.println("CLOSING busy connection: " + pc.fullDescription()); + pc.closeConnectionFully(false); + } catch (Exception ex) { + Log.error("Error when closing potentially leaked connection " + pc.description(), ex); + } + } } String getBusyConnectionInformation() { @@ -434,7 +468,7 @@ void dumpBusyConnectionInformation() { private String getBusyConnectionInformation(boolean toLogger) { lock.lock(); try { - return busyList.busyConnectionInformation(toLogger); + return buffer.busyConnectionInformation(toLogger); } finally { lock.unlock(); } diff --git a/ebean-datasource/src/test/java/io/ebean/datasource/pool/BusyConnectionBufferTest.java b/ebean-datasource/src/test/java/io/ebean/datasource/pool/BusyConnectionBufferTest.java deleted file mode 100644 index 67e258b..0000000 --- a/ebean-datasource/src/test/java/io/ebean/datasource/pool/BusyConnectionBufferTest.java +++ /dev/null @@ -1,99 +0,0 @@ -package io.ebean.datasource.pool; - -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class BusyConnectionBufferTest { - - @Test - public void test() { - - BusyConnectionBuffer b = new BusyConnectionBuffer(2, 4); - - PooledConnection p0 = new PooledConnection("0"); - PooledConnection p1 = new PooledConnection("1"); - PooledConnection p2 = new PooledConnection("2"); - PooledConnection p3 = new PooledConnection("3"); - - assertEquals(2, b.capacity()); - b.add(p0); - b.add(p1); - assertEquals(2, b.capacity()); - b.add(p2); - assertEquals(6, b.capacity()); - b.add(p3); - - assertEquals(0, p0.slotId()); - assertEquals(1, p1.slotId()); - assertEquals(2, p2.slotId()); - assertEquals(3, p3.slotId()); - - b.remove(p2); - b.add(p2); - assertEquals(4, p2.slotId()); - - b.remove(p0); - b.add(p0); - assertEquals(5, p0.slotId()); - - b.remove(p2); - b.add(p2); - assertEquals(0, p2.slotId()); - - } - - @Test - public void test_rotate() { - - BusyConnectionBuffer b = new BusyConnectionBuffer(2, 2); - - PooledConnection p0 = new PooledConnection("0"); - PooledConnection p1 = new PooledConnection("1"); - PooledConnection p2 = new PooledConnection("2"); - PooledConnection p3 = new PooledConnection("3"); - - assertEquals(2, b.capacity()); - assertEquals(0, b.size()); - - b.add(p0); - b.add(p1); - assertEquals(2, b.size()); - assertEquals(2, b.capacity()); - b.add(p2); - assertEquals(3, b.size()); - assertEquals(4, b.capacity()); - b.add(p3); - assertEquals(4, b.size()); - assertEquals(4, b.capacity()); - - assertEquals(0, p0.slotId()); - assertEquals(1, p1.slotId()); - assertEquals(2, p2.slotId()); - assertEquals(3, p3.slotId()); - - b.remove(p2); - assertEquals(3, b.size()); - b.remove(p0); - assertEquals(2, b.size()); - b.remove(p3); - assertEquals(1, b.size()); - b.add(p2); - assertEquals(2, b.size()); - assertEquals(0, p2.slotId()); - - b.remove(p0); - assertEquals(2, b.size()); - b.add(p0); - assertEquals(3, b.size()); - - // p1 is still in it's slot - assertEquals(2, p0.slotId()); - - b.remove(p2); - b.add(p2); - assertEquals(3, p2.slotId()); - - } - -} diff --git a/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java new file mode 100644 index 0000000..88db122 --- /dev/null +++ b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionBufferTest.java @@ -0,0 +1,197 @@ +package io.ebean.datasource.pool; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.*; + +class ConnectionBufferTest { + + @Test + void test() { + + ConnectionBuffer b = new ConnectionBuffer(257); + + PooledConnection p0 = new PooledConnection("0"); + PooledConnection p1 = new PooledConnection("1"); + PooledConnection p2 = new PooledConnection("2"); + + + assertEquals(0, b.freeSize()); + + b.addFree(p0); + + assertEquals(1, b.freeSize()); + + PooledConnection r0 = b.removeFree(null); + b.addBusy(p0); + assertThat(p0).isSameAs(r0); + + assertEquals(0, b.freeSize()); + + assertEquals(0, b.freeSize()); + assertEquals(1, b.busySize()); + b.addFree(p0); + assertEquals(1, b.freeSize()); + assertEquals(0, b.busySize()); + + b.addFree(p1); + b.addFree(p2); + + assertEquals(3, b.freeSize()); + + PooledConnection r1 = b.removeFree(null); + b.addBusy(r1); + assertSame(p2, r1); + PooledConnection r2 = b.removeFree(null); + b.addBusy(r2); + assertSame(p1, r2); + + assertEquals(1, b.freeSize()); + b.addFree(r1); + + assertEquals(2, b.freeSize()); + PooledConnection r3 = b.removeFree(null); + b.addBusy(r3); + assertSame(p2, r3); + assertEquals(1, b.freeSize()); + PooledConnection r4 = b.removeFree(null); + b.addBusy(r4); + assertSame(p0, r4); + assertEquals(0, b.freeSize()); + + b.addFree(r3);// = p2 + b.addFree(r2);// = p1 + b.addFree(r4);// = p0 + + assertEquals(3, b.freeSize()); + + PooledConnection r5 = b.removeFree(null); + b.addBusy(r5); + assertSame(p0, r5); + assertEquals(2, b.freeSize()); + + PooledConnection r6 = b.removeFree(null); + b.addBusy(r6); + assertSame(p1, r6); + assertEquals(1, b.freeSize()); + + PooledConnection r7 = b.removeFree(null); + b.addBusy(r7); + assertSame(p2, r7); + assertEquals(0, b.freeSize()); + + } + + + @Test + public void test_busy_free() { + + ConnectionBuffer b = new ConnectionBuffer(257); + + PooledConnection p0 = new PooledConnection("0"); + PooledConnection p1 = new PooledConnection("1"); + PooledConnection p2 = new PooledConnection("2"); + PooledConnection p3 = new PooledConnection("3"); + + assertEquals(0, b.busySize()); + assertEquals(0, b.freeSize()); + b.addBusy(p0); + b.addBusy(p1); + + assertEquals(2, b.busySize()); + assertEquals(0, b.freeSize()); + + b.addFree(p2); + b.addFree(p3); + + assertEquals(2, b.busySize()); + assertEquals(2, b.freeSize()); + + PooledConnection c3 = b.removeFree(null); + assertSame(p3, c3); + assertEquals(2, b.busySize()); + assertEquals(1, b.freeSize()); + + b.addBusy(c3); + assertEquals(3, b.busySize()); + assertEquals(1, b.freeSize()); + assertThatThrownBy(() -> b.addBusy(p3)).hasMessageContaining("Node already member of a list"); + assertEquals(3, b.busySize()); + + PooledConnection c2 = b.removeFree(null); + b.addBusy(c2); + assertSame(p2, c2); + + assertEquals(4, b.busySize()); + assertEquals(0, b.freeSize()); + + assertNull(b.removeFree(null)); // no free connections left + + // all are busy now + assertTrue(p0.busyFree().isLinked()); + assertTrue(p1.busyFree().isLinked()); + assertTrue(p2.busyFree().isLinked()); + assertTrue(p3.busyFree().isLinked()); + + b.removeBusy(p0); + assertEquals(3, b.busySize()); + assertEquals(0, b.freeSize()); + assertFalse(p0.busyFree().isLinked()); + + assertFalse(b.removeBusy(p0)); + assertTrue(b.removeBusy(p1)); + b.addFree(p1); + assertFalse(b.removeBusy(p1)); + + assertEquals(2, b.busySize()); + assertEquals(1, b.freeSize()); + + b.addFree(p2); + b.addFree(p3); + b.addFree(p3); + + assertEquals(0, b.busySize()); + assertEquals(3, b.freeSize()); + } + + @Test + public void test_Affinity() { + + ConnectionBuffer b = new ConnectionBuffer(257); + + PooledConnection p0 = new PooledConnection("0"); + PooledConnection p1 = new PooledConnection("1"); + PooledConnection p2 = new PooledConnection("2"); + PooledConnection p3 = new PooledConnection("3"); + + b.addFree(p0); + b.addFree(p1); + b.addFree(p2); + b.addFree(p3); + + PooledConnection c1 = getConnection(b, 42); + PooledConnection c2 = getConnection(b, 17); + b.addFree(c1); + b.addFree(c2); + + PooledConnection c3 = getConnection(b,43); + assertNotSame(c3, c1); + assertNotSame(c2, c1); + + PooledConnection c4 = getConnection(b,42); + assertSame(c4, c1); + } + + private static PooledConnection getConnection(ConnectionBuffer b, Object affinity) { + PooledConnection c1 = b.removeFree(affinity); + if (c1 == null) { + c1 = b.removeFree(ConnectionBuffer.GET_LAST); + } + c1.setAffinityId(affinity); + b.addBusy(c1); + return c1; + } + +} diff --git a/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionListTest.java b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionListTest.java new file mode 100644 index 0000000..4140ab7 --- /dev/null +++ b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionListTest.java @@ -0,0 +1,83 @@ +package io.ebean.datasource.pool; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class ConnectionListTest { + + /** + * Test adding and unlinking from connectionList. + */ + @Test + void test() { + + ConnectionList l1 = new ConnectionList(); + ConnectionList l2 = new ConnectionList(); + + PooledConnection p0 = new PooledConnection("0"); + p0.setAffinityId(43); + PooledConnection p1 = new PooledConnection("1"); + p1.setAffinityId(44); + PooledConnection p2 = new PooledConnection("2"); + p2.setAffinityId(45); + + assertThat(l1.size()).isEqualTo(0); + assertThat(l1.peekFirst()).isNull(); + assertThat(l1.peekLast()).isNull(); + assertThat(l1.find(42)).isNull(); + + + l1.addFirst(p0.busyFree()); + l2.addFirst(p1.affinity()); + + assertThat(l1.size()).isEqualTo(1); + assertThat(l1.peekFirst()).isSameAs(p0); + assertThat(l1.peekLast()).isSameAs(p0); + assertThat(l1.find(42)).isNull(); + assertThat(l1.find(43)).isSameAs(p0); + + assertThat(l2.size()).isEqualTo(1); + + l1.addFirst(p1.busyFree()); + l1.addFirst(p2.busyFree()); + + assertThat(l1.size()).isEqualTo(3); + assertThat(l2.size()).isEqualTo(1); + + assertThat(l1).containsExactly(p2, p1, p0); + assertThat(l1.reverse()).containsExactly(p0, p1, p2); + + p0.unlink(); + assertThat(l1.size()).isEqualTo(2); + assertThat(l2.size()).isEqualTo(1); + + p1.unlink(); // p1 is member of both lists + assertThat(l1.size()).isEqualTo(1); + assertThat(l2.size()).isEqualTo(0); + + ConnectionList l3 = new ConnectionList(); + + // adding an already linked conncection will throw an error + assertThatThrownBy(() -> l3.addFirst(p2.busyFree())) + .isInstanceOf(AssertionError.class) + .hasMessageContaining("Node already member of a list"); + + assertThat(l3.size()).isEqualTo(0); + + p2.unlink(); + l3.addFirst(p2.busyFree()); + assertThat(l3.size()).isEqualTo(1); + + p2.unlink(); + p2.unlink(); // subsequent unlink must not throw error + l2.addFirst(p2.busyFree()); + assertThat(l2.size()).isEqualTo(1); + assertThat(l3.size()).isEqualTo(0); + } + +} diff --git a/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionPoolSpeedTest.java b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionPoolSpeedTest.java index 2b3df7e..1155a0e 100644 --- a/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionPoolSpeedTest.java +++ b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionPoolSpeedTest.java @@ -7,7 +7,10 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; import java.util.Random; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @@ -36,6 +39,7 @@ private ConnectionPool createPool() { config.setMinConnections(2); config.setMaxConnections(100); config.setAutoCommit(true); + config.affinityProvider(Thread::currentThread); return new ConnectionPool("testspeed", config); } @@ -64,6 +68,60 @@ public void test() throws SQLException { assertThat(avgNanos).isLessThan(300); } + /** + * Shows the benefit of affinity support. + *

+ * This test starts 10 threads, where each thread has its own set of statements. + * The problem is, if each thread takes the first free connection, which is most + * likely the connection from an other thread, the cached pstmts are useless. + *

+ * When we return the last used connection, we can increase the pstmt hit ratio: + *

+ * With affinity support: psc[hit:19.800 miss:200 put:20.000 rem:0] + *

+ * Without affinity support: psc[hit:7.231 miss:12.769 put:20.000 rem:12.279] + */ + @Test + public void testMultiThread() throws Exception { + warm(); + + total = 0; + List threads = new ArrayList<>(); + for (int threadCount = 0; threadCount < 10; threadCount++) { + threads.add(createThread()); + } + + long startNano = System.nanoTime(); + for (Thread thread : threads) { + thread.start(); + } + for (Thread thread : threads) { + thread.join(); + } + long exeNanos = System.nanoTime() - startNano; + + logger.info("exeNanos[{}]", exeNanos); + } + + private Thread createThread() { + return new Thread(() -> { + try { + for (int j = 0; j < 100; j++) { + for (int k = 0; k < 20; k++) { + try (Connection conn = pool.getConnection()) { + try (PreparedStatement stmt = conn.prepareStatement("select '" + Thread.currentThread().getName() + "', " + k)) { + stmt.execute(); + } + conn.rollback(); + } + } + } + } catch (SQLException e) { + e.printStackTrace(); + } + }); + } + private void perform() throws SQLException { for (int i = 0; i < 1_000_000; i++) { getAndCloseSome(); diff --git a/ebean-datasource/src/test/java/io/ebean/datasource/pool/FreeConnectionBufferTest.java b/ebean-datasource/src/test/java/io/ebean/datasource/pool/FreeConnectionBufferTest.java deleted file mode 100644 index c4ddcb7..0000000 --- a/ebean-datasource/src/test/java/io/ebean/datasource/pool/FreeConnectionBufferTest.java +++ /dev/null @@ -1,110 +0,0 @@ -package io.ebean.datasource.pool; - -import org.junit.jupiter.api.Test; - -import java.util.LinkedHashSet; -import java.util.LinkedList; -import java.util.ListIterator; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.*; - -class FreeConnectionBufferTest { - - @Test - void test() { - - FreeConnectionBuffer b = new FreeConnectionBuffer(); - - PooledConnection p0 = new PooledConnection("0"); - PooledConnection p1 = new PooledConnection("1"); - PooledConnection p2 = new PooledConnection("2"); - // PooledConnection p3 = new PooledConnection("3"); - - assertEquals(0, b.size()); - assertTrue(b.isEmpty()); - - b.add(p0); - - assertEquals(1, b.size()); - assertFalse(b.isEmpty()); - - PooledConnection r0 = b.remove(); - assertThat(p0).isSameAs(r0); - - assertEquals(0, b.size()); - assertTrue(b.isEmpty()); - - b.add(p0); - b.add(p1); - b.add(p2); - - assertEquals(3, b.size()); - - PooledConnection r1 = b.remove(); - assertSame(p2, r1); - PooledConnection r2 = b.remove(); - assertSame(p1, r2); - - assertEquals(1, b.size()); - b.add(p2); - assertEquals(2, b.size()); - PooledConnection r3 = b.remove(); - assertSame(p2, r3); - assertEquals(1, b.size()); - PooledConnection r4 = b.remove(); - assertSame(p0, r4); - assertEquals(0, b.size()); - - b.add(p2); - b.add(p1); - b.add(p0); - - assertEquals(3, b.size()); - - PooledConnection r5 = b.remove(); - assertSame(p0, r5); - assertEquals(2, b.size()); - - PooledConnection r6 = b.remove(); - assertSame(p1, r6); - assertEquals(1, b.size()); - - PooledConnection r7 = b.remove(); - assertSame(p2, r7); - assertEquals(0, b.size()); - - } - - @Test - void listIterator() { - PooledConnection p0 = new PooledConnection("0"); - PooledConnection p1 = new PooledConnection("1"); - PooledConnection p2 = new PooledConnection("2"); - PooledConnection p3 = new PooledConnection("3"); - - var list = new LinkedList(); - list.add(p0); - list.add(p1); - list.add(p2); - list.add(p3); - - var set1 = listIterate(list, 1); - assertThat(set1).hasSize(3); - assertThat(set1).contains(p1, p2, p3); - - var set3 = listIterate(list, 3); - assertThat(set3).hasSize(1); - assertThat(set3).contains(p3); - } - - private LinkedHashSet listIterate(LinkedList list, int position) { - ListIterator it = list.listIterator(position); - var set = new LinkedHashSet(); - while (it.hasNext()) { - set.add(it.next()); - } - return set; - } - -}