diff --git a/onebusaway-nyc-queue-http-proxy/src/main/java/org/onebusaway/nyc/queue_http_proxy/BHSListenerServlet.java b/onebusaway-nyc-queue-http-proxy/src/main/java/org/onebusaway/nyc/queue_http_proxy/BHSListenerServlet.java
index 45d797e9a3..bf22be3df7 100644
--- a/onebusaway-nyc-queue-http-proxy/src/main/java/org/onebusaway/nyc/queue_http_proxy/BHSListenerServlet.java
+++ b/onebusaway-nyc-queue-http-proxy/src/main/java/org/onebusaway/nyc/queue_http_proxy/BHSListenerServlet.java
@@ -19,7 +19,6 @@
import java.io.IOException;
import java.io.ByteArrayOutputStream;
import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.TimerTask;
import javax.servlet.ServletContext;
@@ -30,7 +29,7 @@
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.compress.utils.IOUtils;
-import org.onebusaway.nyc.queue.DNSResolver;
+import org.onebusaway.nyc.queue.LeadershipElectionResolver;
import org.onebusaway.nyc.queue.IPublisher;
import org.onebusaway.nyc.queue.Publisher;
import org.slf4j.Logger;
@@ -56,7 +55,7 @@ public class BHSListenerServlet extends HttpServlet {
private static final long serialVersionUID = 245140554274414196L;
private static Logger _log = LoggerFactory.getLogger(BHSListenerServlet.class);
- protected DNSResolver _resolver = null;
+ protected LeadershipElectionResolver _resolver = null;
@Autowired
private ThreadPoolTaskScheduler _taskScheduler;
@@ -67,17 +66,6 @@ public class BHSListenerServlet extends HttpServlet {
private static final String DEFAULT_HOST = "*";
private static final int DEFAULT_PORT = 5563;
- public void startDNSCheckThread() {
- String host = getHost();
- _log.info("listening on interface " + host);
- _resolver = new DNSResolver(host);
-
- if (_taskScheduler != null) {
- DNSCheckThread dnsCheckThread = new DNSCheckThread();
- // ever 10 seconds
- _taskScheduler.scheduleWithFixedDelay(dnsCheckThread, 10 * 1000);
- }
- }
private String getHost() {
try {
@@ -89,7 +77,6 @@ private String getHost() {
}
public synchronized void init() throws ServletException {
- startDNSCheckThread();
IPublisher publisher = (IPublisher) getServletConfig().getServletContext().getAttribute(
PUBLISHER_KEY);
@@ -185,19 +172,4 @@ private int getInitParameter(String key, int defaultValue) {
return valueAsInt;
}
- private class DNSCheckThread extends TimerTask {
-
- @Override
- public void run() {
- try {
- if (_resolver.hasAddressChanged()) {
- _log.warn("Resolver Changed -- re-binding queue connection");
- init();
- }
- } catch (Exception e) {
- _log.error(e.toString());
- _resolver.reset();
- }
- }
- }
}
\ No newline at end of file
diff --git a/onebusaway-nyc-queue-subscriber/pom.xml b/onebusaway-nyc-queue-subscriber/pom.xml
index cf61b31e4d..75eb0ced74 100644
--- a/onebusaway-nyc-queue-subscriber/pom.xml
+++ b/onebusaway-nyc-queue-subscriber/pom.xml
@@ -16,6 +16,11 @@
onebusaway-nyc-util
${project.version}
+
+ org.onebusaway
+ onebusaway-cloud-aws
+ 0.0.13-SNAPSHOT
+
org.zeromq
jeromq
diff --git a/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/DNSResolver.java b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/DNSResolver.java
deleted file mode 100644
index ac2bdf6f66..0000000000
--- a/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/DNSResolver.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Copyright (C) 2011 Metropolitan Transportation Authority
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onebusaway.nyc.queue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-/**
- * Utility class to test if DNS resolution of a hostname has changed.
- */
-public class DNSResolver {
- protected static Logger _log = LoggerFactory.getLogger(DNSResolver.class);
- private InetAddress currentAddress = null;
- private String host = null;
-
- public DNSResolver(String host) {
- this.host = host;
- currentAddress = getInetAddressByName(host);
- }
-
- /**
- * Reset any cached state. Useful for forcing a comparison/state change in
- * the next call.
- */
- public synchronized void reset() {
- currentAddress = null;
- }
-
- /**
- * test if the host this object was constructed with resolves to the same
- * IP.
- */
- public synchronized boolean hasAddressChanged() {
- InetAddress newAddress = getInetAddressByName(host);
- // test if not previously resolved
- if (currentAddress == null) {
- if (newAddress != null) {
- _log.warn("Previous unresolvable address resolved to " + newAddress);
- currentAddress = newAddress;
- return true;
- }
- } else if (!currentAddress.equals(newAddress)) {
- _log.warn("Resolver changed from " + currentAddress + " to " + newAddress);
- currentAddress = newAddress;
- return true;
- }
- return false;
- }
-
- /**
- * Tests if this host is at the IP corresponding to the DNS address.
- */
- public boolean isPrimary() {
- InetAddress newAddress = getInetAddressByName(host);
- if (newAddress == null) {
- _log.warn("Primary host did not resolve, assuming primary. host=" + host);
- return true;
- }
- try {
- if (InetAddress.getLocalHost().equals(newAddress)) { // compares IP
- return true;
- }
- } catch (UnknownHostException uhe) {
- _log.error("misconfigured host, unable to resolve localhost");
- _log.error(uhe.toString());
- }
- return false;
- }
-
- /**
- * convenience null-safe wrapper for InetAddress.getLocalhost.
- */
- public InetAddress getLocalHost() {
- try {
- return InetAddress.getLocalHost();
- } catch (UnknownHostException uhe) {
- _log.error(uhe.toString());
- }
- return null;
- }
-
- /**
- * null-safe toString of InetAddress.getLocalhost.
- */
- public String getLocalHostString() {
- InetAddress local = getLocalHost();
- if (local != null) {
- return local.toString();
- }
- return "unknown";
- }
-
- /**
- * null-safe lookup of a host.
- */
- public InetAddress getInetAddressByName(String host) {
- InetAddress address = null;
- try {
- address = InetAddress.getByName(host);
- } catch (UnknownHostException uhe) {
- System.out.println("unknown host=" + host);
- }
- return address;
- }
-
-
-}
\ No newline at end of file
diff --git a/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/LeadershipElectionResolver.java b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/LeadershipElectionResolver.java
new file mode 100644
index 0000000000..e2896920b5
--- /dev/null
+++ b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/LeadershipElectionResolver.java
@@ -0,0 +1,77 @@
+/**
+ * Copyright (C) 2011 Metropolitan Transportation Authority
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onebusaway.nyc.queue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.onebusaway.cloud.api.ExternalServices;
+import org.onebusaway.cloud.api.ExternalServicesBridgeFactory;
+
+/**
+ * Utility class to test if DNS resolution of a hostname has changed.
+ */
+public class LeadershipElectionResolver {
+ protected static Logger _log = LoggerFactory.getLogger(LeadershipElectionResolver.class);
+ private ExternalServices externalServices = new ExternalServicesBridgeFactory().getExternalServices();
+
+ boolean primaryHasChanged = false;
+ boolean isPrimary;
+
+
+ /**
+ * Tests if this host is at the IP corresponding to the DNS address.
+ */
+ public boolean isPrimary() {
+ boolean result = externalServices.isInstancePrimary();
+ primaryHasChanged = (result == isPrimary);
+ isPrimary = result;
+ return isPrimary;
+ }
+
+ /**
+ * convenience null-safe wrapper for InetAddress.getLocalhost.
+ */
+ public InetAddress getLocalHost() {
+ try {
+ return InetAddress.getLocalHost();
+ } catch (UnknownHostException uhe) {
+ _log.error(uhe.toString());
+ }
+ return null;
+ }
+
+ /**
+ * null-safe toString of InetAddress.getLocalhost.
+ */
+ public String getLocalHostString() {
+ InetAddress local = getLocalHost();
+ if (local != null) {
+ return local.toString();
+ }
+ return "unknown";
+ }
+
+public boolean getPrimaryHasChanged(){
+ return primaryHasChanged;
+ }
+
+
+}
\ No newline at end of file
diff --git a/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/QueueListenerTask.java b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/QueueListenerTask.java
index 4e81fef227..e6a7b59c07 100644
--- a/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/QueueListenerTask.java
+++ b/onebusaway-nyc-queue-subscriber/src/main/java/org/onebusaway/nyc/queue/QueueListenerTask.java
@@ -26,7 +26,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule;
-import org.onebusaway.nyc.queue.DNSResolver;
import org.onebusaway.nyc.util.configuration.ConfigurationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +49,7 @@ public abstract class QueueListenerTask {
protected boolean _initialized = false;
protected ObjectMapper _mapper = new ObjectMapper().registerModule(new JaxbAnnotationModule());
- protected DNSResolver _resolver = null;
+ protected LeadershipElectionResolver _resolver = null;
protected ZMQ.Context _context = null;
protected ZMQ.Socket _socket = null;
protected ZMQ.Poller _poller = null;
@@ -74,16 +73,6 @@ public abstract class QueueListenerTask {
public void setCountInterval(int countInterval) {
this._countInterval = countInterval;
}
-
- public void startDNSCheckThread() {
- String host = getQueueHost();
- _resolver = new DNSResolver(host);
- if (_taskScheduler != null) {
- DNSCheckThread dnsCheckThread = new DNSCheckThread();
- // ever 10 seconds
- _taskScheduler.scheduleWithFixedDelay(dnsCheckThread, 10 * 1000);
- }
- }
private class ReadThread implements Runnable {
@@ -151,7 +140,6 @@ public void run() {
public void setup() {
_executorService = Executors.newFixedThreadPool(1);
startListenerThread();
- startDNSCheckThread();
_log.warn("threads started for queue " + getQueueName());
}
@@ -203,20 +191,5 @@ protected synchronized void initializeQueue(String host, String queueName,
}
- private class DNSCheckThread extends TimerTask {
-
- @Override
- public void run() {
- try {
- if (_resolver.hasAddressChanged()) {
- _log.warn("Resolver Changed -- re-binding queue connection");
- reinitializeQueue();
- }
- } catch (Exception e) {
- _log.error(e.toString());
- _resolver.reset();
- }
- }
- }
}
\ No newline at end of file
diff --git a/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/ApcQueueListenerTask.java b/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/ApcQueueListenerTask.java
index eaac119669..b4debd692d 100644
--- a/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/ApcQueueListenerTask.java
+++ b/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/ApcQueueListenerTask.java
@@ -129,13 +129,4 @@ public Integer getQueuePort() {
return _configurationService.getConfigurationValueAsInteger("tds.apcQueueOutputPort", 5576);
}
- @Override
- public void startDNSCheckThread() {
- if (!useApcIfAvailable()) {
- _log.error("apc integration disabled; DNS check exiting");
- return;
- }
- _log.info("starting DNS check for APC queue " + getQueueName());
- super.startDNSCheckThread();
- }
}
diff --git a/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/TimeQueueListenerTask.java b/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/TimeQueueListenerTask.java
index dd2756bfc9..8900eec860 100644
--- a/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/TimeQueueListenerTask.java
+++ b/onebusaway-nyc-transit-data-federation/src/main/java/org/onebusaway/nyc/transit_data_federation/impl/queue/TimeQueueListenerTask.java
@@ -115,12 +115,4 @@ public void startListenerThread() {
_initialized = true;
}
- @Override
- public void startDNSCheckThread() {
- if (!useTimePredictionsIfAvailable()) {
- _log.error("time predictions disabled -- exiting");
- return;
- }
- super.startDNSCheckThread();
- }
}
diff --git a/onebusaway-nyc-vehicle-tracking-webapp/pom.xml b/onebusaway-nyc-vehicle-tracking-webapp/pom.xml
index 5cebb4bf98..e1ce462b73 100644
--- a/onebusaway-nyc-vehicle-tracking-webapp/pom.xml
+++ b/onebusaway-nyc-vehicle-tracking-webapp/pom.xml
@@ -148,8 +148,8 @@
org.hibernate.dialect.HSQLDialect
-->
- PartitionedInputQueueListenerTask
- DummyOutputQueueSenderServiceImpl
+ PartitionedInputQueueListenerTask
+ OutputQueueSenderServiceImpl
tdm.dev.obanyc.com
80
/api/
@@ -159,7 +159,7 @@
empty.xml
/tmp/oba-bundle
- true
+ false
stdout
false
diff --git a/onebusaway-nyc-vehicle-tracking-webapp/src/main/java/org/onebusaway/nyc/vehicle_tracking/webapp/controllers/StatusController.java b/onebusaway-nyc-vehicle-tracking-webapp/src/main/java/org/onebusaway/nyc/vehicle_tracking/webapp/controllers/StatusController.java
index 368bbec914..c38d665be2 100644
--- a/onebusaway-nyc-vehicle-tracking-webapp/src/main/java/org/onebusaway/nyc/vehicle_tracking/webapp/controllers/StatusController.java
+++ b/onebusaway-nyc-vehicle-tracking-webapp/src/main/java/org/onebusaway/nyc/vehicle_tracking/webapp/controllers/StatusController.java
@@ -75,7 +75,6 @@ private StatusModel getStatus() {
}
StatusModel status = new StatusModel();
status.setOutputService(queueSenderService.getClass().getName());
- status.setHostname(queueSenderService.getPrimaryHostname());
status.setPrimary(queueSenderService.getIsPrimaryInferenceInstance());
status.setListenerTask(inputTask.getClass().getName());
status.setDepotList(inputTask.getDepotPartitionKey());
@@ -119,7 +118,6 @@ private String slurp(String urlString) {
public static class StatusModel {
- private String hostname;
private boolean isPrimary;
private String depotList;
private String outputService;
@@ -131,12 +129,7 @@ public static class StatusModel {
private String publicHostname;
private String internalHostname;
- public String getHostname() {
- return hostname;
- }
- public void setHostname(String hostname) {
- this.hostname = hostname;
- }
+
public void setPrimary(boolean isPrimary) {
this.isPrimary = isPrimary;
}
@@ -203,7 +196,6 @@ public String toString() {
return "Status:{"
+ "listenerTask=" + this.listenerTask
+ ", outputService=" + this.outputService
- + ", hostname=" + this.hostname
+ ", depotList=" + this.depotList + "}";
}
}
diff --git a/onebusaway-nyc-vehicle-tracking-webapp/src/main/resources/data-sources.xml b/onebusaway-nyc-vehicle-tracking-webapp/src/main/resources/data-sources.xml
index 9fc90d4b0c..4b8139d3ad 100644
--- a/onebusaway-nyc-vehicle-tracking-webapp/src/main/resources/data-sources.xml
+++ b/onebusaway-nyc-vehicle-tracking-webapp/src/main/resources/data-sources.xml
@@ -87,7 +87,6 @@
-
diff --git a/onebusaway-nyc-vehicle-tracking/pom.xml b/onebusaway-nyc-vehicle-tracking/pom.xml
index 02b6c4c8d9..09aa1621c4 100644
--- a/onebusaway-nyc-vehicle-tracking/pom.xml
+++ b/onebusaway-nyc-vehicle-tracking/pom.xml
@@ -121,6 +121,10 @@
org.codehaus.jackson
jackson-mapper-asl
+
+ org.apache.httpcomponents
+ httpclient
+
diff --git a/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/DummyOutputQueueSenderServiceImpl.java b/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/DummyOutputQueueSenderServiceImpl.java
index ed19ac39ae..fc6e60c1d3 100644
--- a/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/DummyOutputQueueSenderServiceImpl.java
+++ b/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/DummyOutputQueueSenderServiceImpl.java
@@ -79,14 +79,4 @@ public boolean getIsPrimaryInferenceInstance() {
return _isPrimaryInferenceInstance;
}
- @Override
- public void setPrimaryHostname(String hostname) {
- _primaryHostname = hostname;
- }
-
- @Override
- public String getPrimaryHostname() {
- return _primaryHostname;
- }
-
}
diff --git a/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/OutputQueueSenderServiceImpl.java b/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/OutputQueueSenderServiceImpl.java
index a6774be718..1df12a11e2 100644
--- a/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/OutputQueueSenderServiceImpl.java
+++ b/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/impl/queue/OutputQueueSenderServiceImpl.java
@@ -16,7 +16,7 @@
package org.onebusaway.nyc.vehicle_tracking.impl.queue;
import org.onebusaway.container.refresh.Refreshable;
-import org.onebusaway.nyc.queue.DNSResolver;
+import org.onebusaway.nyc.queue.LeadershipElectionResolver;
import org.onebusaway.nyc.transit_data.model.NycQueuedInferredLocationBean;
import org.onebusaway.nyc.util.configuration.ConfigurationService;
import org.onebusaway.nyc.vehicle_tracking.services.queue.OutputQueueSenderService;
@@ -28,7 +28,6 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
-import org.springframework.web.context.ServletContextAware;
import org.zeromq.ZMQ;
import java.io.IOException;
@@ -42,7 +41,6 @@
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import javax.servlet.ServletContext;
/**
* Sends inference output to the inference output queue.
@@ -50,8 +48,7 @@
* @author sheldonabrown
*
*/
-public class OutputQueueSenderServiceImpl implements OutputQueueSenderService,
- ServletContextAware {
+public class OutputQueueSenderServiceImpl implements OutputQueueSenderService {
private static Logger _log = LoggerFactory.getLogger(OutputQueueSenderServiceImpl.class);
@@ -73,13 +70,9 @@ public class OutputQueueSenderServiceImpl implements OutputQueueSenderService,
public boolean _isPrimaryInferenceInstance = true;
- public String _primaryHostname = null;
-
private final ObjectMapper _mapper = new ObjectMapper();
- protected DNSResolver _outputQueueResolver = null;
-
- protected DNSResolver _primaryResolver = null;
+ protected LeadershipElectionResolver _primaryResolver = null;
protected ZMQ.Context _context = null;
@@ -92,19 +85,6 @@ public class OutputQueueSenderServiceImpl implements OutputQueueSenderService,
@Autowired
private ThreadPoolTaskScheduler _taskScheduler;
-
- @Override
- public void setServletContext(ServletContext servletContext) {
- // check for primaryHost name
- String hostname = null;
- if (servletContext != null) {
- hostname = servletContext.getInitParameter("primary.host.name");
- _log.info("servlet context provied primary.host.name=" + hostname);
- }
- if (hostname != null) {
- setPrimaryHostname(hostname);
- }
- }
public void setCountInterval(int countInterval) {
this._countInterval = countInterval;
@@ -189,7 +169,7 @@ public void run() {
while (!Thread.currentThread().isInterrupted()) {
final long markTimestamp = System.currentTimeMillis();
if (_isPrimaryInferenceInstance) {
- final String msg = getHeartbeatMessage(getPrimaryHostname(),
+ final String msg = getHeartbeatMessage(_isPrimaryInferenceInstance,
markTimestamp, _interval);
_heartbeatBuffer.put(msg);
}
@@ -200,33 +180,32 @@ public void run() {
}
}
- private String getHeartbeatMessage(String hostname, long timestamp,
+ private String getHeartbeatMessage(Boolean isPrimary, long timestamp,
long interval) {
/*
* remember that only one IE for each depot will be transmitting at a
* time, so we can use the primaryhostname to identify this IE in the
* heartbeat message.
*/
- final String msg = "{\"heartbeat\": {\"hostname\":\"%1$s\",\"heartbeat_timestamp\":%2$s,\"heartbeat_interval\":%3$s}}";
- return String.format(msg, getPrimaryHostname(), timestamp, interval);
+ final String msg = "{\"heartbeat\": {\"isPrimary\":\"%1$b\",\"heartbeat_timestamp\":%2$s,\"heartbeat_interval\":%3$s}}";
+ return String.format(msg, isPrimary, timestamp, interval);
}
}
- private class OutputQueueCheckThread extends TimerTask {
-
- @Override
- public void run() {
- try {
- if (_outputQueueResolver.hasAddressChanged()) {
- _log.warn("Resolver Changed");
- reinitializeQueue();
- }
- } catch (final Exception e) {
- _log.error(e.toString());
- _outputQueueResolver.reset();
- }
- }
- }
+// private class OutputQueueCheckThread extends TimerTask {
+//
+// @Override
+// public void run() {
+// try {
+// if (_outputQueueResolver.getPrimaryHasChanged()) {
+// _log.warn("Primary has changed");
+// reinitializeQueue();
+// }
+// } catch (final Exception e) {
+// _log.error(e.toString(),e);
+// }
+// }
+// }
private class PrimaryCheckThread extends TimerTask {
@@ -239,7 +218,7 @@ public void run() {
_isPrimaryInferenceInstance = primaryValue;
}
} catch (final Exception e) {
- _log.error(e.toString());
+ _log.error(e.toString(),e);
}
}
}
@@ -265,19 +244,13 @@ public void enqueue(NycQueuedInferredLocationBean r) {
@PostConstruct
public void setup() {
- _outputQueueResolver = new DNSResolver(getQueueHost());
- final OutputQueueCheckThread outputQueueCheckThread = new OutputQueueCheckThread();
+ _primaryResolver = new LeadershipElectionResolver();
+ //final OutputQueueCheckThread outputQueueCheckThread = new OutputQueueCheckThread();
// every 10 seconds
- _taskScheduler.scheduleWithFixedDelay(outputQueueCheckThread, 10 * 1000);
-
- if (getPrimaryHostname() != null && getPrimaryHostname().length() > 0) {
- _primaryResolver = new DNSResolver(getPrimaryHostname());
- _log.warn("Primary Inference instance configured to be "
- + getPrimaryHostname() + " on "
- + _primaryResolver.getLocalHostString());
- final PrimaryCheckThread primaryCheckThread = new PrimaryCheckThread();
- _taskScheduler.scheduleWithFixedDelay(primaryCheckThread, 10 * 1000);
- }
+ //_taskScheduler.scheduleWithFixedDelay(outputQueueCheckThread, 10 * 1000);
+
+ final PrimaryCheckThread primaryCheckThread = new PrimaryCheckThread();
+ _taskScheduler.scheduleWithFixedDelay(primaryCheckThread, 10 * 1000);
_executorService = Executors.newFixedThreadPool(1);
_heartbeatService = Executors.newFixedThreadPool(1);
startListenerThread();
@@ -310,7 +283,6 @@ public void startListenerThread() {
try {
initializeQueue(host, queueName, port);
} catch (final Exception any) {
- _outputQueueResolver.reset();
}
}
@@ -320,7 +292,6 @@ protected void reinitializeQueue() {
initializeQueue(getQueueHost(), getQueueName(), getQueuePort());
} catch (final InterruptedException ie) {
_log.error("reinitialize failed:", ie);
- _outputQueueResolver.reset();
return;
}
}
@@ -378,14 +349,4 @@ public boolean getIsPrimaryInferenceInstance() {
return _isPrimaryInferenceInstance;
}
- @Override
- public void setPrimaryHostname(String hostname) {
- _primaryHostname = hostname;
- }
-
- @Override
- public String getPrimaryHostname() {
- return _primaryHostname;
- }
-
}
diff --git a/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/services/queue/OutputQueueSenderService.java b/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/services/queue/OutputQueueSenderService.java
index a0123d21b3..65e21d32dd 100644
--- a/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/services/queue/OutputQueueSenderService.java
+++ b/onebusaway-nyc-vehicle-tracking/src/main/java/org/onebusaway/nyc/vehicle_tracking/services/queue/OutputQueueSenderService.java
@@ -25,8 +25,4 @@ public interface OutputQueueSenderService {
public boolean getIsPrimaryInferenceInstance();
- public void setPrimaryHostname(String hostname);
-
- public String getPrimaryHostname();
-
}
diff --git a/pom.xml b/pom.xml
index 879d800864..9f5eeaeae9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -13,7 +13,7 @@
8.0.1
- 1.3.9
+ 1.11.602
1.9.2
23.0
99999