From 7d392deed6dbacdc2f323c910d953a042c99c966 Mon Sep 17 00:00:00 2001 From: sourabhpoddar404 Date: Mon, 15 Jun 2020 15:45:00 +0200 Subject: [PATCH 1/6] Implementation of Direct Container Creator Co-authored-by: Altafhusen <57062112+altafhusen-mr@users.noreply.github.com> Co-authored-by: Yamini <29372422+Yamini19@users.noreply.github.com> Co-authored-by: Melissa <21376912+melissadas@users.noreply.github.com> --- .../AbstractBenchmarkController.java | 2 +- .../AbstractCommandReceivingComponent.java | 15 +++- .../DirectContainerCreator.java | 86 +++++++++++++++++++ .../components/BenchmarkControllerTest.java | 9 +- 4 files changed, 105 insertions(+), 7 deletions(-) create mode 100644 src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java diff --git a/src/main/java/org/hobbit/core/components/AbstractBenchmarkController.java b/src/main/java/org/hobbit/core/components/AbstractBenchmarkController.java index fcd207c..375fb1e 100644 --- a/src/main/java/org/hobbit/core/components/AbstractBenchmarkController.java +++ b/src/main/java/org/hobbit/core/components/AbstractBenchmarkController.java @@ -206,7 +206,7 @@ protected void createTaskGenerators(String taskGeneratorImageName, int numberOfT * @param generatorIds * set of generator container names */ - private void createGenerator(String generatorImageName, int numberOfGenerators, String[] envVariables, + protected void createGenerator(String generatorImageName, int numberOfGenerators, String[] envVariables, Set generatorIds) { String containerId; String variables[] = envVariables != null ? Arrays.copyOf(envVariables, envVariables.length + 2) diff --git a/src/main/java/org/hobbit/core/components/AbstractCommandReceivingComponent.java b/src/main/java/org/hobbit/core/components/AbstractCommandReceivingComponent.java index 073d141..9b19178 100644 --- a/src/main/java/org/hobbit/core/components/AbstractCommandReceivingComponent.java +++ b/src/main/java/org/hobbit/core/components/AbstractCommandReceivingComponent.java @@ -64,17 +64,17 @@ public abstract class AbstractCommandReceivingComponent extends AbstractComponen /** * Name of this Docker container. */ - private String containerName; + protected String containerName; /** * Name of the queue that is used to receive responses for messages that are * sent via the command queue and for which an answer is expected. */ - private String responseQueueName = null; + protected String responseQueueName = null; /** * Mapping of RabbitMQ's correlationIDs to Future objects corresponding * to that RPC call. */ - private Map> responseFutures = Collections.synchronizedMap(new LinkedHashMap<>()); + protected Map> responseFutures = Collections.synchronizedMap(new LinkedHashMap<>()); /** * Consumer of the queue that is used to receive responses for messages that * are sent via the command queue and for which an answer is expected. @@ -484,7 +484,7 @@ protected void stopContainer(String containerName) { * @throws IOException * if a communication problem occurs */ - private void initResponseQueue() throws IOException { + protected void initResponseQueue() throws IOException { if (responseQueueName == null) { responseQueueName = cmdChannel.queueDeclare().getQueue(); } @@ -555,4 +555,11 @@ public void close() throws IOException { super.close(); } + public void createDummyComponent(byte command, byte[] data, String sessionId, BasicProperties props) { + // TODO Auto-generated method stub + + } + + + } diff --git a/src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java b/src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java new file mode 100644 index 0000000..4cbb35b --- /dev/null +++ b/src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java @@ -0,0 +1,86 @@ +package org.hobbit.core.containerservice; + +import java.nio.ByteBuffer; +import java.util.UUID; +import java.util.concurrent.Future; + +import org.apache.commons.io.Charsets; +import org.hobbit.core.Commands; +import org.hobbit.core.components.AbstractBenchmarkController; +import org.hobbit.core.components.AbstractCommandReceivingComponent; +import org.hobbit.core.data.StartCommandData; +import org.hobbit.core.rabbit.RabbitMQUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.SettableFuture; +import com.rabbitmq.client.AMQP.BasicProperties; + +public abstract class DirectContainerCreator extends AbstractBenchmarkController { + + private static final Logger LOGGER = LoggerFactory.getLogger(DirectContainerCreator.class); + + AbstractCommandReceivingComponent directComponent = null; + + protected Future createContainerAsync(String imageName, String containerType, String[] envVariables, String[] netAliases) { + try { + System.out.println("Inside RabbitMQContainerCreater"); + envVariables = extendContainerEnvVariables(envVariables); + + initResponseQueue(); + String correlationId = UUID.randomUUID().toString(); + SettableFuture containerFuture = SettableFuture.create(); + + synchronized (responseFutures) { + responseFutures.put(correlationId, containerFuture); + } + + byte data[] = RabbitMQUtils.writeString( + gson.toJson(new StartCommandData(imageName, containerType, containerName, envVariables, netAliases))); + BasicProperties.Builder propsBuilder = new BasicProperties.Builder(); + propsBuilder.deliveryMode(2); + propsBuilder.replyTo(responseQueueName); + propsBuilder.correlationId(correlationId); + BasicProperties props = propsBuilder.build(); + + + + byte sessionIdBytes[] = getHobbitSessionId().getBytes(Charsets.UTF_8); + int dataLength = sessionIdBytes.length + 5; + boolean attachData = (data != null) && (data.length > 0); + if (attachData) { + dataLength += data.length; + } + ByteBuffer buffer = ByteBuffer.allocate(dataLength); + buffer.putInt(sessionIdBytes.length); + buffer.put(sessionIdBytes); + buffer.put(Commands.DOCKER_CONTAINER_START); + if (attachData) { + buffer.put(data); + } + + byte sessionIdBytes1[] = new byte[sessionIdBytes.length]; + String sessionId = new String(sessionIdBytes1, Charsets.UTF_8); + byte command = Commands.DOCKER_CONTAINER_START; + + + + + directComponent.createDummyComponent(command, data, sessionId, props); + + return containerFuture; + } catch (Exception e) { + LOGGER.error("Got exception while trying to request the creation of an instance of the \"" + imageName + + "\" image.", e); + } + return null; + } + + protected void createDataGenerators(String dataGeneratorImageName, int numberOfDataGenerators, + String[] envVariables, AbstractCommandReceivingComponent dummyComponent) { + this.directComponent = dummyComponent; + createGenerator(dataGeneratorImageName, numberOfDataGenerators, envVariables, dataGenContainerIds); + } + + +} \ No newline at end of file diff --git a/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java b/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java index d631f8f..ea76cc8 100644 --- a/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java +++ b/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java @@ -29,6 +29,7 @@ import org.hobbit.core.TestConstants; import org.hobbit.core.components.dummy.AbstractDummyPlatformController; import org.hobbit.core.components.dummy.DummyComponentExecutor; +import org.hobbit.core.containerservice.DirectContainerCreator; import org.hobbit.core.rabbit.RabbitMQUtils; import org.hobbit.vocab.HobbitExperiments; import org.junit.Assert; @@ -45,7 +46,7 @@ import com.rabbitmq.client.MessageProperties; @RunWith(Parameterized.class) -public class BenchmarkControllerTest extends AbstractBenchmarkController { +public class BenchmarkControllerTest extends DirectContainerCreator { private static final Logger LOGGER = LoggerFactory.getLogger(BenchmarkControllerTest.class); @@ -210,8 +211,12 @@ public DummyPlatformController(String sessionId) { super(); this.sessionId = sessionId; } - public void receiveCommand(byte command, byte[] data, String sessionId, AMQP.BasicProperties props) { + System.out.println("Inside static : "+this); + System.out.println("command : "+ command); + createDummyComponent(command, data, sessionId, props); + } + public void createDummyComponent(byte command, byte[] data, String sessionId, AMQP.BasicProperties props) { String replyTo = null; if (props != null) { replyTo = props.getReplyTo(); From 82bec475c9b8970ab3688c04d370b00a30e4166a Mon Sep 17 00:00:00 2001 From: sourabhpoddar404 Date: Mon, 15 Jun 2020 16:04:57 +0200 Subject: [PATCH 2/6] Changes to BenchmarkControllerTest Co-authored-by: Altafhusen <57062112+altafhusen-mr@users.noreply.github.com> Co-authored-by: Yamini <29372422+Yamini19@users.noreply.github.com> Co-authored-by: Melissa <21376912+melissadas@users.noreply.github.com> --- .../org/hobbit/core/components/BenchmarkControllerTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java b/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java index ea76cc8..c3dc386 100644 --- a/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java +++ b/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java @@ -55,6 +55,7 @@ public class BenchmarkControllerTest extends DirectContainerCreator { private static final String DATA_GEN_IMAGE = "datagenimage"; private static final String TASK_GEN_IMAGE = "taskgenimage"; private static final String EVAL_IMAGE = "evaluationimage"; + private DummyPlatformController dummyPlatformController; @Parameters public static Collection data() { @@ -91,7 +92,7 @@ public void test() throws Exception { environmentVariables.set(Constants.GENERATOR_ID_KEY, "0"); environmentVariables.set(Constants.GENERATOR_COUNT_KEY, "1"); - final DummyPlatformController dummyPlatformController = new DummyPlatformController(sessionId); + dummyPlatformController = new DummyPlatformController(sessionId); try { DummyComponentExecutor dummyPlatformExecutor = new DummyComponentExecutor(dummyPlatformController); Thread dummyPlatformThread = new Thread(dummyPlatformExecutor); @@ -156,7 +157,7 @@ public void init() throws Exception { super.init(); // create data generators - createDataGenerators(DATA_GEN_IMAGE, numberOfDataGenerators, null); + createDataGenerators(DATA_GEN_IMAGE, numberOfDataGenerators, null,dummyPlatformController); // Create task generators createTaskGenerators(TASK_GEN_IMAGE, numberOfTaskGenerators, null); From 90873d221cd23197089334b982d953fc3c7bdf9a Mon Sep 17 00:00:00 2001 From: sourabhpoddar404 Date: Mon, 22 Jun 2020 12:50:17 +0200 Subject: [PATCH 3/6] Added test case for new implementation Co-authored-by: Sourabh <56801226+sourabhpoddar404@users.noreply.github.com> Co-authored-by: Altafhusen <57062112+altafhusen-mr@users.noreply.github.com> Co-authored-by: Yamini <29372422+Yamini19@users.noreply.github.com> Co-authored-by: Melissa <21376912+melissadas@users.noreply.github.com> --- .../components/AbstractTaskGenerator.java | 4 + .../DirectContainerCreator.java | 18 + .../components/BenchmarkControllerTest.java | 233 ++++++------ .../DirectBenchmarkControllerTest.java | 333 ++++++++++++++++++ .../components/dummy/DummyDataCreator.java | 1 + 5 files changed, 476 insertions(+), 113 deletions(-) create mode 100644 src/test/java/org/hobbit/core/components/DirectBenchmarkControllerTest.java diff --git a/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java b/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java index 5fb0cf0..a40b302 100644 --- a/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java +++ b/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java @@ -132,6 +132,7 @@ public void init() throws Exception { dataGenReceiver = DataReceiverImpl.builder().dataHandler(new DataHandler() { @Override public void handleData(byte[] data) { + receiveGeneratedData(data); } }).maxParallelProcessedMsgs(maxParallelProcessedMsgs).queue(getFactoryForIncomingDataQueues(), @@ -140,9 +141,11 @@ public void handleData(byte[] data) { @Override public void run() throws Exception { + Thread.sleep(1000); sendToCmdQueue(Commands.TASK_GENERATOR_READY_SIGNAL); // Wait for the start message startTaskGenMutex.acquire(); + System.out.println("inside run"); // Wait for message to terminate terminateMutex.acquire(); dataGenReceiver.closeWhenFinished(); @@ -155,6 +158,7 @@ public void run() throws Exception { @Override public void receiveGeneratedData(byte[] data) { try { + System.out.println("inside receive"); generateTask(data); } catch (Exception e) { LOGGER.error("Exception while generating task.", e); diff --git a/src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java b/src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java index 4cbb35b..afe1fd2 100644 --- a/src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java +++ b/src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java @@ -6,6 +6,7 @@ import org.apache.commons.io.Charsets; import org.hobbit.core.Commands; +import org.hobbit.core.Constants; import org.hobbit.core.components.AbstractBenchmarkController; import org.hobbit.core.components.AbstractCommandReceivingComponent; import org.hobbit.core.data.StartCommandData; @@ -82,5 +83,22 @@ protected void createDataGenerators(String dataGeneratorImageName, int numberOfD createGenerator(dataGeneratorImageName, numberOfDataGenerators, envVariables, dataGenContainerIds); } + protected void createTaskGenerators(String taskGeneratorImageName, int numberOfTaskGenerators, + String[] envVariables, AbstractCommandReceivingComponent dummyComponent) { + this.directComponent = dummyComponent; + createGenerator(taskGeneratorImageName, numberOfTaskGenerators, envVariables, taskGenContainerIds); + } + + protected void createEvaluationStorage(String evalStorageImageName, String[] envVariables, + AbstractCommandReceivingComponent dummyComponent) { + this.directComponent = dummyComponent; + evalStoreContainerId = createContainer(evalStorageImageName, Constants.CONTAINER_TYPE_DATABASE, envVariables); + if (evalStoreContainerId == null) { + String errorMsg = "Couldn't create evaluation storage. Aborting."; + LOGGER.error(errorMsg); + throw new IllegalStateException(errorMsg); + } + } + } \ No newline at end of file diff --git a/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java b/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java index c3dc386..9908c26 100644 --- a/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java +++ b/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java @@ -46,16 +46,17 @@ import com.rabbitmq.client.MessageProperties; @RunWith(Parameterized.class) -public class BenchmarkControllerTest extends DirectContainerCreator { +public class BenchmarkControllerTest extends AbstractBenchmarkController { private static final Logger LOGGER = LoggerFactory.getLogger(BenchmarkControllerTest.class); + + DummyPlatformController dummyPlatformController; private static final String HOBBIT_SESSION_ID = "123"; private static final String SYSTEM_CONTAINER_ID = "systemContainerId"; private static final String DATA_GEN_IMAGE = "datagenimage"; private static final String TASK_GEN_IMAGE = "taskgenimage"; private static final String EVAL_IMAGE = "evaluationimage"; - private DummyPlatformController dummyPlatformController; @Parameters public static Collection data() { @@ -91,6 +92,7 @@ public void test() throws Exception { // Needed for the generators environmentVariables.set(Constants.GENERATOR_ID_KEY, "0"); environmentVariables.set(Constants.GENERATOR_COUNT_KEY, "1"); + dummyPlatformController = new DummyPlatformController(sessionId); try { @@ -157,7 +159,7 @@ public void init() throws Exception { super.init(); // create data generators - createDataGenerators(DATA_GEN_IMAGE, numberOfDataGenerators, null,dummyPlatformController); + createDataGenerators(DATA_GEN_IMAGE, numberOfDataGenerators, null); // Create task generators createTaskGenerators(TASK_GEN_IMAGE, numberOfTaskGenerators, null); @@ -212,122 +214,127 @@ public DummyPlatformController(String sessionId) { super(); this.sessionId = sessionId; } + public void receiveCommand(byte command, byte[] data, String sessionId, AMQP.BasicProperties props) { System.out.println("Inside static : "+this); System.out.println("command : "+ command); createDummyComponent(command, data, sessionId, props); } + public void createDummyComponent(byte command, byte[] data, String sessionId, AMQP.BasicProperties props) { - String replyTo = null; - if (props != null) { - replyTo = props.getReplyTo(); - } - - LOGGER.info("received command: session={}, command={}, data={}", sessionId, Commands.toString(command), - data != null ? RabbitMQUtils.readString(data) : "null"); - if (command == Commands.BENCHMARK_READY_SIGNAL) { - System.out.println("Benchmark Ready!"); - try { - sendToCmdQueue(sessionId, Commands.START_BENCHMARK_SIGNAL, - RabbitMQUtils.writeString(SYSTEM_CONTAINER_ID), null); - } catch (IOException e) { - e.printStackTrace(); - Assert.fail(e.getLocalizedMessage()); - } - } else if (command == Commands.DOCKER_CONTAINER_START) { - try { - String startCommandJson = RabbitMQUtils.readString(data); - final String containerId = Integer.toString(random.nextInt()); - - AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder(); - propsBuilder.deliveryMode(2); - propsBuilder.correlationId(props.getCorrelationId()); - AMQP.BasicProperties replyProps = propsBuilder.build(); - - if (startCommandJson.contains(DATA_GEN_IMAGE)) { - // Create data generators that are waiting for a random - // amount of time and terminate after that - DummyComponentExecutor dataGenExecutor = new DummyComponentExecutor( - new AbstractDataGenerator() { - @Override - protected void generateData() throws Exception { - LOGGER.debug("Data Generator started..."); - Thread.sleep(1000 + random.nextInt(1000)); - } - }) { - @Override - public void run() { - super.run(); - try { - sendToCmdQueue(Constants.HOBBIT_SESSION_ID_FOR_BROADCASTS, - Commands.DOCKER_CONTAINER_TERMINATED, - RabbitMQUtils.writeByteArrays(null, - new byte[][] { RabbitMQUtils.writeString(containerId) }, - new byte[] { (byte) 0 }), - null); - } catch (IOException e) { - e.printStackTrace(); - success = false; - } - } - }; - dataGenExecutors.add(dataGenExecutor); - Thread t = new Thread(dataGenExecutor); - dataGenThreads.add(t); - t.start(); - - cmdChannel.basicPublish("", replyTo, replyProps, - RabbitMQUtils.writeString(containerId)); - } else if (startCommandJson.contains(TASK_GEN_IMAGE)) { - // Create task generators that are waiting for a random - // amount of - // time and terminate after that - DummyComponentExecutor taskGenExecutor = new DummyComponentExecutor( - new AbstractTaskGenerator() { - @Override - public void run() throws Exception { - LOGGER.debug("Task Generator started..."); - super.run(); - } - - @Override - protected void generateTask(byte[] data) throws Exception { - } - }) { - @Override - public void run() { - super.run(); - try { - sendToCmdQueue(Constants.HOBBIT_SESSION_ID_FOR_BROADCASTS, - Commands.DOCKER_CONTAINER_TERMINATED, - RabbitMQUtils.writeByteArrays(null, - new byte[][] { RabbitMQUtils.writeString(containerId) }, - new byte[] { (byte) 0 }), - null); - } catch (IOException e) { - e.printStackTrace(); - success = false; - } - } - }; - taskGenExecutors.add(taskGenExecutor); - Thread t = new Thread(taskGenExecutor); - taskGenThreads.add(t); - t.start(); - - cmdChannel.basicPublish("", replyTo, replyProps, - RabbitMQUtils.writeString(containerId)); - } else if (startCommandJson.contains(EVAL_IMAGE)) { - cmdChannel.basicPublish("", replyTo, replyProps, - RabbitMQUtils.writeString(containerId)); - sendToCmdQueue(this.sessionId, Commands.EVAL_STORAGE_READY_SIGNAL, null, null); - } else { - LOGGER.error("Got unknown start command. Ignoring it."); - } - } catch (IOException e) { - LOGGER.error("Exception while trying to respond to a container creation command.", e); - } - } + String replyTo = null; + if (props != null) { + replyTo = props.getReplyTo(); + } + + LOGGER.info("received command: session={}, command={}, data={}", sessionId, Commands.toString(command), + data != null ? RabbitMQUtils.readString(data) : "null"); + if (command == Commands.BENCHMARK_READY_SIGNAL) { + System.out.println("Benchmark Ready!"); + try { + sendToCmdQueue(sessionId, Commands.START_BENCHMARK_SIGNAL, + RabbitMQUtils.writeString(SYSTEM_CONTAINER_ID), null); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(e.getLocalizedMessage()); + } + } else if (command == Commands.DOCKER_CONTAINER_START) { + try { + String startCommandJson = RabbitMQUtils.readString(data); + final String containerId = Integer.toString(random.nextInt()); + + AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder(); + propsBuilder.deliveryMode(2); + propsBuilder.correlationId(props.getCorrelationId()); + AMQP.BasicProperties replyProps = propsBuilder.build(); + + if (startCommandJson.contains(DATA_GEN_IMAGE)) { + // Create data generators that are waiting for a random + // amount of time and terminate after that + DummyComponentExecutor dataGenExecutor = new DummyComponentExecutor( + new AbstractDataGenerator() { + @Override + protected void generateData() throws Exception { + LOGGER.debug("Data Generator started..."); + Thread.sleep(1000 + random.nextInt(1000)); + } + }) { + @Override + public void run() { + super.run(); + try { + sendToCmdQueue(Constants.HOBBIT_SESSION_ID_FOR_BROADCASTS, + Commands.DOCKER_CONTAINER_TERMINATED, + RabbitMQUtils.writeByteArrays(null, + new byte[][] { RabbitMQUtils.writeString(containerId) }, + new byte[] { (byte) 0 }), + null); + } catch (IOException e) { + e.printStackTrace(); + success = false; + } + } + }; + dataGenExecutors.add(dataGenExecutor); + Thread t = new Thread(dataGenExecutor); + dataGenThreads.add(t); + t.start(); + + cmdChannel.basicPublish("", replyTo, replyProps, + RabbitMQUtils.writeString(containerId)); + } else if (startCommandJson.contains(TASK_GEN_IMAGE)) { + // Create task generators that are waiting for a random + // amount of + // time and terminate after that + DummyComponentExecutor taskGenExecutor = new DummyComponentExecutor( + new AbstractTaskGenerator() { + @Override + public void run() throws Exception { + LOGGER.debug("Task Generator started..."); + super.run(); + } + + @Override + protected void generateTask(byte[] data) throws Exception { + } + }) { + @Override + public void run() { + super.run(); + try { + sendToCmdQueue(Constants.HOBBIT_SESSION_ID_FOR_BROADCASTS, + Commands.DOCKER_CONTAINER_TERMINATED, + RabbitMQUtils.writeByteArrays(null, + new byte[][] { RabbitMQUtils.writeString(containerId) }, + new byte[] { (byte) 0 }), + null); + } catch (IOException e) { + e.printStackTrace(); + success = false; + } + } + }; + taskGenExecutors.add(taskGenExecutor); + Thread t = new Thread(taskGenExecutor); + taskGenThreads.add(t); + t.start(); + + cmdChannel.basicPublish("", replyTo, replyProps, + RabbitMQUtils.writeString(containerId)); + } else if (startCommandJson.contains(EVAL_IMAGE)) { + cmdChannel.basicPublish("", replyTo, replyProps, + RabbitMQUtils.writeString(containerId)); + sendToCmdQueue(this.sessionId, Commands.EVAL_STORAGE_READY_SIGNAL, null, null); + } else { + LOGGER.error("Got unknown start command. Ignoring it."); + } + } catch (IOException e) { + LOGGER.error("Exception while trying to respond to a container creation command.", e); + } + } + } + + } } diff --git a/src/test/java/org/hobbit/core/components/DirectBenchmarkControllerTest.java b/src/test/java/org/hobbit/core/components/DirectBenchmarkControllerTest.java new file mode 100644 index 0000000..fdaccd5 --- /dev/null +++ b/src/test/java/org/hobbit/core/components/DirectBenchmarkControllerTest.java @@ -0,0 +1,333 @@ +/** + * This file is part of core. + * + * core is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * core is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with core. If not, see . + */ +package org.hobbit.core.components; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; + +import org.apache.commons.io.IOUtils; +import org.apache.jena.rdf.model.ModelFactory; +import org.hobbit.core.Commands; +import org.hobbit.core.Constants; +import org.hobbit.core.TestConstants; +import org.hobbit.core.components.dummy.AbstractDummyPlatformController; +import org.hobbit.core.components.dummy.DummyComponentExecutor; +import org.hobbit.core.containerservice.DirectContainerCreator; +import org.hobbit.core.rabbit.RabbitMQUtils; +import org.hobbit.vocab.HobbitExperiments; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.contrib.java.lang.system.EnvironmentVariables; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.MessageProperties; + +@RunWith(Parameterized.class) +public class DirectBenchmarkControllerTest extends DirectContainerCreator { + + private static final Logger LOGGER = LoggerFactory.getLogger(DirectBenchmarkControllerTest.class); + + private static final String HOBBIT_SESSION_ID = "123"; + private static final String SYSTEM_CONTAINER_ID = "systemContainerId"; + private static final String DATA_GEN_IMAGE = "datagenimage"; + private static final String TASK_GEN_IMAGE = "taskgenimage"; + private static final String EVAL_IMAGE = "evaluationimage"; + private DummyPlatformController dummyPlatformController; + + @Parameters + public static Collection data() { + List testConfigs = new ArrayList(); + testConfigs.add(new Object[] { 1, 1 }); + testConfigs.add(new Object[] { 1, 10 }); + testConfigs.add(new Object[] { 10, 1 }); + testConfigs.add(new Object[] { 10, 10 }); + return testConfigs; + } + + @Rule + public final EnvironmentVariables environmentVariables = new EnvironmentVariables(); + + private int numberOfDataGenerators; + private int numberOfTaskGenerators; + private String sessionId; + + public DirectBenchmarkControllerTest(int numberOfDataGenerators, int numberOfTaskGenerators) { + this.numberOfDataGenerators = numberOfDataGenerators; + this.numberOfTaskGenerators = numberOfTaskGenerators; + this.sessionId = HOBBIT_SESSION_ID + Integer.toString(numberOfDataGenerators) + + Integer.toString(numberOfTaskGenerators); + } + + @Test + public void test() throws Exception { + environmentVariables.set(Constants.RABBIT_MQ_HOST_NAME_KEY, TestConstants.RABBIT_HOST); + environmentVariables.set(Constants.HOBBIT_SESSION_ID_KEY, sessionId); + environmentVariables.set(Constants.BENCHMARK_PARAMETERS_MODEL_KEY, + "{ \"@id\" : \"http://w3id.org/hobbit/experiments#New\", \"@type\" : \"http://w3id.org/hobbit/vocab#Experiment\" }"); + environmentVariables.set(Constants.HOBBIT_EXPERIMENT_URI_KEY, HobbitExperiments.getExperimentURI(sessionId)); + // Needed for the generators + environmentVariables.set(Constants.GENERATOR_ID_KEY, "0"); + environmentVariables.set(Constants.GENERATOR_COUNT_KEY, "1"); + + dummyPlatformController = new DummyPlatformController(sessionId); + try { + DummyComponentExecutor dummyPlatformExecutor = new DummyComponentExecutor(dummyPlatformController); + Thread dummyPlatformThread = new Thread(dummyPlatformExecutor); + dummyPlatformThread.start(); + dummyPlatformController.waitForControllerBeingReady(); + + AbstractBenchmarkController controller = this; + DummyComponentExecutor controllerExecutor = new DummyComponentExecutor(controller); + Thread controllerThread = new Thread(controllerExecutor); + controllerThread.start(); + // wait for the benchmark controller to start + + Thread.sleep(10000); + dummyPlatformController.sendToCmdQueue(Constants.HOBBIT_SESSION_ID_FOR_BROADCASTS, + Commands.DOCKER_CONTAINER_TERMINATED, RabbitMQUtils.writeByteArrays(null, + new byte[][] { RabbitMQUtils.writeString(SYSTEM_CONTAINER_ID) }, new byte[] { (byte) 0 }), + null); + Thread.sleep(10000); + + for (Thread t : dummyPlatformController.dataGenThreads) { + t.join(10000); + Assert.assertFalse(t.isAlive()); + } + for (Thread t : dummyPlatformController.taskGenThreads) { + t.join(10000); + Assert.assertFalse(t.isAlive()); + } + + for (DummyComponentExecutor executor : dummyPlatformController.dataGenExecutors) { + Assert.assertTrue(executor.isSuccess()); + } + for (DummyComponentExecutor executor : dummyPlatformController.taskGenExecutors) { + Assert.assertTrue(executor.isSuccess()); + } + + // Make sure that the benchmark controller terminates during the + // next seconds + controllerThread.join(5000); + Assert.assertFalse(controllerThread.isAlive()); + } finally { + dummyPlatformController.terminate(); + for (DummyComponentExecutor executor : dummyPlatformController.dataGenExecutors) { + try { + IOUtils.closeQuietly(executor.getComponent()); + } catch (Exception e) { + e.printStackTrace(); + } + } + for (DummyComponentExecutor executor : dummyPlatformController.taskGenExecutors) { + try { + IOUtils.closeQuietly(executor.getComponent()); + } catch (Exception e) { + e.printStackTrace(); + } + } + close(); + } + } + + @Override + public void init() throws Exception { + super.init(); + + // create data generators + createDataGenerators(DATA_GEN_IMAGE, numberOfDataGenerators, null,dummyPlatformController); + + // Create task generators + createTaskGenerators(TASK_GEN_IMAGE, numberOfTaskGenerators, null, dummyPlatformController); + + // Create evaluation storage + createEvaluationStorage(EVAL_IMAGE, null, dummyPlatformController); + + // Wait for all components to finish their initialization + waitForComponentsToInitialize(); + } + + @Override + protected void executeBenchmark() throws Exception { + // give the start signals + sendToCmdQueue(Commands.TASK_GENERATOR_START_SIGNAL); + sendToCmdQueue(Commands.DATA_GENERATOR_START_SIGNAL); + + // wait for the data generators to finish their work + waitForDataGenToFinish(); + + // wait for the task generators to finish their work + waitForTaskGenToFinish(); + + // wait for the system to terminate + waitForSystemToFinish(); + + // Create the evaluation module + + // wait for the evaluation to finish + // waitForEvalComponentsToFinish(); + + // the evaluation module should have sent an RDF model containing the + // results. We should add the configuration of the benchmark to this + // model. + // this.resultModel.add(...); + + // Send the resultModul to the platform controller and terminate + sendResultModel(ModelFactory.createDefaultModel()); + } + + protected static class DummyPlatformController extends AbstractDummyPlatformController { + + public List dataGenExecutors = new ArrayList(); + public List dataGenThreads = new ArrayList<>(); + public List taskGenExecutors = new ArrayList(); + public List taskGenThreads = new ArrayList<>(); + public Random random = new Random(); + + private String sessionId; + + public DummyPlatformController(String sessionId) { + super(); + this.sessionId = sessionId; + } + public void receiveCommand(byte command, byte[] data, String sessionId, AMQP.BasicProperties props) { + System.out.println("Inside static : "+this); + System.out.println("command : "+ command); + createDummyComponent(command, data, sessionId, props); + } + public void createDummyComponent(byte command, byte[] data, String sessionId, AMQP.BasicProperties props) { + String replyTo = null; + if (props != null) { + replyTo = props.getReplyTo(); + } + + LOGGER.info("received command: session={}, command={}, data={}", sessionId, Commands.toString(command), + data != null ? RabbitMQUtils.readString(data) : "null"); + if (command == Commands.BENCHMARK_READY_SIGNAL) { + System.out.println("Benchmark Ready!"); + try { + sendToCmdQueue(sessionId, Commands.START_BENCHMARK_SIGNAL, + RabbitMQUtils.writeString(SYSTEM_CONTAINER_ID), null); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(e.getLocalizedMessage()); + } + } else if (command == Commands.DOCKER_CONTAINER_START) { + try { + String startCommandJson = RabbitMQUtils.readString(data); + final String containerId = Integer.toString(random.nextInt()); + + AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder(); + propsBuilder.deliveryMode(2); + propsBuilder.correlationId(props.getCorrelationId()); + AMQP.BasicProperties replyProps = propsBuilder.build(); + + if (startCommandJson.contains(DATA_GEN_IMAGE)) { + // Create data generators that are waiting for a random + // amount of time and terminate after that + DummyComponentExecutor dataGenExecutor = new DummyComponentExecutor( + new AbstractDataGenerator() { + @Override + protected void generateData() throws Exception { + LOGGER.debug("Data Generator started..."); + Thread.sleep(1000 + random.nextInt(1000)); + } + }) { + @Override + public void run() { + super.run(); + try { + sendToCmdQueue(Constants.HOBBIT_SESSION_ID_FOR_BROADCASTS, + Commands.DOCKER_CONTAINER_TERMINATED, + RabbitMQUtils.writeByteArrays(null, + new byte[][] { RabbitMQUtils.writeString(containerId) }, + new byte[] { (byte) 0 }), + null); + } catch (IOException e) { + e.printStackTrace(); + success = false; + } + } + }; + dataGenExecutors.add(dataGenExecutor); + Thread t = new Thread(dataGenExecutor); + dataGenThreads.add(t); + t.start(); + + cmdChannel.basicPublish("", replyTo, replyProps, + RabbitMQUtils.writeString(containerId)); + } else if (startCommandJson.contains(TASK_GEN_IMAGE)) { + // Create task generators that are waiting for a random + // amount of + // time and terminate after that + DummyComponentExecutor taskGenExecutor = new DummyComponentExecutor( + new AbstractTaskGenerator() { + @Override + public void run() throws Exception { + LOGGER.debug("Task Generator started..."); + super.run(); + } + + @Override + protected void generateTask(byte[] data) throws Exception { + } + }) { + @Override + public void run() { + super.run(); + try { + sendToCmdQueue(Constants.HOBBIT_SESSION_ID_FOR_BROADCASTS, + Commands.DOCKER_CONTAINER_TERMINATED, + RabbitMQUtils.writeByteArrays(null, + new byte[][] { RabbitMQUtils.writeString(containerId) }, + new byte[] { (byte) 0 }), + null); + } catch (IOException e) { + e.printStackTrace(); + success = false; + } + } + }; + taskGenExecutors.add(taskGenExecutor); + Thread t = new Thread(taskGenExecutor); + taskGenThreads.add(t); + t.start(); + + cmdChannel.basicPublish("", replyTo, replyProps, + RabbitMQUtils.writeString(containerId)); + } else if (startCommandJson.contains(EVAL_IMAGE)) { + cmdChannel.basicPublish("", replyTo, replyProps, + RabbitMQUtils.writeString(containerId)); + sendToCmdQueue(this.sessionId, Commands.EVAL_STORAGE_READY_SIGNAL, null, null); + } else { + LOGGER.error("Got unknown start command. Ignoring it."); + } + } catch (IOException e) { + LOGGER.error("Exception while trying to respond to a container creation command.", e); + } + } + } + } +} diff --git a/src/test/java/org/hobbit/core/components/dummy/DummyDataCreator.java b/src/test/java/org/hobbit/core/components/dummy/DummyDataCreator.java index d333906..0d364c4 100644 --- a/src/test/java/org/hobbit/core/components/dummy/DummyDataCreator.java +++ b/src/test/java/org/hobbit/core/components/dummy/DummyDataCreator.java @@ -34,6 +34,7 @@ protected void generateData() throws Exception { byte data[]; for (int i = 0; i < dataSize; ++i) { data = RabbitMQUtils.writeString(Integer.toString(i)); + sendDataToSystemAdapter(data); sendDataToTaskGenerator(data); } From e14cc95d5db09b38f5d1da66db0554448abc1d72 Mon Sep 17 00:00:00 2001 From: altafhusen-mr Date: Wed, 24 Jun 2020 13:35:41 +0200 Subject: [PATCH 4/6] Changes for the component creation based on feedback Co-authored-by: Sourabh <56801226+sourabhpoddar404@users.noreply.github.com> Co-authored-by: Altafhusen <57062112+altafhusen-mr@users.noreply.github.com> Co-authored-by: Yamini <29372422+Yamini19@users.noreply.github.com> Co-authored-by: Melissa <21376912+melissadas@users.noreply.github.com> --- src/main/java/org/hobbit/core/Constants.java | 2 + .../AbstractBenchmarkController.java | 29 +- .../AbstractCommandReceivingComponent.java | 32 +- .../components/AbstractTaskGenerator.java | 3 - .../containerservice/ContainerCreation.java | 21 ++ .../ContainerCreationFactory.java | 24 ++ .../DirectContainerCreator.java | 151 ++++++-- .../RabbitMQContainerCreator.java | 62 ++++ .../components/BenchmarkControllerTest.java | 9 +- .../DirectBenchmarkControllerTest.java | 333 ------------------ 10 files changed, 290 insertions(+), 376 deletions(-) create mode 100644 src/main/java/org/hobbit/core/containerservice/ContainerCreation.java create mode 100644 src/main/java/org/hobbit/core/containerservice/ContainerCreationFactory.java create mode 100644 src/main/java/org/hobbit/core/containerservice/RabbitMQContainerCreator.java delete mode 100644 src/test/java/org/hobbit/core/components/DirectBenchmarkControllerTest.java diff --git a/src/main/java/org/hobbit/core/Constants.java b/src/main/java/org/hobbit/core/Constants.java index 82bbc7a..b520580 100644 --- a/src/main/java/org/hobbit/core/Constants.java +++ b/src/main/java/org/hobbit/core/Constants.java @@ -161,4 +161,6 @@ private Constants() { public static final TimeZone DEFAULT_TIME_ZONE = TimeZone.getTimeZone("GMT"); + public static final String RABBIT_CONTAINER_SERVICE = "RABBIT_CONTAINER_SERVICE"; + } diff --git a/src/main/java/org/hobbit/core/components/AbstractBenchmarkController.java b/src/main/java/org/hobbit/core/components/AbstractBenchmarkController.java index 375fb1e..08c64d7 100644 --- a/src/main/java/org/hobbit/core/components/AbstractBenchmarkController.java +++ b/src/main/java/org/hobbit/core/components/AbstractBenchmarkController.java @@ -34,6 +34,9 @@ import org.apache.jena.vocabulary.RDF; import org.hobbit.core.Commands; import org.hobbit.core.Constants; +import org.hobbit.core.containerservice.ContainerCreation; +import org.hobbit.core.containerservice.ContainerCreationFactory; +import org.hobbit.core.containerservice.RabbitMQContainerCreator; import org.hobbit.core.rabbit.RabbitMQUtils; import org.hobbit.utils.EnvVariables; import org.hobbit.vocab.HOBBIT; @@ -135,6 +138,10 @@ public abstract class AbstractBenchmarkController extends AbstractPlatformConnec * The URI of the experiment. */ protected String experimentUri; + /** + * The instance to create container + */ + protected ContainerCreation containerCreation; public AbstractBenchmarkController() { defaultContainerType = Constants.CONTAINER_TYPE_BENCHMARK; @@ -143,6 +150,7 @@ public AbstractBenchmarkController() { @Override public void init() throws Exception { super.init(); + containerCreation = ContainerCreationFactory.getContainerCreationObject(EnvVariables.getString(Constants.RABBIT_CONTAINER_SERVICE, LOGGER), this); // benchmark controllers should be able to accept broadcasts addCommandHeaderId(Constants.HOBBIT_SESSION_ID_FOR_BROADCASTS); @@ -206,7 +214,7 @@ protected void createTaskGenerators(String taskGeneratorImageName, int numberOfT * @param generatorIds * set of generator container names */ - protected void createGenerator(String generatorImageName, int numberOfGenerators, String[] envVariables, + public void createGenerator(String generatorImageName, int numberOfGenerators, String[] envVariables, Set generatorIds) { String containerId; String variables[] = envVariables != null ? Arrays.copyOf(envVariables, envVariables.length + 2) @@ -608,4 +616,23 @@ protected void containerCrashed(String containerName) { sendResultModel(resultModel); System.exit(1); } + + public Set getDataGenContainerIds() { + return dataGenContainerIds; + } + + public Set getTaskGenContainerIds() { + return taskGenContainerIds; + } + + public String getEvalStoreContainerId() { + return evalStoreContainerId; + } + + public void setEvalStoreContainerId(String evalStoreContainerId) { + this.evalStoreContainerId = evalStoreContainerId; + } + + + } diff --git a/src/main/java/org/hobbit/core/components/AbstractCommandReceivingComponent.java b/src/main/java/org/hobbit/core/components/AbstractCommandReceivingComponent.java index 9b19178..734ba81 100644 --- a/src/main/java/org/hobbit/core/components/AbstractCommandReceivingComponent.java +++ b/src/main/java/org/hobbit/core/components/AbstractCommandReceivingComponent.java @@ -291,7 +291,7 @@ protected String createContainer(String imageName, String[] envVariables) { * user-provided array of environment variables * @return the extended array of environment variables */ - protected String[] extendContainerEnvVariables(String[] envVariables) { + public String[] extendContainerEnvVariables(String[] envVariables) { if (envVariables == null) { envVariables = new String[0]; } @@ -334,7 +334,7 @@ protected String[] extendContainerEnvVariables(String[] envVariables) { * container * @return the name of the container instance or null if an error occurred */ - protected String createContainer(String imageName, String containerType, String[] envVariables) { + public String createContainer(String imageName, String containerType, String[] envVariables) { return createContainer(imageName, containerType, envVariables, null); } @@ -461,7 +461,15 @@ protected Future createContainerAsync(String imageName, String container return null; } - /** + public Map> getResponseFutures() { + return responseFutures; + } + + public String getResponseQueueName() { + return responseQueueName; + } + + /** * This method sends a {@link Commands#DOCKER_CONTAINER_STOP} command to * stop the container with the given id. * @@ -484,7 +492,7 @@ protected void stopContainer(String containerName) { * @throws IOException * if a communication problem occurs */ - protected void initResponseQueue() throws IOException { + public void initResponseQueue() throws IOException { if (responseQueueName == null) { responseQueueName = cmdChannel.queueDeclare().getQueue(); } @@ -554,6 +562,22 @@ public void close() throws IOException { } super.close(); } + + public String getContainerName() { + return containerName; + } + + public void setContainerName(String containerName) { + this.containerName = containerName; + } + + public Gson getGson() { + return gson; + } + + public void setGson(Gson gson) { + this.gson = gson; + } public void createDummyComponent(byte command, byte[] data, String sessionId, BasicProperties props) { // TODO Auto-generated method stub diff --git a/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java b/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java index a40b302..c66b0cd 100644 --- a/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java +++ b/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java @@ -141,11 +141,9 @@ public void handleData(byte[] data) { @Override public void run() throws Exception { - Thread.sleep(1000); sendToCmdQueue(Commands.TASK_GENERATOR_READY_SIGNAL); // Wait for the start message startTaskGenMutex.acquire(); - System.out.println("inside run"); // Wait for message to terminate terminateMutex.acquire(); dataGenReceiver.closeWhenFinished(); @@ -158,7 +156,6 @@ public void run() throws Exception { @Override public void receiveGeneratedData(byte[] data) { try { - System.out.println("inside receive"); generateTask(data); } catch (Exception e) { LOGGER.error("Exception while generating task.", e); diff --git a/src/main/java/org/hobbit/core/containerservice/ContainerCreation.java b/src/main/java/org/hobbit/core/containerservice/ContainerCreation.java new file mode 100644 index 0000000..45ec4cb --- /dev/null +++ b/src/main/java/org/hobbit/core/containerservice/ContainerCreation.java @@ -0,0 +1,21 @@ +package org.hobbit.core.containerservice; + +import org.hobbit.core.components.AbstractCommandReceivingComponent; +/** + * ContainerCreation provides the facility to implement the functionalities + * to create {@link DirectContainerCreator} or {@link RabbitMQContainerCreator} + * @author altaf, sourabh, yamini, melisa + * + */ +public interface ContainerCreation { + + void createDataGenerators(String dataGeneratorImageName, int numberOfDataGenerators, + String[] envVariables, AbstractCommandReceivingComponent dummyComponent); + + void createTaskGenerators(String taskGeneratorImageName, int numberOfTaskGenerators, + String[] envVariables, AbstractCommandReceivingComponent dummyComponent); + + void createEvaluationStorage(String evalStorageImageName, String[] envVariables, + AbstractCommandReceivingComponent dummyComponent); + +} diff --git a/src/main/java/org/hobbit/core/containerservice/ContainerCreationFactory.java b/src/main/java/org/hobbit/core/containerservice/ContainerCreationFactory.java new file mode 100644 index 0000000..d6e84f5 --- /dev/null +++ b/src/main/java/org/hobbit/core/containerservice/ContainerCreationFactory.java @@ -0,0 +1,24 @@ +package org.hobbit.core.containerservice; + +import org.hobbit.core.Constants; +import org.hobbit.core.components.AbstractBenchmarkController; + +/** + * Factory that provides functionality to get an instance of {@link ContainerCreation} + * @author altaf, sourabh, yamini, melisa + * + */ +public class ContainerCreationFactory { + + /** + * This method returns the instance of {@link RabbitMQContainerCreator} or {@link DirectContainerCreator} + * based on the environment variable {@link Constants#RABBIT_CONTAINER_SERVICE} + */ + public static ContainerCreation getContainerCreationObject(String isRabbitContainerService, AbstractBenchmarkController abstractBenchmarkController) { + if(isRabbitContainerService.equals("true")) { + return new RabbitMQContainerCreator(abstractBenchmarkController); + } + return new DirectContainerCreator(abstractBenchmarkController); + } + +} diff --git a/src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java b/src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java index afe1fd2..e5d7036 100644 --- a/src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java +++ b/src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java @@ -1,9 +1,10 @@ package org.hobbit.core.containerservice; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Set; import java.util.UUID; import java.util.concurrent.Future; - import org.apache.commons.io.Charsets; import org.hobbit.core.Commands; import org.hobbit.core.Constants; @@ -13,40 +14,67 @@ import org.hobbit.core.rabbit.RabbitMQUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import com.google.common.util.concurrent.SettableFuture; import com.rabbitmq.client.AMQP.BasicProperties; -public abstract class DirectContainerCreator extends AbstractBenchmarkController { +/** + * This class provides the implementation to create container functionality + * @author altaf, sourabh, yamini, melisa + * + */ +public class DirectContainerCreator implements ContainerCreation { private static final Logger LOGGER = LoggerFactory.getLogger(DirectContainerCreator.class); AbstractCommandReceivingComponent directComponent = null; + + private AbstractBenchmarkController abstractBenchmarkController; + + public DirectContainerCreator(AbstractBenchmarkController abstractBenchmarkController) { + this.abstractBenchmarkController = abstractBenchmarkController; + } - protected Future createContainerAsync(String imageName, String containerType, String[] envVariables, String[] netAliases) { + /** + * This method creates and starts an instance of the given image using the given + * environment variables. + *

+ * Note that the containerType parameter should have one of the following + * values. + *

    + *
  • {@link Constants#CONTAINER_TYPE_DATABASE} if this container is part + * of a benchmark but should be located on a storage node.
  • + *
+ * + * @param imageName + * the name of the image of the docker container + * @param containerType + * the type of the container + * @param envVariables + * environment variables that should be added to the created + * container + * @param netAliases + * network aliases that should be added to the created container + * @return the Future object with the name of the container instance or null if an error occurred + */ + protected Future createContainer(String imageName, String containerType, String[] envVariables, String[] netAliases) { try { - System.out.println("Inside RabbitMQContainerCreater"); - envVariables = extendContainerEnvVariables(envVariables); + envVariables = abstractBenchmarkController.extendContainerEnvVariables(envVariables); - initResponseQueue(); + abstractBenchmarkController.initResponseQueue(); String correlationId = UUID.randomUUID().toString(); SettableFuture containerFuture = SettableFuture.create(); - synchronized (responseFutures) { - responseFutures.put(correlationId, containerFuture); + synchronized (abstractBenchmarkController.getResponseFutures()) { + abstractBenchmarkController.getResponseFutures().put(correlationId, containerFuture); } - byte data[] = RabbitMQUtils.writeString( - gson.toJson(new StartCommandData(imageName, containerType, containerName, envVariables, netAliases))); + abstractBenchmarkController.getGson().toJson(new StartCommandData(imageName, containerType, abstractBenchmarkController.getContainerName(), envVariables, netAliases))); BasicProperties.Builder propsBuilder = new BasicProperties.Builder(); propsBuilder.deliveryMode(2); - propsBuilder.replyTo(responseQueueName); + propsBuilder.replyTo(abstractBenchmarkController.getResponseQueueName()); propsBuilder.correlationId(correlationId); BasicProperties props = propsBuilder.build(); - - - - byte sessionIdBytes[] = getHobbitSessionId().getBytes(Charsets.UTF_8); + byte sessionIdBytes[] = abstractBenchmarkController.getHobbitSessionId().getBytes(Charsets.UTF_8); int dataLength = sessionIdBytes.length + 5; boolean attachData = (data != null) && (data.length > 0); if (attachData) { @@ -59,16 +87,10 @@ protected Future createContainerAsync(String imageName, String container if (attachData) { buffer.put(data); } - byte sessionIdBytes1[] = new byte[sessionIdBytes.length]; String sessionId = new String(sessionIdBytes1, Charsets.UTF_8); byte command = Commands.DOCKER_CONTAINER_START; - - - - directComponent.createDummyComponent(command, data, sessionId, props); - return containerFuture; } catch (Exception e) { LOGGER.error("Got exception while trying to request the creation of an instance of the \"" + imageName @@ -77,28 +99,97 @@ protected Future createContainerAsync(String imageName, String container return null; } - protected void createDataGenerators(String dataGeneratorImageName, int numberOfDataGenerators, + /** + * Creates the given number of data generators using the given image name + * and environment variables. + * + * @param dataGeneratorImageName + * name of the data generator Docker image + * @param numberOfDataGenerators + * number of generators that should be created + * @param envVariables + * environment variables for the data generators + */ + public void createDataGenerators(String dataGeneratorImageName, int numberOfDataGenerators, String[] envVariables, AbstractCommandReceivingComponent dummyComponent) { this.directComponent = dummyComponent; - createGenerator(dataGeneratorImageName, numberOfDataGenerators, envVariables, dataGenContainerIds); + createGenerator(dataGeneratorImageName, numberOfDataGenerators, envVariables, abstractBenchmarkController.getDataGenContainerIds()); } - protected void createTaskGenerators(String taskGeneratorImageName, int numberOfTaskGenerators, + /** + * Creates the given number of task generators using the given image name + * and environment variables. + * + * @param taskGeneratorImageName + * name of the task generator Docker image + * @param numberOfTaskGenerators + * number of generators that should be created + * @param envVariables + * environment variables for the task generators + */ + public void createTaskGenerators(String taskGeneratorImageName, int numberOfTaskGenerators, String[] envVariables, AbstractCommandReceivingComponent dummyComponent) { this.directComponent = dummyComponent; - createGenerator(taskGeneratorImageName, numberOfTaskGenerators, envVariables, taskGenContainerIds); + createGenerator(taskGeneratorImageName, numberOfTaskGenerators, envVariables, abstractBenchmarkController.getTaskGenContainerIds()); } - protected void createEvaluationStorage(String evalStorageImageName, String[] envVariables, + /** + * Creates the evaluate storage using the given image name and environment + * variables. + * + * @param evalStorageImageName + * name of the evaluation storage image + * @param envVariables + * environment variables that should be given to the component + */ + + public void createEvaluationStorage(String evalStorageImageName, String[] envVariables, AbstractCommandReceivingComponent dummyComponent) { this.directComponent = dummyComponent; - evalStoreContainerId = createContainer(evalStorageImageName, Constants.CONTAINER_TYPE_DATABASE, envVariables); - if (evalStoreContainerId == null) { + abstractBenchmarkController.setEvalStoreContainerId(abstractBenchmarkController.createContainer(evalStorageImageName, Constants.CONTAINER_TYPE_DATABASE, envVariables)); + if (abstractBenchmarkController.getEvalStoreContainerId() == null) { String errorMsg = "Couldn't create evaluation storage. Aborting."; LOGGER.error(errorMsg); throw new IllegalStateException(errorMsg); } } - + + /** + * Internal method for creating generator components. + * + * @param generatorImageName + * name of the generator Docker image + * @param numberOfGenerators + * number of generators that should be created + * @param envVariables + * environment variables for the task generators + * @param generatorIds + * set of generator container names + */ + public void createGenerator(String generatorImageName, int numberOfGenerators, String[] envVariables, + Set generatorIds) { + try { + String containerId; + String variables[] = envVariables != null ? Arrays.copyOf(envVariables, envVariables.length + 2) + : new String[2]; + // NOTE: Count only includes generators created within this method call. + variables[variables.length - 2] = Constants.GENERATOR_COUNT_KEY + "=" + numberOfGenerators; + for (int i = 0; i < numberOfGenerators; ++i) { + // At the start generatorIds is empty, and new generators are added to it immediately. + // Current size of that set is used to make IDs for new generators. + variables[variables.length - 1] = Constants.GENERATOR_ID_KEY + "=" + generatorIds.size(); + containerId = createContainer(generatorImageName, null, envVariables, null).get();// createContainer(generatorImageName, variables); + if (containerId != null) { + generatorIds.add(containerId); + } else { + String errorMsg = "Couldn't create generator component. Aborting."; + LOGGER.error(errorMsg); + throw new IllegalStateException(errorMsg); + } + } + }catch(Exception e) { + + } + } } \ No newline at end of file diff --git a/src/main/java/org/hobbit/core/containerservice/RabbitMQContainerCreator.java b/src/main/java/org/hobbit/core/containerservice/RabbitMQContainerCreator.java new file mode 100644 index 0000000..a093523 --- /dev/null +++ b/src/main/java/org/hobbit/core/containerservice/RabbitMQContainerCreator.java @@ -0,0 +1,62 @@ +package org.hobbit.core.containerservice; + +import org.hobbit.core.Constants; +import org.hobbit.core.components.AbstractBenchmarkController; +import org.hobbit.core.components.AbstractCommandReceivingComponent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements functionality for RabbitMQ container creation + * @author altaf, sourabh, yamini, melisa + * + */ +public class RabbitMQContainerCreator implements ContainerCreation { + + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQContainerCreator.class); + + private AbstractBenchmarkController abstractBenchmarkController; + + public RabbitMQContainerCreator(AbstractBenchmarkController abstractBenchmarkController) { + this.abstractBenchmarkController = abstractBenchmarkController; + } + + /** + * This method calls the createGenerator of {@link AbstractBenchmarkController} as the implementation + * is already implemented there + */ + @Override + public void createDataGenerators(String dataGeneratorImageName, int numberOfDataGenerators, String[] envVariables, + AbstractCommandReceivingComponent dummyComponent) { + abstractBenchmarkController.createGenerator(dataGeneratorImageName, numberOfDataGenerators, envVariables, abstractBenchmarkController.getDataGenContainerIds()); + + } + + /** + * This method calls the createTaskGenerators of {@link AbstractBenchmarkController} as the implementation + * is already implemented there + */ + @Override + public void createTaskGenerators(String taskGeneratorImageName, int numberOfTaskGenerators, String[] envVariables, + AbstractCommandReceivingComponent dummyComponent) { + abstractBenchmarkController.createGenerator(taskGeneratorImageName, numberOfTaskGenerators, envVariables, abstractBenchmarkController.getTaskGenContainerIds()); + + } + + /** + * This method calls the createEvaluationStorage of {@link AbstractBenchmarkController} as the implementation + * is already implemented there + */ + @Override + public void createEvaluationStorage(String evalStorageImageName, String[] envVariables, + AbstractCommandReceivingComponent dummyComponent) { + abstractBenchmarkController.setEvalStoreContainerId(abstractBenchmarkController.createContainer(evalStorageImageName, Constants.CONTAINER_TYPE_DATABASE, envVariables)); + if (abstractBenchmarkController.getEvalStoreContainerId() == null) { + String errorMsg = "Couldn't create evaluation storage. Aborting."; + LOGGER.error(errorMsg); + throw new IllegalStateException(errorMsg); + } + + } + +} diff --git a/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java b/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java index 9908c26..65226cf 100644 --- a/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java +++ b/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java @@ -92,6 +92,7 @@ public void test() throws Exception { // Needed for the generators environmentVariables.set(Constants.GENERATOR_ID_KEY, "0"); environmentVariables.set(Constants.GENERATOR_COUNT_KEY, "1"); + environmentVariables.set(Constants.RABBIT_CONTAINER_SERVICE, "true"); dummyPlatformController = new DummyPlatformController(sessionId); @@ -159,13 +160,13 @@ public void init() throws Exception { super.init(); // create data generators - createDataGenerators(DATA_GEN_IMAGE, numberOfDataGenerators, null); + containerCreation.createDataGenerators(DATA_GEN_IMAGE, numberOfDataGenerators, null, dummyPlatformController); // Create task generators - createTaskGenerators(TASK_GEN_IMAGE, numberOfTaskGenerators, null); + containerCreation.createTaskGenerators(TASK_GEN_IMAGE, numberOfTaskGenerators, null, dummyPlatformController); // Create evaluation storage - createEvaluationStorage(EVAL_IMAGE, null); + containerCreation.createEvaluationStorage(EVAL_IMAGE, null, dummyPlatformController); // Wait for all components to finish their initialization waitForComponentsToInitialize(); @@ -216,8 +217,6 @@ public DummyPlatformController(String sessionId) { } public void receiveCommand(byte command, byte[] data, String sessionId, AMQP.BasicProperties props) { - System.out.println("Inside static : "+this); - System.out.println("command : "+ command); createDummyComponent(command, data, sessionId, props); } diff --git a/src/test/java/org/hobbit/core/components/DirectBenchmarkControllerTest.java b/src/test/java/org/hobbit/core/components/DirectBenchmarkControllerTest.java deleted file mode 100644 index fdaccd5..0000000 --- a/src/test/java/org/hobbit/core/components/DirectBenchmarkControllerTest.java +++ /dev/null @@ -1,333 +0,0 @@ -/** - * This file is part of core. - * - * core is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * core is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with core. If not, see . - */ -package org.hobbit.core.components; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Random; - -import org.apache.commons.io.IOUtils; -import org.apache.jena.rdf.model.ModelFactory; -import org.hobbit.core.Commands; -import org.hobbit.core.Constants; -import org.hobbit.core.TestConstants; -import org.hobbit.core.components.dummy.AbstractDummyPlatformController; -import org.hobbit.core.components.dummy.DummyComponentExecutor; -import org.hobbit.core.containerservice.DirectContainerCreator; -import org.hobbit.core.rabbit.RabbitMQUtils; -import org.hobbit.vocab.HobbitExperiments; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.contrib.java.lang.system.EnvironmentVariables; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.MessageProperties; - -@RunWith(Parameterized.class) -public class DirectBenchmarkControllerTest extends DirectContainerCreator { - - private static final Logger LOGGER = LoggerFactory.getLogger(DirectBenchmarkControllerTest.class); - - private static final String HOBBIT_SESSION_ID = "123"; - private static final String SYSTEM_CONTAINER_ID = "systemContainerId"; - private static final String DATA_GEN_IMAGE = "datagenimage"; - private static final String TASK_GEN_IMAGE = "taskgenimage"; - private static final String EVAL_IMAGE = "evaluationimage"; - private DummyPlatformController dummyPlatformController; - - @Parameters - public static Collection data() { - List testConfigs = new ArrayList(); - testConfigs.add(new Object[] { 1, 1 }); - testConfigs.add(new Object[] { 1, 10 }); - testConfigs.add(new Object[] { 10, 1 }); - testConfigs.add(new Object[] { 10, 10 }); - return testConfigs; - } - - @Rule - public final EnvironmentVariables environmentVariables = new EnvironmentVariables(); - - private int numberOfDataGenerators; - private int numberOfTaskGenerators; - private String sessionId; - - public DirectBenchmarkControllerTest(int numberOfDataGenerators, int numberOfTaskGenerators) { - this.numberOfDataGenerators = numberOfDataGenerators; - this.numberOfTaskGenerators = numberOfTaskGenerators; - this.sessionId = HOBBIT_SESSION_ID + Integer.toString(numberOfDataGenerators) - + Integer.toString(numberOfTaskGenerators); - } - - @Test - public void test() throws Exception { - environmentVariables.set(Constants.RABBIT_MQ_HOST_NAME_KEY, TestConstants.RABBIT_HOST); - environmentVariables.set(Constants.HOBBIT_SESSION_ID_KEY, sessionId); - environmentVariables.set(Constants.BENCHMARK_PARAMETERS_MODEL_KEY, - "{ \"@id\" : \"http://w3id.org/hobbit/experiments#New\", \"@type\" : \"http://w3id.org/hobbit/vocab#Experiment\" }"); - environmentVariables.set(Constants.HOBBIT_EXPERIMENT_URI_KEY, HobbitExperiments.getExperimentURI(sessionId)); - // Needed for the generators - environmentVariables.set(Constants.GENERATOR_ID_KEY, "0"); - environmentVariables.set(Constants.GENERATOR_COUNT_KEY, "1"); - - dummyPlatformController = new DummyPlatformController(sessionId); - try { - DummyComponentExecutor dummyPlatformExecutor = new DummyComponentExecutor(dummyPlatformController); - Thread dummyPlatformThread = new Thread(dummyPlatformExecutor); - dummyPlatformThread.start(); - dummyPlatformController.waitForControllerBeingReady(); - - AbstractBenchmarkController controller = this; - DummyComponentExecutor controllerExecutor = new DummyComponentExecutor(controller); - Thread controllerThread = new Thread(controllerExecutor); - controllerThread.start(); - // wait for the benchmark controller to start - - Thread.sleep(10000); - dummyPlatformController.sendToCmdQueue(Constants.HOBBIT_SESSION_ID_FOR_BROADCASTS, - Commands.DOCKER_CONTAINER_TERMINATED, RabbitMQUtils.writeByteArrays(null, - new byte[][] { RabbitMQUtils.writeString(SYSTEM_CONTAINER_ID) }, new byte[] { (byte) 0 }), - null); - Thread.sleep(10000); - - for (Thread t : dummyPlatformController.dataGenThreads) { - t.join(10000); - Assert.assertFalse(t.isAlive()); - } - for (Thread t : dummyPlatformController.taskGenThreads) { - t.join(10000); - Assert.assertFalse(t.isAlive()); - } - - for (DummyComponentExecutor executor : dummyPlatformController.dataGenExecutors) { - Assert.assertTrue(executor.isSuccess()); - } - for (DummyComponentExecutor executor : dummyPlatformController.taskGenExecutors) { - Assert.assertTrue(executor.isSuccess()); - } - - // Make sure that the benchmark controller terminates during the - // next seconds - controllerThread.join(5000); - Assert.assertFalse(controllerThread.isAlive()); - } finally { - dummyPlatformController.terminate(); - for (DummyComponentExecutor executor : dummyPlatformController.dataGenExecutors) { - try { - IOUtils.closeQuietly(executor.getComponent()); - } catch (Exception e) { - e.printStackTrace(); - } - } - for (DummyComponentExecutor executor : dummyPlatformController.taskGenExecutors) { - try { - IOUtils.closeQuietly(executor.getComponent()); - } catch (Exception e) { - e.printStackTrace(); - } - } - close(); - } - } - - @Override - public void init() throws Exception { - super.init(); - - // create data generators - createDataGenerators(DATA_GEN_IMAGE, numberOfDataGenerators, null,dummyPlatformController); - - // Create task generators - createTaskGenerators(TASK_GEN_IMAGE, numberOfTaskGenerators, null, dummyPlatformController); - - // Create evaluation storage - createEvaluationStorage(EVAL_IMAGE, null, dummyPlatformController); - - // Wait for all components to finish their initialization - waitForComponentsToInitialize(); - } - - @Override - protected void executeBenchmark() throws Exception { - // give the start signals - sendToCmdQueue(Commands.TASK_GENERATOR_START_SIGNAL); - sendToCmdQueue(Commands.DATA_GENERATOR_START_SIGNAL); - - // wait for the data generators to finish their work - waitForDataGenToFinish(); - - // wait for the task generators to finish their work - waitForTaskGenToFinish(); - - // wait for the system to terminate - waitForSystemToFinish(); - - // Create the evaluation module - - // wait for the evaluation to finish - // waitForEvalComponentsToFinish(); - - // the evaluation module should have sent an RDF model containing the - // results. We should add the configuration of the benchmark to this - // model. - // this.resultModel.add(...); - - // Send the resultModul to the platform controller and terminate - sendResultModel(ModelFactory.createDefaultModel()); - } - - protected static class DummyPlatformController extends AbstractDummyPlatformController { - - public List dataGenExecutors = new ArrayList(); - public List dataGenThreads = new ArrayList<>(); - public List taskGenExecutors = new ArrayList(); - public List taskGenThreads = new ArrayList<>(); - public Random random = new Random(); - - private String sessionId; - - public DummyPlatformController(String sessionId) { - super(); - this.sessionId = sessionId; - } - public void receiveCommand(byte command, byte[] data, String sessionId, AMQP.BasicProperties props) { - System.out.println("Inside static : "+this); - System.out.println("command : "+ command); - createDummyComponent(command, data, sessionId, props); - } - public void createDummyComponent(byte command, byte[] data, String sessionId, AMQP.BasicProperties props) { - String replyTo = null; - if (props != null) { - replyTo = props.getReplyTo(); - } - - LOGGER.info("received command: session={}, command={}, data={}", sessionId, Commands.toString(command), - data != null ? RabbitMQUtils.readString(data) : "null"); - if (command == Commands.BENCHMARK_READY_SIGNAL) { - System.out.println("Benchmark Ready!"); - try { - sendToCmdQueue(sessionId, Commands.START_BENCHMARK_SIGNAL, - RabbitMQUtils.writeString(SYSTEM_CONTAINER_ID), null); - } catch (IOException e) { - e.printStackTrace(); - Assert.fail(e.getLocalizedMessage()); - } - } else if (command == Commands.DOCKER_CONTAINER_START) { - try { - String startCommandJson = RabbitMQUtils.readString(data); - final String containerId = Integer.toString(random.nextInt()); - - AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder(); - propsBuilder.deliveryMode(2); - propsBuilder.correlationId(props.getCorrelationId()); - AMQP.BasicProperties replyProps = propsBuilder.build(); - - if (startCommandJson.contains(DATA_GEN_IMAGE)) { - // Create data generators that are waiting for a random - // amount of time and terminate after that - DummyComponentExecutor dataGenExecutor = new DummyComponentExecutor( - new AbstractDataGenerator() { - @Override - protected void generateData() throws Exception { - LOGGER.debug("Data Generator started..."); - Thread.sleep(1000 + random.nextInt(1000)); - } - }) { - @Override - public void run() { - super.run(); - try { - sendToCmdQueue(Constants.HOBBIT_SESSION_ID_FOR_BROADCASTS, - Commands.DOCKER_CONTAINER_TERMINATED, - RabbitMQUtils.writeByteArrays(null, - new byte[][] { RabbitMQUtils.writeString(containerId) }, - new byte[] { (byte) 0 }), - null); - } catch (IOException e) { - e.printStackTrace(); - success = false; - } - } - }; - dataGenExecutors.add(dataGenExecutor); - Thread t = new Thread(dataGenExecutor); - dataGenThreads.add(t); - t.start(); - - cmdChannel.basicPublish("", replyTo, replyProps, - RabbitMQUtils.writeString(containerId)); - } else if (startCommandJson.contains(TASK_GEN_IMAGE)) { - // Create task generators that are waiting for a random - // amount of - // time and terminate after that - DummyComponentExecutor taskGenExecutor = new DummyComponentExecutor( - new AbstractTaskGenerator() { - @Override - public void run() throws Exception { - LOGGER.debug("Task Generator started..."); - super.run(); - } - - @Override - protected void generateTask(byte[] data) throws Exception { - } - }) { - @Override - public void run() { - super.run(); - try { - sendToCmdQueue(Constants.HOBBIT_SESSION_ID_FOR_BROADCASTS, - Commands.DOCKER_CONTAINER_TERMINATED, - RabbitMQUtils.writeByteArrays(null, - new byte[][] { RabbitMQUtils.writeString(containerId) }, - new byte[] { (byte) 0 }), - null); - } catch (IOException e) { - e.printStackTrace(); - success = false; - } - } - }; - taskGenExecutors.add(taskGenExecutor); - Thread t = new Thread(taskGenExecutor); - taskGenThreads.add(t); - t.start(); - - cmdChannel.basicPublish("", replyTo, replyProps, - RabbitMQUtils.writeString(containerId)); - } else if (startCommandJson.contains(EVAL_IMAGE)) { - cmdChannel.basicPublish("", replyTo, replyProps, - RabbitMQUtils.writeString(containerId)); - sendToCmdQueue(this.sessionId, Commands.EVAL_STORAGE_READY_SIGNAL, null, null); - } else { - LOGGER.error("Got unknown start command. Ignoring it."); - } - } catch (IOException e) { - LOGGER.error("Exception while trying to respond to a container creation command.", e); - } - } - } - } -} From a57e2e14d36fb15ae1ab5ca58dedd2d33f1c5b45 Mon Sep 17 00:00:00 2001 From: sourabhpoddar404 Date: Sat, 4 Jul 2020 15:05:46 +0200 Subject: [PATCH 5/6] Feedback changes Co-authored-by: Sourabh <56801226+sourabhpoddar404@users.noreply.github.com> Co-authored-by: Altafhusen <57062112+altafhusen-mr@users.noreply.github.com> Co-authored-by: Yamini <29372422+Yamini19@users.noreply.github.com> Co-authored-by: Melissa <21376912+melissadas@users.noreply.github.com> --- .../AbstractBenchmarkController.java | 13 +- .../AbstractCommandReceivingComponent.java | 6 +- .../AbstractPlatformController.java | 17 ++ .../components/AbstractTaskGenerator.java | 1 - .../containerservice/ContainerCreation.java | 52 ++++- .../DirectContainerCreator.java | 187 +++++++++++------- .../RabbitMQContainerCreator.java | 21 +- .../components/BenchmarkControllerTest.java | 10 +- .../ContainerCreationNoCorrelationTest.java | 7 + .../components/ContainerCreationTest.java | 7 + .../AbstractDummyPlatformController.java | 7 +- .../components/dummy/DummyDataCreator.java | 1 - .../mimic/DockerBasedMimickingAlgTest.java | 6 + 13 files changed, 218 insertions(+), 117 deletions(-) create mode 100644 src/main/java/org/hobbit/core/components/AbstractPlatformController.java diff --git a/src/main/java/org/hobbit/core/components/AbstractBenchmarkController.java b/src/main/java/org/hobbit/core/components/AbstractBenchmarkController.java index 08c64d7..f174798 100644 --- a/src/main/java/org/hobbit/core/components/AbstractBenchmarkController.java +++ b/src/main/java/org/hobbit/core/components/AbstractBenchmarkController.java @@ -181,9 +181,9 @@ public void run() throws Exception { * @param envVariables * environment variables for the data generators */ - protected void createDataGenerators(String dataGeneratorImageName, int numberOfDataGenerators, + protected Set createDataGenerators(String dataGeneratorImageName, int numberOfDataGenerators, String[] envVariables) { - createGenerator(dataGeneratorImageName, numberOfDataGenerators, envVariables, dataGenContainerIds); + return createGenerator(dataGeneratorImageName, numberOfDataGenerators, envVariables); } /** @@ -197,9 +197,9 @@ protected void createDataGenerators(String dataGeneratorImageName, int numberOfD * @param envVariables * environment variables for the task generators */ - protected void createTaskGenerators(String taskGeneratorImageName, int numberOfTaskGenerators, + protected Set createTaskGenerators(String taskGeneratorImageName, int numberOfTaskGenerators, String[] envVariables) { - createGenerator(taskGeneratorImageName, numberOfTaskGenerators, envVariables, taskGenContainerIds); + return createGenerator(taskGeneratorImageName, numberOfTaskGenerators, envVariables); } /** @@ -214,8 +214,8 @@ protected void createTaskGenerators(String taskGeneratorImageName, int numberOfT * @param generatorIds * set of generator container names */ - public void createGenerator(String generatorImageName, int numberOfGenerators, String[] envVariables, - Set generatorIds) { + public Set createGenerator(String generatorImageName, int numberOfGenerators, String[] envVariables) { + Set generatorIds = new HashSet<>(); String containerId; String variables[] = envVariables != null ? Arrays.copyOf(envVariables, envVariables.length + 2) : new String[2]; @@ -234,6 +234,7 @@ public void createGenerator(String generatorImageName, int numberOfGenerators, S throw new IllegalStateException(errorMsg); } } + return generatorIds; } /** diff --git a/src/main/java/org/hobbit/core/components/AbstractCommandReceivingComponent.java b/src/main/java/org/hobbit/core/components/AbstractCommandReceivingComponent.java index 734ba81..56ea145 100644 --- a/src/main/java/org/hobbit/core/components/AbstractCommandReceivingComponent.java +++ b/src/main/java/org/hobbit/core/components/AbstractCommandReceivingComponent.java @@ -64,7 +64,7 @@ public abstract class AbstractCommandReceivingComponent extends AbstractComponen /** * Name of this Docker container. */ - protected String containerName; + private String containerName; /** * Name of the queue that is used to receive responses for messages that are * sent via the command queue and for which an answer is expected. @@ -579,10 +579,6 @@ public void setGson(Gson gson) { this.gson = gson; } - public void createDummyComponent(byte command, byte[] data, String sessionId, BasicProperties props) { - // TODO Auto-generated method stub - - } diff --git a/src/main/java/org/hobbit/core/components/AbstractPlatformController.java b/src/main/java/org/hobbit/core/components/AbstractPlatformController.java new file mode 100644 index 0000000..548dfa2 --- /dev/null +++ b/src/main/java/org/hobbit/core/components/AbstractPlatformController.java @@ -0,0 +1,17 @@ +package org.hobbit.core.components; + +import com.rabbitmq.client.AMQP; + +public abstract class AbstractPlatformController extends AbstractCommandReceivingComponent{ + + public AbstractPlatformController() { + super(); + } + + public AbstractPlatformController(boolean execCommandsInParallel) { + super(execCommandsInParallel); + } + + public abstract void createComponent(byte command, byte[] data, String sessionId, AMQP.BasicProperties props); + +} diff --git a/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java b/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java index c66b0cd..5fb0cf0 100644 --- a/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java +++ b/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java @@ -132,7 +132,6 @@ public void init() throws Exception { dataGenReceiver = DataReceiverImpl.builder().dataHandler(new DataHandler() { @Override public void handleData(byte[] data) { - receiveGeneratedData(data); } }).maxParallelProcessedMsgs(maxParallelProcessedMsgs).queue(getFactoryForIncomingDataQueues(), diff --git a/src/main/java/org/hobbit/core/containerservice/ContainerCreation.java b/src/main/java/org/hobbit/core/containerservice/ContainerCreation.java index 45ec4cb..fa7aae4 100644 --- a/src/main/java/org/hobbit/core/containerservice/ContainerCreation.java +++ b/src/main/java/org/hobbit/core/containerservice/ContainerCreation.java @@ -1,6 +1,7 @@ package org.hobbit.core.containerservice; -import org.hobbit.core.components.AbstractCommandReceivingComponent; +import java.util.Set; +import org.hobbit.core.components.AbstractPlatformController; /** * ContainerCreation provides the facility to implement the functionalities * to create {@link DirectContainerCreator} or {@link RabbitMQContainerCreator} @@ -8,14 +9,45 @@ * */ public interface ContainerCreation { - - void createDataGenerators(String dataGeneratorImageName, int numberOfDataGenerators, - String[] envVariables, AbstractCommandReceivingComponent dummyComponent); - - void createTaskGenerators(String taskGeneratorImageName, int numberOfTaskGenerators, - String[] envVariables, AbstractCommandReceivingComponent dummyComponent); - - void createEvaluationStorage(String evalStorageImageName, String[] envVariables, - AbstractCommandReceivingComponent dummyComponent); + + /** + * Creates the given number of data generators using the given image name + * and environment variables. + * + * @param dataGeneratorImageName + * name of the data generator Docker image + * @param numberOfDataGenerators + * number of generators that should be created + * @param envVariables + * environment variables for the data generators + */ + Set createDataGenerators(String dataGeneratorImageName, int numberOfDataGenerators, + String[] envVariables, AbstractPlatformController dummyComponent); + + /** + * Creates the given number of task generators using the given image name + * and environment variables. + * + * @param taskGeneratorImageName + * name of the task generator Docker image + * @param numberOfTaskGenerators + * number of generators that should be created + * @param envVariables + * environment variables for the task generators + */ + Set createTaskGenerators(String taskGeneratorImageName, int numberOfTaskGenerators, + String[] envVariables, AbstractPlatformController dummyComponent); + + /** + * Creates the evaluate storage using the given image name and environment + * variables. + * + * @param evalStorageImageName + * name of the evaluation storage image + * @param envVariables + * environment variables that should be given to the component + */ + String createEvaluationStorage(String evalStorageImageName, String[] envVariables, + AbstractPlatformController dummyComponent); } diff --git a/src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java b/src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java index e5d7036..da88957 100644 --- a/src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java +++ b/src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java @@ -2,19 +2,24 @@ import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.HashSet; import java.util.Set; import java.util.UUID; import java.util.concurrent.Future; +import java.util.stream.Stream; import org.apache.commons.io.Charsets; import org.hobbit.core.Commands; import org.hobbit.core.Constants; import org.hobbit.core.components.AbstractBenchmarkController; import org.hobbit.core.components.AbstractCommandReceivingComponent; +import org.hobbit.core.components.AbstractPlatformController; import org.hobbit.core.data.StartCommandData; import org.hobbit.core.rabbit.RabbitMQUtils; +import org.hobbit.utils.EnvVariables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.util.concurrent.SettableFuture; +import com.google.gson.Gson; import com.rabbitmq.client.AMQP.BasicProperties; /** @@ -24,17 +29,24 @@ */ public class DirectContainerCreator implements ContainerCreation { - private static final Logger LOGGER = LoggerFactory.getLogger(DirectContainerCreator.class); + private static final Logger LOGGER = LoggerFactory.getLogger(DirectContainerCreator.class); - AbstractCommandReceivingComponent directComponent = null; + private AbstractPlatformController platformController = null; - private AbstractBenchmarkController abstractBenchmarkController; + private AbstractCommandReceivingComponent abstractCommandReceivingComponent; - public DirectContainerCreator(AbstractBenchmarkController abstractBenchmarkController) { - this.abstractBenchmarkController = abstractBenchmarkController; - } + private String containerName; + + private String hobbitSessionId; + + public DirectContainerCreator(AbstractBenchmarkController abstractBenchmarkController) { + this.abstractCommandReceivingComponent = abstractBenchmarkController; + hobbitSessionId = EnvVariables.getString(Constants.HOBBIT_SESSION_ID_KEY, + Constants.HOBBIT_SESSION_ID_FOR_PLATFORM_COMPONENTS); + containerName = EnvVariables.getString(Constants.CONTAINER_NAME_KEY, containerName); + } - /** + /** * This method creates and starts an instance of the given image using the given * environment variables. *

@@ -56,41 +68,40 @@ public DirectContainerCreator(AbstractBenchmarkController abstractBenchmarkContr * network aliases that should be added to the created container * @return the Future object with the name of the container instance or null if an error occurred */ - protected Future createContainer(String imageName, String containerType, String[] envVariables, String[] netAliases) { + protected Future createContainer(String imageName, String containerType, String[] envVariables, String[] netAliases) { try { - envVariables = abstractBenchmarkController.extendContainerEnvVariables(envVariables); - - abstractBenchmarkController.initResponseQueue(); - String correlationId = UUID.randomUUID().toString(); + Gson gson = new Gson(); + envVariables = extendContainerEnvVariables(envVariables); + abstractCommandReceivingComponent.initResponseQueue(); + String correlationId = UUID.randomUUID().toString(); SettableFuture containerFuture = SettableFuture.create(); - - synchronized (abstractBenchmarkController.getResponseFutures()) { - abstractBenchmarkController.getResponseFutures().put(correlationId, containerFuture); + synchronized (abstractCommandReceivingComponent.getResponseFutures()) { + abstractCommandReceivingComponent.getResponseFutures().put(correlationId, containerFuture); } byte data[] = RabbitMQUtils.writeString( - abstractBenchmarkController.getGson().toJson(new StartCommandData(imageName, containerType, abstractBenchmarkController.getContainerName(), envVariables, netAliases))); + gson.toJson(new StartCommandData(imageName, containerType, containerName, envVariables, netAliases))); BasicProperties.Builder propsBuilder = new BasicProperties.Builder(); propsBuilder.deliveryMode(2); - propsBuilder.replyTo(abstractBenchmarkController.getResponseQueueName()); + propsBuilder.replyTo(abstractCommandReceivingComponent.getResponseQueueName()); propsBuilder.correlationId(correlationId); BasicProperties props = propsBuilder.build(); - byte sessionIdBytes[] = abstractBenchmarkController.getHobbitSessionId().getBytes(Charsets.UTF_8); - int dataLength = sessionIdBytes.length + 5; - boolean attachData = (data != null) && (data.length > 0); - if (attachData) { - dataLength += data.length; - } - ByteBuffer buffer = ByteBuffer.allocate(dataLength); - buffer.putInt(sessionIdBytes.length); - buffer.put(sessionIdBytes); - buffer.put(Commands.DOCKER_CONTAINER_START); - if (attachData) { - buffer.put(data); - } - byte sessionIdBytes1[] = new byte[sessionIdBytes.length]; - String sessionId = new String(sessionIdBytes1, Charsets.UTF_8); - byte command = Commands.DOCKER_CONTAINER_START; - directComponent.createDummyComponent(command, data, sessionId, props); + byte sessionIdBytes[] = hobbitSessionId.getBytes(Charsets.UTF_8); + int dataLength = sessionIdBytes.length + 5; + boolean attachData = (data != null) && (data.length > 0); + if (attachData) { + dataLength += data.length; + } + ByteBuffer buffer = ByteBuffer.allocate(dataLength); + buffer.putInt(sessionIdBytes.length); + buffer.put(sessionIdBytes); + buffer.put(Commands.DOCKER_CONTAINER_START); + if (attachData) { + buffer.put(data); + } + byte sessionIdBytes1[] = new byte[sessionIdBytes.length]; + String sessionId = new String(sessionIdBytes1, Charsets.UTF_8); + byte command = Commands.DOCKER_CONTAINER_START; + platformController.createComponent(command, data, sessionId, props); return containerFuture; } catch (Exception e) { LOGGER.error("Got exception while trying to request the creation of an instance of the \"" + imageName @@ -99,7 +110,7 @@ protected Future createContainer(String imageName, String containerType, return null; } - /** + /** * Creates the given number of data generators using the given image name * and environment variables. * @@ -110,13 +121,14 @@ protected Future createContainer(String imageName, String containerType, * @param envVariables * environment variables for the data generators */ - public void createDataGenerators(String dataGeneratorImageName, int numberOfDataGenerators, - String[] envVariables, AbstractCommandReceivingComponent dummyComponent) { - this.directComponent = dummyComponent; - createGenerator(dataGeneratorImageName, numberOfDataGenerators, envVariables, abstractBenchmarkController.getDataGenContainerIds()); + @Override + public Set createDataGenerators(String dataGeneratorImageName, int numberOfDataGenerators, + String[] envVariables, AbstractPlatformController dummyComponent) { + this.platformController = dummyComponent; + return createGenerator(dataGeneratorImageName, numberOfDataGenerators, envVariables); } - /** + /** * Creates the given number of task generators using the given image name * and environment variables. * @@ -127,13 +139,14 @@ public void createDataGenerators(String dataGeneratorImageName, int numberOfData * @param envVariables * environment variables for the task generators */ - public void createTaskGenerators(String taskGeneratorImageName, int numberOfTaskGenerators, - String[] envVariables, AbstractCommandReceivingComponent dummyComponent) { - this.directComponent = dummyComponent; - createGenerator(taskGeneratorImageName, numberOfTaskGenerators, envVariables, abstractBenchmarkController.getTaskGenContainerIds()); + @Override + public Set createTaskGenerators(String taskGeneratorImageName, int numberOfTaskGenerators, + String[] envVariables, AbstractPlatformController dummyComponent) { + this.platformController = dummyComponent; + return createGenerator(taskGeneratorImageName, numberOfTaskGenerators, envVariables); } - /** + /** * Creates the evaluate storage using the given image name and environment * variables. * @@ -142,19 +155,21 @@ public void createTaskGenerators(String taskGeneratorImageName, int numberOfTask * @param envVariables * environment variables that should be given to the component */ - - public void createEvaluationStorage(String evalStorageImageName, String[] envVariables, - AbstractCommandReceivingComponent dummyComponent) { - this.directComponent = dummyComponent; - abstractBenchmarkController.setEvalStoreContainerId(abstractBenchmarkController.createContainer(evalStorageImageName, Constants.CONTAINER_TYPE_DATABASE, envVariables)); - if (abstractBenchmarkController.getEvalStoreContainerId() == null) { + @Override + public String createEvaluationStorage(String evalStorageImageName, String[] envVariables, + AbstractPlatformController dummyComponent) { + this.platformController = dummyComponent; + String evaluationStoreContainerId = null; + evaluationStoreContainerId = abstractCommandReceivingComponent.createContainer(evalStorageImageName, Constants.CONTAINER_TYPE_DATABASE, envVariables); + if (evaluationStoreContainerId == null) { String errorMsg = "Couldn't create evaluation storage. Aborting."; LOGGER.error(errorMsg); throw new IllegalStateException(errorMsg); } + return evaluationStoreContainerId; } - /** + /** * Internal method for creating generator components. * * @param generatorImageName @@ -166,30 +181,50 @@ public void createEvaluationStorage(String evalStorageImageName, String[] envVar * @param generatorIds * set of generator container names */ - public void createGenerator(String generatorImageName, int numberOfGenerators, String[] envVariables, - Set generatorIds) { - try { - String containerId; - String variables[] = envVariables != null ? Arrays.copyOf(envVariables, envVariables.length + 2) - : new String[2]; - // NOTE: Count only includes generators created within this method call. - variables[variables.length - 2] = Constants.GENERATOR_COUNT_KEY + "=" + numberOfGenerators; - for (int i = 0; i < numberOfGenerators; ++i) { - // At the start generatorIds is empty, and new generators are added to it immediately. - // Current size of that set is used to make IDs for new generators. - variables[variables.length - 1] = Constants.GENERATOR_ID_KEY + "=" + generatorIds.size(); - containerId = createContainer(generatorImageName, null, envVariables, null).get();// createContainer(generatorImageName, variables); - if (containerId != null) { - generatorIds.add(containerId); - } else { - String errorMsg = "Couldn't create generator component. Aborting."; - LOGGER.error(errorMsg); - throw new IllegalStateException(errorMsg); - } - } - }catch(Exception e) { - - } + public Set createGenerator(String generatorImageName, int numberOfGenerators, String[] envVariables) { + Set generatorIds = new HashSet<>(); + try { + String containerId; + String variables[] = envVariables != null ? Arrays.copyOf(envVariables, envVariables.length + 2) + : new String[2]; + // NOTE: Count only includes generators created within this method call. + variables[variables.length - 2] = Constants.GENERATOR_COUNT_KEY + "=" + numberOfGenerators; + for (int i = 0; i < numberOfGenerators; ++i) { + // At the start generatorIds is empty, and new generators are added to it immediately. + // Current size of that set is used to make IDs for new generators. + variables[variables.length - 1] = Constants.GENERATOR_ID_KEY + "=" + generatorIds.size(); + containerId = createContainer(generatorImageName, null, envVariables, null).get();// createContainer(generatorImageName, variables); + if (containerId != null) { + generatorIds.add(containerId); + } else { + String errorMsg = "Couldn't create generator component. Aborting."; + LOGGER.error(errorMsg); + throw new IllegalStateException(errorMsg); + } + } + }catch(Exception e) { + LOGGER.error("Exception occured",e); + } + return generatorIds; + } + + public String[] extendContainerEnvVariables(String[] envVariables) { + String rabbitMQHostName = EnvVariables.getString(Constants.RABBIT_MQ_HOST_NAME_KEY, LOGGER); + if (envVariables == null) { + envVariables = new String[0]; + } + + // Only add RabbitMQ host env if there isn't any. + if (Stream.of(envVariables).noneMatch(kv -> kv.startsWith(Constants.RABBIT_MQ_HOST_NAME_KEY + "="))) { + envVariables = Arrays.copyOf(envVariables, envVariables.length + 2); + envVariables[envVariables.length - 2] = Constants.RABBIT_MQ_HOST_NAME_KEY + "=" + rabbitMQHostName; + } else { + envVariables = Arrays.copyOf(envVariables, envVariables.length + 1); + } + + envVariables[envVariables.length - 1] = Constants.HOBBIT_SESSION_ID_KEY + "=" + hobbitSessionId; + return envVariables; } + } \ No newline at end of file diff --git a/src/main/java/org/hobbit/core/containerservice/RabbitMQContainerCreator.java b/src/main/java/org/hobbit/core/containerservice/RabbitMQContainerCreator.java index a093523..f4b7ed9 100644 --- a/src/main/java/org/hobbit/core/containerservice/RabbitMQContainerCreator.java +++ b/src/main/java/org/hobbit/core/containerservice/RabbitMQContainerCreator.java @@ -1,8 +1,11 @@ package org.hobbit.core.containerservice; +import java.util.Set; + import org.hobbit.core.Constants; import org.hobbit.core.components.AbstractBenchmarkController; import org.hobbit.core.components.AbstractCommandReceivingComponent; +import org.hobbit.core.components.AbstractPlatformController; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,9 +29,9 @@ public RabbitMQContainerCreator(AbstractBenchmarkController abstractBenchmarkCon * is already implemented there */ @Override - public void createDataGenerators(String dataGeneratorImageName, int numberOfDataGenerators, String[] envVariables, - AbstractCommandReceivingComponent dummyComponent) { - abstractBenchmarkController.createGenerator(dataGeneratorImageName, numberOfDataGenerators, envVariables, abstractBenchmarkController.getDataGenContainerIds()); + public Set createDataGenerators(String dataGeneratorImageName, int numberOfDataGenerators, String[] envVariables, + AbstractPlatformController dummyComponent) { + return abstractBenchmarkController.createGenerator(dataGeneratorImageName, numberOfDataGenerators, envVariables); } @@ -37,9 +40,9 @@ public void createDataGenerators(String dataGeneratorImageName, int numberOfData * is already implemented there */ @Override - public void createTaskGenerators(String taskGeneratorImageName, int numberOfTaskGenerators, String[] envVariables, - AbstractCommandReceivingComponent dummyComponent) { - abstractBenchmarkController.createGenerator(taskGeneratorImageName, numberOfTaskGenerators, envVariables, abstractBenchmarkController.getTaskGenContainerIds()); + public Set createTaskGenerators(String taskGeneratorImageName, int numberOfTaskGenerators, String[] envVariables, + AbstractPlatformController dummyComponent) { + return abstractBenchmarkController.createGenerator(taskGeneratorImageName, numberOfTaskGenerators, envVariables); } @@ -48,15 +51,15 @@ public void createTaskGenerators(String taskGeneratorImageName, int numberOfTask * is already implemented there */ @Override - public void createEvaluationStorage(String evalStorageImageName, String[] envVariables, - AbstractCommandReceivingComponent dummyComponent) { + public String createEvaluationStorage(String evalStorageImageName, String[] envVariables, + AbstractPlatformController dummyComponent) { abstractBenchmarkController.setEvalStoreContainerId(abstractBenchmarkController.createContainer(evalStorageImageName, Constants.CONTAINER_TYPE_DATABASE, envVariables)); if (abstractBenchmarkController.getEvalStoreContainerId() == null) { String errorMsg = "Couldn't create evaluation storage. Aborting."; LOGGER.error(errorMsg); throw new IllegalStateException(errorMsg); } - + return abstractBenchmarkController.getEvalStoreContainerId(); } } diff --git a/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java b/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java index 65226cf..f58e5af 100644 --- a/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java +++ b/src/test/java/org/hobbit/core/components/BenchmarkControllerTest.java @@ -160,13 +160,13 @@ public void init() throws Exception { super.init(); // create data generators - containerCreation.createDataGenerators(DATA_GEN_IMAGE, numberOfDataGenerators, null, dummyPlatformController); + dataGenContainerIds = containerCreation.createDataGenerators(DATA_GEN_IMAGE, numberOfDataGenerators, null, dummyPlatformController); // Create task generators - containerCreation.createTaskGenerators(TASK_GEN_IMAGE, numberOfTaskGenerators, null, dummyPlatformController); + taskGenContainerIds = containerCreation.createTaskGenerators(TASK_GEN_IMAGE, numberOfTaskGenerators, null, dummyPlatformController); // Create evaluation storage - containerCreation.createEvaluationStorage(EVAL_IMAGE, null, dummyPlatformController); + evalStoreContainerId = containerCreation.createEvaluationStorage(EVAL_IMAGE, null, dummyPlatformController); // Wait for all components to finish their initialization waitForComponentsToInitialize(); @@ -217,10 +217,10 @@ public DummyPlatformController(String sessionId) { } public void receiveCommand(byte command, byte[] data, String sessionId, AMQP.BasicProperties props) { - createDummyComponent(command, data, sessionId, props); + createComponent(command, data, sessionId, props); } - public void createDummyComponent(byte command, byte[] data, String sessionId, AMQP.BasicProperties props) { + public void createComponent(byte command, byte[] data, String sessionId, AMQP.BasicProperties props) { String replyTo = null; if (props != null) { replyTo = props.getReplyTo(); diff --git a/src/test/java/org/hobbit/core/components/ContainerCreationNoCorrelationTest.java b/src/test/java/org/hobbit/core/components/ContainerCreationNoCorrelationTest.java index 2896915..4511e46 100644 --- a/src/test/java/org/hobbit/core/components/ContainerCreationNoCorrelationTest.java +++ b/src/test/java/org/hobbit/core/components/ContainerCreationNoCorrelationTest.java @@ -20,6 +20,7 @@ import java.util.Random; import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.MessageProperties; import org.hobbit.core.rabbit.RabbitMQUtils; import java.io.IOException; @@ -95,6 +96,12 @@ public DummyPlatformController(String sessionId) { } public void receiveCommand(byte command, byte[] data, String sessionId, AMQP.BasicProperties props) { + createComponent(command, data, sessionId, props); + } + + @Override + public void createComponent(byte command, byte[] data, String sessionId, BasicProperties props) { + // TODO Auto-generated method stub if (command == Commands.DOCKER_CONTAINER_START) { String[] envVars = gson.fromJson(RabbitMQUtils.readString(data), StartCommandData.class).getEnvironmentVariables(); String containerId = Stream.of(envVars).filter(kv -> kv.startsWith("ID=")).findAny().get().split("=", 2)[1]; diff --git a/src/test/java/org/hobbit/core/components/ContainerCreationTest.java b/src/test/java/org/hobbit/core/components/ContainerCreationTest.java index 3dc49a1..179ee2d 100644 --- a/src/test/java/org/hobbit/core/components/ContainerCreationTest.java +++ b/src/test/java/org/hobbit/core/components/ContainerCreationTest.java @@ -20,6 +20,7 @@ import java.util.Random; import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.MessageProperties; import org.hobbit.core.rabbit.RabbitMQUtils; import java.io.IOException; @@ -111,6 +112,12 @@ public DummyPlatformController(String sessionId) { } public void receiveCommand(byte command, byte[] data, String sessionId, AMQP.BasicProperties props) { + createComponent(command, data, sessionId, props); + } + + @Override + public void createComponent(byte command, byte[] data, String sessionId, BasicProperties props) { + // TODO Auto-generated method stub if (command == Commands.DOCKER_CONTAINER_START) { String[] envVars = gson.fromJson(RabbitMQUtils.readString(data), StartCommandData.class).getEnvironmentVariables(); String containerId = Stream.of(envVars).filter(kv -> kv.startsWith("ID=")).findAny().get().split("=", 2)[1]; diff --git a/src/test/java/org/hobbit/core/components/dummy/AbstractDummyPlatformController.java b/src/test/java/org/hobbit/core/components/dummy/AbstractDummyPlatformController.java index 239ace7..f722226 100644 --- a/src/test/java/org/hobbit/core/components/dummy/AbstractDummyPlatformController.java +++ b/src/test/java/org/hobbit/core/components/dummy/AbstractDummyPlatformController.java @@ -19,18 +19,16 @@ import org.hobbit.core.Constants; import org.hobbit.core.rabbit.RabbitMQUtils; import java.io.IOException; - import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AMQP.BasicProperties; import org.apache.commons.io.Charsets; import java.nio.ByteBuffer; import java.util.concurrent.Semaphore; -import org.hobbit.core.components.AbstractCommandReceivingComponent; - +import org.hobbit.core.components.AbstractPlatformController; import org.junit.Ignore; @Ignore -public abstract class AbstractDummyPlatformController extends AbstractCommandReceivingComponent { +public abstract class AbstractDummyPlatformController extends AbstractPlatformController { private boolean readyFlag = false; private Semaphore terminationMutex = new Semaphore(0); @@ -98,6 +96,7 @@ public void receiveCommand(byte command, byte[] data) { public abstract void receiveCommand(byte command, byte[] data, String sessionId, AMQP.BasicProperties props); + public abstract void createComponent(byte command, byte[] data, String sessionId, BasicProperties props); public void terminate() { terminationMutex.release(); } diff --git a/src/test/java/org/hobbit/core/components/dummy/DummyDataCreator.java b/src/test/java/org/hobbit/core/components/dummy/DummyDataCreator.java index 0d364c4..d333906 100644 --- a/src/test/java/org/hobbit/core/components/dummy/DummyDataCreator.java +++ b/src/test/java/org/hobbit/core/components/dummy/DummyDataCreator.java @@ -34,7 +34,6 @@ protected void generateData() throws Exception { byte data[]; for (int i = 0; i < dataSize; ++i) { data = RabbitMQUtils.writeString(Integer.toString(i)); - sendDataToSystemAdapter(data); sendDataToTaskGenerator(data); } diff --git a/src/test/java/org/hobbit/core/mimic/DockerBasedMimickingAlgTest.java b/src/test/java/org/hobbit/core/mimic/DockerBasedMimickingAlgTest.java index bb62bd7..a1a580e 100644 --- a/src/test/java/org/hobbit/core/mimic/DockerBasedMimickingAlgTest.java +++ b/src/test/java/org/hobbit/core/mimic/DockerBasedMimickingAlgTest.java @@ -30,6 +30,7 @@ import com.google.gson.Gson; import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.MessageProperties; /** @@ -117,6 +118,11 @@ public DummyPlatformController(String sessionId) { } public void receiveCommand(byte command, byte[] data, String sessionId, AMQP.BasicProperties props) { + createComponent(command, data, sessionId, props); + } + + @Override + public void createComponent(byte command, byte[] data, String sessionId, BasicProperties props) { String replyTo = null; if (props != null) { replyTo = props.getReplyTo(); From 8a26f76bd2c759b0c9607bbe0776a578bca82f52 Mon Sep 17 00:00:00 2001 From: melissadas Date: Sun, 5 Jul 2020 20:24:36 +0200 Subject: [PATCH 6/6] Added Javadocs for the components. --- .../components/AbstractTaskGenerator.java | 1 - .../containerservice/ContainerCreation.java | 9 ++- .../ContainerCreationFactory.java | 24 +++++--- .../DirectContainerCreator.java | 17 ++++-- .../RabbitMQContainerCreator.java | 56 ++++++++++++++----- 5 files changed, 77 insertions(+), 30 deletions(-) diff --git a/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java b/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java index 5fb0cf0..b360b27 100644 --- a/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java +++ b/src/main/java/org/hobbit/core/components/AbstractTaskGenerator.java @@ -18,7 +18,6 @@ import java.io.IOException; import java.util.concurrent.Semaphore; - import org.apache.commons.io.IOUtils; import org.hobbit.core.Commands; import org.hobbit.core.Constants; diff --git a/src/main/java/org/hobbit/core/containerservice/ContainerCreation.java b/src/main/java/org/hobbit/core/containerservice/ContainerCreation.java index fa7aae4..2a9437a 100644 --- a/src/main/java/org/hobbit/core/containerservice/ContainerCreation.java +++ b/src/main/java/org/hobbit/core/containerservice/ContainerCreation.java @@ -2,10 +2,15 @@ import java.util.Set; import org.hobbit.core.components.AbstractPlatformController; + /** - * ContainerCreation provides the facility to implement the functionalities + * This interface provides methods that provides functionalities * to create {@link DirectContainerCreator} or {@link RabbitMQContainerCreator} - * @author altaf, sourabh, yamini, melisa + * + * @author Altafhusen Makandar + * @author Sourabh Poddar + * @author Yamini Punetha + * @author Melissa Das * */ public interface ContainerCreation { diff --git a/src/main/java/org/hobbit/core/containerservice/ContainerCreationFactory.java b/src/main/java/org/hobbit/core/containerservice/ContainerCreationFactory.java index d6e84f5..5b30e32 100644 --- a/src/main/java/org/hobbit/core/containerservice/ContainerCreationFactory.java +++ b/src/main/java/org/hobbit/core/containerservice/ContainerCreationFactory.java @@ -4,16 +4,26 @@ import org.hobbit.core.components.AbstractBenchmarkController; /** - * Factory that provides functionality to get an instance of {@link ContainerCreation} - * @author altaf, sourabh, yamini, melisa + * This class provides functionality to obtain an instance of {@link ContainerCreation} + * + * @author Altafhusen Makandar + * @author Sourabh Poddar + * @author Yamini Punetha + * @author Melissa Das * */ public class ContainerCreationFactory { - - /** - * This method returns the instance of {@link RabbitMQContainerCreator} or {@link DirectContainerCreator} - * based on the environment variable {@link Constants#RABBIT_CONTAINER_SERVICE} - */ + + /** + * This static method returns the instance of {@link RabbitMQContainerCreator} or {@link DirectContainerCreator} + * based on the value of the environment variable {@link Constants#RABBIT_CONTAINER_SERVICE} + * + * @param isRabbitContainerService + * the environment variable that decides which instance of {@link ContainerCreation} will be returned. + * @param abstractBenchmarkController + * instance of {@link AbstractBenchmarkController} + * @return an instance of {@link RabbitMQContainerCreator} or {@link DirectContainerCreator} + */ public static ContainerCreation getContainerCreationObject(String isRabbitContainerService, AbstractBenchmarkController abstractBenchmarkController) { if(isRabbitContainerService.equals("true")) { return new RabbitMQContainerCreator(abstractBenchmarkController); diff --git a/src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java b/src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java index da88957..7342869 100644 --- a/src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java +++ b/src/main/java/org/hobbit/core/containerservice/DirectContainerCreator.java @@ -23,8 +23,12 @@ import com.rabbitmq.client.AMQP.BasicProperties; /** - * This class provides the implementation to create container functionality - * @author altaf, sourabh, yamini, melisa + * This class provides the implementation to create container functionality. + * + * @author Altafhusen Makandar + * @author Sourabh Poddar + * @author Yamini Punetha + * @author Melissa Das * */ public class DirectContainerCreator implements ContainerCreation { @@ -34,7 +38,10 @@ public class DirectContainerCreator implements ContainerCreation { private AbstractPlatformController platformController = null; private AbstractCommandReceivingComponent abstractCommandReceivingComponent; - + + /** + * The name of the container instance + */ private String containerName; private String hobbitSessionId; @@ -203,7 +210,7 @@ public Set createGenerator(String generatorImageName, int numberOfGenera } } }catch(Exception e) { - LOGGER.error("Exception occured",e); + LOGGER.error("Exception occurred.",e); } return generatorIds; } @@ -227,4 +234,4 @@ public String[] extendContainerEnvVariables(String[] envVariables) { } -} \ No newline at end of file +} diff --git a/src/main/java/org/hobbit/core/containerservice/RabbitMQContainerCreator.java b/src/main/java/org/hobbit/core/containerservice/RabbitMQContainerCreator.java index f4b7ed9..ec99933 100644 --- a/src/main/java/org/hobbit/core/containerservice/RabbitMQContainerCreator.java +++ b/src/main/java/org/hobbit/core/containerservice/RabbitMQContainerCreator.java @@ -10,8 +10,12 @@ import org.slf4j.LoggerFactory; /** - * Implements functionality for RabbitMQ container creation - * @author altaf, sourabh, yamini, melisa + * This class implements the functionality for RabbitMQ container creation. + * + * @author Altafhusen Makandar + * @author Sourabh Poddar + * @author Yamini Punetha + * @author Melissa Das * */ public class RabbitMQContainerCreator implements ContainerCreation { @@ -24,10 +28,18 @@ public RabbitMQContainerCreator(AbstractBenchmarkController abstractBenchmarkCon this.abstractBenchmarkController = abstractBenchmarkController; } - /** - * This method calls the createGenerator of {@link AbstractBenchmarkController} as the implementation - * is already implemented there - */ + /** + * This method calls the createGenerator method of {@link AbstractBenchmarkController} + * + * @param dataGeneratorImageName + * name of the data generator Docker image + * @param numberOfDataGenerators + * number of generators that should be created + * @param envVariables + * environment variables required for the creation of the data generators + * @param dummyComponent + * + */ @Override public Set createDataGenerators(String dataGeneratorImageName, int numberOfDataGenerators, String[] envVariables, AbstractPlatformController dummyComponent) { @@ -35,10 +47,18 @@ public Set createDataGenerators(String dataGeneratorImageName, int numbe } - /** - * This method calls the createTaskGenerators of {@link AbstractBenchmarkController} as the implementation - * is already implemented there - */ + /** + * This method calls the createTaskGenerators method of {@link AbstractBenchmarkController} + * + * @param taskGeneratorImageName + * name of the task generator Docker image + * @param numberOfTaskGenerators + * number of generators that should be created + * @param envVariables + * environment variables required for the creation of the task generators + * @param dummyComponent + * + */ @Override public Set createTaskGenerators(String taskGeneratorImageName, int numberOfTaskGenerators, String[] envVariables, AbstractPlatformController dummyComponent) { @@ -46,10 +66,16 @@ public Set createTaskGenerators(String taskGeneratorImageName, int numbe } - /** - * This method calls the createEvaluationStorage of {@link AbstractBenchmarkController} as the implementation - * is already implemented there - */ + /** + * This method calls the createEvaluationStorage method of {@link AbstractBenchmarkController} + * + * @param evalStorageImageName + * name of the evaluation storage image + * @param envVariables + * environment variables required for the creation of evaluation storage + * @param dummyComponent + * @return the container id of the evaluation storage. + */ @Override public String createEvaluationStorage(String evalStorageImageName, String[] envVariables, AbstractPlatformController dummyComponent) { @@ -61,5 +87,5 @@ public String createEvaluationStorage(String evalStorageImageName, String[] envV } return abstractBenchmarkController.getEvalStoreContainerId(); } - } +