Skip to content

Commit 90d31b4

Browse files
author
Vishal M Yadav
committed
finally completed kafka broker feature
1 parent d1bb7a9 commit 90d31b4

File tree

6 files changed

+323
-0
lines changed

6 files changed

+323
-0
lines changed
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.gatomalvado.eventbus;
2+
3+
import java.util.Map;
4+
import java.util.concurrent.ConcurrentHashMap;
5+
import java.util.concurrent.CopyOnWriteArrayList;
6+
7+
public class Broker {
8+
9+
private Map<String, Topic> topics;
10+
private Map<String, CopyOnWriteArrayList<Producer>> producers;
11+
12+
public Broker() {
13+
this.topics = new ConcurrentHashMap<>();
14+
this.producers = new ConcurrentHashMap<>();
15+
}
16+
17+
public void publish(String message, String topicName) {
18+
Topic topic = topics.get(topicName);
19+
topic.addElement(message);
20+
}
21+
22+
public Topic subscribe(String topicName) {
23+
Topic topic = topics.get(topicName);
24+
if (topic == null) {
25+
throw new IllegalArgumentException("Topic " + topicName + " not found");
26+
}
27+
return this.topics.get(topicName);
28+
}
29+
30+
public void createTopic(String topicName) {
31+
topics.putIfAbsent(topicName, new Topic(topicName));
32+
}
33+
34+
public synchronized void registerProducer(String topicName, Producer producer) {
35+
if(!topics.containsKey(topicName)) {
36+
return;
37+
}
38+
CopyOnWriteArrayList<Producer> producerList = this.producers.computeIfAbsent(
39+
topicName,
40+
k -> new CopyOnWriteArrayList<>()
41+
);
42+
producerList.add(producer);
43+
}
44+
45+
public synchronized void deRegisterProducer(String topicName, Producer producer) {
46+
CopyOnWriteArrayList<Producer> producerList = this.producers.get(topicName);
47+
if (producerList == null || producerList.size() == 0 || !producerList.contains(producer)) {
48+
return;
49+
}
50+
producerList.remove(producer);
51+
if(producerList.size() == 0) {
52+
Topic topic = topics.get(topicName);
53+
if(topic != null) {
54+
topic.closeTopic();
55+
}
56+
}
57+
}
58+
59+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.gatomalvado.eventbus;
2+
3+
public class Consumer {
4+
5+
private final String consumerId;
6+
private int offset;
7+
private final Broker broker;
8+
private Topic topic;
9+
10+
public Consumer(String consumerId, Broker broker) {
11+
this.consumerId = consumerId;
12+
this.offset = 0;
13+
this.broker = broker;
14+
}
15+
16+
public void startConsumer() throws InterruptedException {
17+
if(topic==null) {
18+
throw new RuntimeException("topic is not subscribed yet");
19+
}
20+
while(true) {
21+
String element = topic.getElement(offset);
22+
if(element==null) {
23+
break;
24+
}
25+
System.out.println(this.consumerId + " : consumed " + topic.getElement(offset));
26+
offset += 1;
27+
}
28+
}
29+
30+
public void subscribeToTopic(String topicName) {
31+
Topic topic = broker.subscribe(topicName);
32+
if(topic==null) {
33+
throw new RuntimeException("topic does not exist");
34+
}
35+
this.topic = topic;
36+
this.offset = 0;
37+
}
38+
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package com.gatomalvado.eventbus;
2+
3+
public class Main {
4+
5+
public static void main(String[] args) throws InterruptedException {
6+
System.out.println("Pub sub kafka");
7+
8+
// create kafka broker
9+
Broker broker = new Broker();
10+
broker.createTopic("topicA");
11+
broker.createTopic("topicB");
12+
13+
Producer producer1 = new Producer("P_1 ", broker);
14+
Producer producer2 = new Producer("P_2 ", broker);
15+
16+
Consumer consumer1 = new Consumer("C_1 ", broker);
17+
Consumer consumer2 = new Consumer("C_2 ", broker);
18+
Consumer consumer3 = new Consumer("C_3 ", broker);
19+
Consumer consumer4 = new Consumer("C_4 ", broker);
20+
Consumer consumer5 = new Consumer("C_5 ", broker);
21+
22+
23+
24+
Thread pt1 = new Thread(() -> {
25+
producer1.register("topicA");
26+
producer1.register("topicB");
27+
for(int i=1; i<=3; i++) {
28+
if(i%2 == 0) {
29+
producer1.publish("message_"+i, "topicA");
30+
} else {
31+
producer1.publish("message_"+i, "topicB");
32+
}
33+
}
34+
producer1.deregister("topicA");
35+
producer1.deregister("topicB");
36+
});
37+
38+
Thread pt2 = new Thread(() -> {
39+
producer2.register("topicA");
40+
producer2.register("topicB");
41+
for(int i=4; i<=7; i++) {
42+
if(i%2 == 0) {
43+
producer2.publish("message_"+i, "topicA");
44+
} else {
45+
producer2.publish("message_"+i, "topicB");
46+
}
47+
}
48+
producer2.deregister("topicA");
49+
producer2.deregister("topicB");
50+
});
51+
52+
Thread ct1 = new Thread(() -> {
53+
consumer1.subscribeToTopic("topicA");
54+
try {
55+
consumer1.startConsumer();
56+
} catch (InterruptedException e) {
57+
throw new RuntimeException(e);
58+
}
59+
});
60+
61+
Thread ct2 = new Thread(() -> {
62+
consumer2.subscribeToTopic("topicA");
63+
try {
64+
consumer2.startConsumer();
65+
} catch (InterruptedException e) {
66+
throw new RuntimeException(e);
67+
}
68+
});
69+
70+
Thread ct3 = new Thread(() -> {
71+
consumer3.subscribeToTopic("topicA");
72+
try {
73+
consumer3.startConsumer();
74+
} catch (InterruptedException e) {
75+
throw new RuntimeException(e);
76+
}
77+
});
78+
79+
Thread ct4 = new Thread(() -> {
80+
consumer4.subscribeToTopic("topicA");
81+
try {
82+
consumer4.startConsumer();
83+
} catch (InterruptedException e) {
84+
throw new RuntimeException(e);
85+
}
86+
});
87+
88+
Thread ct5 = new Thread(() -> {
89+
consumer5.subscribeToTopic("topicA");
90+
try {
91+
consumer5.startConsumer();
92+
} catch (InterruptedException e) {
93+
throw new RuntimeException(e);
94+
}
95+
});
96+
97+
ct1.start(); ct2.start(); ct3.start(); ct4.start(); ct5.start();
98+
pt1.start(); pt2.start();
99+
100+
ct1.join(); ct2.join(); ct3.join(); ct4.join(); ct5.join();
101+
pt1.join(); pt2.join();
102+
103+
Thread ct11 = new Thread(() -> {
104+
consumer1.subscribeToTopic("topicB");
105+
try{
106+
consumer1.startConsumer();
107+
}catch (InterruptedException e) {
108+
throw new RuntimeException(e);
109+
}
110+
});
111+
112+
Thread ct12 = new Thread(() -> {
113+
consumer2.subscribeToTopic("topicB");
114+
try{
115+
consumer2.startConsumer();
116+
}catch (InterruptedException e) {
117+
throw new RuntimeException(e);
118+
}
119+
});
120+
121+
Thread ct13 = new Thread(() -> {
122+
consumer3.subscribeToTopic("topicB");
123+
try{
124+
consumer3.startConsumer();
125+
}catch (InterruptedException e) {
126+
throw new RuntimeException(e);
127+
}
128+
});
129+
130+
Thread ct14 = new Thread(() -> {
131+
consumer4.subscribeToTopic("topicB");
132+
try{
133+
consumer4.startConsumer();
134+
}catch (InterruptedException e) {
135+
throw new RuntimeException(e);
136+
}
137+
});
138+
139+
Thread ct15 = new Thread(() -> {
140+
consumer5.subscribeToTopic("topicB");
141+
try{
142+
consumer5.startConsumer();
143+
}catch (InterruptedException e) {
144+
throw new RuntimeException(e);
145+
}
146+
});
147+
148+
ct11.start(); ct12.start(); ct13.start(); ct14.start(); ct15.start();
149+
ct11.join(); ct12.join(); ct13.join(); ct14.join();
150+
}
151+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.gatomalvado.eventbus;
2+
3+
public class Producer {
4+
5+
private final String producerName;
6+
7+
private final Broker broker;
8+
9+
public Producer(String producerName, Broker broker) {
10+
this.producerName = producerName;
11+
this.broker = broker;
12+
}
13+
14+
public void publish(String message, String topicName) {
15+
broker.publish(message, topicName);
16+
}
17+
18+
public void deregister(String topicName) {
19+
broker.deRegisterProducer(topicName, this);
20+
}
21+
22+
public void register(String topicName) {
23+
broker.registerProducer(topicName, this);
24+
}
25+
}

src/main/java/com/gatomalvado/eventbus/README.md

Whitespace-only changes.
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.gatomalvado.eventbus;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.concurrent.CopyOnWriteArrayList;
6+
import java.util.concurrent.atomic.AtomicBoolean;
7+
8+
public class Topic {
9+
10+
private String topicName;
11+
private AtomicBoolean status;
12+
private List<String> elements;
13+
14+
public Topic(String topicName) {
15+
this.topicName = topicName;
16+
this.status = new AtomicBoolean(false);
17+
this.elements = new CopyOnWriteArrayList<>();
18+
}
19+
20+
public void addElement(String element) {
21+
elements.add(element);
22+
}
23+
24+
public String getElement(int index) throws InterruptedException {
25+
synchronized (elements) {
26+
if(index < 0) {
27+
throw new IndexOutOfBoundsException();
28+
}
29+
while(index >= elements.size() && !status.get()) {
30+
elements.wait();
31+
}
32+
if(index >= elements.size() && status.get()) {
33+
return null;
34+
}
35+
return elements.get(index);
36+
}
37+
}
38+
39+
public boolean isClosed() {
40+
return status.get();
41+
}
42+
43+
public synchronized void closeTopic(){
44+
status.set(true);
45+
synchronized (elements) {
46+
elements.notifyAll();
47+
}
48+
}
49+
50+
}

0 commit comments

Comments
 (0)