diff --git a/src/main/java/org/hobbit/core/Constants.java b/src/main/java/org/hobbit/core/Constants.java index 82bbc7a..b520580 100644 --- a/src/main/java/org/hobbit/core/Constants.java +++ b/src/main/java/org/hobbit/core/Constants.java @@ -161,4 +161,6 @@ private Constants() { public static final TimeZone DEFAULT_TIME_ZONE = TimeZone.getTimeZone("GMT"); + public static final String RABBIT_CONTAINER_SERVICE = "RABBIT_CONTAINER_SERVICE"; + } diff --git a/src/main/java/org/hobbit/core/components/AbstractBenchmarkController.java b/src/main/java/org/hobbit/core/components/AbstractBenchmarkController.java index 5f53737..adb5551 100644 --- a/src/main/java/org/hobbit/core/components/AbstractBenchmarkController.java +++ b/src/main/java/org/hobbit/core/components/AbstractBenchmarkController.java @@ -34,6 +34,9 @@ import org.apache.jena.vocabulary.RDF; import org.hobbit.core.Commands; import org.hobbit.core.Constants; +import org.hobbit.core.containerservice.ContainerCreation; +import org.hobbit.core.containerservice.ContainerCreationFactory; +import org.hobbit.core.containerservice.RabbitMQContainerCreator; import org.hobbit.core.rabbit.RabbitMQUtils; import org.hobbit.vocab.HOBBIT; import org.hobbit.vocab.HobbitErrors; @@ -134,6 +137,10 @@ public abstract class AbstractBenchmarkController extends AbstractPlatformConnec * The URI of the experiment. */ protected String experimentUri; + /** + * The instance to create container + */ + protected ContainerCreation containerCreation; /** * Constructor. @@ -145,6 +152,7 @@ public AbstractBenchmarkController() { @Override public void init() throws Exception { super.init(); + containerCreation = ContainerCreationFactory.getContainerCreationObject(EnvVariables.getString(Constants.RABBIT_CONTAINER_SERVICE, LOGGER), this); // benchmark controllers should be able to accept broadcasts addCommandHeaderId(Constants.HOBBIT_SESSION_ID_FOR_BROADCASTS); @@ -175,9 +183,9 @@ public void run() throws Exception { * @param envVariables * environment variables for the data generators */ - protected void createDataGenerators(String dataGeneratorImageName, int numberOfDataGenerators, + protected Set createDataGenerators(String dataGeneratorImageName, int numberOfDataGenerators, String[] envVariables) { - createGenerator(dataGeneratorImageName, numberOfDataGenerators, envVariables, dataGenContainerIds); + return createGenerator(dataGeneratorImageName, numberOfDataGenerators, envVariables); } /** @@ -191,9 +199,9 @@ protected void createDataGenerators(String dataGeneratorImageName, int numberOfD * @param envVariables * environment variables for the task generators */ - protected void createTaskGenerators(String taskGeneratorImageName, int numberOfTaskGenerators, + protected Set createTaskGenerators(String taskGeneratorImageName, int numberOfTaskGenerators, String[] envVariables) { - createGenerator(taskGeneratorImageName, numberOfTaskGenerators, envVariables, taskGenContainerIds); + return createGenerator(taskGeneratorImageName, numberOfTaskGenerators, envVariables); } /** @@ -208,8 +216,8 @@ protected void createTaskGenerators(String taskGeneratorImageName, int numberOfT * @param generatorIds * set of generator container names */ - private void createGenerator(String generatorImageName, int numberOfGenerators, String[] envVariables, - Set generatorIds) { + public Set createGenerator(String generatorImageName, int numberOfGenerators, String[] envVariables) { + Set generatorIds = new HashSet<>(); String containerId; String variables[] = envVariables != null ? Arrays.copyOf(envVariables, envVariables.length + 2) : new String[2]; @@ -228,6 +236,7 @@ private void createGenerator(String generatorImageName, int numberOfGenerators, throw new IllegalStateException(errorMsg); } } + return generatorIds; } /** @@ -611,4 +620,23 @@ protected void containerCrashed(String containerName) { sendResultModel(resultModel); System.exit(1); } + + public Set getDataGenContainerIds() { + return dataGenContainerIds; + } + + public Set getTaskGenContainerIds() { + return taskGenContainerIds; + } + + public String getEvalStoreContainerId() { + return evalStoreContainerId; + } + + public void setEvalStoreContainerId(String evalStoreContainerId) { + this.evalStoreContainerId = evalStoreContainerId; + } + + + } diff --git a/src/main/java/org/hobbit/core/components/AbstractCommandReceivingComponent.java b/src/main/java/org/hobbit/core/components/AbstractCommandReceivingComponent.java index 5b081c6..4cc9774 100644 --- a/src/main/java/org/hobbit/core/components/AbstractCommandReceivingComponent.java +++ b/src/main/java/org/hobbit/core/components/AbstractCommandReceivingComponent.java @@ -67,12 +67,12 @@ public abstract class AbstractCommandReceivingComponent extends AbstractComponen * Name of the queue that is used to receive responses for messages that are * sent via the command queue and for which an answer is expected. */ - private String responseQueueName = null; + protected String responseQueueName = null; /** * Mapping of RabbitMQ's correlationIDs to Future objects corresponding * to that RPC call. */ - private Map> responseFutures = Collections.synchronizedMap(new LinkedHashMap<>()); + protected Map> responseFutures = Collections.synchronizedMap(new LinkedHashMap<>()); /** * Consumer of the queue that is used to receive responses for messages that * are sent via the command queue and for which an answer is expected. @@ -304,7 +304,7 @@ protected String createContainer(String imageName, String[] envVariables) { * user-provided array of environment variables * @return the extended array of environment variables */ - protected String[] extendContainerEnvVariables(String[] envVariables) { + public String[] extendContainerEnvVariables(String[] envVariables) { if (envVariables == null) { envVariables = new String[0]; } @@ -347,7 +347,7 @@ protected String[] extendContainerEnvVariables(String[] envVariables) { * container * @return the name of the container instance or null if an error occurred */ - protected String createContainer(String imageName, String containerType, String[] envVariables) { + public String createContainer(String imageName, String containerType, String[] envVariables) { return createContainer(imageName, containerType, envVariables, null); } @@ -474,7 +474,15 @@ protected Future createContainerAsync(String imageName, String container return null; } - /** + public Map> getResponseFutures() { + return responseFutures; + } + + public String getResponseQueueName() { + return responseQueueName; + } + + /** * This method sends a {@link Commands#DOCKER_CONTAINER_STOP} command to * stop the container with the given id. * @@ -497,7 +505,7 @@ protected void stopContainer(String containerName) { * @throws IOException * if a communication problem occurs */ - private void initResponseQueue() throws IOException { + public void initResponseQueue() throws IOException { if (responseQueueName == null) { responseQueueName = cmdChannel.queueDeclare().getQueue(); } @@ -567,5 +575,24 @@ public void close() throws IOException { } super.close(); } + + public String getContainerName() { + return containerName; + } + + public void setContainerName(String containerName) { + this.containerName = containerName; + } + + public Gson getGson() { + return gson; + } + + public void setGson(Gson gson) { + this.gson = gson; + } + + + } diff --git a/src/main/java/org/hobbit/core/components/AbstractPlatformController.java b/src/main/java/org/hobbit/core/components/AbstractPlatformController.java new file mode 100644 index 0000000..548dfa2 --- /dev/null +++ b/src/main/java/org/hobbit/core/components/AbstractPlatformController.java @@ -0,0 +1,17 @@ +package org.hobbit.core.components; + +import com.rabbitmq.client.AMQP; + +public abstract class AbstractPlatformController extends AbstractCommandReceivingComponent{ + + public AbstractPlatformController() { + super(); + } + + public AbstractPlatformController(boolean execCommandsInParallel) { + super(execCommandsInParallel); + } + + public abstract void createComponent(byte command, byte[] data, String sessionId, AMQP.BasicProperties props); + +} diff --git a/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java b/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java index e614401..6d6876f 100644 --- a/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java +++ b/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java @@ -18,7 +18,6 @@ import java.io.IOException; import java.util.concurrent.Semaphore; - import org.apache.commons.io.IOUtils; import org.hobbit.core.Commands; import org.hobbit.core.Constants; diff --git a/src/main/java/org/hobbit/core/containerservice/ContainerCreation.java b/src/main/java/org/hobbit/core/containerservice/ContainerCreation.java new file mode 100644 index 0000000..2a9437a --- /dev/null +++ b/src/main/java/org/hobbit/core/containerservice/ContainerCreation.java @@ -0,0 +1,58 @@ +package org.hobbit.core.containerservice; + +import java.util.Set; +import org.hobbit.core.components.AbstractPlatformController; + +/** + * This interface provides methods that provides functionalities + * to create {@link DirectContainerCreator} or {@link RabbitMQContainerCreator} + * + * @author Altafhusen Makandar + * @author Sourabh Poddar + * @author Yamini Punetha + * @author Melissa Das + * + */ +public interface ContainerCreation { + + /** + * Creates the given number of data generators using the given image name + * and environment variables. + * + * @param dataGeneratorImageName + * name of the data generator Docker image + * @param numberOfDataGenerators + * number of generators that should be created + * @param envVariables + * environment variables for the data generators + */ + Set createDataGenerators(String dataGeneratorImageName, int numberOfDataGenerators, + String[] envVariables, AbstractPlatformController dummyComponent); + + /** + * Creates the given number of task generators using the given image name + * and environment variables. + * + * @param taskGeneratorImageName + * name of the task generator Docker image + * @param numberOfTaskGenerators + * number of generators that should be created + * @param envVariables + * environment variables for the task generators + */ + Set createTaskGenerators(String taskGeneratorImageName, int numberOfTaskGenerators, + String[] envVariables, AbstractPlatformController dummyComponent); + + /** + * Creates the evaluate storage using the given image name and environment + * variables. + * + * @param evalStorageImageName + * name of the evaluation storage image + * @param envVariables + * environment variables that should be given to the component + */ + String createEvaluationStorage(String evalStorageImageName, String[] envVariables, + AbstractPlatformController dummyComponent); + +} diff --git a/src/main/java/org/hobbit/core/containerservice/ContainerCreationFactory.java b/src/main/java/org/hobbit/core/containerservice/ContainerCreationFactory.java new file mode 100644 index 0000000..5b30e32 --- /dev/null +++ b/src/main/java/org/hobbit/core/containerservice/ContainerCreationFactory.java @@ -0,0 +1,34 @@ +package org.hobbit.core.containerservice; + +import org.hobbit.core.Constants; +import org.hobbit.core.components.AbstractBenchmarkController; + +/** + * This class provides functionality to obtain an instance of {@link ContainerCreation} + * + * @author Altafhusen Makandar + * @author Sourabh Poddar + * @author Yamini Punetha + * @author Melissa Das + * + */ +public class ContainerCreationFactory { + + /** + * This static method returns the instance of {@link RabbitMQContainerCreator} or {@link DirectContainerCreator} + * based on the value of the environment variable {@link Constants#RABBIT_CONTAINER_SERVICE} + * + * @param isRabbitContainerService + * the environment variable that decides which instance of {@link ContainerCreation} will be returned. + * @param abstractBenchmarkController + * instance of {@link AbstractBenchmarkController} + * @return an instance of {@link RabbitMQContainerCreator} or {@link DirectContainerCreator} + */ + public static ContainerCreation getContainerCreationObject(String isRabbitContainerService, AbstractBenchmarkController abstractBenchmarkController) { + if(isRabbitContainerService.equals("true")) { + return new RabbitMQContainerCreator(abstractBenchmarkController); + } + return new DirectContainerCreator(abstractBenchmarkController); + } + +} diff --git a/src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java b/src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java new file mode 100644 index 0000000..7342869 --- /dev/null +++ b/src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java @@ -0,0 +1,237 @@ +package org.hobbit.core.containerservice; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Future; +import java.util.stream.Stream; +import org.apache.commons.io.Charsets; +import org.hobbit.core.Commands; +import org.hobbit.core.Constants; +import org.hobbit.core.components.AbstractBenchmarkController; +import org.hobbit.core.components.AbstractCommandReceivingComponent; +import org.hobbit.core.components.AbstractPlatformController; +import org.hobbit.core.data.StartCommandData; +import org.hobbit.core.rabbit.RabbitMQUtils; +import org.hobbit.utils.EnvVariables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.util.concurrent.SettableFuture; +import com.google.gson.Gson; +import com.rabbitmq.client.AMQP.BasicProperties; + +/** + * This class provides the implementation to create container functionality. + * + * @author Altafhusen Makandar + * @author Sourabh Poddar + * @author Yamini Punetha + * @author Melissa Das + * + */ +public class DirectContainerCreator implements ContainerCreation { + + private static final Logger LOGGER = LoggerFactory.getLogger(DirectContainerCreator.class); + + private AbstractPlatformController platformController = null; + + private AbstractCommandReceivingComponent abstractCommandReceivingComponent; + + /** + * The name of the container instance + */ + private String containerName; + + private String hobbitSessionId; + + public DirectContainerCreator(AbstractBenchmarkController abstractBenchmarkController) { + this.abstractCommandReceivingComponent = abstractBenchmarkController; + hobbitSessionId = EnvVariables.getString(Constants.HOBBIT_SESSION_ID_KEY, + Constants.HOBBIT_SESSION_ID_FOR_PLATFORM_COMPONENTS); + containerName = EnvVariables.getString(Constants.CONTAINER_NAME_KEY, containerName); + } + + /** + * This method creates and starts an instance of the given image using the given + * environment variables. + *

+ * Note that the containerType parameter should have one of the following + * values. + *

    + *
  • {@link Constants#CONTAINER_TYPE_DATABASE} if this container is part + * of a benchmark but should be located on a storage node.
  • + *
+ * + * @param imageName + * the name of the image of the docker container + * @param containerType + * the type of the container + * @param envVariables + * environment variables that should be added to the created + * container + * @param netAliases + * network aliases that should be added to the created container + * @return the Future object with the name of the container instance or null if an error occurred + */ + protected Future createContainer(String imageName, String containerType, String[] envVariables, String[] netAliases) { + try { + Gson gson = new Gson(); + envVariables = extendContainerEnvVariables(envVariables); + abstractCommandReceivingComponent.initResponseQueue(); + String correlationId = UUID.randomUUID().toString(); + SettableFuture containerFuture = SettableFuture.create(); + synchronized (abstractCommandReceivingComponent.getResponseFutures()) { + abstractCommandReceivingComponent.getResponseFutures().put(correlationId, containerFuture); + } + byte data[] = RabbitMQUtils.writeString( + gson.toJson(new StartCommandData(imageName, containerType, containerName, envVariables, netAliases))); + BasicProperties.Builder propsBuilder = new BasicProperties.Builder(); + propsBuilder.deliveryMode(2); + propsBuilder.replyTo(abstractCommandReceivingComponent.getResponseQueueName()); + propsBuilder.correlationId(correlationId); + BasicProperties props = propsBuilder.build(); + byte sessionIdBytes[] = hobbitSessionId.getBytes(Charsets.UTF_8); + int dataLength = sessionIdBytes.length + 5; + boolean attachData = (data != null) && (data.length > 0); + if (attachData) { + dataLength += data.length; + } + ByteBuffer buffer = ByteBuffer.allocate(dataLength); + buffer.putInt(sessionIdBytes.length); + buffer.put(sessionIdBytes); + buffer.put(Commands.DOCKER_CONTAINER_START); + if (attachData) { + buffer.put(data); + } + byte sessionIdBytes1[] = new byte[sessionIdBytes.length]; + String sessionId = new String(sessionIdBytes1, Charsets.UTF_8); + byte command = Commands.DOCKER_CONTAINER_START; + platformController.createComponent(command, data, sessionId, props); + return containerFuture; + } catch (Exception e) { + LOGGER.error("Got exception while trying to request the creation of an instance of the \"" + imageName + + "\" image.", e); + } + return null; + } + + /** + * Creates the given number of data generators using the given image name + * and environment variables. + * + * @param dataGeneratorImageName + * name of the data generator Docker image + * @param numberOfDataGenerators + * number of generators that should be created + * @param envVariables + * environment variables for the data generators + */ + @Override + public Set createDataGenerators(String dataGeneratorImageName, int numberOfDataGenerators, + String[] envVariables, AbstractPlatformController dummyComponent) { + this.platformController = dummyComponent; + return createGenerator(dataGeneratorImageName, numberOfDataGenerators, envVariables); + } + + /** + * Creates the given number of task generators using the given image name + * and environment variables. + * + * @param taskGeneratorImageName + * name of the task generator Docker image + * @param numberOfTaskGenerators + * number of generators that should be created + * @param envVariables + * environment variables for the task generators + */ + @Override + public Set createTaskGenerators(String taskGeneratorImageName, int numberOfTaskGenerators, + String[] envVariables, AbstractPlatformController dummyComponent) { + this.platformController = dummyComponent; + return createGenerator(taskGeneratorImageName, numberOfTaskGenerators, envVariables); + } + + /** + * Creates the evaluate storage using the given image name and environment + * variables. + * + * @param evalStorageImageName + * name of the evaluation storage image + * @param envVariables + * environment variables that should be given to the component + */ + @Override + public String createEvaluationStorage(String evalStorageImageName, String[] envVariables, + AbstractPlatformController dummyComponent) { + this.platformController = dummyComponent; + String evaluationStoreContainerId = null; + evaluationStoreContainerId = abstractCommandReceivingComponent.createContainer(evalStorageImageName, Constants.CONTAINER_TYPE_DATABASE, envVariables); + if (evaluationStoreContainerId == null) { + String errorMsg = "Couldn't create evaluation storage. Aborting."; + LOGGER.error(errorMsg); + throw new IllegalStateException(errorMsg); + } + return evaluationStoreContainerId; + } + + /** + * Internal method for creating generator components. + * + * @param generatorImageName + * name of the generator Docker image + * @param numberOfGenerators + * number of generators that should be created + * @param envVariables + * environment variables for the task generators + * @param generatorIds + * set of generator container names + */ + public Set createGenerator(String generatorImageName, int numberOfGenerators, String[] envVariables) { + Set generatorIds = new HashSet<>(); + try { + String containerId; + String variables[] = envVariables != null ? Arrays.copyOf(envVariables, envVariables.length + 2) + : new String[2]; + // NOTE: Count only includes generators created within this method call. + variables[variables.length - 2] = Constants.GENERATOR_COUNT_KEY + "=" + numberOfGenerators; + for (int i = 0; i < numberOfGenerators; ++i) { + // At the start generatorIds is empty, and new generators are added to it immediately. + // Current size of that set is used to make IDs for new generators. + variables[variables.length - 1] = Constants.GENERATOR_ID_KEY + "=" + generatorIds.size(); + containerId = createContainer(generatorImageName, null, envVariables, null).get();// createContainer(generatorImageName, variables); + if (containerId != null) { + generatorIds.add(containerId); + } else { + String errorMsg = "Couldn't create generator component. Aborting."; + LOGGER.error(errorMsg); + throw new IllegalStateException(errorMsg); + } + } + }catch(Exception e) { + LOGGER.error("Exception occurred.",e); + } + return generatorIds; + } + + public String[] extendContainerEnvVariables(String[] envVariables) { + String rabbitMQHostName = EnvVariables.getString(Constants.RABBIT_MQ_HOST_NAME_KEY, LOGGER); + if (envVariables == null) { + envVariables = new String[0]; + } + + // Only add RabbitMQ host env if there isn't any. + if (Stream.of(envVariables).noneMatch(kv -> kv.startsWith(Constants.RABBIT_MQ_HOST_NAME_KEY + "="))) { + envVariables = Arrays.copyOf(envVariables, envVariables.length + 2); + envVariables[envVariables.length - 2] = Constants.RABBIT_MQ_HOST_NAME_KEY + "=" + rabbitMQHostName; + } else { + envVariables = Arrays.copyOf(envVariables, envVariables.length + 1); + } + + envVariables[envVariables.length - 1] = Constants.HOBBIT_SESSION_ID_KEY + "=" + hobbitSessionId; + return envVariables; + } + + +} diff --git a/src/main/java/org/hobbit/core/containerservice/RabbitMQContainerCreator.java b/src/main/java/org/hobbit/core/containerservice/RabbitMQContainerCreator.java new file mode 100644 index 0000000..ec99933 --- /dev/null +++ b/src/main/java/org/hobbit/core/containerservice/RabbitMQContainerCreator.java @@ -0,0 +1,91 @@ +package org.hobbit.core.containerservice; + +import java.util.Set; + +import org.hobbit.core.Constants; +import org.hobbit.core.components.AbstractBenchmarkController; +import org.hobbit.core.components.AbstractCommandReceivingComponent; +import org.hobbit.core.components.AbstractPlatformController; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class implements the functionality for RabbitMQ container creation. + * + * @author Altafhusen Makandar + * @author Sourabh Poddar + * @author Yamini Punetha + * @author Melissa Das + * + */ +public class RabbitMQContainerCreator implements ContainerCreation { + + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQContainerCreator.class); + + private AbstractBenchmarkController abstractBenchmarkController; + + public RabbitMQContainerCreator(AbstractBenchmarkController abstractBenchmarkController) { + this.abstractBenchmarkController = abstractBenchmarkController; + } + + /** + * This method calls the createGenerator method of {@link AbstractBenchmarkController} + * + * @param dataGeneratorImageName + * name of the data generator Docker image + * @param numberOfDataGenerators + * number of generators that should be created + * @param envVariables + * environment variables required for the creation of the data generators + * @param dummyComponent + * + */ + @Override + public Set createDataGenerators(String dataGeneratorImageName, int numberOfDataGenerators, String[] envVariables, + AbstractPlatformController dummyComponent) { + return abstractBenchmarkController.createGenerator(dataGeneratorImageName, numberOfDataGenerators, envVariables); + + } + + /** + * This method calls the createTaskGenerators method of {@link AbstractBenchmarkController} + * + * @param taskGeneratorImageName + * name of the task generator Docker image + * @param numberOfTaskGenerators + * number of generators that should be created + * @param envVariables + * environment variables required for the creation of the task generators + * @param dummyComponent + * + */ + @Override + public Set createTaskGenerators(String taskGeneratorImageName, int numberOfTaskGenerators, String[] envVariables, + AbstractPlatformController dummyComponent) { + return abstractBenchmarkController.createGenerator(taskGeneratorImageName, numberOfTaskGenerators, envVariables); + + } + + /** + * This method calls the createEvaluationStorage method of {@link AbstractBenchmarkController} + * + * @param evalStorageImageName + * name of the evaluation storage image + * @param envVariables + * environment variables required for the creation of evaluation storage + * @param dummyComponent + * @return the container id of the evaluation storage. + */ + @Override + public String createEvaluationStorage(String evalStorageImageName, String[] envVariables, + AbstractPlatformController dummyComponent) { + abstractBenchmarkController.setEvalStoreContainerId(abstractBenchmarkController.createContainer(evalStorageImageName, Constants.CONTAINER_TYPE_DATABASE, envVariables)); + if (abstractBenchmarkController.getEvalStoreContainerId() == null) { + String errorMsg = "Couldn't create evaluation storage. Aborting."; + LOGGER.error(errorMsg); + throw new IllegalStateException(errorMsg); + } + return abstractBenchmarkController.getEvalStoreContainerId(); + } +} + diff --git a/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java b/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java index e649a50..d8d8d54 100644 --- a/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java +++ b/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java @@ -31,6 +31,7 @@ import org.hobbit.core.TestConstants; import org.hobbit.core.components.dummy.AbstractDummyPlatformController; import org.hobbit.core.components.dummy.DummyComponentExecutor; +import org.hobbit.core.containerservice.DirectContainerCreator; import org.hobbit.core.rabbit.RabbitMQUtils; import org.hobbit.utils.config.HobbitConfiguration; import org.hobbit.vocab.HobbitExperiments; @@ -45,9 +46,11 @@ import com.rabbitmq.client.AMQP; @RunWith(Parameterized.class) -public class BenchmarkControllerTest extends AbstractBenchmarkController { +public class BenchmarkControllerTest extends AbstractBenchmarkController { private static final Logger LOGGER = LoggerFactory.getLogger(BenchmarkControllerTest.class); + + DummyPlatformController dummyPlatformController; private static final String HOBBIT_SESSION_ID = "123"; private static final String SYSTEM_CONTAINER_ID = "systemContainerId"; @@ -88,6 +91,7 @@ public void test() throws Exception { // Needed for the generators configurationVar.setProperty(Constants.GENERATOR_ID_KEY, "0"); configurationVar.setProperty(Constants.GENERATOR_COUNT_KEY, "1"); + configurationVar.setProperty(Constants.RABBIT_CONTAINER_SERVICE, "true"); configuration = new HobbitConfiguration(); configuration.addConfiguration(configurationVar); @@ -156,13 +160,13 @@ public void init() throws Exception { super.init(); // create data generators - createDataGenerators(DATA_GEN_IMAGE, numberOfDataGenerators, null); + dataGenContainerIds = containerCreation.createDataGenerators(DATA_GEN_IMAGE, numberOfDataGenerators, null, dummyPlatformController); // Create task generators - createTaskGenerators(TASK_GEN_IMAGE, numberOfTaskGenerators, null); + taskGenContainerIds = containerCreation.createTaskGenerators(TASK_GEN_IMAGE, numberOfTaskGenerators, null, dummyPlatformController); // Create evaluation storage - createEvaluationStorage(EVAL_IMAGE, null); + evalStoreContainerId = containerCreation.createEvaluationStorage(EVAL_IMAGE, null, dummyPlatformController); // Wait for all components to finish their initialization waitForComponentsToInitialize(); @@ -331,5 +335,121 @@ public void run() { } } } + + public void createComponent(byte command, byte[] data, String sessionId, AMQP.BasicProperties props) { + String replyTo = null; + if (props != null) { + replyTo = props.getReplyTo(); + } + + LOGGER.info("received command: session={}, command={}, data={}", sessionId, Commands.toString(command), + data != null ? RabbitMQUtils.readString(data) : "null"); + if (command == Commands.BENCHMARK_READY_SIGNAL) { + System.out.println("Benchmark Ready!"); + try { + sendToCmdQueue(sessionId, Commands.START_BENCHMARK_SIGNAL, + RabbitMQUtils.writeString(SYSTEM_CONTAINER_ID), null); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(e.getLocalizedMessage()); + } + } else if (command == Commands.DOCKER_CONTAINER_START) { + try { + String startCommandJson = RabbitMQUtils.readString(data); + final String containerId = Integer.toString(random.nextInt()); + + AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder(); + propsBuilder.deliveryMode(2); + propsBuilder.correlationId(props.getCorrelationId()); + AMQP.BasicProperties replyProps = propsBuilder.build(); + + if (startCommandJson.contains(DATA_GEN_IMAGE)) { + // Create data generators that are waiting for a random + // amount of time and terminate after that + DummyComponentExecutor dataGenExecutor = new DummyComponentExecutor( + new AbstractDataGenerator() { + @Override + protected void generateData() throws Exception { + LOGGER.debug("Data Generator started..."); + Thread.sleep(1000 + random.nextInt(1000)); + } + }) { + @Override + public void run() { + super.run(); + try { + sendToCmdQueue(Constants.HOBBIT_SESSION_ID_FOR_BROADCASTS, + Commands.DOCKER_CONTAINER_TERMINATED, + RabbitMQUtils.writeByteArrays(null, + new byte[][] { RabbitMQUtils.writeString(containerId) }, + new byte[] { (byte) 0 }), + null); + } catch (IOException e) { + e.printStackTrace(); + success = false; + } + } + }; + dataGenExecutors.add(dataGenExecutor); + Thread t = new Thread(dataGenExecutor); + dataGenThreads.add(t); + t.start(); + + cmdChannel.basicPublish("", replyTo, replyProps, + RabbitMQUtils.writeString(containerId)); + } else if (startCommandJson.contains(TASK_GEN_IMAGE)) { + // Create task generators that are waiting for a random + // amount of + // time and terminate after that + DummyComponentExecutor taskGenExecutor = new DummyComponentExecutor( + new AbstractTaskGenerator() { + @Override + public void run() throws Exception { + LOGGER.debug("Task Generator started..."); + super.run(); + } + + @Override + protected void generateTask(byte[] data) throws Exception { + } + }) { + @Override + public void run() { + super.run(); + try { + sendToCmdQueue(Constants.HOBBIT_SESSION_ID_FOR_BROADCASTS, + Commands.DOCKER_CONTAINER_TERMINATED, + RabbitMQUtils.writeByteArrays(null, + new byte[][] { RabbitMQUtils.writeString(containerId) }, + new byte[] { (byte) 0 }), + null); + } catch (IOException e) { + e.printStackTrace(); + success = false; + } + } + }; + taskGenExecutors.add(taskGenExecutor); + Thread t = new Thread(taskGenExecutor); + taskGenThreads.add(t); + t.start(); + + cmdChannel.basicPublish("", replyTo, replyProps, + RabbitMQUtils.writeString(containerId)); + } else if (startCommandJson.contains(EVAL_IMAGE)) { + cmdChannel.basicPublish("", replyTo, replyProps, + RabbitMQUtils.writeString(containerId)); + sendToCmdQueue(this.sessionId, Commands.EVAL_STORAGE_READY_SIGNAL, null, null); + } else { + LOGGER.error("Got unknown start command. Ignoring it."); + } + } catch (IOException e) { + LOGGER.error("Exception while trying to respond to a container creation command.", e); + } + } + + } + + } } diff --git a/src/test/java/org/hobbit/core/components/ContainerCreationNoCorrelationTest.java b/src/test/java/org/hobbit/core/components/ContainerCreationNoCorrelationTest.java index 1d3076d..c582808 100644 --- a/src/test/java/org/hobbit/core/components/ContainerCreationNoCorrelationTest.java +++ b/src/test/java/org/hobbit/core/components/ContainerCreationNoCorrelationTest.java @@ -22,6 +22,7 @@ import java.util.Random; import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.MessageProperties; import org.hobbit.core.rabbit.RabbitMQUtils; import org.hobbit.utils.config.HobbitConfiguration; @@ -105,6 +106,12 @@ public DummyPlatformController(String sessionId, HobbitConfiguration configVar) } public void receiveCommand(byte command, byte[] data, String sessionId, AMQP.BasicProperties props) { + createComponent(command, data, sessionId, props); + } + + @Override + public void createComponent(byte command, byte[] data, String sessionId, BasicProperties props) { + // TODO Auto-generated method stub if (command == Commands.DOCKER_CONTAINER_START) { String[] envVars = gson.fromJson(RabbitMQUtils.readString(data), StartCommandData.class).getEnvironmentVariables(); String containerId = Stream.of(envVars).filter(kv -> kv.startsWith("ID=")).findAny().get().split("=", 2)[1]; diff --git a/src/test/java/org/hobbit/core/components/ContainerCreationTest.java b/src/test/java/org/hobbit/core/components/ContainerCreationTest.java index 981ff03..09038e7 100644 --- a/src/test/java/org/hobbit/core/components/ContainerCreationTest.java +++ b/src/test/java/org/hobbit/core/components/ContainerCreationTest.java @@ -21,6 +21,9 @@ import org.hobbit.core.components.dummy.DummyComponentExecutor; import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.MessageProperties; + import org.hobbit.core.rabbit.RabbitMQUtils; import org.hobbit.utils.config.HobbitConfiguration; @@ -115,6 +118,12 @@ public DummyPlatformController(String sessionId, HobbitConfiguration configVar) } public void receiveCommand(byte command, byte[] data, String sessionId, AMQP.BasicProperties props) { + createComponent(command, data, sessionId, props); + } + + @Override + public void createComponent(byte command, byte[] data, String sessionId, BasicProperties props) { + // TODO Auto-generated method stub if (command == Commands.DOCKER_CONTAINER_START) { String[] envVars = gson.fromJson(RabbitMQUtils.readString(data), StartCommandData.class).getEnvironmentVariables(); String containerId = Stream.of(envVars).filter(kv -> kv.startsWith("ID=")).findAny().get().split("=", 2)[1]; diff --git a/src/test/java/org/hobbit/core/components/dummy/AbstractDummyPlatformController.java b/src/test/java/org/hobbit/core/components/dummy/AbstractDummyPlatformController.java index 239ace7..f722226 100644 --- a/src/test/java/org/hobbit/core/components/dummy/AbstractDummyPlatformController.java +++ b/src/test/java/org/hobbit/core/components/dummy/AbstractDummyPlatformController.java @@ -19,18 +19,16 @@ import org.hobbit.core.Constants; import org.hobbit.core.rabbit.RabbitMQUtils; import java.io.IOException; - import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AMQP.BasicProperties; import org.apache.commons.io.Charsets; import java.nio.ByteBuffer; import java.util.concurrent.Semaphore; -import org.hobbit.core.components.AbstractCommandReceivingComponent; - +import org.hobbit.core.components.AbstractPlatformController; import org.junit.Ignore; @Ignore -public abstract class AbstractDummyPlatformController extends AbstractCommandReceivingComponent { +public abstract class AbstractDummyPlatformController extends AbstractPlatformController { private boolean readyFlag = false; private Semaphore terminationMutex = new Semaphore(0); @@ -98,6 +96,7 @@ public void receiveCommand(byte command, byte[] data) { public abstract void receiveCommand(byte command, byte[] data, String sessionId, AMQP.BasicProperties props); + public abstract void createComponent(byte command, byte[] data, String sessionId, BasicProperties props); public void terminate() { terminationMutex.release(); } diff --git a/src/test/java/org/hobbit/core/mimic/DockerBasedMimickingAlgTest.java b/src/test/java/org/hobbit/core/mimic/DockerBasedMimickingAlgTest.java index b850ae3..999ed35 100644 --- a/src/test/java/org/hobbit/core/mimic/DockerBasedMimickingAlgTest.java +++ b/src/test/java/org/hobbit/core/mimic/DockerBasedMimickingAlgTest.java @@ -30,6 +30,8 @@ import org.slf4j.LoggerFactory; import com.google.gson.Gson; import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.MessageProperties; /** * This is a test that simulates the workflow of the @@ -122,6 +124,11 @@ public DummyPlatformController(String sessionId, HobbitConfiguration c) { } public void receiveCommand(byte command, byte[] data, String sessionId, AMQP.BasicProperties props) { + createComponent(command, data, sessionId, props); + } + + @Override + public void createComponent(byte command, byte[] data, String sessionId, BasicProperties props) { String replyTo = null; if (props != null) { replyTo = props.getReplyTo();