diff --git a/src/jvm/yieldbot/storm/RedisPubSubTopology.java b/src/jvm/yieldbot/storm/RedisPubSubTopology.java index 3123260..d55afc9 100644 --- a/src/jvm/yieldbot/storm/RedisPubSubTopology.java +++ b/src/jvm/yieldbot/storm/RedisPubSubTopology.java @@ -7,20 +7,20 @@ import backtype.storm.utils.Utils; public class RedisPubSubTopology { - public static void main(String[] args) { + public static void main(String[] args) { String host = args[0]; int port = Integer.parseInt(args[1]); String pattern = args[2]; TopologyBuilder builder = new TopologyBuilder(); - + builder.setSpout("pubsub1", new RedisPubSubSpout(host,port,pattern)); - + Config conf = new Config(); conf.setDebug(true); - - + + LocalCluster cluster = new LocalCluster(); - + cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); diff --git a/src/jvm/yieldbot/storm/spout/RedisPubSubSpout.java b/src/jvm/yieldbot/storm/spout/RedisPubSubSpout.java index 6d15e73..8576294 100644 --- a/src/jvm/yieldbot/storm/spout/RedisPubSubSpout.java +++ b/src/jvm/yieldbot/storm/spout/RedisPubSubSpout.java @@ -20,120 +20,120 @@ import backtype.storm.utils.Utils; public class RedisPubSubSpout extends BaseRichSpout { - - static final long serialVersionUID = 737015318988609460L; - static Logger LOG = Logger.getLogger(RedisPubSubSpout.class); - - SpoutOutputCollector _collector; - final String host; - final int port; - final String pattern; - LinkedBlockingQueue queue; - JedisPool pool; - - public RedisPubSubSpout(String host, int port, String pattern) { - this.host = host; - this.port = port; - this.pattern = pattern; - } - - class ListenerThread extends Thread { - LinkedBlockingQueue queue; - JedisPool pool; - String pattern; - - public ListenerThread(LinkedBlockingQueue queue, JedisPool pool, String pattern) { - this.queue = queue; - this.pool = pool; - this.pattern = pattern; - } - - public void run() { - - JedisPubSub listener = new JedisPubSub() { - - @Override - public void onMessage(String channel, String message) { - queue.offer(message); - } - - @Override - public void onPMessage(String pattern, String channel, String message) { - queue.offer(message); - } - - @Override - public void onPSubscribe(String channel, int subscribedChannels) { - // TODO Auto-generated method stub - - } - - @Override - public void onPUnsubscribe(String channel, int subscribedChannels) { - // TODO Auto-generated method stub - - } - - @Override - public void onSubscribe(String channel, int subscribedChannels) { - // TODO Auto-generated method stub - - } - - @Override - public void onUnsubscribe(String channel, int subscribedChannels) { - // TODO Auto-generated method stub - - } - }; - - Jedis jedis = pool.getResource(); - try { - jedis.psubscribe(listener, pattern); - } finally { - pool.returnResource(jedis); - } - } - }; - - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - _collector = collector; - queue = new LinkedBlockingQueue(1000); - pool = new JedisPool(new JedisPoolConfig(),host,port); - - ListenerThread listener = new ListenerThread(queue,pool,pattern); - listener.start(); - - } - - public void close() { - pool.destroy(); - } - - public void nextTuple() { - String ret = queue.poll(); + + static final long serialVersionUID = 737015318988609460L; + static Logger LOG = Logger.getLogger(RedisPubSubSpout.class); + + SpoutOutputCollector _collector; + final String host; + final int port; + final String pattern; + LinkedBlockingQueue queue; + JedisPool pool; + + public RedisPubSubSpout(String host, int port, String pattern) { + this.host = host; + this.port = port; + this.pattern = pattern; + } + + class ListenerThread extends Thread { + LinkedBlockingQueue queue; + JedisPool pool; + String pattern; + + public ListenerThread(LinkedBlockingQueue queue, JedisPool pool, String pattern) { + this.queue = queue; + this.pool = pool; + this.pattern = pattern; + } + + public void run() { + + JedisPubSub listener = new JedisPubSub() { + + @Override + public void onMessage(String channel, String message) { + queue.offer(message); + } + + @Override + public void onPMessage(String pattern, String channel, String message) { + queue.offer(message); + } + + @Override + public void onPSubscribe(String channel, int subscribedChannels) { + // TODO Auto-generated method stub + + } + + @Override + public void onPUnsubscribe(String channel, int subscribedChannels) { + // TODO Auto-generated method stub + + } + + @Override + public void onSubscribe(String channel, int subscribedChannels) { + // TODO Auto-generated method stub + + } + + @Override + public void onUnsubscribe(String channel, int subscribedChannels) { + // TODO Auto-generated method stub + + } + }; + + Jedis jedis = pool.getResource(); + try { + jedis.psubscribe(listener, pattern); + } finally { + pool.returnResource(jedis); + } + } + }; + + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + _collector = collector; + queue = new LinkedBlockingQueue(1000); + pool = new JedisPool(new JedisPoolConfig(),host,port); + + ListenerThread listener = new ListenerThread(queue,pool,pattern); + listener.start(); + + } + + public void close() { + pool.destroy(); + } + + public void nextTuple() { + String ret = queue.poll(); if(ret==null) { Utils.sleep(50); } else { - _collector.emit(tuple(ret)); + _collector.emit(tuple(ret)); } - } + } - public void ack(Object msgId) { - // TODO Auto-generated method stub + public void ack(Object msgId) { + // TODO Auto-generated method stub - } + } - public void fail(Object msgId) { - // TODO Auto-generated method stub + public void fail(Object msgId) { + // TODO Auto-generated method stub - } + } - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("message")); - } + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("message")); + } - public boolean isDistributed() { - return false; - } + public boolean isDistributed() { + return false; + } }