Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
4de34f8
First bit of conversion done. Still not done with Output Queue Sender…
rjasmin-camsys Apr 17, 2025
a5ee5c3
Second bit of conversion done. I think it might be about ready to tes…
rjasmin-camsys Apr 18, 2025
a121c97
Final bit of conversion and creation of KafkaSubscriber class since t…
rjasmin-camsys Apr 24, 2025
b16c5fe
Modifications for code clarity and usage
rjasmin-camsys Apr 24, 2025
6d379f7
Code tested successfully!
rjasmin-camsys Apr 24, 2025
08ffcb5
Modifications to allow "switching" between kafka and ZMQ without code…
rjasmin-camsys Apr 24, 2025
f6d2a40
Modification to allow the TDS to use kafka or zmq. Still needs tested…
rjasmin-camsys Apr 28, 2025
703cea3
Testing didn't go perfectly, still working on getting the webapp to run.
rjasmin-camsys Apr 30, 2025
cb93b2d
Having maven build issues
rjasmin-camsys May 1, 2025
75c6dad
Adding license header
carabalb May 1, 2025
405fe88
Fixes
carabalb May 1, 2025
f2ee71e
Change to use app-context xml for switching
rjasmin-camsys May 1, 2025
da380e7
Change to use app-context xml for switching
rjasmin-camsys May 5, 2025
9d65c86
Change to use app-context xml for switching, plus some cleanup
rjasmin-camsys May 5, 2025
9389e24
Using app context for switching, but beans are no longer autowiring.
rjasmin-camsys May 5, 2025
6c9919a
clean up
rjasmin-camsys May 6, 2025
c08200a
runs properly
rjasmin-camsys May 6, 2025
4118b97
Fixes for autowiring beans.
rjasmin-camsys May 6, 2025
f472115
Fixes for extending or implementing the wrong methods
rjasmin-camsys May 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions onebusaway-nyc-admin-webapp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

<repositories>
<!-- Required for the xwiki-rendering-macros -->
<repository>
<!--<repository>
<id>xwiki</id>
<url>https://maven.xwiki.org/releases</url>
</repository>
</repository>-->
</repositories>
<dependencies>
<!-- Jersey Depedencies -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@

<!-- Time Queue Configuration -->
<!-- This option integrates with an external time prediction queue -->
<bean id="timeInputQueue"
<bean id="timeInputQueue" abstract="true"
class="org.onebusaway.nyc.transit_data_federation.impl.predictions.QueuePredictionIntegrationServiceImpl"
destroy-method="destroy">
<property name="status" value="TESTING"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@

<!-- Time Queue Configuration -->
<!-- This option integrates with an external time prediction queue -->
<bean id="timeInputQueue"
<bean id="timeInputQueue" abstract="true"
class="org.onebusaway.nyc.transit_data_federation.impl.predictions.QueuePredictionIntegrationServiceImpl"
destroy-method="destroy">
<property name="checkPredictionAge" value="false"/>
</bean>

<!-- APC Queue Configuration -->
<bean id="apcInputQueue" class="org.onebusaway.nyc.transit_data_federation.impl.nyc.ApcIntegrationServiceImpl">
<bean id="apcInputQueue" abstract="true" class="org.onebusaway.nyc.transit_data_federation.impl.nyc.ApcIntegrationServiceImpl">
<property name="status" value="ENABLED"/>
</bean>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public TripUpdateTest(String gtfsFile, String defaultAgencyId, String blockTripM
TripUpdateFeedBuilderImpl feedBuilder = new TripUpdateFeedBuilderImpl();
_feedBuilder = feedBuilder;

_predictionIntegrationService = new QueuePredictionIntegrationServiceImpl();
_predictionIntegrationService.setTransitDataService(_transitDataService);
_predictionIntegrationService.setConfigurationService(new MockConfigurationService());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@

<!-- Time Queue Configuration -->
<!-- This option integrates with an external time prediction queue -->
<bean id="timeInputQueue" class="org.onebusaway.nyc.transit_data_federation.impl.predictions.QueuePredictionIntegrationServiceImpl">
<bean id="timeInputQueue" abstract="true" class="org.onebusaway.nyc.transit_data_federation.impl.predictions.QueuePredictionIntegrationServiceImpl">
<property name="status" value="DISABLED"/>
</bean>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,27 +144,11 @@ private void discardRecord(String vehicleId, String contents) {
_locationDao.handleException(contents, e, new Date());
}

@Override
public String getQueueHost() {
return _configurationService.getConfigurationValueAsString(
"tds.inputQueueHost", null);
}

@Override
public String getQueueName() {
return _configurationService.getConfigurationValueAsString(
"tds.inputQueueName", null);
}

public String getQueueDisplayName() {
return "archive_inference";
}

@Override
public Integer getQueuePort() {
return _configurationService.getConfigurationValueAsInteger(
"tds.inputQueuePort", 5567);
}

public void logStatus() {
_log.info("obanyc_inferredlocation records count : " + _locationDao.getArchiveInferredLocationCount());
Expand All @@ -180,7 +164,6 @@ public void run() {
}
}


@SuppressWarnings("unchecked")
@PostConstruct
public void setup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,18 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
import org.onebusaway.container.refresh.Refreshable;
import org.onebusaway.nyc.queue.QueueListenerTask;
import org.onebusaway.nyc.queue.IQueueListenerTask;
import org.onebusaway.nyc.queue.model.RealtimeEnvelope;
import org.onebusaway.nyc.report.impl.CcLocationCache;
import org.onebusaway.nyc.report.model.CcLocationReportRecord;
import org.onebusaway.nyc.report.services.RecordValidationService;
import org.onebusaway.nyc.util.configuration.ConfigurationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;

public class OpsInputQueueListenerTask extends QueueListenerTask {
public abstract class OpsInputQueueListenerTask implements IQueueListenerTask {

public static final int DELAY_THRESHOLD = 10 * 1000;

Expand All @@ -51,6 +53,10 @@ public void setCcLocationCache(CcLocationCache cache) {
_ccLocationCache = cache;
}

@Qualifier("configurationService")
@Autowired
private ConfigurationService _configurationService;

@Autowired
public void setValidationService(RecordValidationService validationService) {
this.validationService = validationService;
Expand All @@ -61,6 +67,12 @@ public void setValidationService(RecordValidationService validationService) {
private String _systemTimeZone = null;
private long zoneOffsetWindow = System.currentTimeMillis();

protected boolean _initialized = false;

@Autowired
@Qualifier("listener")
IQueueListenerTask _queueListenerTask;

public OpsInputQueueListenerTask() {
/*
* use Jaxb annotation interceptor so we pick up autogenerated annotations
Expand Down Expand Up @@ -183,7 +195,7 @@ record = new CcLocationReportRecord(envelope, contents, getZoneOffset());

@PostConstruct
public void setup() {
super.setup();
_queueListenerTask.setup();
// make parsing lenient
_mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
Expand All @@ -194,7 +206,7 @@ public void setup() {

@PreDestroy
public void destroy() {
super.destroy();
_queueListenerTask.destroy();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public void setUp() throws Exception {
when(inferredResult.getInferredLatitude()).thenReturn(1D);
when(inferredResult.getInferredLongitude()).thenReturn(-1D);

inferenceQueueListenertask = new OpsInferenceQueueListenerTask();
inferenceQueueListenertask.setLocationDao(locationDao);
inferenceQueueListenertask.setLocationService(locationService);
inferenceQueueListenertask.setValidationService(validationService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest;
import org.apache.commons.lang.StringUtils;
import org.onebusaway.gtfs.model.AgencyAndId;
import org.onebusaway.nyc.queue.QueueListenerTask;
import org.onebusaway.nyc.queue.IQueueListenerTask;
import org.onebusaway.nyc.transit_data.model.NycVehicleLoadBean;
import org.onebusaway.realtime.api.VehicleOccupancyRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;

import java.util.Date;

Expand All @@ -41,7 +43,7 @@
* -Dapc.value=my_cloudwatch_secret \
* >apc.log 2>&1 &
*/
public class ApcMonitor extends QueueListenerTask {
public class ApcMonitor implements IQueueListenerTask {

public static final String DEFAULT_ENV = "Obanyc:qa";
public static final String DEFAULT_KEY = "my_cw_key";
Expand All @@ -51,8 +53,16 @@ public class ApcMonitor extends QueueListenerTask {
public static final String DEFAULT_DISPLAY_NAME = DEFAULT_NAME;
public static final int DEFAULT_PORT = 5576;

@Autowired
@Qualifier("listener")
IQueueListenerTask _queueListenerTask;
protected boolean _initialized = false;

private AmazonCloudWatchClient cloudWatch = new AmazonCloudWatchClient(new BasicAWSCredentials(getKey(), getValue()));

@Override
public void initializeQueue(String host, String queueName, Integer port) throws InterruptedException {}

@Override
public boolean processMessage(String contents, byte[] buff) throws Exception {
if(StringUtils.isBlank(contents)){
Expand All @@ -79,7 +89,8 @@ public void startListenerThread() {

System.out.println("connecting to " + queueName + " queue at " + host + ":" + port);
try {
initializeQueue(host, queueName, port);
_queueListenerTask.initializeQueue(host, queueName, port);

} catch (InterruptedException ie) {
return;
}
Expand Down Expand Up @@ -125,6 +136,15 @@ public Integer getQueuePort() {
return DEFAULT_PORT;
}

@Override
public void startDNSCheckThread() {}

@Override
public void destroy() {}

@Override
public void setup() {}


private void processResult(NycVehicleLoadBean message, String contents) {
VehicleOccupancyRecord vor = toVehicleOccupancyRecord(message);
Expand All @@ -134,10 +154,9 @@ private void processResult(NycVehicleLoadBean message, String contents) {
}


public static void main(String[] args) {
public void main(String[] args) {
System.out.println("starting up....");
ApcMonitor monitor = new ApcMonitor();
monitor.setup();
_queueListenerTask.setup();
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(1000);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.onebusaway.nyc.queue_broker;

public interface ISimpleBroker {
void main(String[] args);
void run();
void setInPort(int inPort);
void setOutPort(int outPort);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/**
* Copyright (C) 2011 Metropolitan Transportation Authority
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.onebusaway.nyc.queue_broker;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;

import java.util.concurrent.ExecutorService;

/**
* Simple Brokering impelmented in ZeroMQ. Listen for anything on one socket,
* and send it back out on another socket.
*/
public class KafkaSimpleBroker implements ISimpleBroker {
Logger logger = LoggerFactory.getLogger(KafkaSimpleBroker.class);

private static final int DEFAULT_IN_PORT = 5566;
private static final int DEFAULT_OUT_PORT = 5567;
private static final int HWM_VALUE = 50000; // High Water Mark
private ExecutorService _executorService = null;
private int inPort;
private int outPort;

public void main(String[] args) {
KafkaSimpleBroker broker = new KafkaSimpleBroker();
if (args.length > 0) {
broker.setInPort(Integer.parseInt(args[0]));
} else {
broker.setInPort(DEFAULT_IN_PORT);
}
if (args.length > 1) {
broker.setOutPort(Integer.parseInt(args[1]));
} else {
broker.setOutPort(DEFAULT_OUT_PORT);
}

broker.run();
}

public KafkaSimpleBroker() {
logger.info("Starting up SimpleBroker");
}

public void run() {
// Prepare our context and subscriber
ZMQ.Context context = ZMQ.context(1); // 1 = number of threads

ZMQ.Socket subscriber = context.socket(ZMQ.SUB);
String inBind = "tcp://*:" + inPort;

logger.info("subscribing to queue at " + inBind);
subscriber.bind(inBind);
// subscribe to everything
subscriber.subscribe(new byte[0]); // was inTopic.getBytes()

ZMQ.Socket publisher = context.socket(ZMQ.PUB);

//Set to prevent broker memory from growing indefinitely if client falls behind
publisher.setHWM(HWM_VALUE);

String outBind = "tcp://*:" + outPort;

logger.info("publishing to queue at " + outBind);
publisher.bind(outBind);

ZMQ.proxy(subscriber, publisher, null);

}

public void setInPort(int inPort) {
this.inPort = inPort;
}

public void setOutPort(int outPort) {
this.outPort = outPort;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Simple Brokering impelmented in ZeroMQ. Listen for anything on one socket,
* and send it back out on another socket.
*/
public class SimpleBroker {
public class SimpleBroker implements ISimpleBroker {
Logger logger = LoggerFactory.getLogger(SimpleBroker.class);

private static final int DEFAULT_IN_PORT = 5566;
Expand All @@ -38,7 +38,7 @@ public class SimpleBroker {
private int inPort;
private int outPort;

public static void main(String[] args) {
public void main(String[] args) {
SimpleBroker broker = new SimpleBroker();
if (args.length > 0) {
broker.setInPort(Integer.parseInt(args[0]));
Expand Down
4 changes: 2 additions & 2 deletions onebusaway-nyc-queue-http-proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
</testResources>
<plugins>
<!-- jetty -->
<plugin>
<!-- <plugin>
<groupId>org.mortbay.jetty</groupId>
<artifactId>maven-jetty-plugin</artifactId>
<version>${jetty.version}</version>
Expand Down Expand Up @@ -151,7 +151,7 @@
</execution>
</executions>

</plugin>
</plugin>-->
<!-- integration testing -->
<!--
<plugin>
Expand Down
Loading