Process stream of data in real time manner and trigger data-driven actions.
This is notification system for Video Stream Processing Unit (VSPU).
The purpose is to build solution that based on any data source is able to process received data and finally trigger custom actions.
Each part is build in such way that custom data sources, data processors and actions may be implemented.
Here You have provided few examples.
- Live plot (matplotlib):
Plots stream of incoming data provided by callable object.
Possible to set limit value (both static and dynamic).
Based on matplotlib. - Email notificaiton (Gmail SMTP):
Send email based on incoming data stream and either dynamic or static limit value.
System sources data from Kafka topic and compare to current limit.
If data value exceeds limit value, sends email. - Database for IPC:
Redis database for quick inter process communication.
Modifying vaiables by external user made available.
This component is optional.
- SumTheAge:
Kafka Streams and Java application.
Recieves data from one Kafka topic, process and pass to another Kafka topic.
Values are filtered and then sum off allage
key is evaluated in real time as data is coming.
- Mockup script:
Python script that pushes data to Kafka topic. Data is flat JSON type in format of (string_key, JSON_value).
- Kafka CLI consumer:
To visulize data flow through Kafka server.
NOTE: It is assumed that Kafka Server is up and running. Also it's bin directory added to path
kafka-topics.sh --zookeeper 127.0.0.1:2181 \
--topic example.001 \
--create \
--partitions 1 \
--replication-factor 1
kafka-topics.sh --zookeeper 127.0.0.1:2181 \
--topic example.001.age.sum \
--create \
--partitions 1 \
--replication-factor 1
docker run -d --name redis-db -p 6379:6379 redis redis-server --requirepass "password"
NOTE: To manipulate Redis data (e.g. change value for 'limit' key to 150):
docker exec -it redis-db bash redis-cli -a password SET limit 150
git clone https://github.com/patryklaskowski/vspu-notification-system.git &&
cd vspu-notification-system
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 \
--topic example.001.age.sum \
--group example.001.age.sum.vis.app \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 \
--topic example.001 \
--group example.001.vis.app \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
TODO
NOTE: To reset group offset to very beginning
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 \ --topic example.001 \ --group example.001.vis.app \ --reset-offsets --to-earliest \ --execute
java -jar kafka-streams/sumTheAge-kafka-streams/target/sumTheAge-kafka-streams-1.0-jar-with-dependencies.jar
NOTE: To reset group offset to very beginning
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 \ --topic example.001.age.sum \ --group example.001.age.sum.vis.app \ --reset-offsets --to-earliest \ --execute
cd python-kafka-consumer &&
python3.7 -m venv env &&
source env/bin/activate &&
python3 -m pip install -r requirements.txt &&
python3 live_plot_consumer.py --limit 10000 --window 50 --interval 300
cd python-kafka-producer-mockup &&
python3.7 -m venv env &&
source env/bin/activate &&
python3 -m pip install -r requirements.txt &&
python3 kafka-python-sumTheAge-producer.py --min -5 --max 7 --sleep 0.2 -n 200