Skip to content

consistant spacing instead of mixed spaces and tabs #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/jvm/yieldbot/storm/RedisPubSubTopology.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@
import backtype.storm.utils.Utils;

public class RedisPubSubTopology {
public static void main(String[] args) {
public static void main(String[] args) {
String host = args[0];
int port = Integer.parseInt(args[1]);
String pattern = args[2];
TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("pubsub1", new RedisPubSubSpout(host,port,pattern));

Config conf = new Config();
conf.setDebug(true);


LocalCluster cluster = new LocalCluster();

cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
Expand Down
212 changes: 106 additions & 106 deletions src/jvm/yieldbot/storm/spout/RedisPubSubSpout.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,120 +20,120 @@
import backtype.storm.utils.Utils;

public class RedisPubSubSpout extends BaseRichSpout {
static final long serialVersionUID = 737015318988609460L;
static Logger LOG = Logger.getLogger(RedisPubSubSpout.class);
SpoutOutputCollector _collector;
final String host;
final int port;
final String pattern;
LinkedBlockingQueue<String> queue;
JedisPool pool;
public RedisPubSubSpout(String host, int port, String pattern) {
this.host = host;
this.port = port;
this.pattern = pattern;
}
class ListenerThread extends Thread {
LinkedBlockingQueue<String> queue;
JedisPool pool;
String pattern;
public ListenerThread(LinkedBlockingQueue<String> queue, JedisPool pool, String pattern) {
this.queue = queue;
this.pool = pool;
this.pattern = pattern;
}
public void run() {
JedisPubSub listener = new JedisPubSub() {

@Override
public void onMessage(String channel, String message) {
queue.offer(message);
}

@Override
public void onPMessage(String pattern, String channel, String message) {
queue.offer(message);
}

@Override
public void onPSubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}

@Override
public void onPUnsubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}

@Override
public void onSubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}

@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
};
Jedis jedis = pool.getResource();
try {
jedis.psubscribe(listener, pattern);
} finally {
pool.returnResource(jedis);
}
}
};
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
queue = new LinkedBlockingQueue<String>(1000);
pool = new JedisPool(new JedisPoolConfig(),host,port);
ListenerThread listener = new ListenerThread(queue,pool,pattern);
listener.start();
}

public void close() {
pool.destroy();
}

public void nextTuple() {
String ret = queue.poll();

static final long serialVersionUID = 737015318988609460L;
static Logger LOG = Logger.getLogger(RedisPubSubSpout.class);

SpoutOutputCollector _collector;
final String host;
final int port;
final String pattern;
LinkedBlockingQueue<String> queue;
JedisPool pool;

public RedisPubSubSpout(String host, int port, String pattern) {
this.host = host;
this.port = port;
this.pattern = pattern;
}

class ListenerThread extends Thread {
LinkedBlockingQueue<String> queue;
JedisPool pool;
String pattern;

public ListenerThread(LinkedBlockingQueue<String> queue, JedisPool pool, String pattern) {
this.queue = queue;
this.pool = pool;
this.pattern = pattern;
}

public void run() {

JedisPubSub listener = new JedisPubSub() {

@Override
public void onMessage(String channel, String message) {
queue.offer(message);
}

@Override
public void onPMessage(String pattern, String channel, String message) {
queue.offer(message);
}

@Override
public void onPSubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub

}

@Override
public void onPUnsubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub

}

@Override
public void onSubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub

}

@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub

}
};

Jedis jedis = pool.getResource();
try {
jedis.psubscribe(listener, pattern);
} finally {
pool.returnResource(jedis);
}
}
};

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
queue = new LinkedBlockingQueue<String>(1000);
pool = new JedisPool(new JedisPoolConfig(),host,port);

ListenerThread listener = new ListenerThread(queue,pool,pattern);
listener.start();

}

public void close() {
pool.destroy();
}

public void nextTuple() {
String ret = queue.poll();
if(ret==null) {
Utils.sleep(50);
} else {
_collector.emit(tuple(ret));
_collector.emit(tuple(ret));
}
}
}

public void ack(Object msgId) {
// TODO Auto-generated method stub
public void ack(Object msgId) {
// TODO Auto-generated method stub

}
}

public void fail(Object msgId) {
// TODO Auto-generated method stub
public void fail(Object msgId) {
// TODO Auto-generated method stub

}
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("message"));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("message"));
}

public boolean isDistributed() {
return false;
}
public boolean isDistributed() {
return false;
}
}