diff --git a/onebusaway-nyc-admin-webapp/pom.xml b/onebusaway-nyc-admin-webapp/pom.xml
index 08272d134c..223fa0a06a 100644
--- a/onebusaway-nyc-admin-webapp/pom.xml
+++ b/onebusaway-nyc-admin-webapp/pom.xml
@@ -18,10 +18,10 @@
-
+
diff --git a/onebusaway-nyc-gtfsrt-integration-tests/src/integration-test/resources/onebusaway-nyc-gtfsrt-webapp/data-sources.xml b/onebusaway-nyc-gtfsrt-integration-tests/src/integration-test/resources/onebusaway-nyc-gtfsrt-webapp/data-sources.xml
index cb9b9440c2..978947f68e 100644
--- a/onebusaway-nyc-gtfsrt-integration-tests/src/integration-test/resources/onebusaway-nyc-gtfsrt-webapp/data-sources.xml
+++ b/onebusaway-nyc-gtfsrt-integration-tests/src/integration-test/resources/onebusaway-nyc-gtfsrt-webapp/data-sources.xml
@@ -58,7 +58,7 @@
-
diff --git a/onebusaway-nyc-gtfsrt-webapp/src/main/resources/data-sources.xml b/onebusaway-nyc-gtfsrt-webapp/src/main/resources/data-sources.xml
index 29e16b47a4..05c0cc2d2c 100644
--- a/onebusaway-nyc-gtfsrt-webapp/src/main/resources/data-sources.xml
+++ b/onebusaway-nyc-gtfsrt-webapp/src/main/resources/data-sources.xml
@@ -63,14 +63,14 @@
-
-
+
diff --git a/onebusaway-nyc-gtfsrt/src/test/java/org/onebusaway/nyc/gtfsrt/tests/TripUpdateTest.java b/onebusaway-nyc-gtfsrt/src/test/java/org/onebusaway/nyc/gtfsrt/tests/TripUpdateTest.java
index 1803b298e8..8edad19228 100644
--- a/onebusaway-nyc-gtfsrt/src/test/java/org/onebusaway/nyc/gtfsrt/tests/TripUpdateTest.java
+++ b/onebusaway-nyc-gtfsrt/src/test/java/org/onebusaway/nyc/gtfsrt/tests/TripUpdateTest.java
@@ -66,7 +66,6 @@ public TripUpdateTest(String gtfsFile, String defaultAgencyId, String blockTripM
TripUpdateFeedBuilderImpl feedBuilder = new TripUpdateFeedBuilderImpl();
_feedBuilder = feedBuilder;
- _predictionIntegrationService = new QueuePredictionIntegrationServiceImpl();
_predictionIntegrationService.setTransitDataService(_transitDataService);
_predictionIntegrationService.setConfigurationService(new MockConfigurationService());
}
diff --git a/onebusaway-nyc-integration-tests/src/integration-test/resources/onebusaway-nyc-vehicle-tracking-webapp/data-sources.xml b/onebusaway-nyc-integration-tests/src/integration-test/resources/onebusaway-nyc-vehicle-tracking-webapp/data-sources.xml
index 724b9c5184..80ebd3b3f6 100644
--- a/onebusaway-nyc-integration-tests/src/integration-test/resources/onebusaway-nyc-vehicle-tracking-webapp/data-sources.xml
+++ b/onebusaway-nyc-integration-tests/src/integration-test/resources/onebusaway-nyc-vehicle-tracking-webapp/data-sources.xml
@@ -76,7 +76,7 @@
-
+
diff --git a/onebusaway-nyc-ops-api-webapp/src/main/java/org/onebusaway/nyc/ops/queue/OpsInferenceQueueListenerTask.java b/onebusaway-nyc-ops-api-webapp/src/main/java/org/onebusaway/nyc/ops/queue/OpsInferenceQueueListenerTask.java
index b4d1291970..d619d7db10 100644
--- a/onebusaway-nyc-ops-api-webapp/src/main/java/org/onebusaway/nyc/ops/queue/OpsInferenceQueueListenerTask.java
+++ b/onebusaway-nyc-ops-api-webapp/src/main/java/org/onebusaway/nyc/ops/queue/OpsInferenceQueueListenerTask.java
@@ -144,27 +144,11 @@ private void discardRecord(String vehicleId, String contents) {
_locationDao.handleException(contents, e, new Date());
}
- @Override
- public String getQueueHost() {
- return _configurationService.getConfigurationValueAsString(
- "tds.inputQueueHost", null);
- }
-
- @Override
- public String getQueueName() {
- return _configurationService.getConfigurationValueAsString(
- "tds.inputQueueName", null);
- }
public String getQueueDisplayName() {
return "archive_inference";
}
- @Override
- public Integer getQueuePort() {
- return _configurationService.getConfigurationValueAsInteger(
- "tds.inputQueuePort", 5567);
- }
public void logStatus() {
_log.info("obanyc_inferredlocation records count : " + _locationDao.getArchiveInferredLocationCount());
@@ -180,7 +164,6 @@ public void run() {
}
}
-
@SuppressWarnings("unchecked")
@PostConstruct
public void setup() {
diff --git a/onebusaway-nyc-ops-api-webapp/src/main/java/org/onebusaway/nyc/ops/queue/OpsInputQueueListenerTask.java b/onebusaway-nyc-ops-api-webapp/src/main/java/org/onebusaway/nyc/ops/queue/OpsInputQueueListenerTask.java
index 42c2a5fc7f..a6d4ca5acb 100644
--- a/onebusaway-nyc-ops-api-webapp/src/main/java/org/onebusaway/nyc/ops/queue/OpsInputQueueListenerTask.java
+++ b/onebusaway-nyc-ops-api-webapp/src/main/java/org/onebusaway/nyc/ops/queue/OpsInputQueueListenerTask.java
@@ -27,16 +27,18 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
import org.onebusaway.container.refresh.Refreshable;
-import org.onebusaway.nyc.queue.QueueListenerTask;
+import org.onebusaway.nyc.queue.IQueueListenerTask;
import org.onebusaway.nyc.queue.model.RealtimeEnvelope;
import org.onebusaway.nyc.report.impl.CcLocationCache;
import org.onebusaway.nyc.report.model.CcLocationReportRecord;
import org.onebusaway.nyc.report.services.RecordValidationService;
+import org.onebusaway.nyc.util.configuration.ConfigurationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
-public class OpsInputQueueListenerTask extends QueueListenerTask {
+public abstract class OpsInputQueueListenerTask implements IQueueListenerTask {
public static final int DELAY_THRESHOLD = 10 * 1000;
@@ -51,6 +53,10 @@ public void setCcLocationCache(CcLocationCache cache) {
_ccLocationCache = cache;
}
+ @Qualifier("configurationService")
+ @Autowired
+ private ConfigurationService _configurationService;
+
@Autowired
public void setValidationService(RecordValidationService validationService) {
this.validationService = validationService;
@@ -61,6 +67,12 @@ public void setValidationService(RecordValidationService validationService) {
private String _systemTimeZone = null;
private long zoneOffsetWindow = System.currentTimeMillis();
+ protected boolean _initialized = false;
+
+ @Autowired
+ @Qualifier("listener")
+ IQueueListenerTask _queueListenerTask;
+
public OpsInputQueueListenerTask() {
/*
* use Jaxb annotation interceptor so we pick up autogenerated annotations
@@ -183,7 +195,7 @@ record = new CcLocationReportRecord(envelope, contents, getZoneOffset());
@PostConstruct
public void setup() {
- super.setup();
+ _queueListenerTask.setup();
// make parsing lenient
_mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
@@ -194,7 +206,7 @@ public void setup() {
@PreDestroy
public void destroy() {
- super.destroy();
+ _queueListenerTask.destroy();
}
/**
diff --git a/onebusaway-nyc-ops-api-webapp/src/test/java/org/onebusaway/nyc/ops/queue/OpsInferenceQueueListenerTaskTest.java b/onebusaway-nyc-ops-api-webapp/src/test/java/org/onebusaway/nyc/ops/queue/OpsInferenceQueueListenerTaskTest.java
index 363f0a447b..39720775eb 100644
--- a/onebusaway-nyc-ops-api-webapp/src/test/java/org/onebusaway/nyc/ops/queue/OpsInferenceQueueListenerTaskTest.java
+++ b/onebusaway-nyc-ops-api-webapp/src/test/java/org/onebusaway/nyc/ops/queue/OpsInferenceQueueListenerTaskTest.java
@@ -75,7 +75,6 @@ public void setUp() throws Exception {
when(inferredResult.getInferredLatitude()).thenReturn(1D);
when(inferredResult.getInferredLongitude()).thenReturn(-1D);
- inferenceQueueListenertask = new OpsInferenceQueueListenerTask();
inferenceQueueListenertask.setLocationDao(locationDao);
inferenceQueueListenertask.setLocationService(locationService);
inferenceQueueListenertask.setValidationService(validationService);
diff --git a/onebusaway-nyc-queue-apc/src/main/java/org/onebusaway/nyc/queue/apc/ApcMonitor.java b/onebusaway-nyc-queue-apc/src/main/java/org/onebusaway/nyc/queue/apc/ApcMonitor.java
index 4efef2618c..c9a9e99f5c 100644
--- a/onebusaway-nyc-queue-apc/src/main/java/org/onebusaway/nyc/queue/apc/ApcMonitor.java
+++ b/onebusaway-nyc-queue-apc/src/main/java/org/onebusaway/nyc/queue/apc/ApcMonitor.java
@@ -23,9 +23,11 @@
import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest;
import org.apache.commons.lang.StringUtils;
import org.onebusaway.gtfs.model.AgencyAndId;
-import org.onebusaway.nyc.queue.QueueListenerTask;
+import org.onebusaway.nyc.queue.IQueueListenerTask;
import org.onebusaway.nyc.transit_data.model.NycVehicleLoadBean;
import org.onebusaway.realtime.api.VehicleOccupancyRecord;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import java.util.Date;
@@ -41,7 +43,7 @@
* -Dapc.value=my_cloudwatch_secret \
* >apc.log 2>&1 &
*/
-public class ApcMonitor extends QueueListenerTask {
+public class ApcMonitor implements IQueueListenerTask {
public static final String DEFAULT_ENV = "Obanyc:qa";
public static final String DEFAULT_KEY = "my_cw_key";
@@ -51,8 +53,16 @@ public class ApcMonitor extends QueueListenerTask {
public static final String DEFAULT_DISPLAY_NAME = DEFAULT_NAME;
public static final int DEFAULT_PORT = 5576;
+ @Autowired
+ @Qualifier("listener")
+ IQueueListenerTask _queueListenerTask;
+ protected boolean _initialized = false;
+
private AmazonCloudWatchClient cloudWatch = new AmazonCloudWatchClient(new BasicAWSCredentials(getKey(), getValue()));
+ @Override
+ public void initializeQueue(String host, String queueName, Integer port) throws InterruptedException {}
+
@Override
public boolean processMessage(String contents, byte[] buff) throws Exception {
if(StringUtils.isBlank(contents)){
@@ -79,7 +89,8 @@ public void startListenerThread() {
System.out.println("connecting to " + queueName + " queue at " + host + ":" + port);
try {
- initializeQueue(host, queueName, port);
+ _queueListenerTask.initializeQueue(host, queueName, port);
+
} catch (InterruptedException ie) {
return;
}
@@ -125,6 +136,15 @@ public Integer getQueuePort() {
return DEFAULT_PORT;
}
+ @Override
+ public void startDNSCheckThread() {}
+
+ @Override
+ public void destroy() {}
+
+ @Override
+ public void setup() {}
+
private void processResult(NycVehicleLoadBean message, String contents) {
VehicleOccupancyRecord vor = toVehicleOccupancyRecord(message);
@@ -134,10 +154,9 @@ private void processResult(NycVehicleLoadBean message, String contents) {
}
- public static void main(String[] args) {
+ public void main(String[] args) {
System.out.println("starting up....");
- ApcMonitor monitor = new ApcMonitor();
- monitor.setup();
+ _queueListenerTask.setup();
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(1000);
diff --git a/onebusaway-nyc-queue-broker/src/main/java/org/onebusaway/nyc/queue_broker/ISimpleBroker.java b/onebusaway-nyc-queue-broker/src/main/java/org/onebusaway/nyc/queue_broker/ISimpleBroker.java
new file mode 100644
index 0000000000..a2832467cc
--- /dev/null
+++ b/onebusaway-nyc-queue-broker/src/main/java/org/onebusaway/nyc/queue_broker/ISimpleBroker.java
@@ -0,0 +1,8 @@
+package org.onebusaway.nyc.queue_broker;
+
+public interface ISimpleBroker {
+ void main(String[] args);
+ void run();
+ void setInPort(int inPort);
+ void setOutPort(int outPort);
+}
diff --git a/onebusaway-nyc-queue-broker/src/main/java/org/onebusaway/nyc/queue_broker/KafkaSimpleBroker.java b/onebusaway-nyc-queue-broker/src/main/java/org/onebusaway/nyc/queue_broker/KafkaSimpleBroker.java
new file mode 100644
index 0000000000..62c264cbfe
--- /dev/null
+++ b/onebusaway-nyc-queue-broker/src/main/java/org/onebusaway/nyc/queue_broker/KafkaSimpleBroker.java
@@ -0,0 +1,93 @@
+/**
+ * Copyright (C) 2011 Metropolitan Transportation Authority
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onebusaway.nyc.queue_broker;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Simple Brokering impelmented in ZeroMQ. Listen for anything on one socket,
+ * and send it back out on another socket.
+ */
+public class KafkaSimpleBroker implements ISimpleBroker {
+ Logger logger = LoggerFactory.getLogger(KafkaSimpleBroker.class);
+
+ private static final int DEFAULT_IN_PORT = 5566;
+ private static final int DEFAULT_OUT_PORT = 5567;
+ private static final int HWM_VALUE = 50000; // High Water Mark
+ private ExecutorService _executorService = null;
+ private int inPort;
+ private int outPort;
+
+ public void main(String[] args) {
+ KafkaSimpleBroker broker = new KafkaSimpleBroker();
+ if (args.length > 0) {
+ broker.setInPort(Integer.parseInt(args[0]));
+ } else {
+ broker.setInPort(DEFAULT_IN_PORT);
+ }
+ if (args.length > 1) {
+ broker.setOutPort(Integer.parseInt(args[1]));
+ } else {
+ broker.setOutPort(DEFAULT_OUT_PORT);
+ }
+
+ broker.run();
+ }
+
+ public KafkaSimpleBroker() {
+ logger.info("Starting up SimpleBroker");
+ }
+
+ public void run() {
+ // Prepare our context and subscriber
+ ZMQ.Context context = ZMQ.context(1); // 1 = number of threads
+
+ ZMQ.Socket subscriber = context.socket(ZMQ.SUB);
+ String inBind = "tcp://*:" + inPort;
+
+ logger.info("subscribing to queue at " + inBind);
+ subscriber.bind(inBind);
+ // subscribe to everything
+ subscriber.subscribe(new byte[0]); // was inTopic.getBytes()
+
+ ZMQ.Socket publisher = context.socket(ZMQ.PUB);
+
+ //Set to prevent broker memory from growing indefinitely if client falls behind
+ publisher.setHWM(HWM_VALUE);
+
+ String outBind = "tcp://*:" + outPort;
+
+ logger.info("publishing to queue at " + outBind);
+ publisher.bind(outBind);
+
+ ZMQ.proxy(subscriber, publisher, null);
+
+ }
+
+ public void setInPort(int inPort) {
+ this.inPort = inPort;
+ }
+
+ public void setOutPort(int outPort) {
+ this.outPort = outPort;
+ }
+
+}
diff --git a/onebusaway-nyc-queue-broker/src/main/java/org/onebusaway/nyc/queue_broker/SimpleBroker.java b/onebusaway-nyc-queue-broker/src/main/java/org/onebusaway/nyc/queue_broker/SimpleBroker.java
index 1b27ec41f6..4c7249f716 100644
--- a/onebusaway-nyc-queue-broker/src/main/java/org/onebusaway/nyc/queue_broker/SimpleBroker.java
+++ b/onebusaway-nyc-queue-broker/src/main/java/org/onebusaway/nyc/queue_broker/SimpleBroker.java
@@ -28,7 +28,7 @@
* Simple Brokering impelmented in ZeroMQ. Listen for anything on one socket,
* and send it back out on another socket.
*/
-public class SimpleBroker {
+public class SimpleBroker implements ISimpleBroker {
Logger logger = LoggerFactory.getLogger(SimpleBroker.class);
private static final int DEFAULT_IN_PORT = 5566;
@@ -38,7 +38,7 @@ public class SimpleBroker {
private int inPort;
private int outPort;
- public static void main(String[] args) {
+ public void main(String[] args) {
SimpleBroker broker = new SimpleBroker();
if (args.length > 0) {
broker.setInPort(Integer.parseInt(args[0]));
diff --git a/onebusaway-nyc-queue-http-proxy/pom.xml b/onebusaway-nyc-queue-http-proxy/pom.xml
index b280d84361..28030ccd8f 100644
--- a/onebusaway-nyc-queue-http-proxy/pom.xml
+++ b/onebusaway-nyc-queue-http-proxy/pom.xml
@@ -112,7 +112,7 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/onebusaway-nyc-queue-http-proxy/src/main/resources/org/onebusaway/nyc/queue_http_proxy/application-context-webapp.xml b/onebusaway-nyc-queue-http-proxy/src/main/resources/org/onebusaway/nyc/queue_http_proxy/application-context-webapp.xml
index cd6f38b3e3..97502eb759 100644
--- a/onebusaway-nyc-queue-http-proxy/src/main/resources/org/onebusaway/nyc/queue_http_proxy/application-context-webapp.xml
+++ b/onebusaway-nyc-queue-http-proxy/src/main/resources/org/onebusaway/nyc/queue_http_proxy/application-context-webapp.xml
@@ -31,5 +31,22 @@ limitations under the License.
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/onebusaway-nyc-queue-subscriber/pom.xml b/onebusaway-nyc-queue-subscriber/pom.xml
index 992e2ce556..c6a506703c 100644
--- a/onebusaway-nyc-queue-subscriber/pom.xml
+++ b/onebusaway-nyc-queue-subscriber/pom.xml
@@ -73,6 +73,11 @@
guava
31.0.1-jre
+
+ org.apache.kafka
+ kafka-clients
+ 3.4.0
+
@@ -91,7 +96,8 @@
true
- org.onebusaway.nyc.queue.Subscriber
+
+ org.onebusaway.nyc.queue.KafkaSubscriber
@@ -104,7 +110,8 @@
- org.onebusaway.nyc.queue.Subscriber
+
+ org.onebusaway.nyc.queue.KafkaSubscriber
jar-with-dependencies
@@ -138,7 +145,8 @@
java
- org.onebusaway.nyc.queue.Subscriber
+
+ org.onebusaway.nyc.queue.KafkaSubscriber
@@ -150,7 +158,8 @@
true
- org.onebusaway.nyc.queue.Subscriber
+
+ org.onebusaway.nyc.queue.KafkaSubscriber
@@ -163,7 +172,8 @@
- org.onebusaway.nyc.queue.Subscriber
+
+ org.onebusaway.nyc.queue.KafkaSubscriber
@@ -192,7 +202,8 @@
true
- org.onebusaway.nyc.queue.TripUpdateSubscriber
+
+ org.onebusaway.nyc.queue.KafkaTripUpdateSubscriber
@@ -205,7 +216,8 @@
- org.onebusaway.nyc.queue.TripUpdateSubscriber
+
+ org.onebusaway.nyc.queue.KafkaTripUpdateSubscriber
diff --git a/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/IPublisher.java b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/IPublisher.java
index 4f193754d5..9e96bae82d 100644
--- a/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/IPublisher.java
+++ b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/IPublisher.java
@@ -23,6 +23,14 @@
*/
public interface IPublisher {
+ void setTopic(String topic);
+
+ void setProtocol(String protocol);
+
+ void setHost(String host);
+
+ void setPort(int port);
+
void init();
void close();
diff --git a/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/IQueueListenerTask.java b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/IQueueListenerTask.java
new file mode 100644
index 0000000000..7e0f07934f
--- /dev/null
+++ b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/IQueueListenerTask.java
@@ -0,0 +1,67 @@
+/**
+ * Copyright (C) 2011 Metropolitan Transportation Authority
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onebusaway.nyc.queue;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule;
+import org.onebusaway.nyc.util.configuration.ConfigurationService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Represents an interface to simply listen to queue operations. This
+ * is not attempting to be JMS. This is merely allowing us to
+ */
+public interface IQueueListenerTask {
+
+ ExecutorService _executorService = null;
+ ObjectMapper _mapper = new ObjectMapper().registerModule(new JaxbAnnotationModule());
+ Logger _log = LoggerFactory.getLogger(IQueueListenerTask.class);
+
+
+ Properties properties = new Properties();
+
+ void initializeQueue(String host, String queueName,
+ Integer port) throws InterruptedException;
+
+ boolean processMessage(String address, byte[] buff) throws Exception;
+
+ void startListenerThread();
+
+ String getQueueHost();
+
+ String getQueueName();
+
+ /**
+ * Return the name of the queue for display of statistics in logs.
+ */
+ String getQueueDisplayName();
+
+ Integer getQueuePort();
+
+ void startDNSCheckThread();
+
+ void destroy();
+
+ void setup();
+
+}
\ No newline at end of file
diff --git a/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/ISubscriber.java b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/ISubscriber.java
new file mode 100644
index 0000000000..366aaf9850
--- /dev/null
+++ b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/ISubscriber.java
@@ -0,0 +1,28 @@
+/**
+ * Copyright (C) 2011 Metropolitan Transportation Authority
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onebusaway.nyc.queue;
+
+/**
+ * Represents an interface to simply message queue publishing operations. This
+ * is not attempting to be JMS. This is merely hiding the details of ZeroMQ for
+ * easier testing.
+ */
+public interface ISubscriber {
+
+ void main(String[] args);
+
+}
\ No newline at end of file
diff --git a/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/ITripUpdateSubscriber.java b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/ITripUpdateSubscriber.java
new file mode 100644
index 0000000000..90ec5c8325
--- /dev/null
+++ b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/ITripUpdateSubscriber.java
@@ -0,0 +1,28 @@
+/**
+ * Copyright (C) 2011 Metropolitan Transportation Authority
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onebusaway.nyc.queue;
+
+/**
+ * Represents an interface to simply listen to queue operations. This
+ * is not attempting to be JMS. This is merely allowing us to
+ */
+public interface ITripUpdateSubscriber {
+
+ void main(String[] args);
+
+
+}
\ No newline at end of file
diff --git a/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/KafkaPublisher.java b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/KafkaPublisher.java
new file mode 100644
index 0000000000..bacfe94477
--- /dev/null
+++ b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/KafkaPublisher.java
@@ -0,0 +1,288 @@
+/**
+ * Copyright (C) 2011 Metropolitan Transportation Authority
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onebusaway.nyc.queue;
+
+import com.eaio.uuid.UUID;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.PostConstruct;
+import java.util.Date;
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.transaction.annotation.Transactional;
+
+public class KafkaPublisher implements IPublisher,InitializingBean {
+
+ private static Logger _log = LoggerFactory.getLogger(KafkaPublisher.class);
+ private ExecutorService executorService = null;
+ private ArrayBlockingQueue outputBuffer = new ArrayBlockingQueue(
+ 1000);
+ private String topic;
+ private String protocol;
+ private String host;
+ private int port;
+ protected Properties properties = new Properties();
+
+ protected ProducerRecord producerRecord = null;
+ protected KafkaProducer producer = null;
+
+ public KafkaPublisher(String topic) {
+ this.topic = topic;
+ }
+
+ /**
+ * Bind Kafka to the given host and port using the specified protocol.
+ ** @param topic
+ * zeromq topic
+ * @param protocol
+ * "tcp" for example
+ * @param host
+ * localhost, "*", or ip.
+ * @param port
+ * port to bind to. Below 1024 requires elevated privs.
+ */
+ public KafkaPublisher(String topic, String protocol, String host, Integer port) {
+ this.topic = topic;
+ this.protocol = protocol;
+ this.host = host;
+ this.port = port;
+ }
+
+ public KafkaPublisher(){}
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public void setProtocol(String protocol) {
+ this.protocol = protocol;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ @Override
+ public synchronized void init() {}
+
+ @Override
+ @Transactional
+ public void afterPropertiesSet() {
+ //String bind = protocol + "://" + host + ":" + port;
+ protocol= "http";
+ host= "localhost";
+ port= 9092;
+ String bind = "http://localhost:9092";
+ _log.warn("connecting to " + bind);
+ /*
+ * do not bind to the socket, simply connect to existing socket provided by
+ * broker.
+ */
+ setProperties(bind, topic);
+ producer = new KafkaProducer<>(properties);
+ producerRecord = new ProducerRecord<>(topic, null);
+ executorService = Executors.newFixedThreadPool(1);
+ executorService.execute(new SendThread(producer, topic));
+
+ }
+
+ /**
+ * Ask Kafka to close politely.
+ */
+ @Override
+ public synchronized void close() {
+ _log.warn("shutting down...");
+ try {
+ if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+ executorService.shutdownNow();
+ }
+ Thread.sleep(1 * 1000);
+ } catch (InterruptedException ie) {
+ executorService.shutdownNow();
+ }
+ // flush and close producer
+ producer.flush();
+ producer.close();
+ }
+
+ @Override
+ public synchronized void reset(){
+ close();
+ init();
+ }
+
+ /**
+ * Publish a message to a topic. Be aware that fist message may be lost as
+ * subscriber will not connect in time.
+ *
+ * @param message
+ * the content of the message
+ */
+ @Override
+ public void send(byte[] message) {
+ try {
+ outputBuffer.put(wrap(message));
+ } catch (InterruptedException ie) {
+ _log.error(ie.toString());
+ }
+ }
+
+
+ String wrap(byte[] message) {
+ if (message == null || message.length == 0)
+ return null;
+ long timeReceived = getTimeReceived();
+ String realtime = new String(message);
+ // we remove wrapping below, so check for min length acceptable
+ if (realtime.length() < 2)
+ return null;
+
+ StringBuilder prefix = new StringBuilder();
+
+ prefix.append("{\"RealtimeEnvelope\": {\"UUID\":\"")
+ .append(generateUUID()).append("\",\"timeReceived\": ")
+ .append(timeReceived).append(",")
+ .append(removeLastBracket(realtime)).append("}}");
+
+ try{
+ return RmcUtil.replaceInvalidRmcDateTime(prefix, timeReceived);
+ }catch(Throwable t){
+ _log.warn("unable to replace invalid rmc date", t);
+ t.printStackTrace();
+ return prefix.toString();
+ }
+
+ }
+
+ @Override
+ public void send(String message) {
+ try {
+ outputBuffer.put(wrap(message));
+ } catch (InterruptedException ie) {
+ _log.error(ie.toString());
+ }
+ }
+
+ String wrap(String realtime) {
+ if (realtime == null || realtime.length() == 0)
+ return null;
+ long timeReceived = getTimeReceived();
+
+ // we remove wrapping below, so check for min length acceptable
+ if (realtime.length() < 2)
+ return null;
+
+ StringBuilder prefix = new StringBuilder();
+
+ prefix.append("{\"RealtimeEnvelope\": {\"UUID\":\"")
+ .append(generateUUID()).append("\",\"timeReceived\": ")
+ .append(timeReceived).append(",")
+ .append(removeLastBracket(realtime)).append("}}");
+
+ try{
+ return RmcUtil.replaceInvalidRmcDateTime(prefix, timeReceived);
+ }catch(Throwable t){
+ _log.warn("unable to replace invalid rmc date", t);
+ t.printStackTrace();
+ return prefix.toString();
+ }
+
+ }
+
+
+ String removeLastBracket(String s) {
+ String trimmed = s.trim();
+ return trimmed.substring(1, trimmed.length() - 1);
+ }
+
+ String generateUUID() {
+ return new UUID().toString();
+ }
+
+ long getTimeReceived() {
+ return System.currentTimeMillis();
+ }
+
+ private class SendThread implements Runnable {
+
+ int processedCount = 0;
+ Date markTimestamp = new Date();
+
+ private KafkaProducer _producer;
+
+ private String _topicName;
+
+ ProducerRecord _producerRecord;
+
+ public SendThread(KafkaProducer producer, String topicName) {
+ _producer = producer;
+ _topicName = topicName;
+ }
+
+
+ public void run() {
+ int errorCount = 0;
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ String r = outputBuffer.take();
+ _producerRecord = new ProducerRecord<>(_topicName, r);;
+ producer.send(_producerRecord);
+ } catch (InterruptedException ie) {
+ return;
+ }
+
+ if (processedCount > 1000) {
+ long timeDiff = TimeUnit.MILLISECONDS.toSeconds (System.currentTimeMillis() - markTimestamp.getTime());
+ _log.info("HTTP Proxy output queue {}: processed 1000 messages in {} seconds; current queue length is {}",
+ this._topicName, timeDiff, outputBuffer.size());
+ markTimestamp = new Date();
+ processedCount = 0;
+ }
+
+ processedCount++;
+ }
+ }
+ }
+
+ private void setProperties(String bind, String queueName){
+
+ properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bind);
+ properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bind);
+ properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, queueName);
+ properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+
+ }
+}
diff --git a/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/KafkaQueueListenerTask.java b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/KafkaQueueListenerTask.java
new file mode 100644
index 0000000000..d74c585536
--- /dev/null
+++ b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/KafkaQueueListenerTask.java
@@ -0,0 +1,266 @@
+/**
+ * Copyright (C) 2011 Metropolitan Transportation Authority
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onebusaway.nyc.queue;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.onebusaway.nyc.util.configuration.ConfigurationService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.Date;
+import java.util.Properties;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.Arrays;
+
+/**
+ * Base class for listeners that subscribe to ZeroMQ. Provides a simple
+ * re-connection mechanism if the IP changes.
+ */
+public class KafkaQueueListenerTask implements IQueueListenerTask{
+
+ protected static Logger _log = LoggerFactory
+ .getLogger(KafkaQueueListenerTask.class);
+
+ @Autowired
+ protected ConfigurationService _configurationService;
+ @Autowired
+ private ThreadPoolTaskScheduler _taskScheduler;
+ private ExecutorService _executorService = null;
+ protected boolean _initialized = false;
+ protected ObjectMapper _mapper = new ObjectMapper().registerModule(new JaxbAnnotationModule());
+
+ protected DNSResolver _resolver = null;
+ protected int _countInterval = 10000;
+ protected KafkaConsumer consumer;
+
+ protected Properties properties = new Properties();
+
+ public void startDNSCheckThread() {
+ String host = getQueueHost();
+ _resolver = new DNSResolver(host);
+ if (_taskScheduler != null) {
+ DNSCheckThread dnsCheckThread = new DNSCheckThread();
+ // ever 10 seconds
+ _taskScheduler.scheduleWithFixedDelay(dnsCheckThread, 10 * 1000);
+ }
+ }
+
+ private class ReadThread implements Runnable {
+
+ int processedCount = 0;
+ Date markTimestamp = new Date();
+
+ private KafkaConsumer _consumer = null;
+
+ String _topicName;
+
+ public ReadThread(KafkaConsumer consumer, String topicName) {
+ _consumer = consumer;
+ _topicName = topicName;
+ }
+
+ @Override
+ public void run() {
+ _log.warn("ReadThread for queue " + getQueueName() + " starting");
+
+ while (!Thread.currentThread().isInterrupted()) {
+ // prefer a java sleep to a native block
+ ConsumerRecords records = _consumer.poll(0 * 1000); // microseconds for 2.2, milliseconds for 3.0
+ if (!records.isEmpty()) {
+
+ for (ConsumerRecord record : records) {
+ try {
+ byte[] buff = SerializationUtils.serialize((Serializable) record);
+ processMessage(_topicName, buff);
+ processedCount++;
+ } catch(Exception ex) {
+ _log.error("#####>>>>> processMessage() failed", ex);
+ }
+ }
+
+ Thread.yield();
+ } else {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ _log.warn("exiting (interrupted) " + getQueueName());
+ return;
+ }
+ }
+
+ if (processedCount > _countInterval) {
+ long timeInterval = (new Date().getTime() - markTimestamp.getTime());
+ _log.info(getQueueDisplayName()
+ + " input queue: processed " + _countInterval + " messages in "
+ + (timeInterval/1000)
+ + " seconds. (" + (1000.0 * processedCount/timeInterval)
+ + ") records/second");
+
+ markTimestamp = new Date();
+ processedCount = 0;
+ }
+
+ }
+ _log.error("Thread loop Interrupted, exiting queue " + getQueueName());
+ }
+ }
+
+ @PostConstruct
+ public void setup() {
+ _executorService = Executors.newFixedThreadPool(1);
+ startListenerThread();
+ startDNSCheckThread();
+ _log.warn("threads started for queue " + getQueueName());
+ }
+
+ @PreDestroy
+ public void destroy() {
+ _log.info("destroy " + getQueueName());
+ _executorService.shutdownNow();
+ if (_taskScheduler != null)
+ _taskScheduler.shutdown();
+ }
+
+ protected void reinitializeQueue() {
+ try {
+ initializeQueue(getQueueHost(), getQueueName(), getQueuePort());
+ } catch (InterruptedException ie) {
+ return;
+ }
+ }
+
+ // (re)-initialize Kafka listener with the given args
+ public synchronized void initializeQueue(String host, String queueName,
+ Integer port) throws InterruptedException {
+
+ try {
+ String bind = host + ":" + port;
+ _log.warn("binding to " + bind + " with topic=" + queueName);
+
+ if (properties.isEmpty() ||
+ properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).isBlank() ||
+ properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).isEmpty()) {
+ setProperties(bind, queueName);
+ _executorService = Executors.newFixedThreadPool(1);
+ }
+
+ consumer = new KafkaConsumer<>(properties);
+
+ if (!consumer.subscription().isEmpty()) {
+ _executorService.shutdownNow();
+ Thread.sleep(1 * 1000);
+ _log.debug("_executorService.isTerminated="
+ + _executorService.isTerminated());
+ consumer.close();
+ _executorService = Executors.newFixedThreadPool(1);
+ }
+
+ consumer.subscribe(Arrays.asList(queueName));
+
+ consumer.poll(Duration.ofMillis(100));
+
+ _executorService.execute(new ReadThread(consumer, bind));
+
+ _log.warn("queue " + queueName + " is listening on " + bind);
+ _initialized = true;
+ }catch(Exception e){
+ System.out.println("exception = " + e + "");
+ }
+
+ }
+
+ @Override
+ public boolean processMessage(String address, byte[] buff) throws Exception {
+ return false;
+ }
+
+ @Override
+ public void startListenerThread() {
+
+ }
+
+ private class DNSCheckThread extends TimerTask {
+
+ @Override
+ public void run() {
+ try {
+ if (_resolver.hasAddressChanged()) {
+ _log.warn("Resolver Changed -- re-binding queue connection");
+ reinitializeQueue();
+ }
+ } catch (Exception e) {
+ _log.error(e.toString());
+ _resolver.reset();
+ }
+ }
+ }
+
+ private void setProperties(String bind, String queueName){
+
+ try {
+
+ properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bind);
+ properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bind);
+ properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, queueName);
+ properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ }catch(Exception e){
+ System.out.println("exception = " + e + "");
+ }
+
+ }
+ @Override
+ public String getQueueHost() {
+ return _configurationService.getConfigurationValueAsString("tds.inputQueueHost", null);
+ }
+
+ @Override
+ public String getQueueName() {
+ return _configurationService.getConfigurationValueAsString("tds.inputQueueName", null);
+ }
+
+ @Override
+ public String getQueueDisplayName() {
+ return null;
+ }
+
+ @Override
+ public Integer getQueuePort() {
+ return _configurationService.getConfigurationValueAsInteger("tds.inputQueuePort", 5564);
+ }
+
+}
\ No newline at end of file
diff --git a/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/KafkaSubscriber.java b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/KafkaSubscriber.java
new file mode 100644
index 0000000000..927afe9b6b
--- /dev/null
+++ b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/KafkaSubscriber.java
@@ -0,0 +1,168 @@
+/**
+ * Copyright (C) 2011 Metropolitan Transportation Authority
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onebusaway.nyc.queue;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+
+public class KafkaSubscriber implements ISubscriber{
+ public static final String HOST_KEY = "mq.host";
+ public static final String PORT_KEY = "mq.port";
+ public static final String TOPIC_KEY = "mq.topic";
+ public static final String PBDIR_KEY = "pb.dir";
+ public static final String PB_LIMIT_KEY = "pb.limit";
+ private static final String DEFAULT_HOST = "localhost";
+ private static final int DEFAULT_PORT = 9092;
+ private static final String DEFAULT_TOPIC = "bhs_queue";
+ private static final int DEFAULT_PB_LIMIT = -1;
+
+ protected static KafkaConsumer consumer;
+
+ protected static Properties properties = new Properties();
+
+ public void main(String[] args) {
+
+ String host = DEFAULT_HOST;
+ if (System.getProperty(HOST_KEY) != null) {
+ host = System.getProperty(HOST_KEY);
+ }
+ int port = DEFAULT_PORT;
+ if (System.getProperty(PORT_KEY) != null) {
+ try {
+ port = Integer.parseInt(System.getProperty(PORT_KEY));
+ } catch (NumberFormatException nfe) {
+ port = DEFAULT_PORT;
+ }
+ }
+ String topic = DEFAULT_TOPIC;
+ //if (System.getProperty(TOPIC_KEY) != null) {
+ // topic = System.getProperty(TOPIC_KEY);
+ //}
+
+ String pbdir = null;
+ if (System.getProperty(PBDIR_KEY) != null) {
+ pbdir = System.getProperty(PBDIR_KEY);
+ }
+
+ int pblimit = DEFAULT_PB_LIMIT;
+ if (System.getProperty(PB_LIMIT_KEY) != null) {
+ try {
+ pblimit = Integer.parseInt(System.getProperty(PB_LIMIT_KEY));
+ } catch (NumberFormatException nfe) {
+ pblimit = DEFAULT_PB_LIMIT;
+ }
+ }
+
+ String bind = "tcp://" + host + ":" + port;
+
+ if (properties.isEmpty() ||
+ properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).isBlank() ||
+ properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).isEmpty()) {
+ setProperties(bind, topic);
+ }
+
+
+ consumer = new KafkaConsumer<>(properties);
+ consumer.subscribe(Arrays.asList(topic));
+ System.out.println("listening on " + bind);
+ int nprocess = 0;
+ while (true) {
+ // Read envelope with address
+ ConsumerRecords records =
+ consumer.poll(Duration.ofMillis(100));
+ for (ConsumerRecord record : records) {
+ System.out.println("Key: " + record.key() + ", Value: " + record.value());
+ System.out.println("Partition: " + record.partition() + ", Offset:" + record.offset());
+ String address = record.key();
+ // Read message contents
+ byte[] contents = record.value().getBytes();
+ if (pbdir == null)
+ process(address, new String(contents));
+ else
+ processToDir(pbdir, address, contents);
+
+ nprocess++;
+ if (pblimit > 0 && nprocess >= pblimit)
+ break;
+ }
+ }
+ }
+ private static void process(String address, String contents) {
+ System.out.println(address + " : " + toDate(new Date()) + " : " + contents);
+ }
+
+ private static void processToDir(String dir, String address, byte[] contents) {
+ writeToFile(dir + "/" + toDate(new Date()) + ".pb", contents);
+ }
+
+ protected static void writeToFile(String filename, byte[] contents) {
+ File file = new File(filename);
+ if (file.getParentFile() != null)
+ file.getParentFile().mkdirs();
+ try {
+ DataOutputStream dos = new DataOutputStream(new FileOutputStream(file));
+ dos.write(contents);
+ dos.close();
+ } catch(IOException ex) {
+ ex.printStackTrace();
+ System.exit(1);
+ }
+ }
+
+ private static final SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
+ private static String toDate(Date date) {
+ if (date != null) {
+ return dateFormatter.format(date);
+ }
+ return null;
+ }
+
+ private static void setProperties(String bind, String queueName){
+
+ try {
+
+ properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bind);
+ properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bind);
+ properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, queueName);
+ properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ }catch(Exception e){
+ System.out.println("exception = " + e + "");
+ }
+
+ }
+
+}
diff --git a/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/KafkaTripUpdateSubscriber.java b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/KafkaTripUpdateSubscriber.java
new file mode 100644
index 0000000000..19ab3503c7
--- /dev/null
+++ b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/KafkaTripUpdateSubscriber.java
@@ -0,0 +1,131 @@
+/**
+ * Copyright (C) 2011 Metropolitan Transportation Authority
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onebusaway.nyc.queue;
+
+import com.google.transit.realtime.GtfsRealtime.FeedEntity;
+import com.google.transit.realtime.GtfsRealtime.FeedMessage;
+import com.google.transit.realtime.GtfsRealtime.TripUpdate;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.onebusaway.gtfs.model.AgencyAndId;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Properties;
+
+public class KafkaTripUpdateSubscriber implements ITripUpdateSubscriber{
+ private static final String HOST_KEY = "mq.host";
+ private static final String PORT_KEY = "mq.port";
+ private static final String TOPIC_KEY = "mq.topic";
+ private static final String SINGLE_MODE_KEY = "tu.single";
+ private static final String ROUTE_KEY = "tu.route";
+ private static final String OUTPUT_DIR_KEY = "tu.output";
+ private static final String DEFAULT_HOST = "queue.dev.obanyc.com";
+ private static final int DEFAULT_PORT = 5569;
+ private static final String DEFAULT_TOPIC = "time";
+ private static final String DEFAULT_OUTPUT = ".";
+ private static final String DEFAULT_SINGLE_MODE = "false";
+
+ private Properties properties = new Properties();
+
+ public void main(String[] args) {
+
+ String route = System.getProperty(ROUTE_KEY);
+ String outputDir = System.getProperty(OUTPUT_DIR_KEY, DEFAULT_OUTPUT);
+ boolean singleMode = Boolean.parseBoolean(System.getProperty(SINGLE_MODE_KEY, DEFAULT_SINGLE_MODE));
+
+ System.out.println("listen for route " + route + ", writing to dir " + outputDir + ", singleMode=" + singleMode);
+
+ String host = System.getProperty(HOST_KEY, DEFAULT_HOST);
+ int port = defaultOrProperty(PORT_KEY, DEFAULT_PORT);
+ String topic = System.getProperty(TOPIC_KEY, DEFAULT_TOPIC);
+
+ String bind = "tcp://" + host + ":" + port;
+
+ setProperties(bind, topic);
+ KafkaConsumer consumer = new KafkaConsumer<>(properties);
+ consumer.subscribe(Arrays.asList(topic));
+
+ System.out.println("TU subscriber listening on " + bind);
+
+ while (true) {
+ // Read envelope with address
+
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(250));
+
+ for (ConsumerRecord record : records){
+ try {
+ FeedMessage message = FeedMessage.parseFrom(record.value().getBytes());
+ FeedEntity entity = message.getEntity(0);
+ if (entity.hasTripUpdate()) {
+ TripUpdate tripUpdate = entity.getTripUpdate();
+ AgencyAndId routeId = AgencyAndId.convertFromString(tripUpdate.getTrip().getRouteId());
+ if (route == null || routeId.getId().equals(route)) {
+ // name by trip
+ AgencyAndId tripId = AgencyAndId.convertFromString(tripUpdate.getTrip().getTripId());
+ long timestamp = message.getHeader().getTimestamp()/1000;
+ String vehicle = AgencyAndId.convertFromString(tripUpdate.getVehicle().getId()).getId();
+ String filename = outputDir + "/" + tripId.getId() + ".pb";
+ System.out.println("timestamp=" + timestamp + " vehicleid=" + vehicle + " filename=" + filename);
+ KafkaSubscriber.writeToFile(filename, record.value().getBytes());
+ if (singleMode)
+ break;
+ }
+ }
+ else {
+ System.out.println("no trip update");
+ }
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+
+ private static int defaultOrProperty(String prop, int def) {
+ if (System.getProperty(prop) != null) {
+ try {
+ return Integer.parseInt(System.getProperty(prop));
+ } catch (NumberFormatException nfe) {
+ //
+ }
+ }
+ return def;
+ }
+
+ private void setProperties(String bind, String queueName){
+
+ properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bind);
+ properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bind);
+ properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, queueName);
+ properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+
+ }
+
+
+
+}
diff --git a/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/QueueListenerTask.java b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/QueueListenerTask.java
index 4e81fef227..fa49bf10ef 100644
--- a/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/QueueListenerTask.java
+++ b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/QueueListenerTask.java
@@ -38,10 +38,9 @@
* Base class for listeners that subscribe to ZeroMQ. Provides a simple
* re-connection mechanism if the IP changes.
*/
-public abstract class QueueListenerTask {
+public class QueueListenerTask implements IQueueListenerTask{
+
- protected static Logger _log = LoggerFactory
- .getLogger(QueueListenerTask.class);
@Autowired
protected ConfigurationService _configurationService;
@Autowired
@@ -56,25 +55,6 @@ public abstract class QueueListenerTask {
protected ZMQ.Poller _poller = null;
protected int _countInterval = 10000;
- public abstract boolean processMessage(String address, byte[] buff) throws Exception;
-
- public abstract void startListenerThread();
-
- public abstract String getQueueHost();
-
- public abstract String getQueueName();
-
- /**
- * Return the name of the queue for display of statistics in logs.
- */
- public abstract String getQueueDisplayName();
-
- public abstract Integer getQueuePort();
-
- public void setCountInterval(int countInterval) {
- this._countInterval = countInterval;
- }
-
public void startDNSCheckThread() {
String host = getQueueHost();
_resolver = new DNSResolver(host);
@@ -172,8 +152,8 @@ protected void reinitializeQueue() {
}
// (re)-initialize ZMQ with the given args
- protected synchronized void initializeQueue(String host, String queueName,
- Integer port) throws InterruptedException {
+ public synchronized void initializeQueue(String host, String queueName,
+ Integer port) throws InterruptedException {
String bind = "tcp://" + host + ":" + port;
_log.warn("binding to " + bind + " with topic=" + queueName);
@@ -203,6 +183,36 @@ protected synchronized void initializeQueue(String host, String queueName,
}
+ @Override
+ public boolean processMessage(String address, byte[] buff) throws Exception {
+ return false;
+ }
+
+ @Override
+ public void startListenerThread() {
+
+ }
+
+ @Override
+ public String getQueueHost() {
+ return _configurationService.getConfigurationValueAsString("tds.inputQueueHost", null);
+ }
+
+ @Override
+ public String getQueueName() {
+ return _configurationService.getConfigurationValueAsString("tds.inputQueueName", null);
+ }
+
+ @Override
+ public String getQueueDisplayName() {
+ return null;
+ }
+
+ @Override
+ public Integer getQueuePort() {
+ return _configurationService.getConfigurationValueAsInteger("tds.inputQueuePort", 5564);
+ }
+
private class DNSCheckThread extends TimerTask {
@Override
diff --git a/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/Subscriber.java b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/Subscriber.java
index 7aa9d25c25..77281a4fba 100644
--- a/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/Subscriber.java
+++ b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/Subscriber.java
@@ -25,7 +25,7 @@
import java.text.SimpleDateFormat;
import java.util.Date;
-public class Subscriber {
+public class Subscriber implements ISubscriber{
public static final String HOST_KEY = "mq.host";
public static final String PORT_KEY = "mq.port";
public static final String TOPIC_KEY = "mq.topic";
@@ -36,7 +36,7 @@ public class Subscriber {
private static final String DEFAULT_TOPIC = "bhs_queue";
private static final int DEFAULT_PB_LIMIT = -1;
- public static void main(String[] args) {
+ public void main(String[] args) {
// Prepare our context and subscriber
ZMQ.Context context = ZMQ.context(1);
diff --git a/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/TripUpdateSubscriber.java b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/TripUpdateSubscriber.java
index ec8e89b8e3..f6b7d841f0 100644
--- a/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/TripUpdateSubscriber.java
+++ b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/TripUpdateSubscriber.java
@@ -20,7 +20,7 @@
import org.onebusaway.gtfs.model.AgencyAndId;
import org.zeromq.ZMQ;
-public class TripUpdateSubscriber {
+public class TripUpdateSubscriber implements ITripUpdateSubscriber{
private static final String HOST_KEY = "mq.host";
private static final String PORT_KEY = "mq.port";
private static final String TOPIC_KEY = "mq.topic";
@@ -33,7 +33,7 @@ public class TripUpdateSubscriber {
private static final String DEFAULT_OUTPUT = ".";
private static final String DEFAULT_SINGLE_MODE = "false";
- public static void main(String[] args) {
+ public void main(String[] args) {
String route = System.getProperty(ROUTE_KEY);
String outputDir = System.getProperty(OUTPUT_DIR_KEY, DEFAULT_OUTPUT);
diff --git a/onebusaway-nyc-report-archive/src/main/java/org/onebusaway/nyc/report_archive/queue/ArchivingInferenceQueueListenerTask.java b/onebusaway-nyc-report-archive/src/main/java/org/onebusaway/nyc/report_archive/queue/ArchivingInferenceQueueListenerTask.java
index 00adf2a3f1..c64bad69de 100644
--- a/onebusaway-nyc-report-archive/src/main/java/org/onebusaway/nyc/report_archive/queue/ArchivingInferenceQueueListenerTask.java
+++ b/onebusaway-nyc-report-archive/src/main/java/org/onebusaway/nyc/report_archive/queue/ArchivingInferenceQueueListenerTask.java
@@ -21,19 +21,26 @@
import javax.annotation.PreDestroy;
import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.onebusaway.container.refresh.Refreshable;
+import org.onebusaway.nyc.queue.IQueueListenerTask;
import org.onebusaway.nyc.report.model.ArchivedInferredLocationRecord;
import org.onebusaway.nyc.report.services.InferencePersistenceService;
import org.onebusaway.nyc.report.services.CcAndInferredLocationDao;
import org.onebusaway.nyc.report.services.RecordValidationService;
import org.onebusaway.nyc.transit_data.model.NycQueuedInferredLocationBean;
import org.onebusaway.nyc.transit_data_federation.impl.queue.InferenceQueueListenerTask;
+import org.onebusaway.nyc.transit_data_federation.impl.queue.interfaces.InferenceQueueListenerInterface;
+import org.onebusaway.nyc.util.configuration.ConfigurationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
-public class ArchivingInferenceQueueListenerTask extends
- InferenceQueueListenerTask {
+public abstract class ArchivingInferenceQueueListenerTask implements
+ InferenceQueueListenerInterface, IQueueListenerTask {
+
+ protected boolean _initialized = false;
private static Logger _log = LoggerFactory.getLogger(ArchivingInferenceQueueListenerTask.class);
@@ -43,6 +50,11 @@ public class ArchivingInferenceQueueListenerTask extends
private InferencePersistenceService persister;
+ private final ObjectMapper _mapper = new ObjectMapper();
+
+ @Autowired
+ @Qualifier("listener")
+ IQueueListenerTask _queueListenerTask;
@Autowired
public void setLocationDao(CcAndInferredLocationDao locationDao) {
@@ -80,7 +92,8 @@ public void startListenerThread() {
_log.info("inference archive listening on " + host + ":" + port
+ ", queue=" + queueName);
try {
- initializeQueue(host, queueName, port);
+
+ _queueListenerTask.initializeQueue(host, queueName, port);;
_log.warn("queue config:" + queueName + " COMPLETE");
} catch (InterruptedException ie) {
_log.error("queue " + queueName + " interrupted");
@@ -90,7 +103,6 @@ public void startListenerThread() {
}
}
- @Override
// this method must throw exceptions to force a transaction rollback
protected void processResult(NycQueuedInferredLocationBean inferredResult,
String contents) {
@@ -122,33 +134,13 @@ private void discardRecord(String vehicleId, String contents) {
_locationDao.handleException(contents, e, new Date());
}
- @Override
- public String getQueueHost() {
- return _configurationService.getConfigurationValueAsString(
- "tds.inputQueueHost", null);
- }
-
- @Override
- public String getQueueName() {
- return _configurationService.getConfigurationValueAsString(
- "tds.inputQueueName", null);
- }
-
public String getQueueDisplayName() {
return "archive_inference";
}
- @Override
- public Integer getQueuePort() {
- return _configurationService.getConfigurationValueAsInteger(
- "tds.inputQueuePort", 5567);
- }
-
-
-
@PostConstruct
public void setup() {
- super.setup();
+ _queueListenerTask.setup();
// make parsing lenient
_mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
@@ -157,7 +149,7 @@ public void setup() {
@PreDestroy
public void destroy() {
- super.destroy();
+ _queueListenerTask.destroy();
}
}
diff --git a/onebusaway-nyc-report-archive/src/main/java/org/onebusaway/nyc/report_archive/queue/ArchivingInputQueueListenerTask.java b/onebusaway-nyc-report-archive/src/main/java/org/onebusaway/nyc/report_archive/queue/ArchivingInputQueueListenerTask.java
index 732a38a5bf..9c5a7f6118 100644
--- a/onebusaway-nyc-report-archive/src/main/java/org/onebusaway/nyc/report_archive/queue/ArchivingInputQueueListenerTask.java
+++ b/onebusaway-nyc-report-archive/src/main/java/org/onebusaway/nyc/report_archive/queue/ArchivingInputQueueListenerTask.java
@@ -26,7 +26,7 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
import org.onebusaway.container.refresh.Refreshable;
-import org.onebusaway.nyc.queue.QueueListenerTask;
+import org.onebusaway.nyc.queue.IQueueListenerTask;
import org.onebusaway.nyc.queue.model.RealtimeEnvelope;
import org.onebusaway.nyc.report.impl.CcLocationCache;
import org.onebusaway.nyc.report.model.CcLocationReportRecord;
@@ -34,11 +34,13 @@
import org.onebusaway.nyc.report_archive.services.CcLocationReportDao;
import org.onebusaway.nyc.report_archive.services.EmergencyStatusNotificationService;
import org.onebusaway.nyc.report_archive.services.RealtimePersistenceService;
+import org.onebusaway.nyc.util.configuration.ConfigurationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
-public class ArchivingInputQueueListenerTask extends QueueListenerTask {
+public abstract class ArchivingInputQueueListenerTask implements IQueueListenerTask {
public static final int DELAY_THRESHOLD = 10 * 1000;
@@ -50,6 +52,16 @@ public class ArchivingInputQueueListenerTask extends QueueListenerTask {
private CcLocationCache _ccLocationCache;
+ @Qualifier("configurationService")
+ @Autowired
+ private ConfigurationService _configurationService;
+
+ protected boolean _initialized = false;
+
+ @Autowired
+ @Qualifier("listener")
+ IQueueListenerTask _queueListenerTask;
+
@Autowired
public void setCcLocationCache(CcLocationCache cache) {
_ccLocationCache = cache;
@@ -214,7 +226,7 @@ record = new CcLocationReportRecord(envelope, contents, getZoneOffset());
@PostConstruct
public void setup() {
- super.setup();
+ _queueListenerTask.setup();
// make parsing lenient
_mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
@@ -225,7 +237,7 @@ public void setup() {
@PreDestroy
public void destroy() {
- super.destroy();
+ _queueListenerTask.destroy();
}
/**
diff --git a/onebusaway-nyc-report-archive/src/test/java/org/onebusaway/nyc/report_archive/queue/ArchivingInferenceQueueListenerTaskTest.java b/onebusaway-nyc-report-archive/src/test/java/org/onebusaway/nyc/report_archive/queue/ArchivingInferenceQueueListenerTaskTest.java
index cc090b339c..aae6d5272a 100644
--- a/onebusaway-nyc-report-archive/src/test/java/org/onebusaway/nyc/report_archive/queue/ArchivingInferenceQueueListenerTaskTest.java
+++ b/onebusaway-nyc-report-archive/src/test/java/org/onebusaway/nyc/report_archive/queue/ArchivingInferenceQueueListenerTaskTest.java
@@ -71,7 +71,6 @@ public void setUp() throws Exception {
when(inferredResult.getInferredLatitude()).thenReturn(1D);
when(inferredResult.getInferredLongitude()).thenReturn(-1D);
- inferenceQueueListenertask = new ArchivingInferenceQueueListenerTask();
inferenceQueueListenertask.setLocationDao(locationDao);
inferenceQueueListenertask.setValidationService(validationService);
inferenceQueueListenertask.setInferencePersistenceService(persister);
diff --git a/onebusaway-nyc-report-archive/src/test/java/org/onebusaway/nyc/report_archive/queue/ArchivingInputQueueListenerTaskTest.java b/onebusaway-nyc-report-archive/src/test/java/org/onebusaway/nyc/report_archive/queue/ArchivingInputQueueListenerTaskTest.java
index 5f518e4871..60fbdc272a 100644
--- a/onebusaway-nyc-report-archive/src/test/java/org/onebusaway/nyc/report_archive/queue/ArchivingInputQueueListenerTaskTest.java
+++ b/onebusaway-nyc-report-archive/src/test/java/org/onebusaway/nyc/report_archive/queue/ArchivingInputQueueListenerTaskTest.java
@@ -34,7 +34,17 @@ public class ArchivingInputQueueListenerTaskTest {
CcLocationReportDao dao;
@InjectMocks
- ArchivingInputQueueListenerTask t = new ArchivingInputQueueListenerTask();
+ ArchivingInputQueueListenerTask t = new ArchivingInputQueueListenerTask() {
+ @Override
+ public void initializeQueue(String host, String queueName, Integer port) throws InterruptedException {
+
+ }
+
+ @Override
+ public void startDNSCheckThread() {
+
+ }
+ };
@Test
public void testProcessMessage() throws IOException {
diff --git a/onebusaway-nyc-report/src/main/java/org/onebusaway/nyc/report/impl/CloudWatchServiceImpl.java b/onebusaway-nyc-report/src/main/java/org/onebusaway/nyc/report/impl/CloudWatchServiceImpl.java
index 8f5515f665..47e72b3690 100644
--- a/onebusaway-nyc-report/src/main/java/org/onebusaway/nyc/report/impl/CloudWatchServiceImpl.java
+++ b/onebusaway-nyc-report/src/main/java/org/onebusaway/nyc/report/impl/CloudWatchServiceImpl.java
@@ -26,6 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
@@ -42,6 +43,7 @@
public class CloudWatchServiceImpl implements CloudWatchService, ServletContextAware {
@Autowired
+ @Qualifier("configurationService")
ConfigurationService _configurationService;
AmazonCloudWatchClient cloudWatch;
diff --git a/onebusaway-nyc-transit-data-federation-webapp/src/main/resources/data-sources.xml b/onebusaway-nyc-transit-data-federation-webapp/src/main/resources/data-sources.xml
index 587af3d125..bb846c0d1f 100644
--- a/onebusaway-nyc-transit-data-federation-webapp/src/main/resources/data-sources.xml
+++ b/onebusaway-nyc-transit-data-federation-webapp/src/main/resources/data-sources.xml
@@ -77,12 +77,12 @@
-->
-
+
-
+
diff --git a/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/nyc/ApcIntegrationServiceImpl.java b/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/nyc/ApcIntegrationServiceImpl.java
index aa95e361a5..90bd896a23 100644
--- a/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/nyc/ApcIntegrationServiceImpl.java
+++ b/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/nyc/ApcIntegrationServiceImpl.java
@@ -25,6 +25,7 @@
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
import org.onebusaway.gtfs.model.AgencyAndId;
+import org.onebusaway.nyc.queue.IQueueListenerTask;
import org.onebusaway.nyc.transit_data.model.NycVehicleLoadBean;
import org.onebusaway.nyc.transit_data.services.NycTransitDataService;
import org.onebusaway.nyc.transit_data_federation.impl.queue.ApcQueueListenerTask;
@@ -38,6 +39,7 @@
import org.onebusaway.transit_data_federation.services.realtime.BlockLocationService;
import org.onebusaway.transit_data_federation.services.transit_graph.TripEntry;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import javax.annotation.PostConstruct;
@@ -53,11 +55,18 @@
* Support levels of crowding and / or raw passenger count integration
* and inject into OneBusAway TDS.
*/
-public class ApcIntegrationServiceImpl extends ApcQueueListenerTask {
+public abstract class ApcIntegrationServiceImpl extends ApcQueueListenerTask {
// how recent the occupancy record needs to be for usage
public static final int MAX_AGE_MILLIS = 6 * 60 * 1000; // 6 minutes
+ @Autowired
+ protected ConfigurationService _configurationService;
+
+ @Autowired
+ @Qualifier("listener")
+ IQueueListenerTask _queueListenerTask;
+
// in progress read connection timeout in millis
private static final int TIMEOUT_CONNECTION = 5000;
// socket connection timeout in millis
@@ -95,7 +104,9 @@ public String getRawCountUrl() {
@PostConstruct
public void setup() {
- super.setup();
+
+ _queueListenerTask.setup();
+
if (!getRawCountsViaWebService()) {
_log.error("url not configured, Raw Count polling via webservice disabled.");
return;
diff --git a/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/nyc/NycRouteTypeService.java b/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/nyc/NycRouteTypeService.java
index 481c3d4a44..64deb17fb8 100644
--- a/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/nyc/NycRouteTypeService.java
+++ b/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/nyc/NycRouteTypeService.java
@@ -22,6 +22,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@@ -49,6 +50,7 @@ public class NycRouteTypeService {
@Autowired
private ThreadPoolTaskScheduler _taskScheduler;
+ @Qualifier("configurationService")
@Autowired
private ConfigurationService _configurationService;
diff --git a/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/predictions/QueuePredictionIntegrationServiceImpl.java b/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/predictions/QueuePredictionIntegrationServiceImpl.java
index 27b2b0a150..562295f268 100644
--- a/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/predictions/QueuePredictionIntegrationServiceImpl.java
+++ b/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/predictions/QueuePredictionIntegrationServiceImpl.java
@@ -29,6 +29,7 @@
import com.google.common.cache.CacheBuilder;
import org.onebusaway.container.refresh.Refreshable;
import org.onebusaway.gtfs.model.AgencyAndId;
+import org.onebusaway.nyc.queue.IQueueListenerTask;
import org.onebusaway.nyc.transit_data_federation.impl.queue.TimeQueueListenerTask;
import org.onebusaway.nyc.transit_data_federation.services.predictions.PredictionIntegrationService;
import org.onebusaway.nyc.util.configuration.ConfigurationService;
@@ -56,7 +57,7 @@
* @author sheldonabrown
*
*/
-public class QueuePredictionIntegrationServiceImpl extends
+public abstract class QueuePredictionIntegrationServiceImpl extends
TimeQueueListenerTask implements PredictionIntegrationService {
private static final int DEFAULT_CACHE_TIMEOUT = 2 * 60; // seconds
@@ -74,7 +75,6 @@ public class QueuePredictionIntegrationServiceImpl extends
@Autowired
private ConfigurationService _configurationService;
-
private Cache> _cache = null;
private Boolean _checkPredictionAge;
@@ -85,6 +85,11 @@ public class QueuePredictionIntegrationServiceImpl extends
private long predictionRecordAverageLatency = 0;
private Long _serviceTime = null; // leave empty for now, set for tests
+ protected boolean _initialized = false;
+ @Autowired
+ @Qualifier("listener")
+ IQueueListenerTask _queueListenerTask;
+
public void setCheckPredictionAge(Boolean checkAge) {
_checkPredictionAge = checkAge;
}
@@ -247,9 +252,15 @@ public boolean isEnabled() {
}
public void destroy() {
- super.destroy();
+ _queueListenerTask.destroy();
_log.warn("destroy called!");
}
+
+ @Override
+ public void setup() {
+
+ }
+
private boolean enableCheckPredictionLatency() {
if (_checkPredictionLatency == null) {
refreshConfig();
diff --git a/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/ApcQueueListenerTask.java b/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/ApcQueueListenerTask.java
index eaac119669..7f51bb570a 100644
--- a/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/ApcQueueListenerTask.java
+++ b/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/ApcQueueListenerTask.java
@@ -16,17 +16,25 @@
package org.onebusaway.nyc.transit_data_federation.impl.queue;
import org.onebusaway.container.refresh.Refreshable;
-import org.onebusaway.nyc.queue.QueueListenerTask;
+import org.onebusaway.nyc.queue.IQueueListenerTask;
import org.onebusaway.nyc.transit_data.model.NycVehicleLoadBean;
import org.apache.commons.lang.StringUtils;
+import org.onebusaway.nyc.util.configuration.ConfigurationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
/**
* Base class for attaching to APC Queue.
*/
-public abstract class ApcQueueListenerTask extends QueueListenerTask {
+public class ApcQueueListenerTask implements IQueueListenerTask {
+
+ protected boolean _initialized = false;
+
+ @Autowired
+ protected ConfigurationService _configurationService;
public enum Status {
ENABLED, // read from queue
@@ -34,6 +42,10 @@ public enum Status {
DISABLED; // totally disabled
};
+ @Autowired
+ @Qualifier("listener")
+ IQueueListenerTask _queueListenerTask;
+
private Status status = Status.ENABLED;
public void setStatus(Status status) { this.status = status; }
@@ -42,7 +54,7 @@ public enum Status {
.getLogger(ApcQueueListenerTask.class);
- protected abstract void processResult(NycVehicleLoadBean message, String contents);
+ protected void processResult(NycVehicleLoadBean message, String contents){};
public Boolean useApcIfAvailable() {
@@ -51,6 +63,9 @@ public Boolean useApcIfAvailable() {
}
+ @Override
+ public void initializeQueue(String host, String queueName, Integer port) throws InterruptedException {}
+
@Override
public boolean processMessage(String contents, byte[] buff) throws Exception {
if(!useApcIfAvailable()){
@@ -101,7 +116,7 @@ public void startListenerThread() {
_log.info("apc input queue listening on " + host + ":" + port + ", queue=" + queueName);
try {
- initializeQueue(host, queueName, port);
+ _queueListenerTask.initializeQueue(host, queueName, port);
} catch (InterruptedException ie) {
return;
}
@@ -136,6 +151,11 @@ public void startDNSCheckThread() {
return;
}
_log.info("starting DNS check for APC queue " + getQueueName());
- super.startDNSCheckThread();
+ _queueListenerTask.startDNSCheckThread();;
}
+
+ @Override
+ public void destroy() {}
+ @Override
+ public void setup() {}
}
diff --git a/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/InferenceInputQueueListenerTask.java b/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/InferenceInputQueueListenerTask.java
index 4cdc16f6d1..b9b98a280c 100644
--- a/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/InferenceInputQueueListenerTask.java
+++ b/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/InferenceInputQueueListenerTask.java
@@ -18,6 +18,7 @@
import org.onebusaway.container.refresh.Refreshable;
import org.onebusaway.nyc.transit_data.model.NycQueuedInferredLocationBean;
import org.onebusaway.nyc.transit_data_federation.services.predictions.PredictionIntegrationService;
+import org.onebusaway.nyc.util.configuration.ConfigurationService;
import org.onebusaway.nyc.util.impl.tdm.ConfigurationServiceImpl;
import org.onebusaway.realtime.api.EVehiclePhase;
import org.onebusaway.realtime.api.VehicleLocationListener;
@@ -25,6 +26,7 @@
import org.onebusaway.util.AgencyAndIdLibrary;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
/**
* This component listens to the inference output queue and injects records into
@@ -48,6 +50,10 @@ public class InferenceInputQueueListenerTask extends InferenceQueueListenerTask
private int ageLimit = 300;
private boolean refreshCheck;
+ @Autowired
+ @Qualifier("configurationService")
+ ConfigurationService _configurationService;
+
public InferenceInputQueueListenerTask() {
setConfigurationService(new ConfigurationServiceImpl());
refreshCache();
diff --git a/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/InferenceQueueListenerTask.java b/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/InferenceQueueListenerTask.java
index 8d359703ee..8e650f8484 100644
--- a/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/InferenceQueueListenerTask.java
+++ b/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/InferenceQueueListenerTask.java
@@ -17,24 +17,43 @@
import com.fasterxml.jackson.databind.ObjectReader;
import org.onebusaway.container.refresh.Refreshable;
-import org.onebusaway.nyc.queue.QueueListenerTask;
+import org.onebusaway.nyc.queue.IQueueListenerTask;
import org.onebusaway.nyc.transit_data.model.NycQueuedInferredLocationBean;
import com.fasterxml.jackson.databind.AnnotationIntrospector;
import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import org.onebusaway.nyc.transit_data_federation.impl.queue.interfaces.InferenceQueueListenerInterface;
+import org.onebusaway.nyc.util.configuration.ConfigurationService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-public abstract class InferenceQueueListenerTask extends QueueListenerTask {
+public class InferenceQueueListenerTask implements InferenceQueueListenerInterface, IQueueListenerTask {
- protected abstract void processResult(NycQueuedInferredLocationBean inferredResult, String contents);
+ protected void processResult(NycQueuedInferredLocationBean inferredResult, String contents){};
protected ObjectReader _reader;
+ protected boolean _initialized = false;
+
+ @Autowired
+ @Qualifier("configurationService")
+ ConfigurationService _configurationService;
+
+ @Autowired
+ @Qualifier("listener")
+ IQueueListenerTask _queueListenerTask;
+
public InferenceQueueListenerTask() {
_reader = _mapper.reader(NycQueuedInferredLocationBean.class);
}
+ @Override
+ public void initializeQueue(String host, String queueName, Integer port) throws InterruptedException {
+
+ }
+
@Override
public boolean processMessage(String address, byte[] buff) {
String contents = null;
@@ -58,23 +77,34 @@ public boolean processMessage(String address, byte[] buff) {
@Override
public String getQueueHost() {
- return _configurationService.getConfigurationValueAsString("tds.inputQueueHost", null);
+ return _configurationService.getConfigurationValueAsString("tds.inputQueueHost", "localhost");
}
@Override
public String getQueueName() {
- return _configurationService.getConfigurationValueAsString("tds.inputQueueName", null);
+ return _configurationService.getConfigurationValueAsString("tds.inputQueueName", "bhs_queue");
+ }
+
+ @Override
+ public String getQueueDisplayName() {
+ return null;
}
@Override
public Integer getQueuePort() {
- return _configurationService.getConfigurationValueAsInteger("tds.inputQueuePort", 5564);
+ return _configurationService.getConfigurationValueAsInteger("tds.inputQueuePort", 9092);
+ }
+
+ @Override
+ public void startDNSCheckThread() {
+
}
@SuppressWarnings("deprecation")
@PostConstruct
public void setup() {
- super.setup();
+
+ _queueListenerTask.setup();
// use JAXB annotations so that we pick up anything from the
// auto-generated XML classes
@@ -86,7 +116,8 @@ public void setup() {
@PreDestroy
public void destroy() {
- super.destroy();
+ _queueListenerTask.destroy();
+
}
@Refreshable(dependsOn = { "tds.inputQueueHost", "tds.inputQueuePort", "tds.inputQueueName" })
@@ -108,11 +139,10 @@ public void startListenerThread() {
_log.info("queue listening on " + host + ":" + port + ", queue=" + queueName);
try {
- initializeQueue(host, queueName, port);
+ _queueListenerTask.initializeQueue(host, queueName, port);;
} catch (InterruptedException ie) {
return;
}
-
_initialized = true;
}
diff --git a/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/TimeQueueListenerTask.java b/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/TimeQueueListenerTask.java
index dd2756bfc9..aa4496cc39 100644
--- a/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/TimeQueueListenerTask.java
+++ b/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/TimeQueueListenerTask.java
@@ -17,11 +17,22 @@
package org.onebusaway.nyc.transit_data_federation.impl.queue;
import org.onebusaway.container.refresh.Refreshable;
-import org.onebusaway.nyc.queue.QueueListenerTask;
+import org.onebusaway.nyc.queue.IQueueListenerTask;
import com.google.transit.realtime.GtfsRealtime.FeedMessage;
+import org.onebusaway.nyc.util.configuration.ConfigurationService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
-public abstract class TimeQueueListenerTask extends QueueListenerTask {
+public class TimeQueueListenerTask implements IQueueListenerTask {
+
+ @Autowired
+ protected ConfigurationService _configurationService;
+ protected boolean _initialized = false;
+
+ @Autowired
+ @Qualifier("listener")
+ IQueueListenerTask _queueListenerTask;
public enum Status {
ENABLED, // read from queue
@@ -29,7 +40,10 @@ public enum Status {
DISABLED; // totally disabled
};
- protected abstract void processResult(FeedMessage message);
+ protected void processResult(FeedMessage message){};
+
+ @Override
+ public void initializeQueue(String host, String queueName, Integer port) throws InterruptedException {}
@Override
public boolean processMessage(String address, byte[] buff) {
@@ -120,7 +134,13 @@ public void startDNSCheckThread() {
if (!useTimePredictionsIfAvailable()) {
_log.error("time predictions disabled -- exiting");
return;
- }
- super.startDNSCheckThread();
+ }
+ _queueListenerTask.startDNSCheckThread();
}
+
+ @Override
+ public void destroy() {}
+
+ @Override
+ public void setup() {}
}
diff --git a/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/interfaces/InferenceQueueListenerInterface.java b/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/interfaces/InferenceQueueListenerInterface.java
new file mode 100644
index 0000000000..b52fbff797
--- /dev/null
+++ b/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/interfaces/InferenceQueueListenerInterface.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright (C) 2011 Metropolitan Transportation Authority
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onebusaway.nyc.transit_data_federation.impl.queue.interfaces;
+
+import org.onebusaway.nyc.util.configuration.ConfigurationService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+
+public interface InferenceQueueListenerInterface {
+
+ @Autowired
+ ThreadPoolTaskScheduler _taskScheduler = new ThreadPoolTaskScheduler();
+
+ boolean processMessage(String address, byte[] buff);
+ String getQueueHost();
+
+ public String getQueueName();
+
+ Integer getQueuePort();
+
+ void setup();
+ void destroy();
+
+ void startListenerThread();
+}
+
diff --git a/onebusaway-nyc-transit-data-federation/src/main/resources/org/onebusaway/nyc/transit_data_federation/application-context.xml b/onebusaway-nyc-transit-data-federation/src/main/resources/org/onebusaway/nyc/transit_data_federation/application-context.xml
index c6debb9f66..659fe55dea 100644
--- a/onebusaway-nyc-transit-data-federation/src/main/resources/org/onebusaway/nyc/transit_data_federation/application-context.xml
+++ b/onebusaway-nyc-transit-data-federation/src/main/resources/org/onebusaway/nyc/transit_data_federation/application-context.xml
@@ -32,6 +32,8 @@
+
+
diff --git a/onebusaway-nyc-transit-data-federation/src/test/java/org/onebusaway/nyc/transit_data_federation/impl/nyc/ApcIntegrationServiceImplTest.java b/onebusaway-nyc-transit-data-federation/src/test/java/org/onebusaway/nyc/transit_data_federation/impl/nyc/ApcIntegrationServiceImplTest.java
index e1b89420b2..406596c189 100644
--- a/onebusaway-nyc-transit-data-federation/src/test/java/org/onebusaway/nyc/transit_data_federation/impl/nyc/ApcIntegrationServiceImplTest.java
+++ b/onebusaway-nyc-transit-data-federation/src/test/java/org/onebusaway/nyc/transit_data_federation/impl/nyc/ApcIntegrationServiceImplTest.java
@@ -44,7 +44,6 @@ public void testGetFeed() throws Exception {
//feed is object map key:vehicleId, value: ApcData
String url = "http://example.com/feed"; // we mock out the results, this isn't used
Map map = new HashMap();
- ApcIntegrationServiceImpl impl = new ApcIntegrationServiceImpl();
ApcLoadLevelCalculator calculator = new ApcLoadLevelCalculator();
final HttpClient httpClient = Mockito.mock(HttpClient.class);
HttpResponse response = Mockito.mock(HttpResponse.class);
diff --git a/onebusaway-nyc-transit-data-federation/src/test/java/org/onebusaway/nyc/transit_data_federation/impl/predictions/QueuePredictionIntegrationServiceImplTest.java b/onebusaway-nyc-transit-data-federation/src/test/java/org/onebusaway/nyc/transit_data_federation/impl/predictions/QueuePredictionIntegrationServiceImplTest.java
index ba7c158cd5..aa2e057272 100644
--- a/onebusaway-nyc-transit-data-federation/src/test/java/org/onebusaway/nyc/transit_data_federation/impl/predictions/QueuePredictionIntegrationServiceImplTest.java
+++ b/onebusaway-nyc-transit-data-federation/src/test/java/org/onebusaway/nyc/transit_data_federation/impl/predictions/QueuePredictionIntegrationServiceImplTest.java
@@ -56,7 +56,6 @@ public void before() {
when(config.getConfigurationValueAsString(anyString(), anyString()))
.thenAnswer(AdditionalAnswers.returnsSecondArg());
- service = new QueuePredictionIntegrationServiceImpl();
service.setConfigurationService(config);
service.setTransitDataService(tds);
service.setCache(CacheBuilder.newBuilder().>build());
diff --git a/onebusaway-nyc-transit-data-federation/src/test/java/org/onebusaway/nyc/transit_data_federation/impl/queue/ApcQueueListenerTaskTest.java b/onebusaway-nyc-transit-data-federation/src/test/java/org/onebusaway/nyc/transit_data_federation/impl/queue/ApcQueueListenerTaskTest.java
index 14682dfafa..6834b40f32 100644
--- a/onebusaway-nyc-transit-data-federation/src/test/java/org/onebusaway/nyc/transit_data_federation/impl/queue/ApcQueueListenerTaskTest.java
+++ b/onebusaway-nyc-transit-data-federation/src/test/java/org/onebusaway/nyc/transit_data_federation/impl/queue/ApcQueueListenerTaskTest.java
@@ -52,11 +52,26 @@ public void testProcessResult() throws Exception {
private ApcQueueListenerTask getTask() {
return new ApcQueueListenerTask() {
+ @Override
+ public void initializeQueue(String host, String queueName, Integer port) throws InterruptedException {
+
+ }
+
@Override
public String getQueueName() {
return "apc";
}
+ @Override
+ public void destroy() {
+
+ }
+
+ @Override
+ public void setup() {
+
+ }
+
@Override
public Boolean useApcIfAvailable() {
return Boolean.TRUE;
diff --git a/onebusaway-nyc-transit-data-federation/src/test/java/org/onebusaway/nyc/transit_data_federation/impl/queue/InferenceInputQueueListenerTaskTest.java b/onebusaway-nyc-transit-data-federation/src/test/java/org/onebusaway/nyc/transit_data_federation/impl/queue/InferenceInputQueueListenerTaskTest.java
index 98d8ced290..70da6ae5ca 100644
--- a/onebusaway-nyc-transit-data-federation/src/test/java/org/onebusaway/nyc/transit_data_federation/impl/queue/InferenceInputQueueListenerTaskTest.java
+++ b/onebusaway-nyc-transit-data-federation/src/test/java/org/onebusaway/nyc/transit_data_federation/impl/queue/InferenceInputQueueListenerTaskTest.java
@@ -97,7 +97,6 @@ public NycQueuedInferredLocationBean initializeSpookingBean(){
@Before
public void setupApiLibrary() throws Exception {
- task = new InferenceInputQueueListenerTask();
task.setConfigurationService(service);
listener = mock(VehicleLocationListener.class);
pService = mock(QueuePredictionIntegrationServiceImpl.class);
diff --git a/onebusaway-nyc-util/src/main/java/org/onebusaway/nyc/util/impl/S3Utility.java b/onebusaway-nyc-util/src/main/java/org/onebusaway/nyc/util/impl/S3Utility.java
index f4ff636ef1..aef5591d67 100644
--- a/onebusaway-nyc-util/src/main/java/org/onebusaway/nyc/util/impl/S3Utility.java
+++ b/onebusaway-nyc-util/src/main/java/org/onebusaway/nyc/util/impl/S3Utility.java
@@ -25,8 +25,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.context.ServletContextAware;
import javax.annotation.PostConstruct;
+import javax.servlet.ServletContext;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -34,7 +36,7 @@
// S3Utility class, mostly coppied over from FileServiceImpl
-public class S3Utility {
+public class S3Utility implements ServletContextAware {
private static Logger _log = LoggerFactory.getLogger(S3Utility.class);
private AWSCredentials _credentials;
@@ -58,6 +60,11 @@ public void setBucketName(String bucketName) {
this._bucketName = bucketName;
}
+ @Autowired
+ public String getBucketName() {
+ return this._bucketName;
+ }
+
public S3Utility(String username, String password){
_password = password;
_username = username;
@@ -85,6 +92,28 @@ public void setup() {
}
+ @Override
+ public void setServletContext(ServletContext servletContext) {
+ if (servletContext != null) {
+ String user = servletContext.getInitParameter("s3.user");
+ _log.info("servlet context provided s3.user=" + user);
+ if (user != null) {
+ setS3User(user);
+ }
+ String password = servletContext.getInitParameter("s3.password");
+ if (password != null) {
+ setS3Password(password);
+ }
+ String bucketName = servletContext.getInitParameter("s3.bundle.bucketName");
+ if (bucketName != null) {
+ _log.info("servlet context provided bucketName=" + bucketName);
+ setBucketName(bucketName);
+ } else {
+ _log.info("servlet context missing bucketName, using " + getBucketName());
+ }
+ }
+ }
+
/**
* delete an object from s3
diff --git a/onebusaway-nyc-vehicle-tracking-webapp/src/main/resources/data-sources.xml b/onebusaway-nyc-vehicle-tracking-webapp/src/main/resources/data-sources.xml
index 9fc90d4b0c..df385b1a88 100644
--- a/onebusaway-nyc-vehicle-tracking-webapp/src/main/resources/data-sources.xml
+++ b/onebusaway-nyc-vehicle-tracking-webapp/src/main/resources/data-sources.xml
@@ -92,7 +92,7 @@
-
+
diff --git a/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/monitoring/CloudWatchServiceImpl.java b/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/monitoring/CloudWatchServiceImpl.java
index 4fd57bf4f1..665591086e 100644
--- a/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/monitoring/CloudWatchServiceImpl.java
+++ b/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/monitoring/CloudWatchServiceImpl.java
@@ -27,6 +27,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
@@ -43,6 +44,7 @@
public class CloudWatchServiceImpl implements CloudWatchService, ServletContextAware {
@Autowired
+ @Qualifier("configurationService")
ConfigurationService _configurationService;
AmazonCloudWatchClient cloudWatch;
diff --git a/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/DummyPartitionedInputQueueListenerTask.java b/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/DummyPartitionedInputQueueListenerTask.java
index f6bebdbef9..d9901f7d24 100644
--- a/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/DummyPartitionedInputQueueListenerTask.java
+++ b/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/DummyPartitionedInputQueueListenerTask.java
@@ -16,6 +16,7 @@
package org.onebusaway.nyc.vehicle_tracking.impl.queue;
+import org.onebusaway.nyc.queue.DNSResolver;
import org.onebusaway.nyc.vehicle_tracking.services.queue.PartitionedInputQueueListener;
/**
@@ -31,17 +32,47 @@ public String getDepotPartitionKey() {
return null;
}
+ protected DNSResolver _outputQueueResolver;
+
+ protected boolean _initialized = false;
+
@Override
public void setDepotPartitionKey(String depotPartitionKey) {
}
- @Override
+ @Override
+ public void initializeQueue(String host, String queueName, Integer port) throws InterruptedException {
+
+ }
+
+ @Override
public boolean processMessage(String address, byte[] buff) {
return true;
}
public void startListenerThread() {
+ if (_initialized == true) {
+ _log.warn("Configuration service tried to reconfigure inference output queue service; this service is not reconfigurable once started.");
+ return;
+ }
+
+ final String host = "localhost";
+ final String queueName = "bhs_queue";
+ final Integer port = 9092;
+ //"localhost", "demo_java", 9092
+
+ if (host == null || queueName == null || port == null) {
+ _log.info("Inference output queue is not attached; output hostname was not available via configuration service.");
+ return;
+ }
+
+ try {
+ initializeQueue(host, queueName, port);
+ } catch (Exception any) {
+ System.out.println("exception = " + any + "");
+ _outputQueueResolver.reset();
+ }
}
public String getQueueHost() {
@@ -60,5 +91,10 @@ public Integer getQueuePort() {
return -1;
}
+ @Override
+ public void startDNSCheckThread() {
+
+ }
+
}
diff --git a/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/InputQueueListenerTask.java b/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/InputQueueListenerTask.java
index be2bdfe970..82bb319252 100644
--- a/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/InputQueueListenerTask.java
+++ b/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/InputQueueListenerTask.java
@@ -16,10 +16,13 @@
package org.onebusaway.nyc.vehicle_tracking.impl.queue;
import org.onebusaway.container.refresh.Refreshable;
-import org.onebusaway.nyc.queue.QueueListenerTask;
+import org.onebusaway.nyc.queue.IQueueListenerTask;
import org.onebusaway.nyc.queue.model.RealtimeEnvelope;
+import org.onebusaway.nyc.util.configuration.ConfigurationService;
import org.onebusaway.nyc.vehicle_tracking.services.queue.InputService;
import org.onebusaway.nyc.vehicle_tracking.services.queue.InputTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import com.fasterxml.jackson.databind.AnnotationIntrospector;
@@ -28,10 +31,22 @@
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-public abstract class InputQueueListenerTask extends QueueListenerTask implements InputTask{
+public class InputQueueListenerTask implements InputTask, IQueueListenerTask {
InputService _inputService;
+ protected static Logger _log = LoggerFactory
+ .getLogger(InputQueueListenerTask.class);
+
+ protected boolean _initialized = false;
+
+ @Autowired
+ private ConfigurationService _configurationService;
+
+ @Autowired
+ @Qualifier("listener")
+ IQueueListenerTask _queueListenerTask;
+
@SuppressWarnings("deprecation")
public InputQueueListenerTask() {
/*
@@ -52,7 +67,17 @@ public RealtimeEnvelope deserializeMessage(String contents){
return _inputService.deserializeMessage(contents);
}
- @Override
+ @Override
+ public void initializeQueue(String host, String queueName, Integer port) throws InterruptedException {
+
+ }
+
+ @Override
+ public boolean processMessage(String address, byte[] buff) throws Exception {
+ return false;
+ }
+
+ @Override
@Refreshable(dependsOn = {
"inference-engine.inputQueueHost", "inference-engine.inputQueuePort",
"inference-engine.inputQueueName"})
@@ -62,6 +87,7 @@ public void startListenerThread() {
return;
}
+
final String host = getQueueHost();
final String queueName = getQueueName();
final Integer port = getQueuePort();
@@ -74,7 +100,7 @@ public void startListenerThread() {
_log.info("realtime archive listening on " + host + ":" + port + ", queue="
+ queueName);
try {
- initializeQueue(host, queueName, port);
+ _queueListenerTask.initializeQueue(host, queueName, port);
} catch (final InterruptedException ie) {
return;
}
@@ -96,23 +122,34 @@ public String getQueueName() {
"inference-engine.inputQueueName", null);
}
- @Override
+ @Override
+ public String getQueueDisplayName() {
+ return null;
+ }
+
+ @Override
public Integer getQueuePort() {
return _configurationService.getConfigurationValueAsInteger(
"inference-engine.inputQueuePort", 5563);
}
- @Override
+ @Override
+ public void startDNSCheckThread() {
+
+ }
+
+ @Override
@PostConstruct
public void setup() {
-
- super.setup();
+
+ _queueListenerTask.setup();
+
}
@Override
@PreDestroy
public void destroy() {
- super.destroy();
+ _queueListenerTask.destroy();
}
}
diff --git a/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/OutputQueueSenderServiceImpl.java b/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/OutputQueueSenderServiceImpl.java
index a6774be718..a06fe83786 100644
--- a/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/OutputQueueSenderServiceImpl.java
+++ b/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/OutputQueueSenderServiceImpl.java
@@ -15,6 +15,10 @@
*/
package org.onebusaway.nyc.vehicle_tracking.impl.queue;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.onebusaway.container.refresh.Refreshable;
import org.onebusaway.nyc.queue.DNSResolver;
import org.onebusaway.nyc.transit_data.model.NycQueuedInferredLocationBean;
@@ -29,11 +33,13 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.context.ServletContextAware;
-import org.zeromq.ZMQ;
import java.io.IOException;
import java.io.StringWriter;
+import java.time.Duration;
+import java.util.Arrays;
import java.util.Date;
+import java.util.Properties;
import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
@@ -81,10 +87,6 @@ public class OutputQueueSenderServiceImpl implements OutputQueueSenderService,
protected DNSResolver _primaryResolver = null;
- protected ZMQ.Context _context = null;
-
- protected ZMQ.Socket _socket = null;
-
protected int _countInterval = 10000;
@Autowired
@@ -93,6 +95,12 @@ public class OutputQueueSenderServiceImpl implements OutputQueueSenderService,
@Autowired
private ThreadPoolTaskScheduler _taskScheduler;
+ protected Properties properties = new Properties();
+
+ protected KafkaProducer producer = null;
+
+ protected ProducerRecord producerRecord = null;
+
@Override
public void setServletContext(ServletContext servletContext) {
// check for primaryHost name
@@ -116,13 +124,13 @@ private class SendThread implements Runnable {
Date markTimestamp = new Date();
- private ZMQ.Socket _zmqSocket = null;
+ private KafkaProducer _producer = null;
- private byte[] _topicName = null;
+ private String _topicName = null;
- public SendThread(ZMQ.Socket socket, String topicName) {
- _zmqSocket = socket;
- _topicName = topicName.getBytes();
+ public SendThread(KafkaProducer producer, String topicName) {
+ _producer = producer;
+ _topicName = topicName;
}
@Override
@@ -136,18 +144,11 @@ public void run() {
return;
}
- if (r != null) {
- if (_isPrimaryInferenceInstance) {
- _zmqSocket.send(_topicName, ZMQ.SNDMORE);
- _zmqSocket.send(r.getBytes(), 0);
- }
- }
-
final String h = _heartbeatBuffer.poll();
+ producerRecord = new ProducerRecord<>( _topicName, h);
if (h != null) {
if (_isPrimaryInferenceInstance) {
- _zmqSocket.send(HEARTBEAT_TOPIC.getBytes(), ZMQ.SNDMORE);
- _zmqSocket.send(h.getBytes(), 0);
+ producer.send(producerRecord);
_log.debug("heartbeat=" + h);
}
}
@@ -238,6 +239,7 @@ public void run() {
_log.warn("Primary inference status changed to " + primaryValue);
_isPrimaryInferenceInstance = primaryValue;
}
+
} catch (final Exception e) {
_log.error(e.toString());
}
@@ -327,27 +329,36 @@ protected void reinitializeQueue() {
protected synchronized void initializeQueue(String host, String queueName,
Integer port) throws InterruptedException {
- final String bind = "tcp://" + host + ":" + port;
- _log.warn("binding to " + bind);
- if (_context == null) {
- _context = ZMQ.context(1);
+
+ String bind = host + ":" + port;
+ _log.warn("binding to " + bind + " with topic=" + queueName);
+
+ setProperties(bind, queueName);
+
+ producer = new KafkaProducer<>(properties);
+
+ if (properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).isEmpty()) {
+ setProperties(bind, queueName);
+ producerRecord = new ProducerRecord<>(queueName, null);
}
- if (_socket != null) {
+
+ if (!producerRecord.toString().isBlank()) {
_executorService.shutdownNow();
_heartbeatService.shutdownNow();
Thread.sleep(1 * 1000);
- _log.warn("_executorService.isTerminated="
- + _executorService.isTerminated());
- _socket.close();
+ _log.debug("_executorService.isTerminated="
+ + _executorService.isTerminated());
+ producer.close();
_executorService = Executors.newFixedThreadPool(1);
- _heartbeatService = Executors.newFixedThreadPool(1);
+ _heartbeatService.execute(new HeartbeatThread(HEARTBEAT_INTERVAL));
}
- _log.info("binding to " + bind);
- _socket = _context.socket(ZMQ.PUB);
- _socket.connect(bind);
- _executorService.execute(new SendThread(_socket, queueName));
- _heartbeatService.execute(new HeartbeatThread(HEARTBEAT_INTERVAL));
+ producer.send(producerRecord);
+
+ _executorService.execute(new SendThread(producer, bind));
+
+ producer.flush();
+ producer.close();
_log.info("Inference output queue is sending to " + bind);
_initialized = true;
@@ -355,17 +366,17 @@ protected synchronized void initializeQueue(String host, String queueName,
public String getQueueHost() {
return _configurationService.getConfigurationValueAsString(
- "inference-engine.outputQueueHost", null);
+ "inference-engine.outputQueueHost", "localhost");
}
public String getQueueName() {
return _configurationService.getConfigurationValueAsString(
- "inference-engine.outputQueueName", null);
+ "inference-engine.outputQueueName", "bhs_queue");
}
public Integer getQueuePort() {
return _configurationService.getConfigurationValueAsInteger(
- "inference-engine.outputQueuePort", 5566);
+ "inference-engine.outputQueuePort", 9092);
}
@Override
@@ -388,4 +399,14 @@ public String getPrimaryHostname() {
return _primaryHostname;
}
+ private void setProperties(String bind, String queueName){
+
+ properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bind);
+ properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, queueName);
+ properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ }
+
}
diff --git a/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/PartitionedInputQueueListenerTask.java b/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/PartitionedInputQueueListenerTask.java
index b7bdc2e27c..436df2ef12 100644
--- a/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/PartitionedInputQueueListenerTask.java
+++ b/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/PartitionedInputQueueListenerTask.java
@@ -16,20 +16,12 @@
package org.onebusaway.nyc.vehicle_tracking.impl.queue;
-import org.onebusaway.gtfs.model.AgencyAndId;
-import org.onebusaway.nyc.queue.model.RealtimeEnvelope;
-import org.onebusaway.nyc.transit_data_federation.services.tdm.VehicleAssignmentService;
-import org.onebusaway.nyc.vehicle_tracking.services.inference.VehicleLocationInferenceService;
+import org.onebusaway.nyc.queue.IQueueListenerTask;
import org.onebusaway.nyc.vehicle_tracking.services.queue.InputService;
import org.onebusaway.nyc.vehicle_tracking.services.queue.PartitionedInputQueueListener;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.context.ServletContextAware;
-
-import tcip_final_3_0_5_1.CPTVehicleIden;
-import tcip_final_3_0_5_1.CcLocationReport;
-
-import java.util.ArrayList;
-
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.servlet.ServletContext;
@@ -41,11 +33,17 @@
* @author jmaki
*
*/
-public class PartitionedInputQueueListenerTask extends InputQueueListenerTask
- implements PartitionedInputQueueListener, ServletContextAware {
+public class PartitionedInputQueueListenerTask
+ implements PartitionedInputQueueListener, ServletContextAware, IQueueListenerTask {
private String _depotPartitionKey;
-
+
+ InputService _inputService;
+
+ @Autowired
+ @Qualifier("listener")
+ IQueueListenerTask _queueListenerTask;
+
@Override
public void setServletContext(ServletContext servletContext) {
// check for depot partition keys in the servlet context
@@ -65,29 +63,50 @@ public String getDepotPartitionKey() {
return _depotPartitionKey;
}
+ @Override
+ public void initializeQueue(String host, String queueName, Integer port) throws InterruptedException {
+
+ }
+
@Override
public boolean processMessage(String address, byte[] buff) throws Exception{
return _inputService.processMessage(address, buff);
}
-
+
+ @Override
+ public void startListenerThread() {}
+
+ @Override
+ public String getQueueHost() {return null;}
+
+ @Override
+ public String getQueueName() {return null;}
+
@Override
public String getQueueDisplayName() {
return "PartitionedInputQueueListenerTask";
}
+ @Override
+ public Integer getQueuePort() {return null;}
+
+ @Override
+ public void startDNSCheckThread() {}
+
@Override
@PostConstruct
public void setup() {
_inputService.setDepotPartitionKey(_depotPartitionKey);
- super.setup();
+ _queueListenerTask.setup();
}
@Override
@PreDestroy
public void destroy() {
- super.destroy();
+ _inputService.setDepotPartitionKey(_depotPartitionKey);
+ _queueListenerTask.destroy();
}
-
+
}
diff --git a/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/SingleVehicleInputQueueListenerTask.java b/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/SingleVehicleInputQueueListenerTask.java
index 43f1cf5859..002d8582ba 100644
--- a/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/SingleVehicleInputQueueListenerTask.java
+++ b/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/SingleVehicleInputQueueListenerTask.java
@@ -17,6 +17,7 @@
package org.onebusaway.nyc.vehicle_tracking.impl.queue;
import org.onebusaway.gtfs.model.AgencyAndId;
+import org.onebusaway.nyc.queue.IQueueListenerTask;
import org.onebusaway.nyc.queue.model.RealtimeEnvelope;
import org.onebusaway.nyc.vehicle_tracking.services.inference.VehicleLocationInferenceService;
import org.onebusaway.nyc.vehicle_tracking.services.queue.InputService;
@@ -36,12 +37,12 @@
*
* @author jmaki
*/
-public class SingleVehicleInputQueueListenerTask extends InputQueueListenerTask
- implements PartitionedInputQueueListener {
+public abstract class SingleVehicleInputQueueListenerTask
+ implements PartitionedInputQueueListener, IQueueListenerTask {
+ InputService _inputService;
@Autowired
@Qualifier("singleVehicleInputService")
- @Override
public void setInputService(InputService inputService){
_inputService = inputService;
}
@@ -56,18 +57,6 @@ public boolean processMessage(String address, byte[] buff) throws Exception {
return _inputService.processMessage(address, buff);
}
- @Override
- @PostConstruct
- public void setup() {
- super.setup();
- }
-
- @Override
- @PreDestroy
- public void destroy() {
- super.destroy();
- }
-
@Override
public String getDepotPartitionKey() {
return null;
diff --git a/onebusaway-nyc-vehicle-tracking/src/main/resources/org/onebusaway/nyc/vehicle_tracking/application-context.xml b/onebusaway-nyc-vehicle-tracking/src/main/resources/org/onebusaway/nyc/vehicle_tracking/application-context.xml
index 3ec54aea0f..a5449a8011 100644
--- a/onebusaway-nyc-vehicle-tracking/src/main/resources/org/onebusaway/nyc/vehicle_tracking/application-context.xml
+++ b/onebusaway-nyc-vehicle-tracking/src/main/resources/org/onebusaway/nyc/vehicle_tracking/application-context.xml
@@ -55,5 +55,7 @@
+
+
diff --git a/onebusaway-nyc-vehicle-tracking/src/test/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/OutputQueueSenderServiceTest.java b/onebusaway-nyc-vehicle-tracking/src/test/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/OutputQueueSenderServiceTest.java
new file mode 100644
index 0000000000..f0a7a9b70a
--- /dev/null
+++ b/onebusaway-nyc-vehicle-tracking/src/test/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/OutputQueueSenderServiceTest.java
@@ -0,0 +1,78 @@
+/**
+ * Copyright (C) 2011 Metropolitan Transportation Authority
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onebusaway.nyc.vehicle_tracking.impl.queue;
+
+import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.logging.log4j.core.config.DefaultConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.onebusaway.gtfs.model.AgencyAndId;
+import org.onebusaway.nyc.transit_data_federation.services.tdm.VehicleAssignmentService;
+import org.onebusaway.nyc.vehicle_tracking.services.inference.VehicleLocationInferenceService;
+import org.onebusaway.util.AgencyAndIdLibrary;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class OutputQueueSenderServiceTest {
+
+ @Mock
+ private VehicleAssignmentService vehicleAssignmentService;
+
+ @SuppressWarnings("unused")
+ @Mock
+ private VehicleLocationInferenceService vehicleLocationInferenceService;
+
+ @InjectMocks
+ private DummyPartitionedInputQueueListenerTask service;
+
+ @InjectMocks
+ private FileInputServiceImpl inputService;
+
+ @Before
+ public void setup() throws Exception {
+ Configurator.initialize(new DefaultConfiguration());
+
+ final ArrayList list = new ArrayList();
+ list.add(AgencyAndIdLibrary.convertFromString("MTA NYCT_8140"));
+
+ when(vehicleAssignmentService.getAssignedVehicleIdsForDepot("ZZ")).thenReturn(
+ list);
+ when(vehicleAssignmentService.getAssignedVehicleIdsForDepot("JG")).thenReturn(
+ list);
+
+ inputService.setup();
+ inputService.setDepotPartitionKey("JG");
+ service.setInputService(inputService);
+ }
+
+ @Test
+ public void testDeserializeMessage() throws Exception {
+ final String message = "{\"RealtimeEnvelope\": {\"UUID\":\"f43530c0-ec7a-11e5-a081-22000b028187\",\"timeReceived\": 1458244853196,\"CcLocationReport\": {\"request-id\":250,\"vehicle\":{\"vehicle-id\":8140,\"agency-id\":2008,\"agencydesignator\":\"MTA NYCT\"},\"status-info\":0,\"time-reported\":\"2016-03-17T20:00:50.860-00:00\",\"latitude\":40530910,\"longitude\":-74236203,\"direction\":{\"deg\":71.50},\"speed\":36,\"manufacturer-data\":\"VFTP155-600-826\",\"operatorID\":{\"operator-id\":0,\"designator\":\"0\"},\"runID\":{\"run-id\":0,\"designator\":\"000\"},\"destSignCode\":12,\"routeID\":{\"route-id\":0,\"route-designator\":\"0\"},\"localCcLocationReport\":{\"NMEA\":{\"sentence\":[\"$GPGGA,175251.000,4031.85833,N,07414.16762,W,1,08,01.2,+00031.0,M,,M,,*46\",\"$GPRMC,175251.00,A,4031.858330,N,07414.167620,W,006.955,033.61\"]}}}}}";
+ service.startListenerThread();
+ assertNotNull(inputService.deserializeMessage(message));
+ }
+}
diff --git a/onebusaway-nyc-vehicle-tracking/src/test/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/PartitionedInputQueueListenerTaskTest.java b/onebusaway-nyc-vehicle-tracking/src/test/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/PartitionedInputQueueListenerTaskTest.java
index 6d52c7e49f..d4ab623a3d 100644
--- a/onebusaway-nyc-vehicle-tracking/src/test/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/PartitionedInputQueueListenerTaskTest.java
+++ b/onebusaway-nyc-vehicle-tracking/src/test/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/PartitionedInputQueueListenerTaskTest.java
@@ -67,7 +67,7 @@ public void setup() throws Exception {
inputService.setup();
inputService.setDepotPartitionKey("JG");
- service.setInputService(inputService);
+ //service.setInputService(inputService);
}
@Test
@@ -100,7 +100,7 @@ public void testNoNMEA() throws Exception {
public void testIllegalUnquotedCharacterProcessing() throws Exception {
final char DEVICE_CONTROL = 0x13;
final String message = "{\"RealtimeEnvelope\": {\"UUID\":\"f43530c0-ec7a-11e5-a081-22000b028187\",\"timeReceived\": 1458244853196,\"CcLocationReport\": {\"request-id\":250,\"vehicle\":{\"vehicle-id\":8140,\"agency-id\":2008,\"agencydesignator\":\"MTA NYCT\"},\"status-info\":0,\"time-reported\":\"2016-03-17T20:00:50.860-00:00\",\"latitude\":40530910,\"longitude\":-74236203,\"direction\":{\"deg\":71.50},\"speed\":36,\"manufacturer-data\":\"VFTP155-600-826\",\"operatorID\":{\"operator-id\":0,\"designator\":\"0\"},\"runID\":{\"run-id\":0,\"designator\":\"000\"},\"destSignCode\":12,\"routeID\":{\"route-id\":0,\"route-designator\":\"0\"},\"localCcLocationReport\":{\"NMEA\":{\"sentence\":[\"$GPGGA,175251.000,4031.85833,N,07414.16762,W,1,08,01.2,+00031.0,M,,M,,*46\",\"$GPRMC,175251.00,A,4031.858330,N,07414.167620,W,006.955,033.61" + DEVICE_CONTROL + "\"]}}}}}";
- service.deserializeMessage(message);
+ //service.deserializeMessage(message);
assertEquals(true, service.processMessage(null, message.getBytes()));
}
diff --git a/pom.xml b/pom.xml
index a728352ff1..490e7fb921 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,6 +45,8 @@
2.5.33
1.0.1
2.3.1
+ KafkaPublisher
+ KafkaSubscriber