diff --git a/convex-peer/src/main/java/convex/api/ConvexRemote.java b/convex-peer/src/main/java/convex/api/ConvexRemote.java index b7188fbab..0c106f0fb 100644 --- a/convex-peer/src/main/java/convex/api/ConvexRemote.java +++ b/convex-peer/src/main/java/convex/api/ConvexRemote.java @@ -119,8 +119,8 @@ public synchronized CompletableFuture transact(SignedData CompletableFuture cf; long id = -1; long wait=1; - // loop until request is queued + // loop until request is queued. We need this for backpressure while (true) { if (connection.isClosed()) throw new IOException("Connection closed"); synchronized (awaiting) { @@ -137,21 +137,20 @@ public synchronized CompletableFuture transact(SignedData Thread.sleep(wait); wait+=1; // linear backoff } catch (InterruptedException e) { - throw Utils.sneakyThrow(e); + Thread.currentThread().interrupt(); + throw new IOException("Transaction sending interrupted",e); } } log.trace("Sent transaction with message ID: {} awaiting count = {}", id, awaiting.size()); return cf; } - - @Override public CompletableFuture query(ACell query, Address address) throws IOException { long wait=1; - // loop until request is queued + // loop until request is queued. We need this for backpressure while (true) { synchronized (awaiting) { long id = connection.sendQuery(query, address); @@ -166,7 +165,8 @@ public CompletableFuture query(ACell query, Address address) throws IOEx Thread.sleep(wait); wait+=wait; // exponential backoff } catch (InterruptedException e) { - throw new IOException("Transaction sending interrupted",e); + Thread.currentThread().interrupt(); + throw new IOException("Query sending interrupted",e); } } } diff --git a/convex-peer/src/main/java/convex/net/Connection.java b/convex-peer/src/main/java/convex/net/Connection.java index e6a006c93..df2872211 100644 --- a/convex-peer/src/main/java/convex/net/Connection.java +++ b/convex-peer/src/main/java/convex/net/Connection.java @@ -395,8 +395,7 @@ public boolean sendMessage(Message msg) { } /** - * Sends a full payload for the given message type. Should be called on the thread - * that responds to missing data messages from the destination. + * Sends a message with full payload for the given message type. * * @param type Type of message * @param payload Payload value for message diff --git a/convex-peer/src/main/java/convex/net/Message.java b/convex-peer/src/main/java/convex/net/Message.java index 2bb9da711..309f004b8 100644 --- a/convex-peer/src/main/java/convex/net/Message.java +++ b/convex-peer/src/main/java/convex/net/Message.java @@ -111,9 +111,9 @@ public static Message createGoodBye() { @SuppressWarnings("unchecked") public T getPayload() throws BadFormatException { if (payload!=null) return (T) payload; - if (messageData==null) return null; + if (messageData==null) return null; // no message data, so must actually be null - // actual null payload :-) + // detect actual message data for null payload :-) if ((messageData.count()==1)&&(messageData.byteAt(0)==Tag.NULL)) return null; switch(type) { diff --git a/convex-peer/src/main/java/convex/peer/QueryHandler.java b/convex-peer/src/main/java/convex/peer/QueryHandler.java index b0e0a3371..651cb9145 100644 --- a/convex-peer/src/main/java/convex/peer/QueryHandler.java +++ b/convex-peer/src/main/java/convex/peer/QueryHandler.java @@ -12,6 +12,7 @@ import convex.core.data.AVector; import convex.core.data.Address; import convex.core.data.prim.CVMLong; +import convex.core.exceptions.BadFormatException; import convex.core.lang.RT; import convex.core.util.LoadMonitor; import convex.net.Message; @@ -64,13 +65,13 @@ protected void loop() throws InterruptedException { private void handleQuery(Message m) { try { // query is a vector [id , form, address?] - AVector v = m.getPayload(); + AVector v= m.getPayload(); CVMLong id = (CVMLong) v.get(0); ACell form = v.get(1); - + // extract the Address, might be null Address address = RT.ensureAddress(v.get(2)); - + log.debug( "Processing query: {} with address: {}" , form, address); // log.log(LEVEL_MESSAGE, "Processing query: " + form + " with address: " + // address); @@ -78,14 +79,15 @@ private void handleQuery(Message m) { // Report result back to message sender boolean resultReturned= m.returnResult(Result.fromContext(id, resultContext)); - + if (!resultReturned) { log.warn("Failed to send query result back to client with ID: {}", id); } - - } catch (Throwable t) { - log.warn("Query Error: {}", t); + } catch (BadFormatException e) { + log.debug("Terminated client due to bad query format"); + m.closeConnection(); } + } @Override diff --git a/convex-peer/src/test/java/convex/api/ConvexRemoteTest.java b/convex-peer/src/test/java/convex/api/ConvexRemoteTest.java index 56f8789ea..a4cafed09 100644 --- a/convex-peer/src/test/java/convex/api/ConvexRemoteTest.java +++ b/convex-peer/src/test/java/convex/api/ConvexRemoteTest.java @@ -21,6 +21,7 @@ import convex.core.crypto.AKeyPair; import convex.core.crypto.Ed25519Signature; import convex.core.data.Address; +import convex.core.data.Blobs; import convex.core.data.Ref; import convex.core.data.SignedData; import convex.core.data.prim.CVMLong; @@ -31,6 +32,8 @@ import convex.core.transactions.Invoke; import convex.core.util.Utils; import convex.net.Connection; +import convex.net.Message; +import convex.net.MessageType; import convex.peer.TestNetwork; /** @@ -70,6 +73,13 @@ public void testConnection() throws IOException, TimeoutException { assertTrue(convex.isConnected()); } } + + @Test + public void testBadQueryMessage() throws IOException, TimeoutException { + ConvexRemote convex = Convex.connect(network.SERVER.getHostAddress()); + Connection conn=convex.connection; + conn.sendMessage(Message.create(MessageType.QUERY, Blobs.empty())); + } @Test public void testBadConnect() throws IOException, TimeoutException {