Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.aries.rsa.itests.felix.RsaTestBase;
import org.apache.aries.rsa.spi.DistributionProvider;
import org.apache.aries.rsa.spi.EndpointDescriptionParser;
import org.apache.aries.rsa.spi.ImportedService;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.hamcrest.Matchers;
Expand Down Expand Up @@ -67,10 +68,11 @@ public static Option[] configure() throws Exception {
@Test
public void testDiscoveryExport() throws Exception {
EndpointDescription epd = getEndpoint();
EchoService service = (EchoService)tcpProvider
.importEndpoint(EchoService.class.getClassLoader(),
bundleContext, new Class[]{EchoService.class}, epd);
ImportedService importedService = tcpProvider.importEndpoint(EchoService.class.getClassLoader(),
bundleContext, new Class[]{EchoService.class}, epd);
EchoService service = (EchoService)importedService.getService();
Assert.assertEquals("test", service.echo("test"));
importedService.close();
}

private EndpointDescription getEndpoint() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.aries.rsa.provider.fastbin.util.UuidGenerator;
import org.apache.aries.rsa.spi.DistributionProvider;
import org.apache.aries.rsa.spi.Endpoint;
import org.apache.aries.rsa.spi.ImportedService;
import org.apache.aries.rsa.spi.IntentUnsatisfiedException;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.hawtdispatch.DispatchQueue;
Expand Down Expand Up @@ -172,15 +173,16 @@ public void close() throws IOException {
}

@Override
public Object importEndpoint(ClassLoader cl,
BundleContext consumerContext,
Class[] interfaces,
EndpointDescription endpoint)
public ImportedService importEndpoint(ClassLoader cl,
BundleContext consumerContext,
Class[] interfaces,
EndpointDescription endpoint)
throws IntentUnsatisfiedException {

String address = (String) endpoint.getProperties().get(FASTBIN_ADDRESS);
InvocationHandler handler = client.getProxy(address, endpoint.getId(), cl);
return Proxy.newProxyInstance(cl, interfaces, handler);
Object service = Proxy.newProxyInstance(cl, interfaces, handler);
return () -> service;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
*/
package org.apache.aries.rsa.provider.tcp;

import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
Expand All @@ -46,24 +49,117 @@
* which sends the details of the method invocations
* over a TCP connection, to be executed by the remote service.
*/
public class TcpInvocationHandler implements InvocationHandler {
public class TcpInvocationHandler implements InvocationHandler, Closeable {

private static class Connection {
Socket socket;
BasicObjectOutputStream out;
BasicObjectInputStream in;

public Connection(Socket socket) throws IOException {
this.socket = socket;
out = new BasicObjectOutputStream(socket.getOutputStream());
in = new BasicObjectInputStream(socket.getInputStream());
}
}

private String host;
private int port;
private String endpointId;
private ClassLoader cl;
private int timeoutMillis;

private final Deque<Connection> pool = new ArrayDeque<>();
private int acquired; // counts connections currently in use (not in pool)
private boolean closed;

public TcpInvocationHandler(ClassLoader cl, String host, int port, String endpointId, int timeoutMillis)
throws UnknownHostException, IOException {
throws UnknownHostException, IOException {
this.cl = cl;
this.host = host;
this.port = port;
this.endpointId = endpointId;
this.timeoutMillis = timeoutMillis;
}

private Connection acquireConnection() throws IOException {
Connection conn;
synchronized (pool) {
acquired++; // must be first
if (closed) {
throw new IOException("Connection pool is closed");
}
conn = pool.pollFirst(); // reuse most recently used connection
}
// if the pool is empty, create a new connection
if (conn == null) {
conn = new Connection(openSocket());
conn.socket.setSoTimeout(timeoutMillis);
conn.socket.setTcpNoDelay(true);
conn.in.addClassLoader(cl);
conn.out.writeUTF(endpointId); // select endpoint for this connection
}
return conn;
}

// must be called exactly once for each call to acquireConnection,
// regardless of the outcome - if there was an error, pass null
private void releaseConnection(Connection conn) {
synchronized (pool) {
acquired--; // must be first
if (conn != null) {
pool.offerFirst(conn); // add to front of queue so old idle ones can expire
}
pool.notifyAll();
}
}

private void closeConnection(Connection conn) throws IOException {
if (conn != null) {
conn.socket.close();
}
}

private void closeConnections() throws IOException {
synchronized (pool) {
closed = true; // first prevent acquiring new connections
while (true) {
// close all idle connections
for (Iterator<Connection> it = pool.iterator(); it.hasNext(); ) {
closeConnection(it.next());
it.remove();
}
if (acquired == 0) {
break; // all closed
}
// wait for additional active connections to be released
try {
pool.wait();
} catch (InterruptedException ie) {
throw new IOException("interrupted while closing connections", ie);
}
}
}
}

private int getPoolSize() {
synchronized (pool) {
return pool.size() + acquired; // both idle and active
}
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// handle Object methods locally so we can use equals, HashMap, etc. normally
if (method.getDeclaringClass() == Object.class) {
switch (method.getName()) {
case "equals": return proxy == args[0];
case "hashCode": return System.identityHashCode(proxy);
case "toString": return proxy.getClass().getName() + "@"
+ Integer.toHexString(System.identityHashCode(proxy));
}
}
// handle remote invocation
if (Future.class.isAssignableFrom(method.getReturnType()) ||
CompletionStage.class.isAssignableFrom(method.getReturnType())) {
return createFutureResult(method, args);
Expand Down Expand Up @@ -105,33 +201,50 @@ public void run() {
}

private Object handleSyncCall(Method method, Object[] args) throws Throwable {
Throwable error;
Object result;
try (
Socket socket = openSocket();
ObjectOutputStream out = new BasicObjectOutputStream(socket.getOutputStream())
) {
socket.setSoTimeout(timeoutMillis);
out.writeUTF(endpointId);
out.writeObject(method.getName());
out.writeObject(args);
out.flush();

try (BasicObjectInputStream in = new BasicObjectInputStream(socket.getInputStream())) {
in.addClassLoader(cl);
error = (Throwable) in.readObject();
result = readReplaceVersion(in.readObject());
Connection conn = null;
Throwable error = null;
Object result = null;

try {
// try at most all existing connections (which may be stale) plus one new
for (int attempts = getPoolSize() + 1; attempts > 0; attempts--) {
conn = acquireConnection(); // get or create pool connection
try {
// write invocation data
conn.out.writeObject(method.getName());
conn.out.writeObject(args);
conn.out.flush();
conn.out.reset();
// read result data
error = (Throwable) conn.in.readObject();
result = readReplaceVersion(conn.in.readObject());
break; // transaction completed
} catch (SocketException se) { // catch only read/write exceptions here - only stale connections
if (attempts == 1) {
throw se; // failed last attempt - propagate the error
}
// the server socket was previously open, but now failed -
// communication error or server socket was closed (e.g. idle timeout)
// so we retry with another connection
releaseConnection(null); // dispose of it before next attempt
}
}

if (error == null)
return result;
else if (error instanceof InvocationTargetException)
error = error.getCause(); // exception thrown from remotely invoked method (not our problem)
else
throw error; // exception thrown by provider itself
} catch (SocketTimeoutException e) {
throw new ServiceException("Timeout calling " + host + ":" + port + " method: " + method.getName(), ServiceException.REMOTE, e);
} catch (Throwable e) {
throw new ServiceException("Error calling " + host + ":" + port + " method: " + method.getName(), ServiceException.REMOTE, e);
// this can be an unexpected error from remote (not from the invoked method itself
// but somewhere in the provider processing), or a communications error (e.g. timeout) -
// in either case we don't know what was written or not, so we must abort the connection
closeConnection(conn);
conn = null; // don't return it to the pool
throw new ServiceException("Error invoking " + method.getName() + " on " + endpointId, ServiceException.REMOTE, e);
} finally {
releaseConnection(conn);
}
throw error;
}
Expand All @@ -158,4 +271,8 @@ private Object readReplaceVersion(Object readObject) {
}
}

@Override
public void close() throws IOException {
closeConnections();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.aries.rsa.provider.tcp;

import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.net.URI;
import java.util.Arrays;
Expand All @@ -32,6 +31,7 @@
import org.apache.aries.rsa.annotations.RSADistributionProvider;
import org.apache.aries.rsa.spi.DistributionProvider;
import org.apache.aries.rsa.spi.Endpoint;
import org.apache.aries.rsa.spi.ImportedService;
import org.apache.aries.rsa.spi.IntentUnsatisfiedException;
import org.apache.aries.rsa.util.StringPlus;
import org.osgi.framework.BundleContext;
Expand Down Expand Up @@ -126,17 +126,28 @@ private synchronized void removeServer(TcpEndpoint endpoint) {
}

@Override
public Object importEndpoint(ClassLoader cl,
BundleContext consumerContext,
Class[] interfaces,
EndpointDescription endpoint)
public ImportedService importEndpoint(ClassLoader cl,
BundleContext consumerContext,
Class[] interfaces,
EndpointDescription endpoint)
throws IntentUnsatisfiedException {
try {
String endpointId = endpoint.getId();
URI address = new URI(endpointId);
int timeout = new EndpointPropertiesParser(endpoint).getTimeoutMillis();
InvocationHandler handler = new TcpInvocationHandler(cl, address.getHost(), address.getPort(), endpointId, timeout);
return Proxy.newProxyInstance(cl, interfaces, handler);
TcpInvocationHandler handler = new TcpInvocationHandler(cl, address.getHost(), address.getPort(), endpointId, timeout);
Object service = Proxy.newProxyInstance(cl, interfaces, handler);
return new ImportedService() {
@Override
public Object getService() {
return service;
}

@Override
public void close() throws IOException {
handler.close();
}
};
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.aries.rsa.provider.tcp;

import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
Expand All @@ -27,6 +28,7 @@
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -113,20 +115,24 @@ public void run() {
}

private void handleConnection(Socket socket) {
try (Socket sock = socket;
BasicObjectInputStream in = new BasicObjectInputStream(socket.getInputStream());
ObjectOutputStream out = new BasicObjectOutputStream(socket.getOutputStream())) {
try (Socket sock = socket; // socket will be closed when done
ObjectOutputStream out = new BasicObjectOutputStream(socket.getOutputStream());
BasicObjectInputStream in = new BasicObjectInputStream(socket.getInputStream())) {
socket.setTcpNoDelay(true);
String endpointId = in.readUTF();
MethodInvoker invoker = invokers.get(endpointId);
if (invoker == null)
throw new IllegalArgumentException("invalid endpoint: " + endpointId);
in.addClassLoader(invoker.getService().getClass().getClassLoader());
handleCall(invoker, in, out);
} catch (SocketException se) {
return; // e.g. connection closed by client
} catch (Exception e) {
LOG.warn("Error processing service call", e);
while (running) {
handleCall(invoker, in, out);
}
} catch (SocketException | SocketTimeoutException | EOFException se) {
return; // e.g. connection closed by client or read timeout due to inactivity
} catch (Throwable t) {
LOG.warn("Error processing service call", t);
}
// connection is now closed and thread is done
}

private void handleCall(MethodInvoker invoker, ObjectInputStream in, ObjectOutputStream out) throws Exception {
Expand All @@ -141,6 +147,8 @@ private void handleCall(MethodInvoker invoker, ObjectInputStream in, ObjectOutpu
}
out.writeObject(error);
out.writeObject(result);
out.flush();
out.reset();
}

@SuppressWarnings("unchecked")
Expand Down
Loading