diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..5e90407 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,16 @@ +# Normalize as LF in the repository, OS native locally +* text=auto +*.java text + +# Binary files that should not be modified +*.dat binary +*.db binary +*.icns binary +*.ico binary +*.key binary +*.jks binary +*.jpg binary +*.png binary +*.ttf binary +*.wav binary +JavaApplicationStub binary diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e895fab --- /dev/null +++ b/.gitignore @@ -0,0 +1,70 @@ +# Build # +######### +MANIFEST.MF +dependency-reduced-pom.xml + +# Compiled # +############ +bin +build +dist +lib +out +target +*.com +*.class +*.dll +*.exe +*.o +*.so + +# Databases # +############# +*.db +*.sql +*.sqlite + +# Packages # +############ +*.7z +*.dmg +*.gz +*.iso +*.rar +*.tar +*.zip + +# Repository # +############## +.git + +# Logging # +########### +/logs +*.log + +# Misc # +######## +*.bak + +# System # +########## +.DS_Store +ehthumbs.db +Thumbs.db + +# Project # +########### +.classpath +.externalToolBuilders +.gradle +.idea +.project +.settings +nbproject +atlassian-ide-plugin.xml +build.xml +nb-configuration.xml +*.iml +*.ipr +*.iws diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..2a79e1d --- /dev/null +++ b/.travis.yml @@ -0,0 +1,32 @@ +# Source language and JDK version to use +language: java +jdk: oraclejdk7 + +# Compile and package JAR and set build properties +install: mvn package -DbuildNumber=$TRAVIS_BUILD_NUMBER -Dci=travis -Dcommit=${TRAVIS_COMMIT:0:7} + +# Perform steps after compilation/test success +after_success: + # Generate Javadocs and Cobertura report for Coveralls.io + - "mvn javadoc:javadoc cobertura:cobertura coveralls:cobertura -DserviceJobId=$TRAVIS_JOB_ID" + # Get files for use with build, namely the custom Maven settings.xml + - "git clone https://github.com/flow/travis-ci-resources.git target/travis" + # Check if commit is not a pull request, if git repo is official, and if branch is master, then deploy artifacts to Maven repository + - "[[ $TRAVIS_PULL_REQUEST == false ]] && [[ $TRAVIS_REPO_SLUG == flow/flow-networking ]] && [[ $TRAVIS_BRANCH == master ]] && mvn deploy --settings target/travis/settings.xml" + +# Notification services +notifications: + # Disable build status email notifications, until the issue with forks is fixed + email: false + webhooks: + # Send build information and status to Notifico + - http://n.tkte.ch/h/1814/1bcifMkR2PGDQ3fjIHToxyNh + +# Environmental system variables +env: + global: + # Make the log output cleaner + - TERM=dumb + # Super secure, encrypted variables! Ssssh! + - secure: "W+3c+zyZvJAGxnlzn1KDwO0+sAGilX88Bkm4TFdk1a3foXtF7+kj3GFyRnxH7ga2y+gEivxWwatS3MZYpXeU2ruI91N3GlYSbQNeySUQPA8If+qjXwQQpZlDt8/R+LnlbAfaG+CovTIELb7E4obX8RXKLRGdVoh3JZVjRYh+5tM=" + - secure: "fGFdb9H61pzNQbeBrQCwTr9MlmbsidGHD9disxkzZ1N95Jy4p7KiW+FFMk4/ZabjEdSfC+jSYN96KHpOdNXmPt6pDjyJlSQk8JS2hyIwI9jxGECNul8etFM6sS65Pp1Ay6LCvUnbAjnIIQY0wBgQjwveFx7xp6oUQuURi8JhNo8=" diff --git a/HEADER.txt b/HEADER.txt new file mode 100644 index 0000000..6218055 --- /dev/null +++ b/HEADER.txt @@ -0,0 +1,21 @@ +This file is part of ${project}, licensed under the MIT License (MIT). + +Copyright (c) ${year} ${name} <${url}/> + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. \ No newline at end of file diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..1a63677 --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,19 @@ +The MIT License (MIT) + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is furnished to do +so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..53334d3 --- /dev/null +++ b/pom.xml @@ -0,0 +1,285 @@ + + 4.0.0 + + + Flow Networking + com.flowpowered + flow-networkinging + 0.1.0-SNAPSHOT + jar + 2013 + http://flowpowered.com + Networking library for the Flow collection. + + + + org.sonatype.oss + oss-parent + 7 + + + + + UTF-8 + 0 + unknown + unknown + + + + + + MIT License + http://www.tldrlegal.com/license/mit-license + repo + + + + + + Spout LLC + http://www.spout.org + + + + + + kitskub + Jack Huey + kitskub@gmail.com + + + Wolf480pl + Wolf480pl + wolf480@interia.pl + + + Wulfspider + Luke Spragg + the@wulf.im + + + + + + scm:git:git://github.com/flow/flow-networking.git + scm:git:ssh://git@github.com:flow/flow-networking.git + https://github.com/flow/flow-networking + + + + + travis + https://travis-ci.org/flow/flow-networking + + + + + github + https://github.com/flow/flow-networking/issues + + + + + + sonatype-nexus-releases + https://oss.sonatype.org/content/repositories/releases + + + sonatype-nexus-snapshots + https://oss.sonatype.org/content/repositories/snapshots + + true + always + + + + + + + + sonatype-nexus-releases + https://oss.sonatype.org/content/repositories/releases + + + sonatype-nexus-snapshots + https://oss.sonatype.org/content/repositories/snapshots + + true + always + + + + + + + + + com.flowpowered + flow-commons + 0.1.0-SNAPSHOT + provided + + + io.netty + netty-all + 4.0.9.Final + provided + + + org.apache.commons + commons-lang3 + 3.1 + provided + + + org.apache.logging.log4j + log4j-api + 2.0-beta9 + provided + + + org.apache.logging.log4j + log4j-core + 2.0-beta9 + provided + + + + junit + junit + 4.11 + test + + + hamcrest-core + org.hamcrest + + + + + org.hamcrest + hamcrest-library + 1.3 + test + + + org.powermock + powermock-api-mockito + 1.5.1 + test + + + org.powermock + powermock-module-junit4 + 1.5.1 + test + + + + + + clean install + + + + + + false + . + . + + LICENSE.txt + + + + + + + + + com.mycila.maven-license-plugin + maven-license-plugin + 1.10.b1 + + + + + ${project.name} + ${project.inceptionYear} + ${project.organization.name} + ${project.organization.url} + + true + UTF-8 + true +
HEADER.txt
+ + SLASHSTAR_STYLE + + + ${project.name} + license + + + src/main/java/** + src/test/java/** + +
+ clean + + format + +
+
+
+ + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + 1.7 + 1.7 + + -Xlint:all + -Xlint:-path + + true + true + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + + ${project.name} + ${project.version}+${ciSystem}-b${buildNumber}.git-${commit} + ${project.organization.name} + + + + + + + org.eluder.coveralls + coveralls-maven-plugin + 2.0.1 + + + + org.codehaus.mojo + cobertura-maven-plugin + 2.6 + + xml + 256m + + +
+
+
diff --git a/src/main/java/com/flowpowered/networking/BasicChannelInitializer.java b/src/main/java/com/flowpowered/networking/BasicChannelInitializer.java new file mode 100644 index 0000000..781f1a2 --- /dev/null +++ b/src/main/java/com/flowpowered/networking/BasicChannelInitializer.java @@ -0,0 +1,55 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; + +import com.flowpowered.networking.pipeline.MessageDecoder; +import com.flowpowered.networking.pipeline.MessageEncoder; +import com.flowpowered.networking.pipeline.MessageHandler; + +/** + * Used to initialize the channels. + */ +public class BasicChannelInitializer extends ChannelInitializer { + private final ConnectionManager connectionManager; + + public BasicChannelInitializer(ConnectionManager connectionManager) { + this.connectionManager = connectionManager; + } + + @Override + protected final void initChannel(SocketChannel c) { + if (c.parent() != null) { + c.attr(ConnectionManager.PROTOCOL_ATTRIBUTE).set(c.parent().attr(ConnectionManager.PROTOCOL_ATTRIBUTE).get()); + } + // Up for encoding/sending/outbound; Down for decoding/receiving/inbound + MessageEncoder encoder = new MessageEncoder(); + MessageDecoder decoder = new MessageDecoder(); + MessageHandler handler = new MessageHandler(connectionManager); + + c.pipeline().addLast(decoder, encoder, handler); + } +} diff --git a/src/main/java/com/flowpowered/networking/Codec.java b/src/main/java/com/flowpowered/networking/Codec.java new file mode 100644 index 0000000..80407ec --- /dev/null +++ b/src/main/java/com/flowpowered/networking/Codec.java @@ -0,0 +1,72 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking; + +import io.netty.buffer.ByteBuf; + +import java.io.IOException; + +/** + * {@code Codec}s are used to encode and decode a {@link Message} into a {@link ByteBuf}. + */ +public abstract class Codec { + private final Class clazz; + private int opcode; + + public Codec(Class clazz, int opcode) { + this.clazz = clazz; + this.opcode = opcode; + } + + public final Class getMessage() { + return clazz; + } + + public final int getOpcode() { + return opcode; + } + + void setOpcode(int opcode) { + this.opcode = opcode; + } + + /** + * Decodes a {@link ByteBuf} into a {@link Message}. + * + * @param buffer the buffer to read from + * @return the message fully encoded. + * @throws IOException If any decoding fails on the buffer + */ + public abstract T decode(ByteBuf buffer) throws IOException; + + /** + * Encodes a {@link Message} into a {@link ByteBuffer}. + * + * @param buf the buffer to encode into. Should be empty. + * @param message The message to encode + * @return A buffer ready to be sent + * @throws IOException If any data on the message fails to encode + */ + public abstract ByteBuf encode(ByteBuf buf, T message) throws IOException; +} diff --git a/src/main/java/com/flowpowered/networking/ConnectionManager.java b/src/main/java/com/flowpowered/networking/ConnectionManager.java new file mode 100644 index 0000000..9b6f523 --- /dev/null +++ b/src/main/java/com/flowpowered/networking/ConnectionManager.java @@ -0,0 +1,62 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking; + +import io.netty.channel.Channel; +import io.netty.util.AttributeKey; + +import com.flowpowered.networking.protocol.Protocol; +import com.flowpowered.networking.session.Session; + +/** + * This class defines a basic structure for any object which manages connections. + *
+ * NOTE: it is imperative that any {@link Channel}s being bound or connected have the {@code PROTOCOL_ATTRIBUTE} applied to them + * by {@code Channel.attr(PROTOCOL_ATTRIBUTE).set();}. + */ +public abstract class ConnectionManager { + /** + * Creates a new Session for a {@code Channel}. This session will be used for all api-facing actions. + * Therefore, this session will most likely be saved by the {@code ConnectionManager} in order to intereact with the + * {@code Session}. + * + * @param c the Channel the Session will be using + * @param protocol the Protocol the Session will be using + * @return the new Session + */ + public abstract Session newSession(Channel c, Protocol protocol); + + /** + * Called when a session becomes inactive because the underlying channel has been closed. + * All references to the Session should be removed, as it will no longer be valid. + * + * @param session the Session which will become inactive + */ + public abstract void sessionInactivated(Session session); + + /** + * This attribute is used internally to store the protocol of a channel. + */ + public static final AttributeKey PROTOCOL_ATTRIBUTE = new AttributeKey<>("PROTCOL"); +} diff --git a/src/main/java/com/flowpowered/networking/Message.java b/src/main/java/com/flowpowered/networking/Message.java new file mode 100644 index 0000000..87865ba --- /dev/null +++ b/src/main/java/com/flowpowered/networking/Message.java @@ -0,0 +1,50 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking; + +/** + * Implementers of this class represent the data of a message to be sent. + * There are a few rules that messages should follow: + *
    + *
  • All message fields should be immutable. This ensures thread-safety and makes it so Message objects can be safely stored
  • + *
  • Message subclasses should override {@link #toString()}, {@link #equals(Object)} , and {@link #hashCode()}.
  • + *
  • All fields in a Message should be protocol-primitive (can be written directly via ByteBuf methods or via a *single* ByteBufUtils method)
  • + *
+ */ +public interface Message { + @Override + public String toString(); + + @Override + public boolean equals(Object other); + + @Override + public int hashCode(); + + /** + * This method may optionally be used to define if the message should be handled asynchronously or synchronously, when the option is available. + * It is completely up to the implementation how this is used. + */ + public boolean isAsync(); +} diff --git a/src/main/java/com/flowpowered/networking/MessageHandler.java b/src/main/java/com/flowpowered/networking/MessageHandler.java new file mode 100644 index 0000000..ba84041 --- /dev/null +++ b/src/main/java/com/flowpowered/networking/MessageHandler.java @@ -0,0 +1,41 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking; + +import com.flowpowered.networking.session.Session; + +/** + * Defines a class which handles a message of type {@code T}. + * + * @param the type of message to handle + */ +public interface MessageHandler { + /** + * Handles a message that was received. + * + * @param session the session that received the message + * @param message the message that was received + */ + public void handle(Session session, T message); +} diff --git a/src/main/java/com/flowpowered/networking/NetworkClient.java b/src/main/java/com/flowpowered/networking/NetworkClient.java new file mode 100644 index 0000000..e952fc7 --- /dev/null +++ b/src/main/java/com/flowpowered/networking/NetworkClient.java @@ -0,0 +1,54 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; + +import java.net.SocketAddress; + +import static com.flowpowered.networking.ConnectionManager.PROTOCOL_ATTRIBUTE; + +import com.flowpowered.networking.protocol.Protocol; + +/** + * This class defines an easy, general way to start a client. It is recommended that any clients use or extend this class. + */ +public abstract class NetworkClient extends ConnectionManager { + private final Bootstrap bootstrap = new Bootstrap(); + private final EventLoopGroup workerGroup = new NioEventLoopGroup(); + + public NetworkClient(final SocketAddress remoteAdress, final Protocol toConnect) { + bootstrap + .group(workerGroup) + .channel(NioSocketChannel.class) + .handler(new BasicChannelInitializer(this)); + + Channel channel = bootstrap.connect(remoteAdress).awaitUninterruptibly().channel(); + channel.attr(PROTOCOL_ATTRIBUTE).set(toConnect); + } +} diff --git a/src/main/java/com/flowpowered/networking/NetworkServer.java b/src/main/java/com/flowpowered/networking/NetworkServer.java new file mode 100644 index 0000000..118c107 --- /dev/null +++ b/src/main/java/com/flowpowered/networking/NetworkServer.java @@ -0,0 +1,61 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; + +import java.net.SocketAddress; + +import com.flowpowered.networking.protocol.Protocol; + +/** + * This class defines an easy, general way to start a server. It is recommended that any server use or extend this class. + */ +public abstract class NetworkServer extends ConnectionManager { + /** + * The {@link ServerBootstrap} used to initialize Netty. + */ + private final ServerBootstrap bootstrap = new ServerBootstrap(); + private final EventLoopGroup bossGroup = new NioEventLoopGroup(); + private final EventLoopGroup workerGroup = new NioEventLoopGroup(); + + public NetworkServer() { + bootstrap + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new BasicChannelInitializer(this)) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.SO_KEEPALIVE, true); + } + + public void bind(SocketAddress address, Protocol protocol) { + Channel channel = bootstrap.bind(address).awaitUninterruptibly().channel(); + channel.attr(PROTOCOL_ATTRIBUTE).set(protocol); + } +} diff --git a/src/main/java/com/flowpowered/networking/exception/UnknownPacketException.java b/src/main/java/com/flowpowered/networking/exception/UnknownPacketException.java new file mode 100644 index 0000000..17bd769 --- /dev/null +++ b/src/main/java/com/flowpowered/networking/exception/UnknownPacketException.java @@ -0,0 +1,57 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking.exception; + +import java.io.IOException; + +/** + * Thrown when a {@link com.flowpowered.networking.Codec} cannot be found for a given opcode + */ +public class UnknownPacketException extends IOException { + private static final long serialVersionUID = 2479966238464122702L; + private final int opcode; + /** + * If the length is -1, the length is unknown. + */ + private final int length; + + /** + * + * @param opcode the opcode of the unknown packet + * @param length the length of the unknown packet, or -1 if unknown + */ + public UnknownPacketException(int opcode, int length) { + super("Unknown opcode: " + opcode); + this.opcode = opcode; + this.length = length; + } + + public int getOpcode() { + return opcode; + } + + public int getLength() { + return length; + } +} diff --git a/src/main/java/com/flowpowered/networking/pipeline/MessageDecoder.java b/src/main/java/com/flowpowered/networking/pipeline/MessageDecoder.java new file mode 100644 index 0000000..6f608f9 --- /dev/null +++ b/src/main/java/com/flowpowered/networking/pipeline/MessageDecoder.java @@ -0,0 +1,83 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking.pipeline; + +import java.io.IOException; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; + +import com.flowpowered.networking.Codec; +import com.flowpowered.networking.ConnectionManager; +import com.flowpowered.networking.Message; +import com.flowpowered.networking.process.PreprocessReplayingDecoder; +import com.flowpowered.networking.protocol.Protocol; +import com.flowpowered.networking.exception.UnknownPacketException; + +/** + * A {@link PreprocessReplayingDecoder} which decodes {@link ByteBuf}s into Common {@link Message}s. + */ +public class MessageDecoder extends PreprocessReplayingDecoder { + private static final int PREVIOUS_MASK = 0x1F; + private int[] previousOpcodes = new int[PREVIOUS_MASK + 1]; + private int opcodeCounter = 0; + + public MessageDecoder() { + super(512); + } + + @Override + protected Object decodeProcessed(ChannelHandlerContext ctx, ByteBuf buf) throws Exception { + Protocol protocol = ctx.channel().parent().attr(ConnectionManager.PROTOCOL_ATTRIBUTE).get(); + Codec codec = null; + try { + codec = protocol.readHeader(buf); + } catch (UnknownPacketException e) { + int length = e.getLength(); + if (length != -1 && length != 0) { + buf.readBytes(length); + } + if (length == -1) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < PREVIOUS_MASK; i++) { + if (i > 0) { + sb.append(", "); + } + sb.append(Integer.toHexString(previousOpcodes[(opcodeCounter + i) & PREVIOUS_MASK])); + } + + throw new IOException("Unknown operation code: " + e.getOpcode() + " (previous opcodes: " + sb.toString() + ").", e); + } + } + + if (codec == null) { + return buf; + } + + previousOpcodes[(opcodeCounter++) & PREVIOUS_MASK] = codec.getOpcode(); + Object decoded = codec.decode(buf); + buf.release(); + return decoded; + } +} diff --git a/src/main/java/com/flowpowered/networking/pipeline/MessageEncoder.java b/src/main/java/com/flowpowered/networking/pipeline/MessageEncoder.java new file mode 100644 index 0000000..d33c22d --- /dev/null +++ b/src/main/java/com/flowpowered/networking/pipeline/MessageEncoder.java @@ -0,0 +1,60 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking.pipeline; + +import com.flowpowered.networking.protocol.Protocol; +import com.flowpowered.networking.Message; +import com.flowpowered.networking.Codec; +import java.io.IOException; +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import com.flowpowered.networking.ConnectionManager; + +import com.flowpowered.networking.process.ProcessingEncoder; + +/** + * A {@link MessageToMessageEncoder} which encodes into {@link ByteBuf}s. + */ +public class MessageEncoder extends ProcessingEncoder { + + @SuppressWarnings("unchecked") + @Override + protected void encodePreProcess(ChannelHandlerContext ctx, final Object msg, List out) throws IOException { + if (msg instanceof Message) { + final Protocol protocol = ctx.channel().attr(ConnectionManager.PROTOCOL_ATTRIBUTE).get(); + final Message message = (Message) msg; + final Class clazz = message.getClass(); + final Codec codec = (Codec) protocol.getCodecLookupService().find(clazz); + if (codec == null) { + throw new IOException("Unknown message type: " + clazz + "."); + } + final ByteBuf messageBuf = codec.encode(ctx.alloc().buffer(), message); + final ByteBuf headerBuf = protocol.writeHeader(codec, messageBuf, ctx.alloc().buffer()); + out.add(Unpooled.wrappedBuffer(headerBuf, messageBuf)); + } + } +} diff --git a/src/main/java/com/flowpowered/networking/pipeline/MessageHandler.java b/src/main/java/com/flowpowered/networking/pipeline/MessageHandler.java new file mode 100644 index 0000000..eb34ba3 --- /dev/null +++ b/src/main/java/com/flowpowered/networking/pipeline/MessageHandler.java @@ -0,0 +1,108 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking.pipeline; + +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.flowpowered.networking.ConnectionManager; +import com.flowpowered.networking.Message; +import com.flowpowered.networking.session.Session; + +/** + * A {@link SimpleChannelUpstreamHandler} which processes incoming network events. + */ +public class MessageHandler extends SimpleChannelInboundHandler { + /** + * The associated session + */ + private final AtomicReference session = new AtomicReference<>(null); + private final ConnectionManager connectionManager; + + /** + * Creates a new network event handler. + * + */ + public MessageHandler(ConnectionManager connectionManager) { + this.connectionManager = connectionManager; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + final Channel c = ctx.channel(); + setSession(connectionManager.newSession(c, c.attr(ConnectionManager.PROTOCOL_ATTRIBUTE).get())); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + Channel c = ctx.channel(); + Session session = this.session.get(); + // TODO needed? + session.validate(c); + session.onDisconnect(); + connectionManager.sessionInactivated(session); + getLogger().info("Channel disconnected: " + c + "."); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Message i) { + Session session = this.session.get(); + session.validate(ctx.channel()); + session.messageReceived(i); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + Channel c = ctx.channel(); + getLogger().log(Level.WARN, "Exception caught, closing channel: " + c + "...", cause); + c.close(); + } + + public Session getSession() { + return session.get(); + } + + public void setSession(Session session) { + if (!this.session.compareAndSet(null, session)) { + throw new IllegalStateException("Session may not be set more than once"); + } + } + + protected Logger getLogger() { + String loggerName = ""; + if (session.get() != null) { + Logger protocolLogger = session.get().getProtocol().getLogger(); + // TODO: Maybe we should just use the protocolLogger if present? + loggerName = protocolLogger != null ? protocolLogger.getName() + "." : ""; + } + return LogManager.getLogger(loggerName + getClass().getSimpleName()); + } +} diff --git a/src/main/java/com/flowpowered/networking/process/ByteBufferChannelProcessor.java b/src/main/java/com/flowpowered/networking/process/ByteBufferChannelProcessor.java new file mode 100644 index 0000000..6dcb861 --- /dev/null +++ b/src/main/java/com/flowpowered/networking/process/ByteBufferChannelProcessor.java @@ -0,0 +1,117 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking.process; + +/** + * Represents a processor that acts as a pass-through, backed by a byte array + */ +public class ByteBufferChannelProcessor extends CommonChannelProcessor { + private byte[] internalBuffer; + private int writePointer; + private int readPointer; + private boolean full; + + public ByteBufferChannelProcessor(int capacity) { + super(capacity); + } + + @Override + protected void write(byte[] buf, int length) { + if (length > buf.length) { + throw new ArrayIndexOutOfBoundsException(length + " exceeds the size of the byte array " + buf.length); + } + + int toCopy = length; + + if (internalBuffer == null) { + internalBuffer = new byte[capacity << 1]; + readPointer = 0; + writePointer = 0; + full = false; + } + if (freeSpace() < length) { + throw new IllegalStateException("Internal buffer ran out of memory"); + } + int toTransfer = Math.min(length, internalBuffer.length - writePointer); + System.arraycopy(buf, 0, internalBuffer, writePointer, toTransfer); + writePointer = (writePointer + toTransfer) % internalBuffer.length; + + length -= toTransfer; + + if (length > 0) { + System.arraycopy(buf, toTransfer, internalBuffer, writePointer, length); + writePointer = (writePointer + length) % internalBuffer.length; + } + + if (writePointer == readPointer && toCopy > 0) { + full = true; + } + } + + @Override + protected int read(byte[] buf) { + final int toCopy = Math.min(stored(), buf.length); + final int toTransfer = Math.min(toCopy, internalBuffer.length - readPointer); + int length = toCopy; + + System.arraycopy(internalBuffer, readPointer, buf, 0, toTransfer); + readPointer = (readPointer + toTransfer) % internalBuffer.length; + + length -= toTransfer; + + if (length > 0) { + System.arraycopy(internalBuffer, 0, buf, toTransfer, length); + readPointer = (readPointer + length) % internalBuffer.length; + } + + if (toCopy > 0) { + full = false; + } + return toCopy; + } + + private int stored() { + if (full) { + return internalBuffer.length; + } + + if (writePointer >= readPointer) { + return writePointer - readPointer; + } + + return internalBuffer.length - (readPointer - writePointer); + } + + private int freeSpace() { + if (full) { + return 0; + } + + if (writePointer >= readPointer) { + return internalBuffer.length - (writePointer - readPointer); + } + + return readPointer - writePointer; + } +} diff --git a/src/main/java/com/flowpowered/networking/process/ChannelProcessor.java b/src/main/java/com/flowpowered/networking/process/ChannelProcessor.java new file mode 100644 index 0000000..c0115da --- /dev/null +++ b/src/main/java/com/flowpowered/networking/process/ChannelProcessor.java @@ -0,0 +1,45 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking.process; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; + +/** + * {@code ChannelProcessor} can be used in a {@link PreprocessReplayingDecoder} or {@link ProcessingEncoder} to define + * how a {@code ByteBuf} should be process prior to decode or encode. + */ +public interface ChannelProcessor { + /** + * Adds the data contained in the given channel buffer to the processor and returns the output channel buffer. The method may be called from multiple threads. + * {@code input.release} should NOT be called; it is done externally. + * {@code buffer.release} should NOT be called; it is done externally. + * + * @param ctx the channel handler context + * @param input the buffer containing the input data + * @param buffer the buffer to add the data to; will be dynamically-sized + * @return + */ + public ByteBuf process(ChannelHandlerContext ctx, ByteBuf input, ByteBuf buffer); +} diff --git a/src/main/java/com/flowpowered/networking/process/CommonChannelProcessor.java b/src/main/java/com/flowpowered/networking/process/CommonChannelProcessor.java new file mode 100644 index 0000000..ed7ffd2 --- /dev/null +++ b/src/main/java/com/flowpowered/networking/process/CommonChannelProcessor.java @@ -0,0 +1,75 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking.process; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; + +/** + * Bridge class for passing ByteBufs through byte array read/write processing + */ +public abstract class CommonChannelProcessor implements ChannelProcessor { + protected final int capacity; + private final byte[] byteBuffer; + + public CommonChannelProcessor(int capacity) { + this.capacity = capacity; + this.byteBuffer = new byte[capacity]; + } + + @Override + public final synchronized ByteBuf process(ChannelHandlerContext ctx, final ByteBuf input, ByteBuf buffer) { + if (buffer == null) { + throw new IllegalArgumentException("buffer cannot be null!"); + } + int remaining; + while ((remaining = input.readableBytes()) > 0) { + int clamped = Math.min(remaining, capacity); + input.readBytes(byteBuffer, 0, clamped); + write(byteBuffer, clamped); + int read; + while ((read = read(byteBuffer)) > 0) { + buffer.writeBytes(byteBuffer, 0, read); + } + } + + return buffer; + } + + /** + * Writes data to the processor

This method does not need to be thread safe + * + * @param buf a buffer containing the data + * @param length the length of the data to process + */ + protected abstract void write(byte[] buf, int length); + + /** + * Reads the data from the processor into the given array

This method does not need to be thread safe + * + * @param buf the byte array to process the data to + * @return the number of bytes written + */ + protected abstract int read(byte[] buf); +} diff --git a/src/main/java/com/flowpowered/networking/process/PreprocessReplayingDecoder.java b/src/main/java/com/flowpowered/networking/process/PreprocessReplayingDecoder.java new file mode 100644 index 0000000..66ca57d --- /dev/null +++ b/src/main/java/com/flowpowered/networking/process/PreprocessReplayingDecoder.java @@ -0,0 +1,143 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking.process; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +/** + * This class is both a {@link ByteToMessageDecoder} but also allows processing pre-decode via {@code decodeProcessed}. + * + */ +public abstract class PreprocessReplayingDecoder extends ByteToMessageDecoder implements ProcessorHandler { + private final AtomicReference processor = new AtomicReference<>(null); + private final ReplayableByteBuf replayableBuffer = new ReplayableByteBuf(); + private final AtomicBoolean locked = new AtomicBoolean(false); + private final int capacity; + private ByteBuf processedBuffer = null; + + /** + * Constructs a new replaying decoder.

The internal buffer is dynamically sized, but if it grows larger than the given capacity, it will be resized downwards when possible. This allows + * handling of larger packets without requiring the buffers to be set larger than the size of the largest packet. + * + * @param capacity the default capacity of the internal buffer. + */ + public PreprocessReplayingDecoder(int capacity) { + this.capacity = capacity; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List frames) throws Exception { + if (!buf.isReadable()) { + return; + } + + if (locked.get()) { + throw new IllegalStateException("Decode attempted when channel was locked"); + } + + Object frame; + + ChannelProcessor processor = this.processor.get(); + ByteBuf liveBuffer; + do { + if (processor == null) { + liveBuffer = buf; + } else { + if (processedBuffer == null) { + processedBuffer = ctx.alloc().buffer(); + } + processedBuffer = processor.process(ctx, buf, processedBuffer); + liveBuffer = processedBuffer; + } + int readPointer = liveBuffer.readerIndex(); + try { + frame = decodeProcessed(ctx, replayableBuffer.setBuffer(liveBuffer)); + } catch (ReplayableException e) { + // roll back liveBuffer read to state prior to calling decodeProcessed + liveBuffer.readerIndex(readPointer); + // No frame returned + frame = null; + } + + if (frame != null) { + frames.add(frame); + if (frame instanceof ProcessorSetupMessage) { + ProcessorSetupMessage setupMessage = (ProcessorSetupMessage) frame; + ChannelProcessor newProcessor = setupMessage.getProcessor(); + if (newProcessor != null) { + setProcessor(newProcessor); + } + if (setupMessage.isChannelLocking()) { + locked.set(true); + } else { + locked.set(false); + } + setupMessage.setProcessorHandler(this); + } + processor = this.processor.get(); + } + } while (frame != null && !locked.get()); + + if (processedBuffer != null) { + if (processedBuffer instanceof CompositeByteBuf || (processedBuffer.capacity() > capacity && processedBuffer.isWritable())) { + ByteBuf newBuffer = ctx.alloc().buffer(Math.max(capacity, processedBuffer.readableBytes())); + if (processedBuffer.isReadable()) { + // This method transfers the data in processedBuffer to the newBuffer. + // However, for some reason, if processedBuffer is zero length, it causes an exception. + newBuffer.writeBytes(processedBuffer); + } + ByteBuf old = processedBuffer; + processedBuffer = newBuffer; + old.release(); + } + processedBuffer.discardReadBytes(); + } + } + + @Override + public void setProcessor(ChannelProcessor processor) { + if (processor == null) { + throw new IllegalArgumentException("Processor may not be set to null"); + } else if (!this.processor.compareAndSet(null, processor)) { + throw new IllegalArgumentException("Processor may only be set once"); + } + locked.set(false); + } + + /** + * This method is the equivalent of the decode method for the standard ReplayingDecoder
The method call is repeated if decoding causes the ByteBuf to run out of bytes
+ * + * @param ctx the channel handler context + * @param buffer the channel buffer + * @return the message to pass to the next stage + */ + protected abstract Object decodeProcessed(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception; +} diff --git a/src/main/java/com/flowpowered/networking/process/ProcessingEncoder.java b/src/main/java/com/flowpowered/networking/process/ProcessingEncoder.java new file mode 100644 index 0000000..0940587 --- /dev/null +++ b/src/main/java/com/flowpowered/networking/process/ProcessingEncoder.java @@ -0,0 +1,98 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking.process; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.MessageToMessageEncoder; +import java.util.ArrayList; +import java.util.List; + +/** + * This class provides a layer of processing after encode but before the message is passed outbound. + * + */ +public abstract class ProcessingEncoder extends MessageToMessageEncoder implements ProcessorHandler { + private final AtomicReference processor = new AtomicReference<>(); + private final AtomicBoolean locked = new AtomicBoolean(false); + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (locked.get()) { + throw new IllegalStateException("Encode attempted when channel was locked"); + } + super.write(ctx, msg, promise); + } + + private void checkForSetupMessage(Object e) { + if (e instanceof ProcessorSetupMessage) { + ProcessorSetupMessage setupMessage = (ProcessorSetupMessage) e; + ChannelProcessor newProcessor = setupMessage.getProcessor(); + if (newProcessor != null) { + setProcessor(newProcessor); + } + if (setupMessage.isChannelLocking()) { + locked.set(true); + } else { + locked.set(false); + } + setupMessage.setProcessorHandler(this); + } + } + + @Override + public void setProcessor(ChannelProcessor processor) { + if (processor == null) { + throw new IllegalArgumentException("Processor may not be set to null"); + } else if (!this.processor.compareAndSet(null, processor)) { + throw new IllegalArgumentException("Processor may only be set once"); + } + locked.set(false); + } + + @Override + protected void encode(ChannelHandlerContext ctx, final Object msg, List out) throws Exception { + List newOut = new ArrayList<>(); + encodePreProcess(ctx, msg, newOut); + final ChannelProcessor processor = this.processor.get(); + for (final Object encoded : newOut) { + Object toAdd = encoded; + if (processor != null && encoded instanceof ByteBuf) { + synchronized (this) { + // Gotta release the old + toAdd = processor.process(ctx, (ByteBuf) encoded, ctx.alloc().buffer()); + ((ByteBuf) encoded).release(); + } + } + out.add(toAdd); + } + checkForSetupMessage(msg); + } + + protected abstract void encodePreProcess(ChannelHandlerContext ctx, Object msg, List out) throws Exception; +} diff --git a/src/main/java/com/flowpowered/networking/process/ProcessorHandler.java b/src/main/java/com/flowpowered/networking/process/ProcessorHandler.java new file mode 100644 index 0000000..ab303ec --- /dev/null +++ b/src/main/java/com/flowpowered/networking/process/ProcessorHandler.java @@ -0,0 +1,34 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking.process; + +/** + * Represents a processor handler. + */ +public interface ProcessorHandler { + /** + * Sets the processor to be used to process the packets
+ */ + public void setProcessor(ChannelProcessor processor); +} diff --git a/src/main/java/com/flowpowered/networking/process/ProcessorSetupMessage.java b/src/main/java/com/flowpowered/networking/process/ProcessorSetupMessage.java new file mode 100644 index 0000000..836b58b --- /dev/null +++ b/src/main/java/com/flowpowered/networking/process/ProcessorSetupMessage.java @@ -0,0 +1,49 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking.process; + +import com.flowpowered.networking.Message; + +public interface ProcessorSetupMessage extends Message { + /** + * Gets the processor to use to process messages subsequent to this one + * + * @return the new ChannelProcessor or null for none + */ + public ChannelProcessor getProcessor(); + + /** + * Gets if the channel should process any more packets pending processor setup + * + * @return true if the channel should be locked after receiving this message + */ + public boolean isChannelLocking(); + + /** + * Sets the processor handler associated with this packet + * + * @param handler the handler + */ + public void setProcessorHandler(ProcessorHandler handler); +} diff --git a/src/main/java/com/flowpowered/networking/process/ReplayableByteBuf.java b/src/main/java/com/flowpowered/networking/process/ReplayableByteBuf.java new file mode 100644 index 0000000..3a709d3 --- /dev/null +++ b/src/main/java/com/flowpowered/networking/process/ReplayableByteBuf.java @@ -0,0 +1,853 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking.process; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; +import java.nio.charset.Charset; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufProcessor; + +public class ReplayableByteBuf extends ByteBuf { + private static final ReplayableException ERROR_INSTANCE = new ReplayableException(""); + private ByteBuf buffer; + + public ByteBuf setBuffer(ByteBuf buffer) { + this.buffer = buffer; + return this; + } + + @Override + public int capacity() { + return Integer.MAX_VALUE; + } + + @Override + public ByteOrder order() { + return buffer.order(); + } + + @Override + public boolean isDirect() { + return buffer.isDirect(); + } + + @Override + public int readerIndex() { + return buffer.readerIndex(); + } + + @Override + public ByteBuf readerIndex(int readerIndex) { + indexRangeCheck(readerIndex); + buffer.readerIndex(readerIndex); + return this; + } + + @Override + public int writerIndex() { + return buffer.writerIndex(); + } + + @Override + public ByteBuf writerIndex(int writerIndex) { + indexRangeCheck(writerIndex); + return this; + } + + @Override + public ByteBuf setIndex(int readerIndex, int writerIndex) { + indexRangeCheck(readerIndex); + indexRangeCheck(writerIndex); + buffer.setIndex(readerIndex, writerIndex); + return this; + } + + @Override + public int readableBytes() { + return Integer.MAX_VALUE - buffer.readableBytes(); + } + + @Override + public int writableBytes() { + return buffer.writableBytes(); + } + + @Override + public boolean isReadable() { + return true; + } + + @Override + public boolean isWritable() { + return buffer.isWritable(); + } + + @Override + public ByteBuf clear() { + return unsupported(); + } + + @Override + public ByteBuf markReaderIndex() { + buffer.markReaderIndex(); + return this; + } + + @Override + public ByteBuf resetReaderIndex() { + buffer.resetReaderIndex(); + return this; + } + + @Override + public ByteBuf markWriterIndex() { + buffer.markWriterIndex(); + return this; + } + + @Override + public ByteBuf resetWriterIndex() { + buffer.resetWriterIndex(); + return this; + } + + @Override + public ByteBuf discardReadBytes() { + return unsupported(); + } + + @Override + public byte getByte(int index) { + checkAvail(index, 1); + return buffer.getByte(index); + } + + @Override + public short getUnsignedByte(int index) { + checkAvail(index, 1); + return buffer.getUnsignedByte(index); + } + + @Override + public short getShort(int index) { + checkAvail(index, 2); + return buffer.getShort(index); + } + + @Override + public int getUnsignedShort(int index) { + checkAvail(index, 2); + return buffer.getUnsignedShort(index); + } + + @Override + public int getMedium(int index) { + checkAvail(index, 3); + return buffer.getMedium(index); + } + + @Override + public int getUnsignedMedium(int index) { + checkAvail(index, 3); + return buffer.getUnsignedMedium(index); + } + + @Override + public int getInt(int index) { + checkAvail(index, 4); + return buffer.getInt(index); + } + + @Override + public long getUnsignedInt(int index) { + checkAvail(index, 4); + return buffer.getUnsignedInt(index); + } + + @Override + public long getLong(int index) { + checkAvail(index, 8); + return buffer.getLong(index); + } + + @Override + public char getChar(int index) { + checkAvail(index, 2); + return buffer.getChar(index); + } + + @Override + public float getFloat(int index) { + checkAvail(index, 4); + return buffer.getFloat(index); + } + + @Override + public double getDouble(int index) { + checkAvail(index, 4); + return buffer.getFloat(index); + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst) { + return unsupported(); + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int length) { + return unsupported(); + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { + return unsupported(); + } + + @Override + public ByteBuf getBytes(int index, byte[] dst) { + checkAvail(index, dst.length); + buffer.getBytes(index, dst); + return this; + } + + @Override + public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { + checkAvail(index, length); + buffer.getBytes(index, dst, dstIndex, length); + return this; + } + + @Override + public ByteBuf getBytes(int index, ByteBuffer dst) { + return unsupported(); + } + + @Override + public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException { + return unsupported(); + } + + @Override + public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { + return unsupported(); + } + + @Override + public ByteBuf setByte(int index, int value) { + return unsupported(); + } + + @Override + public ByteBuf setShort(int index, int value) { + return unsupported(); + } + + @Override + public ByteBuf setMedium(int index, int value) { + return unsupported(); + } + + @Override + public ByteBuf setInt(int index, int value) { + return unsupported(); + } + + @Override + public ByteBuf setLong(int index, long value) { + return unsupported(); + } + + @Override + public ByteBuf setChar(int index, int value) { + return unsupported(); + } + + @Override + public ByteBuf setFloat(int index, float value) { + return unsupported(); + } + + @Override + public ByteBuf setDouble(int index, double value) { + return unsupported(); + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src) { + return unsupported(); + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int length) { + return unsupported(); + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { + return unsupported(); + } + + @Override + public ByteBuf setBytes(int index, byte[] src) { + return unsupported(); + } + + @Override + public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { + return unsupported(); + } + + @Override + public ByteBuf setBytes(int index, ByteBuffer src) { + return unsupported(); + } + + @Override + public int setBytes(int index, InputStream in, int length) throws IOException { + return unsupported(); + } + + @Override + public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { + return unsupported(); + } + + @Override + public ByteBuf setZero(int index, int length) { + return unsupported(); + } + + @Override + public byte readByte() { + checkAvail(1); + return buffer.readByte(); + } + + @Override + public short readUnsignedByte() { + checkAvail(1); + return buffer.readUnsignedByte(); + } + + @Override + public short readShort() { + checkAvail(2); + return buffer.readShort(); + } + + @Override + public int readUnsignedShort() { + checkAvail(2); + return buffer.readUnsignedShort(); + } + + @Override + public int readMedium() { + checkAvail(3); + return buffer.readMedium(); + } + + @Override + public int readUnsignedMedium() { + checkAvail(3); + return buffer.readUnsignedMedium(); + } + + @Override + public int readInt() { + checkAvail(4); + return buffer.readInt(); + } + + @Override + public long readUnsignedInt() { + checkAvail(4); + return buffer.readInt(); + } + + @Override + public long readLong() { + checkAvail(8); + return buffer.readLong(); + } + + @Override + public char readChar() { + checkAvail(2); + return buffer.readChar(); + } + + @Override + public float readFloat() { + checkAvail(4); + return buffer.readFloat(); + } + + @Override + public double readDouble() { + checkAvail(8); + return buffer.readDouble(); + } + + @Override + public ByteBuf readBytes(int length) { + checkAvail(length); + int readable = buffer.readableBytes(); + try { + return buffer.readBytes(length); + } catch (ReplayableException e) { + throw new ReplayableException("Error: readable = " + readable + " length = " + length, e); + } + } + + @Override + public ByteBuf readSlice(int length) { + checkAvail(length); + return buffer.readSlice(length); + } + + @Override + public ByteBuf readBytes(ByteBuf dst) { + return unsupported(); + } + + @Override + public ByteBuf readBytes(ByteBuf dst, int length) { + return unsupported(); + } + + @Override + public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) { + checkAvail(length); + buffer.readBytes(dst, dstIndex, length); + return this; + } + + @Override + public ByteBuf readBytes(byte[] dst) { + checkAvail(dst.length); + buffer.readBytes(dst); + return this; + } + + @Override + public ByteBuf readBytes(byte[] dst, int dstIndex, int length) { + checkAvail(length); + buffer.readBytes(dst, dstIndex, length); + return this; + } + + @Override + public ByteBuf readBytes(ByteBuffer dst) { + return unsupported(); + } + + @Override + public ByteBuf readBytes(OutputStream out, int length) throws IOException { + return unsupported(); + } + + @Override + public int readBytes(GatheringByteChannel out, int length) throws IOException { + return unsupported(); + } + + @Override + public ByteBuf skipBytes(int length) { + checkAvail(length); + buffer.skipBytes(length); + return this; + } + + @Override + public ByteBuf writeByte(int value) { + return unsupported(); + } + + @Override + public ByteBuf writeShort(int value) { + return unsupported(); + } + + @Override + public ByteBuf writeMedium(int value) { + return unsupported(); + } + + @Override + public int writeBytes(InputStream in, int length) throws IOException { + return unsupported(); + } + + @Override + public int writeBytes(ScatteringByteChannel in, int length) throws IOException { + return unsupported(); + } + + @Override + public ByteBuf writeBoolean(boolean value) { + return unsupported(); + } + + @Override + public ByteBuf writeInt(int value) { + return unsupported(); + } + + @Override + public ByteBuf writeLong(long value) { + return unsupported(); + } + + @Override + public ByteBuf writeChar(int value) { + return unsupported(); + } + + @Override + public ByteBuf writeFloat(float value) { + return unsupported(); + } + + @Override + public ByteBuf writeDouble(double value) { + return unsupported(); + } + + @Override + public ByteBuf writeBytes(ByteBuf src) { + return unsupported(); + } + + @Override + public ByteBuf writeBytes(ByteBuf src, int length) { + return unsupported(); + } + + @Override + public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) { + return unsupported(); + } + + @Override + public ByteBuf writeBytes(byte[] src) { + return unsupported(); + } + + @Override + public ByteBuf writeBytes(byte[] src, int srcIndex, int length) { + return unsupported(); + } + + @Override + public ByteBuf writeBytes(ByteBuffer src) { + return unsupported(); + } + + @Override + public ByteBuf writeZero(int length) { + return unsupported(); + } + + @Override + public int indexOf(int fromIndex, int toIndex, byte value) { + indexRangeCheck(toIndex); + indexRangeCheck(fromIndex); + return buffer.indexOf(fromIndex, toIndex, value); + } + + @Override + public int bytesBefore(byte value) { + return unsupported(); + } + + @Override + public int bytesBefore(int length, byte value) { + return unsupported(); + } + + @Override + public int bytesBefore(int index, int length, byte value) { + return unsupported(); + } + + @Override + public ByteBuf copy() { + return unsupported(); + } + + @Override + public ByteBuf copy(int index, int length) { + checkAvail(index, length); + return buffer.copy(index, length); + } + + @Override + public ByteBuf slice() { + return buffer.slice(); + } + + @Override + public ByteBuf slice(int index, int length) { + checkAvail(index, length); + return buffer.slice(index, length); + } + + @Override + public ByteBuf duplicate() { + return buffer.duplicate(); + } + + @Override + public boolean hasArray() { + return false; + } + + @Override + public byte[] array() { + return unsupported(); + } + + @Override + public int arrayOffset() { + return unsupported(); + } + + @Override + public String toString(Charset charset) { + return buffer.toString(); + } + + @Override + public String toString(int index, int length, Charset charset) { + checkAvail(index, length); + return buffer.toString(index, length, charset); + } + + @Override + public int compareTo(ByteBuf buffer) { + return unsupported(); + } + + private R unsupported() { + throw new UnsupportedOperationException("This method is not supported for a replayable channel buffer"); + } + + private void indexRangeCheck(int index) { + checkAvail(index, 0); + } + + private void checkAvail(int length) { + if (buffer.readableBytes() < length) { + throw ERROR_INSTANCE; + } + } + + private void checkAvail(int index, int length) { + if (index + length > buffer.writerIndex()) { + throw ERROR_INSTANCE; + } + } + + @Override + public ByteBuf capacity(int newCapacity) { + buffer.capacity(newCapacity); + return this; + } + + @Override + public int maxCapacity() { + return buffer.maxCapacity(); + } + + @Override + public ByteBufAllocator alloc() { + return unsupported(); + } + + @Override + public ByteBuf order(ByteOrder endianness) { + return unsupported(); + } + + @Override + public ByteBuf unwrap() { + return unsupported(); + } + + @Override + public int maxWritableBytes() { + return unsupported(); + } + + @Override + public boolean isReadable(int size) { + return false; + } + + @Override + public boolean isWritable(int size) { + return buffer.isWritable(size); + } + + @Override + public ByteBuf discardSomeReadBytes() { + return unsupported(); + } + + @Override + public ByteBuf ensureWritable(int minWritableBytes) { + buffer.ensureWritable(minWritableBytes); + return this; + } + + @Override + public int ensureWritable(int minWritableBytes, boolean force) { + return buffer.ensureWritable(minWritableBytes, force); + } + + @Override + public boolean getBoolean(int index) { + return unsupported(); + } + + @Override + public ByteBuf setBoolean(int index, boolean value) { + buffer.setBoolean(index, value); + return this; + } + + @Override + public boolean readBoolean() { + return unsupported(); + } + + @Override + public int forEachByte(ByteBufProcessor processor) { + return unsupported(); + } + + @Override + public int forEachByte(int index, int length, ByteBufProcessor processor) { + return unsupported(); + } + + @Override + public int forEachByteDesc(ByteBufProcessor processor) { + return unsupported(); + } + + @Override + public int forEachByteDesc(int index, int length, ByteBufProcessor processor) { + return unsupported(); + } + + @Override + public int nioBufferCount() { + return unsupported(); + } + + @Override + public ByteBuffer nioBuffer() { + return unsupported(); + } + + @Override + public ByteBuffer nioBuffer(int index, int length) { + return unsupported(); + } + + @Override + public ByteBuffer internalNioBuffer(int index, int length) { + return unsupported(); + } + + @Override + public ByteBuffer[] nioBuffers() { + return unsupported(); + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + return unsupported(); + } + + @Override + public boolean hasMemoryAddress() { + return false; + } + + @Override + public long memoryAddress() { + return unsupported(); + } + + @Override + public int hashCode() { + return 123 + buffer.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof ReplayableByteBuf && buffer.equals(((ReplayableByteBuf) obj).buffer); + } + + @Override + public String toString() { + return "Replayable" + buffer.toString(); + } + + @Override + public ByteBuf retain(int increment) { + return this; + } + + @Override + public ByteBuf retain() { + return this; + } + + @Override + public int refCnt() { + return 1; + } + + @Override + public boolean release() { + return true; + } + + @Override + public boolean release(int decrement) { + return true; + } +} diff --git a/src/main/java/com/flowpowered/networking/process/ReplayableException.java b/src/main/java/com/flowpowered/networking/process/ReplayableException.java new file mode 100644 index 0000000..5a1776b --- /dev/null +++ b/src/main/java/com/flowpowered/networking/process/ReplayableException.java @@ -0,0 +1,36 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking.process; + +public class ReplayableException extends RuntimeException { + private static final long serialVersionUID = 13424275234247532L; + + public ReplayableException(String message) { + super(message); + } + + public ReplayableException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/com/flowpowered/networking/protocol/CodecLookupService.java b/src/main/java/com/flowpowered/networking/protocol/CodecLookupService.java new file mode 100644 index 0000000..8115fcf --- /dev/null +++ b/src/main/java/com/flowpowered/networking/protocol/CodecLookupService.java @@ -0,0 +1,152 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking.protocol; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; + +import com.flowpowered.commons.StringToUniqueIntegerMap; +import com.flowpowered.networking.Codec; +import com.flowpowered.networking.Message; + +/** + * A class used to lookup message codecs. + */ +public class CodecLookupService { + /** + * A lookup table for the Message classes mapped to their Codec. + */ + private final ConcurrentMap, Codec> classTable; + /** + * A synced map for the dynamic packets. + */ + private final StringToUniqueIntegerMap dynamicPacketMap; + /** + * Lookup table for opcodes mapped to their codecs. + */ + private final Codec[] opcodeTable; + /** + * Stores the next opcode available. + */ + private final AtomicInteger nextId; + + /** + * The {@link CodecLookupService} stores the codecs available in the protocol. Codecs can be found using either the class of the message they represent or their message's opcode. + * + * @param dynamicPacketMap - The dynamic opcode map + * @param size The maximum number of message types + */ + protected CodecLookupService(StringToUniqueIntegerMap dynamicPacketMap, int size) { + classTable = new ConcurrentHashMap<>(size, 1.0f); + opcodeTable = new Codec[size]; + this.dynamicPacketMap = dynamicPacketMap; + nextId = new AtomicInteger(0); + } + + /** + * Binds a codec by adding entries for it to the tables. TODO: if a dynamic opcode is registered then a static opcode tries to register, reassign dynamic. TODO: if a static opcode is registered then + * a static opcode tries to register, throw exception + * + * @param clazz The codec's class. + * @param The type of message + * @param The type of codec. + * @throws InstantiationException if the codec could not be instantiated. + * @throws IllegalAccessException if the codec could not be instantiated due to an access violation. + */ + @SuppressWarnings("unchecked") + protected > C bind(Class clazz) throws InstantiationException, IllegalAccessException, InvocationTargetException { + if (dynamicPacketMap.getKeys().contains(clazz.getName())) { + // Already bound, return codec + return (C) opcodeTable[dynamicPacketMap.register(clazz.getName())]; + } + C codec; + try { + codec = clazz.getConstructor().newInstance(); + final Codec prevCodec = opcodeTable[codec.getOpcode()]; + if (prevCodec != null) { + throw new IllegalStateException("Trying to bind a static opcode where one already exists. Static: " + clazz.getSimpleName() + " Other: " + prevCodec.getClass().getSimpleName()); + } + } catch (NoSuchMethodException e) { + try { + Constructor constructor = clazz.getConstructor(int.class); + int id; + try { + do { + id = nextId.getAndIncrement(); + } while (opcodeTable[id] != null); + } catch (IndexOutOfBoundsException ioobe) { + throw new IllegalStateException("Ran out of Ids!", ioobe); + } + codec = constructor.newInstance(id); + } catch (NoSuchMethodException e1) { + IllegalArgumentException iae = new IllegalArgumentException("Codec must either have a zero arg or single int arg constructor!", e1); + iae.addSuppressed(e); + throw iae; + } + } + opcodeTable[codec.getOpcode()] = codec; + classTable.put(codec.getMessage(), codec); + dynamicPacketMap.register(clazz.getName(), codec.getOpcode()); + return codec; + } + + /** + * Retrieves the {@link Codec} from the lookup table + * + * @param opcode The opcode which the codec uses + * @return The codec, null if not found. + */ + public Codec find(int opcode) { + if (opcode < 0 || opcode >= opcodeTable.length) { + throw new IllegalArgumentException("Opcode " + opcode + " is out of bounds"); + } + return opcodeTable[opcode]; + } + + /** + * Finds a codec by message class. + * + * @param clazz The message class. + * @param The type of message. + * @return The codec, or {@code null} if it could not be found. + */ + @SuppressWarnings("unchecked") + public Codec find(Class clazz) { + return (Codec) classTable.get(clazz); + } + + /** + * Returns A collection of all the codecs which have been registered so far. + * + * @return Collection of codecs + */ + public Collection> getCodecs() { + return Collections.unmodifiableCollection(classTable.values()); + } +} diff --git a/src/main/java/com/flowpowered/networking/protocol/HandlerLookupService.java b/src/main/java/com/flowpowered/networking/protocol/HandlerLookupService.java new file mode 100644 index 0000000..818e339 --- /dev/null +++ b/src/main/java/com/flowpowered/networking/protocol/HandlerLookupService.java @@ -0,0 +1,46 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking.protocol; + +import com.flowpowered.networking.MessageHandler; +import com.flowpowered.networking.Message; +import java.util.HashMap; +import java.util.Map; + +public class HandlerLookupService { + private final Map, MessageHandler> handlers = new HashMap<>(); + + protected > void bind(Class clazz, Class handlerClass) throws InstantiationException, IllegalAccessException { + MessageHandler handler = handlerClass.newInstance(); + handlers.put(clazz, handler); + } + + @SuppressWarnings("unchecked") + public MessageHandler find(Class clazz) { + return (MessageHandler) handlers.get(clazz); + } + + protected HandlerLookupService() { + } +} diff --git a/src/main/java/com/flowpowered/networking/protocol/Protocol.java b/src/main/java/com/flowpowered/networking/protocol/Protocol.java new file mode 100644 index 0000000..be4bb33 --- /dev/null +++ b/src/main/java/com/flowpowered/networking/protocol/Protocol.java @@ -0,0 +1,153 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking.protocol; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; + +import io.netty.buffer.ByteBuf; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.flowpowered.commons.Named; +import com.flowpowered.commons.StringToUniqueIntegerMap; +import com.flowpowered.commons.store.MemoryStore; +import com.flowpowered.networking.Codec; +import com.flowpowered.networking.Message; +import com.flowpowered.networking.MessageHandler; +import com.flowpowered.networking.exception.UnknownPacketException; + +/** + * A {@code Protocol} stores {@link Message}s and their respective {@link Codec}s and {@link MessageHandler}s. + * It also stores to what port the protocol should be bound to. + */ +public abstract class Protocol implements Named { + private final CodecLookupService codecLookup; + private final HandlerLookupService handlerLookup; + private final String name; + private final int defaultPort; + private final Logger logger; + + public Protocol(String name, int defaultPort, int maxPackets) { + this(name, defaultPort, maxPackets, LogManager.getLogger("Protocol." + name)); + } + + public Protocol(String name, int defaultPort, int maxPackets, Logger logger) { + this.name = name; + StringToUniqueIntegerMap dynamicPacketLookup = new StringToUniqueIntegerMap(null, new MemoryStore(), maxPackets, maxPackets, this.name + "ProtocolDynamicPackets"); + codecLookup = new CodecLookupService(dynamicPacketLookup, maxPackets); + handlerLookup = new HandlerLookupService(); + this.defaultPort = defaultPort; + this.logger = logger; + } + + /** + * Gets the handler lookup service associated with this Protocol + * + * @return the handler lookup service + */ + public HandlerLookupService getHandlerLookupService() { + return handlerLookup; + } + + /** + * Gets the codec lookup service associated with this Protocol + * + * @return the codec lookup service + */ + public CodecLookupService getCodecLookupService() { + return codecLookup; + } + + public , H extends MessageHandler> C registerMessage(Class codec, Class handler) { + try { + C bind = codecLookup.bind(codec); + if (bind != null) { + handlerLookup.bind(bind.getMessage(), handler); + } + return bind; + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + logger.log(Level.ERROR, "Error registering codec " + codec + ": ", e); + return null; + } + } + + /** + * Gets the name of the Protocol + * + * @return the name + */ + public String getName() { + return name; + } + + /** + * The default port is the port used when autogenerating default bindings for this protocol and in the client when no port is given. + * + * @return The default port + */ + public int getDefaultPort() { + return defaultPort; + } + + /** + * Returns the logger for this protocol. + * + * @return the logger + */ + public Logger getLogger() { + return logger; + } + + /** + * Allows applying a wrapper to messages with dynamically allocated id's, in case this protocol needs to provide special treatment for them. + * + * @param dynamicMessage The message with a dynamically-allocated codec + * @return The new message + */ + public Message getWrappedMessage(T dynamicMessage) throws IOException { + return dynamicMessage; + } + + /** + * Read a packet header from the buffer. If a codec is not known, throw a {@link UnknownPacketException} + * + * @param buf The buffer to read from + * @return The correct codec + * @throws UnknownPacketException when the opcode does not have an associated codec + */ + public abstract Codec readHeader(ByteBuf buf) throws UnknownPacketException; + + /** + * Writes a packet header to a new buffer. + * + * @param codec The codec the message was written with + * @param data The data from the encoded message + * @param header the buffer which to write the header to + * @return The buffer with the packet header + */ + public abstract ByteBuf writeHeader(Codec codec, ByteBuf data, ByteBuf header); +} diff --git a/src/main/java/com/flowpowered/networking/protocol/ProtocolRegistry.java b/src/main/java/com/flowpowered/networking/protocol/ProtocolRegistry.java new file mode 100644 index 0000000..bf7a3ff --- /dev/null +++ b/src/main/java/com/flowpowered/networking/protocol/ProtocolRegistry.java @@ -0,0 +1,80 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking.protocol; + +import java.net.SocketAddress; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; + +import com.flowpowered.networking.protocol.Protocol; + +/** + * This class provides a way to store Protocols by name and {@link SocketAddress}. + * + */ +public class ProtocolRegistry { + private final ConcurrentHashMap names = new ConcurrentHashMap<>(); + private final ConcurrentHashMap sockets = new ConcurrentHashMap<>(); + + /** + * Registers a Protocol under its name + * + * @param protocol the Protocol + */ + public void registerProtocol(SocketAddress adress, Protocol protocol) { + this.names.put(protocol.getName(), protocol); + this.sockets.put(adress, protocol); + } + + /** + * Gets the Protocol associated with a particular id + * + * @param name the id + * @return the Protocol + */ + public Protocol getProtocol(String name) { + return this.names.get(name); + } + + /** + * Gets the Protocol associated with a particular address + * + * @param address the address + * @return the Protocol + */ + public Protocol getProtocol(SocketAddress address) { + return this.sockets.get(address); + } + + /** + * Returns all protocols currently registered. The returned collection is unmodifiable. + * + * @return All registered protocols + */ + public Collection getProtocols() { + return Collections.unmodifiableCollection(this.names.values()); + } + +} diff --git a/src/main/java/com/flowpowered/networking/session/BasicSession.java b/src/main/java/com/flowpowered/networking/session/BasicSession.java new file mode 100644 index 0000000..588807d --- /dev/null +++ b/src/main/java/com/flowpowered/networking/session/BasicSession.java @@ -0,0 +1,183 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking.session; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Random; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.channel.Channel; + +import com.flowpowered.networking.Message; +import com.flowpowered.networking.MessageHandler; +import com.flowpowered.networking.protocol.Protocol; + +/** + * A basic implementation of a {@link Session} which handles and sends messages instantly. + */ +public class BasicSession implements Session { + /** + * The Random used for sessionIds. + */ + private static final Random random = new Random(); + /** + * The channel associated with this session. + */ + private final Channel channel; + /** + * The random long used for client-server handshake + */ + private final String sessionId = Long.toString(random.nextLong(), 16).trim(); + /** + * The protocol for this session + */ + private final Protocol protocol; + /** + * Default uncaught exception handler + */ + private final AtomicReference exceptionHandler; + + /** + * Creates a new session. + * + * @param channel The channel associated with this session. + */ + public BasicSession(Channel channel, Protocol bootstrapProtocol) { + this.channel = channel; + this.protocol = bootstrapProtocol; + this.exceptionHandler = new AtomicReference(new DefaultUncaughtExceptionHandler(this)); + } + + @SuppressWarnings("unchecked") + private void handleMessage(Message message) { + MessageHandler handler = (MessageHandler) protocol.getHandlerLookupService().find(message.getClass()); + if (handler != null) { + try { + handler.handle(this, message); + } catch (Exception e) { + exceptionHandler.get().uncaughtException(message, handler, e); + } + } + } + + @Override + public void send(Message message) { + if (!channel.isActive()) { + throw new IllegalStateException("Trying to send a message when a session is inactive!"); + } + try { + channel.writeAndFlush(message); + } catch (Exception e) { + protocol.getLogger().error("Exception when trying to send message, disconnecting.", e); + disconnect(); + } + } + + @Override + public void sendAll(Message... messages) { + for (Message msg : messages) { + send(msg); + } + } + + /** + * Returns the address of this session. + * + * @return The remote address. + */ + @Override + public InetSocketAddress getAddress() { + SocketAddress addr = channel.remoteAddress(); + if (!(addr instanceof InetSocketAddress)) { + return null; + } + + return (InetSocketAddress) addr; + } + + @Override + public String toString() { + return BasicSession.class.getName() + " [address=" + channel.remoteAddress() + "]"; + } + + /** + * Adds a message to the unprocessed queue. + * + * @param message The message. + */ + @Override + public void messageReceived(Message message) { + handleMessage(message); + } + + @Override + public String getSessionId() { + return sessionId; + } + + @Override + public Protocol getProtocol() { + return this.protocol; + } + + @Override + public boolean isActive() { + return channel.isActive(); + } + + @Override + public void validate(Channel c) throws IllegalStateException { + if (c != this.channel) { + throw new IllegalStateException("Unknown channel for session!"); + } + } + + @Override + public UncaughtExceptionHandler getUncaughtExceptionHandler() { + return exceptionHandler.get(); + } + + @Override + public void setUncaughtExceptionHandler(UncaughtExceptionHandler handler) { + if (handler != null) { + exceptionHandler.set(handler); + } else { + throw new IllegalArgumentException("Null uncaught exception handlers are not permitted"); + } + } + + public Channel getChannel() { + return channel; + } + + @Override + public void disconnect() { + channel.close(); + } + + @Override + public void onDisconnect() { + } +} diff --git a/src/main/java/com/flowpowered/networking/session/PulsingSession.java b/src/main/java/com/flowpowered/networking/session/PulsingSession.java new file mode 100644 index 0000000..4c2f1d4 --- /dev/null +++ b/src/main/java/com/flowpowered/networking/session/PulsingSession.java @@ -0,0 +1,172 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking.session; + +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import io.netty.channel.Channel; + +import com.flowpowered.networking.Message; +import com.flowpowered.networking.protocol.Protocol; + +/** + * Represents a {@link BasicSession} which has both a {@link State} and {@link SendType}. It can queue messages if needed. + */ +public abstract class PulsingSession extends BasicSession { + /** + * A queue of incoming and unprocessed messages + */ + private final Queue messageQueue = new ArrayDeque<>(); + /** + * A queue of outgoing messages that will be sent after the client finishes identification + */ + private final Queue sendQueue = new ConcurrentLinkedQueue<>(); + /** + * The current state. + */ + private State state = State.EXCHANGE_HANDSHAKE; + + /** + * Creates a new pulsing session. + * + * @param channel The channel associated with this session. + */ + public PulsingSession(Channel channel, Protocol bootstrapProtocol) { + super(channel, bootstrapProtocol); + } + + /** + * Gets the state of this session. + * + * @return The session's state. + */ + public State getState() { + return state; + } + + /** + * Sets the state of this session. + * + * @param state The new state. + */ + public void setState(State state) { + this.state = state; + } + + public void pulse() { + Message message; + + if (state == State.OPEN) { + while ((message = sendQueue.poll()) != null) { + super.send(message); + } + } + + while ((message = messageQueue.poll()) != null) { + super.messageReceived(message); + } + } + + @Override + public void send(Message message) { + send(SendType.QUEUE, message); + } + + public void send(SendType type, Message message) { + if (message == null) { + return; + } + if (type == SendType.FORCE || this.state == State.OPEN) { + super.send(message); + } else if (type == SendType.QUEUE) { + sendQueue.add(message); + } + } + + @Override + public void sendAll(Message... messages) { + sendAll(SendType.QUEUE, messages); + } + + public void sendAll(SendType type, Message... messages) { + for (Message msg : messages) { + send(type, msg); + } + } + + /** + * Adds a message to the unprocessed queue. + * + * @param message The message. + */ + @Override + public void messageReceived(Message message) { + if (message.isAsync()) { + super.messageReceived(message); + } else { + messageQueue.add(message); + } + } + + /** + * Specifies send behavior + */ + public static enum SendType { + /** + * Messages sent with a SendType of OPEN_ONLY will only send if State is OPEN. Messages will not be + * queued. + */ + OPEN_ONLY, /** + * Messages sent with a SendType of QUEUE will wait until State is OPEN to send. Messages may be queued. + */ + QUEUE, /** + * Messages sent with a SendType of FORCE will send as soon as possible regardless of State. + */ + FORCE + } + + public static enum State { + /** + * In the exchange handshake state, the server is waiting for the client to send its initial handshake + * packet. + */ + EXCHANGE_HANDSHAKE, /** + * In the exchange identification state, the server is waiting for the client to send its identification + * packet. + */ + EXCHANGE_IDENTIFICATION, /** + * In the exchange encryption state, the server is waiting for the client to send its encryption + * response packet. + */ + EXCHANGE_ENCRYPTION, /** + * This state is when a critical message has been sent that must be waited for. + */ + WAITING, /** + * Allows messages to be sent. + */ + OPEN + } +} diff --git a/src/main/java/com/flowpowered/networking/session/Session.java b/src/main/java/com/flowpowered/networking/session/Session.java new file mode 100644 index 0000000..f283bfb --- /dev/null +++ b/src/main/java/com/flowpowered/networking/session/Session.java @@ -0,0 +1,152 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking.session; + +import java.net.InetSocketAddress; + +import io.netty.channel.Channel; + +import org.apache.logging.log4j.Level; + +import com.flowpowered.networking.Message; +import com.flowpowered.networking.MessageHandler; +import com.flowpowered.networking.protocol.Protocol; + +/** + * Represents a connection to another engine. + *
+ * Controls the state, protocol and channels of a connection to another engine. + */ +public interface Session { + /** + * Passes a message to a session for processing. + * + * @param message message to be processed + */ + public void messageReceived(T message); + + /** + * Gets the protocol associated with this session. + * + * @return the protocol + */ + public Protocol getProtocol(); + + /** + * Sends a message across the network. + * + * @param message The message. + */ + public void send(Message message); + + /** + * Sends any amount of messages to the client. + * + * @param messages the messages to send to the client + */ + public void sendAll(Message... messages); + + /** + * Closes the session. + * + */ + public void disconnect(); + + /** + * Called after the Session has been disconnected, right before the Session is invalidated. + * + */ + public void onDisconnect(); + + /** + * Returns the address of this session. + * + * @return The remote address. + */ + public InetSocketAddress getAddress(); + + /** + * Gets the id for this session + * + * @return session id + */ + public String getSessionId(); + + /** + * Validates that {@code c} is the channel of the session. The channel of a session never changes. + * + * @param c the channel to check + * @throws IllegalStateException if {@code c} is not the channel of the session + */ + public void validate(Channel c) throws IllegalStateException; + + /** + * True if this session is open and connected. If the session is closed, errors will be thrown if messages are attempted to be sent. + * + * @return is active + */ + public boolean isActive(); + + public interface UncaughtExceptionHandler { + /** + * Called when an exception occurs during session handling + * + * @param message the message handler threw an exception on + * @param handle handler that threw the an exception handling the message + * @param ex the exception + */ + public void uncaughtException(Message message, MessageHandler handle, Exception ex); + } + + /** + * Gets the uncaught exception handler. + * + *

Note: the default exception handler is the {@link DefaultUncaughtExceptionHandler}.

+ * + * @return exception handler + */ + public UncaughtExceptionHandler getUncaughtExceptionHandler(); + + /** + * Sets the uncaught exception handler to be used for this session. Null values are not permitted. + * + *

Note: to reset the default exception handler, use the{@link DefaultUncaughtExceptionHandler}.

+ */ + public void setUncaughtExceptionHandler(UncaughtExceptionHandler handler); + + public static final class DefaultUncaughtExceptionHandler implements UncaughtExceptionHandler { + private final Session session; + + public DefaultUncaughtExceptionHandler(Session session) { + this.session = session; + } + + @Override + public void uncaughtException(Message message, MessageHandler handle, Exception ex) { + session.getProtocol().getLogger().log(Level.ERROR, "Message handler for " + message.getClass().getSimpleName() + " threw exception", ex); + //session.disconnect("Message handler exception for " + message.getClass().getSimpleName()); + session.disconnect(); + } + } +} diff --git a/src/test/java/com/flowpowered/networking/ByteBufferChannelProcessorTest.java b/src/test/java/com/flowpowered/networking/ByteBufferChannelProcessorTest.java new file mode 100644 index 0000000..0e4941e --- /dev/null +++ b/src/test/java/com/flowpowered/networking/ByteBufferChannelProcessorTest.java @@ -0,0 +1,109 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking; + +import java.util.Random; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; + +import org.junit.Test; +import static org.junit.Assert.assertTrue; + +import com.flowpowered.networking.fake.ChannelHandlerContextFaker; +import com.flowpowered.networking.process.ByteBufferChannelProcessor; + +public class ByteBufferChannelProcessorTest { + private final int LENGTH = 65536; + Thread mainThread; + + @Test + public void randomPassthrough() { + + mainThread = Thread.currentThread(); + + ByteBuf buffer = Unpooled.buffer(2048); + + ChannelHandlerContext ctx = ChannelHandlerContextFaker.setup(); + + ByteBufferChannelProcessor processor = new ByteBufferChannelProcessor(256); + + byte[] input = new byte[LENGTH]; + byte[] output = new byte[LENGTH]; + + Random r = new Random(); + + for (int i = 0; i < input.length; i++) { + input[i] = (byte) (r.nextInt()); + } + + int writePointer = 0; + int readPointer = 0; + + int pass = 0; + + while (writePointer < LENGTH && (pass++) < 512) { + + int toWrite = r.nextInt(512); + + if (r.nextInt(10) == 0) { + // simulate "large" packets + toWrite *= 10; + } + + if (toWrite > buffer.writableBytes()) { + toWrite = buffer.writableBytes(); + } + if (toWrite > LENGTH - writePointer) { + toWrite = LENGTH - writePointer; + } + + //System.out.println("Writing block of size " + toWrite); + + buffer.writeBytes(input, writePointer, toWrite); + writePointer += toWrite; + + ByteBuf buf = Unpooled.buffer(); + ByteBuf outputBuffer = processor.process(ctx, buffer, buf); + + buffer.discardReadBytes(); + + while (outputBuffer.isReadable()) { + int toRead = r.nextInt(768); + if (toRead > outputBuffer.readableBytes()) { + toRead = outputBuffer.readableBytes(); + } + //System.out.println("ToRead: " + toRead + " of " + outputBuffer.readableBytes()); + outputBuffer.readBytes(output, readPointer, toRead); + readPointer += toRead; + outputBuffer.discardReadBytes(); + } + } + + for (int i = 0; i < input.length; i++) { + assertTrue("Mismatch at position " + i, input[i] == output[i]); + } + } +} diff --git a/src/test/java/com/flowpowered/networking/PreprocessReplayingDecoderTest.java b/src/test/java/com/flowpowered/networking/PreprocessReplayingDecoderTest.java new file mode 100644 index 0000000..dc47b42 --- /dev/null +++ b/src/test/java/com/flowpowered/networking/PreprocessReplayingDecoderTest.java @@ -0,0 +1,184 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking; + +import java.util.LinkedList; +import java.util.List; +import java.util.Random; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import java.util.Arrays; + +import org.junit.Test; +import static org.junit.Assert.assertTrue; + +import com.flowpowered.networking.fake.ChannelHandlerContextFaker; +import com.flowpowered.networking.fake.FakeChannelHandlerContext; +import com.flowpowered.networking.process.CommonChannelProcessor; +import com.flowpowered.networking.process.PreprocessReplayingDecoder; + +public class PreprocessReplayingDecoderTest { + private final int LENGTH = 65536; + private final int BREAK = 17652; + + @Test + public void test() throws Exception { + // Preprocessor basically is split into two parts + // Part 1 is just a direct copy + // Part 2 negates all bytes before copying + Preprocessor p = new Preprocessor(512, BREAK, LENGTH); + + // Set up a fake ChannelHandlerContext + FakeChannelHandlerContext fake = ChannelHandlerContextFaker.setup(); + fake.setList(new LinkedList()); + + Random r = new Random(); + + // Get some random bytes for data + byte[] input = new byte[LENGTH]; + r.nextBytes(input); + + for (int i = 0; i < input.length;) { + // Simulate real data read + int burstSize = r.nextInt(512); + // With a 1/10 chance of having an extra-large burst + if (r.nextInt(10) == 0) { + burstSize *= 10; + } + + // Final burst needs to be clamped + if (i + burstSize > input.length) { + burstSize = input.length - i; + } + + // Write info to a new ByteBuf + final ByteBuf buf = Unpooled.buffer(burstSize); + buf.writeBytes(input, i, burstSize); + i += burstSize; + + // Fake a read + p.channelRead(fake, buf); + } + + // Get the output data and combine into one array + List outputList = fake.getList(); + byte[] output = new byte[LENGTH]; + int i = 0; + for (byte[] array : outputList) { + for (int j = 0; j < array.length; j++) { + output[i++] = array[j]; + } + } + + for (i = 0; i < input.length; i++) { + byte expected = i < BREAK ? input[i] : (byte) ~input[i]; + if (output[i] != expected) { + for (int j = i - 10; j <= i + 10; j++) { + //System.out.println(j + ") " + Integer.toBinaryString(input[j] & 0xFF) + " " + Integer.toBinaryString(output[j] & 0xFF)); + } + } + + if (i < BREAK) { + assertTrue("Input/Output mismatch at position " + i, output[i] == input[i]); + } else { + assertTrue("Input/Output mismatch at position " + i + ", after the processor change", output[i] == (byte) ~input[i]); + } + } + } + + private static class Preprocessor extends PreprocessReplayingDecoder { + private final int breakPoint; + private final int length; + private int position = 0; + private boolean breakOccured; + private Random r = new Random(); + + public Preprocessor(int capacity, int breakPoint, int length) { + super(capacity); + this.breakPoint = breakPoint; + this.length = length; + } + + @Override + public Object decodeProcessed(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { + int packetSize = r.nextInt(128) + 1; + if (r.nextInt(10) == 0) { + packetSize *= 20; + } + + if (position + packetSize > breakPoint && !breakOccured) { + packetSize = breakPoint - position; + } + if (position + packetSize > length) { + packetSize = length - position; + } + + if (packetSize == 0) { + return null; + } + + byte[] buf = new byte[packetSize]; + + buffer.readBytes(buf); + + position += packetSize; + + if (position == breakPoint) { + this.setProcessor(new NegatingProcessor(512)); + breakOccured = true; + } + + return buf; + } + } + + private static class NegatingProcessor extends CommonChannelProcessor { + byte[] buffer = new byte[65536]; + int readPointer = 0; + int writePointer = 0; + int mask = 0xFFFF; + + public NegatingProcessor(int capacity) { + super(capacity); + } + + @Override + protected void write(byte[] buf, int length) { + for (int i = 0; i < length; i++) { + buffer[(writePointer++) & mask] = (byte) ~buf[i]; + } + } + + @Override + protected int read(byte[] buf) { + int i; + for (i = 0; i < buf.length && readPointer < writePointer; i++) { + buf[i] = buffer[(readPointer++) & mask]; + } + return i; + } + } +} diff --git a/src/test/java/com/flowpowered/networking/fake/ChannelHandlerContextFaker.java b/src/test/java/com/flowpowered/networking/fake/ChannelHandlerContextFaker.java new file mode 100644 index 0000000..9025ec0 --- /dev/null +++ b/src/test/java/com/flowpowered/networking/fake/ChannelHandlerContextFaker.java @@ -0,0 +1,71 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking.fake; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class ChannelHandlerContextFaker { + private static FakeChannelHandlerContext context = null; + private static Channel channel = null; + private static ChannelConfig config = null; + private static ByteBufAllocator alloc = null; + + public static FakeChannelHandlerContext setup() { + if (context == null) { + alloc(); + context = Mockito.mock(FakeChannelHandlerContext.class, Mockito.CALLS_REAL_METHODS); + channel = Mockito.mock(Channel.class); + config = Mockito.mock(ChannelConfig.class); + Mockito.doReturn(channel).when(context).channel(); + Mockito.when(channel.config()).thenReturn(config); + Mockito.when(config.getAllocator()).thenReturn(alloc); + Answer answer = new Answer() { + @Override + public ByteBuf answer(InvocationOnMock invocation) throws Throwable { + ByteBuf buffer = Unpooled.buffer(); + buffer.retain(); + return buffer; + } + }; + Mockito.when(alloc.buffer()).thenAnswer(answer); + Mockito.when(alloc.buffer(Mockito.anyInt())).thenAnswer(answer); + } + return context; + } + + public static ByteBufAllocator alloc() { + if (alloc == null) { + alloc = Mockito.mock(ByteBufAllocator.class); + } + return alloc; + } +} diff --git a/src/test/java/com/flowpowered/networking/fake/FakeChannelHandlerContext.java b/src/test/java/com/flowpowered/networking/fake/FakeChannelHandlerContext.java new file mode 100644 index 0000000..6db892e --- /dev/null +++ b/src/test/java/com/flowpowered/networking/fake/FakeChannelHandlerContext.java @@ -0,0 +1,65 @@ +/* + * This file is part of Flow Networking, licensed under the MIT License (MIT). + * + * Copyright (c) 2013 Spout LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.flowpowered.networking.fake; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; + +import java.util.List; + +public abstract class FakeChannelHandlerContext implements ChannelHandlerContext { + private List list; + boolean first; + + public void setList(List list) { + this.list = list; + } + + public List getList() { + return list; + } + + @Override + public ChannelHandlerContext fireChannelRead(Object msg) { + if (list != null && msg instanceof byte[]) { + list.add((byte[]) msg); + } + return this; + } + + @Override + public abstract Channel channel(); + + @Override + public boolean isRemoved() { + return false; + } + + @Override + public ByteBufAllocator alloc() { + return ChannelHandlerContextFaker.alloc(); + } +} +