55import org .reactivestreams .Publisher ;
66import org .reactivestreams .Subscriber ;
77import org .reactivestreams .Subscription ;
8- import reactor .core .publisher .Flux ;
98import reactor .core .publisher .Mono ;
109
1110import java .util .concurrent .ConcurrentLinkedDeque ;
1514
1615/**
1716 * <p>
18- * A lock that is acquired asynchronously. Acquiring threads invoke {@link
19- * #lock(Runnable)} with a {@code Runnable} that will access a guarded resource.
20- * The {@code Runnable} <i>MUST</i> ensure that a single invocation of {@link
21- * #unlock()} will occur after its {@code run()} method has been invoked. The
22- * call to {@code unlock} may occur asynchronously on a thread other than the
23- * one invoking {@code run}.
17+ * A lock that is acquired and unlocked asynchronously. An instance of this lock
18+ * is used to guard access to the Oracle JDBC Connection, without blocking
19+ * threads that contend for it.
2420 * </p><p>
25- * An instance of this lock is used to guard access to the Oracle JDBC
26- * Connection, without blocking threads that contend for it.
21+ * Any time Oracle R2DBC invokes a synchronous API of Oracle JDBC, it will
22+ * acquire an instance of this lock before doing so. Synchronous method calls
23+ * will block a thread if JDBC has a database call in progress, and this can
24+ * lead a starvation of the thread pool. If no pooled threads are available,
25+ * then JDBC can not execute callbacks that complete the database call, and
26+ * so JDBC will never release it's blocking lock.
2727 * </p><p>
2828 * Any time Oracle R2DBC creates a publisher that is implemented by the
2929 * Oracle JDBC driver, it will acquire an instance of this lock before
30- * subscribing to that publisher, and before signalling demand to the publisher.
31- * The lock is released at any point in time when it is known that neither
32- * onNext nor onSubscribe signals are pending. If these signals are not
33- * pending, then the driver should not be executing any database call; As
34- * long as no database call is in progress, JDBC should not block threads
35- * that call any method of its API.
30+ * subscribing to that publisher. The lock is only released if it is known
31+ * that neither onNext nor onSubscribe signals are pending. If these signals
32+ * are not pending, then the driver should not be executing any database
33+ * call; As long as no database call is in progress, JDBC should not block
34+ * threads that call any method of its API.
35+ * </p>
36+ * <h3>Locking for Asynchronous JDBC Calls</h3>
37+ * <p>
38+ * Wrapping a JDBC Publisher with {@link #lock(Publisher)} will have signals
39+ * from a downstream subscriber proxied, such that the lock is held whenever
40+ * {@code onSubscribe} or {@code onNext} signals are pending.
3641 * </p><p>
37- * Any time Oracle R2DBC invokes a synchronous API of Oracle JDBC, it will
38- * acquire an instance of this lock before doing so. Synchronous method calls
39- * will block a thread if JDBC has a database call in progress.
42+ * Invoking {@link #lock(Runnable)} will have a {@code Runnable} executed
43+ * after the lock becomes available. The {@code Runnable} is executed
44+ * immediately, before {@code lock(Runnable)} returns if the lock is
45+ * available. Otherwise, the {@code Runnable} is executed asynchronously
46+ * after the lock becomes available.
4047 * </p><p>
41- * The {@link #get(JdbcSupplier)} and {@link #run(JdbcRunnable)} methods of
42- * this class return a publisher that completes a synchronous method call has
43- * completed. Because the lock may not be available when these methods are
44- * called, the completion must be signalled asynchronously.
48+ * The {@code Runnable} provided to {@link #lock(Runnable)} <i>MUST</i> ensure
49+ * that a single invocation of {@link #unlock()} will occur after its
50+ * {@code run()} method is invoked. The call to {@code unlock} may occur
51+ * within the scope of the {@code Runnable.run()} method. It may also occur
52+ * asynchronously, after the {@code run()} method has returned.
53+ * </p>
54+ * <h3>Locking for Synchronous JDBC Calls</h3>
55+ * <p>
56+ * If the lock simply needs to be acquired before making a synchronous call,
57+ * and then released after that call returns, then methods
58+ * {@link #run(JdbcRunnable)}, {@link #get(JdbcSupplier)}, or
59+ * {@link #flatMap(JdbcSupplier)} may be used. These methods return a
60+ * {@code Publisher} that completes after the lock is acquired and the provided
61+ * task has been run. These methods will automatically release the lock after
62+ * the provided task has run.
4563 * </p><p>
4664 * Rather than invoke the {@code get/run} methods for each and every JDBC
4765 * method call, it is preferable to invoke them with a single task that
@@ -151,6 +169,18 @@ <T> Publisher<T> get(JdbcSupplier<T> jdbcSupplier) {
151169 }));
152170 }
153171
172+ /**
173+ * Returns a {@code Publisher} that acquires this lock and executes a
174+ * {@code publisherSupplier} when a subscriber subscribes. The
175+ * {@code Publisher} output by the {@code publisherSupplier} is flat mapped
176+ * into the {@code Publisher} returned by this method. If the supplier outputs
177+ * {@code null}, the returned publisher just emits {@code onComplete}. If the
178+ * supplier throws an error, the returned publisher emits that as
179+ * {@code onError}.
180+ * @param publisherSupplier Supplier to execute. Not null.
181+ * @return A flat-mapping of the publisher output by the {
182+ * @code publisherSupplier}.
183+ */
154184 <T > Publisher <T > flatMap (JdbcSupplier <Publisher <T >> publisherSupplier ) {
155185 return Mono .from (get (publisherSupplier ))
156186 .flatMapMany (Function .identity ());
@@ -175,7 +205,7 @@ <T> Publisher<T> lock(Publisher<T> publisher) {
175205
176206 /**
177207 * <p>
178- * A {@code Subscriber} that uses the {@link #asyncLock } to ensure that
208+ * A {@code Subscriber} that uses this {@link AsyncLock } to ensure that
179209 * threads do not become blocked when contending for this adapter's JDBC
180210 * {@code Connection}. Any time a {@code Subscriber} subscribes to a
181211 * {@code Publisher} that uses the JDBC {@code Connection}, an instance of
@@ -261,7 +291,7 @@ <T> Publisher<T> lock(Publisher<T> publisher) {
261291 * attempt to use it when an asynchronous database call is in-flight. The
262292 * potential for an in-flight call exists whenever there is a pending signal
263293 * from the upstream {@code Publisher}. Instances of
264- * {@code UsingConnectionSubscriber} acquire the {@link #asyncLock }
294+ * {@code UsingConnectionSubscriber} acquire this {@link AsyncLock }
265295 * before requesting a signal from the publisher, and release the
266296 * {@code asyncLock} once that signal is received. This ensures that no other
267297 * thread will be able to acquire the {@code asyncLock} when a pending signal
@@ -270,7 +300,7 @@ <T> Publisher<T> lock(Publisher<T> publisher) {
270300 * An {@code onSubscribe} signal is pending between an invocation of
271301 * {@link Publisher#subscribe(Subscriber)} and an invocation of
272302 * {@link Subscriber#onSubscribe(Subscription)}. Accordingly, the
273- * {@link #asyncLock } <i>MUST</i> be acquired before invoking
303+ * {@link AsyncLock } <i>MUST</i> be acquired before invoking
274304 * {@code subscribe} with an instance of {@code UsingConnectionSubscriber}.
275305 * When that instance receives an {@code onSubscribe} signal, it will release
276306 * the {@code asyncLock}.
@@ -279,14 +309,14 @@ <T> Publisher<T> lock(Publisher<T> publisher) {
279309 * {@link Subscription#request(long)} and a number of invocations of
280310 * {@link Subscriber#onNext(Object)} equal to the number of
281311 * values requested. Accordingly, instances of
282- * {@code UsingConnectionSubscriber} acquire the {@link #asyncLock } before
312+ * {@code UsingConnectionSubscriber} acquire the {@link AsyncLock } before
283313 * emitting a {@code request} signal, and release the {@code asyncLock} when
284314 * a corresponding number of {@code onNext} signals have been received.
285315 * </p><p>
286316 * When a {@code cancel} signal is emitted to the upstream {@code Publisher},
287317 * that publisher will not emit any further signals to the downstream
288318 * {@code Subscriber}. If an instance {@code UsingConnectionSubscriber}
289- * has acquired the {@link #asyncLock } for a pending {@code onNext} signal,
319+ * has acquired the {@link AsyncLock } for a pending {@code onNext} signal,
290320 * then it will defer sending a {@code cancel} signal until the pending
291321 * {@code onNext} signal has been received. Deferring cancellation until the
292322 * the publisher invokes {@code onNext} ensures that the cancellation happens
@@ -388,38 +418,6 @@ public void request(long n) {
388418 else //if (currentDemand == TERMINATED)
389419 unlock ();
390420 });
391-
392-
393-
394- /* No guarantee that lock is already held in the else/non-zero case.
395- Only guarantee is that request call has been enqueued for lock ownership
396- Above this is fixed. All requests enqueued then, so that they may only
397- happen when lock is owned.
398- The update of demand may occur before terminate() is called, and
399- terminate will then unlock the lock. But there is no guarantee that the
400- lock has actually been acquired after demand is updated. The only
401- guarantee is that the request call has been enqueued for lock ownership.
402- Above this is fixed. The demand is updated only when the lock has been
403- acquired:
404- If terminate reads a demand greater than, then it knows the
405- lock must be released, because no onNext signal will follow.
406- If the update of demand reads negative number, then it unlocks the
407- lock, because no onNext signal will follow.
408- long currentDemand = demand.getAndUpdate(current ->
409- current < 0L
410- ? current // Leave negative values as is
411- : (Long.MAX_VALUE - current) < n // Check for overflow
412- ? Long.MAX_VALUE
413- : current + n);
414- If multiple request calls are made to this subscription, the first is
415- sent and then onNext is received until the first demand is met, and then
416- the lock is unlocked, and then the next request is sent, and so.
417- if (currentDemand == 0)
418- lock(() -> upstream.request(n));
419- else if (currentDemand > 0)
420- upstream.request(n);
421- */
422- // else: Do nothing if terminated or cancelled
423421 }
424422
425423 /**
0 commit comments