Skip to content

Commit

Permalink
Port to Memgraph 2.0 features (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
gitbuda authored Oct 26, 2021
1 parent 22fb328 commit 3244980
Show file tree
Hide file tree
Showing 17 changed files with 224 additions and 167 deletions.
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,26 @@

This repository serves as a point of reference when developing a streaming application with [Memgraph](https://memgraph.com) and a message broker such as [Kafka](https://kafka.apache.org).

![drawing](https://i.imgur.com/5YMlN8M.png)
![Example Streaming App](https://user-images.githubusercontent.com/4950251/137717495-fab38a69-b087-44ef-90b4-188a7187fbab.png)

*KafkaProducer* represents the source of your data.
That can be transactions, queries, metadata or something different entirely.
In this minimal example we propose using a [special string format](./kafka) that is easy to parse.
The data is sent from the *KafkaProducer* to *Kafka* under a topic aptly named *topic*.
The *Backend* implements a *KafkaConsumer*.
It takes data from *Kafka*, parses it and sends it to *Memgraph* for graph analysis, feature extraction or storage.
It takes data from *Kafka*, consumes it, but also queries *Memgraph* for graph analysis, feature extraction or storage.

## Installation
Install [Kafka](./kafka) and [Memgraph](./memgraph) using the instructions in the homonymous directories.
Then choose a programming language from the list of supported languages and follow the instructions given there.

### List of supported programming languages
- [c#](./backend/cs)
- [go](./backend/go)
- [java](./backend/java)
- [node](./backend/node)
- [python](./backend/python)
- [java](./backend/java)
- [go](./backend/go)
- [c#](./backend/cs)
- [rust](./backend/rust)

## How does it work *exactly*
### KafkaProducer
Expand Down
32 changes: 5 additions & 27 deletions backend/cs/memgraph-streaming/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,6 @@ class Program
{
static void Main(string[] args)
{
var cypherNodeCommand = "MERGE (node:{0} {1}) "
+ "SET node += {2}";
var cypherEdgeCommand = "MERGE (node1:{0} {1}) "
+ "MERGE (node2:{2} {3}) "
+ "MERGE (node1)-[:{4} {5}]->(node2)";

using var driver = GraphDatabase.Driver("bolt://localhost:7687", AuthTokens.None);
using var session = driver.Session();

Expand All @@ -30,32 +24,16 @@ static void Main(string[] args)
var message = consumer.Consume().Message.Value;
System.Console.WriteLine("received message: " + message);
var arr = message.Split("|");
var cypherCommand = "";
switch (arr[0])
{
case "node":
cypherCommand = string.Format(cypherNodeCommand, arr[1], arr[2], arr[3]);
break;
case "edge":
cypherCommand = string.Format(cypherEdgeCommand, arr[1], arr[2], arr[5], arr[6], arr[3], arr[4]);
break;
default:
throw new InvalidOperationException(
string.Format("Command '{0}' not supported.", message)
);
}
System.Console.WriteLine(cypherCommand);
session.WriteTransaction(tx =>
{
tx.Run(cypherCommand);
return "";
});
if (arr[0] == "node") {
var neighbors = session.WriteTransaction(tx =>
{
return tx.Run(string.Format("MATCH (node:{0} {1}) RETURN node.neighbors AS neighbors", arr[1], arr[2])).Peek();
});
Console.WriteLine(string.Format("Node (node:{0} {1}) has {2} neighbors.", arr[1], arr[2], neighbors.Values["neighbors"]));
if (neighbors != null) {
Console.WriteLine(string.Format("Node (node:{0} {1}) has {2} neighbors.", arr[1], arr[2], neighbors.Values["neighbors"]));
} else {
Console.WriteLine("Neighbors number is null. Triggers are not defined or not yet executed.");
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions backend/cs/memgraph-streaming/memgraph-streaming.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.7.0" />
<PackageReference Include="Neo4j.Driver.Simple" Version="4.3.1" />
<PackageReference Include="Confluent.Kafka" Version="1.8.1" />
<PackageReference Include="Neo4j.Driver.Simple" Version="4.3.2" />
</ItemGroup>

</Project>
21 changes: 0 additions & 21 deletions backend/go/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,22 @@ func main() {
}
defer driver.Close()

cypherNodeCommand := "MERGE (node:%s %s) " +
"SET node += %s"
cypherEdgeCommand := "MERGE (node1:%s %s) " +
"MERGE (node2:%s %s) " +
"MERGE (node1)-[:%s %s]->(node2)"

kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "topic",
MinBytes: 0,
MaxBytes: 10e6,
})
defer kafkaReader.Close()
kafkaLoop:
for {
kafkaMessage, err := kafkaReader.ReadMessage(context.Background())
if err != nil {
fmt.Println("nothing to read...")
break
}
message := string(kafkaMessage.Value)
cypherCommand := ""
arr := strings.Split(message, "|")

switch arr[0] {
case "node":
cypherCommand = fmt.Sprintf(cypherNodeCommand, arr[1], arr[2], arr[3])
case "edge":
cypherCommand = fmt.Sprintf(cypherEdgeCommand, arr[1], arr[2], arr[5], arr[6], arr[3], arr[4])
default:
fmt.Printf("invalid kafka message: `%s`", message)
break kafkaLoop
}
_, err = runCypherCommand(driver, cypherCommand)
if err != nil {
panic(err)
}
if arr[0] == "node" {
result, err := runCypherCommand(
driver,
Expand Down
16 changes: 0 additions & 16 deletions backend/java/memgraph-streaming/src/main/java/memgraph/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@

public class App {
public static void main(String[] args) throws Exception {
String nodeQuery = "MERGE (node:%s %s) "
+ "SET node += %s";
String edgeQuery = "MERGE (node1:%s %s) "
+ "MERGE (node2:%s %s) "
+ "MERGE (node1)-[:%s %s]->(node2)";

try (Driver driver = GraphDatabase.driver("bolt://localhost:7687");
Session session = driver.session();
Expand All @@ -38,24 +33,13 @@ public static void main(String[] args) throws Exception {
public String execute(Transaction tx) {
switch (command[0]) {
case "node":
tx.run(String.format(nodeQuery, command[1], command[2],
command[3]));
Result result = tx.run(String.format(
"MATCH (node:%s %s) RETURN node.neighbors AS neighbors",
command[1], command[2]));
System.out.printf("Node (node:%s %s) has %d neighbors.\n",
command[1], command[2],
result.single().get(0).asInt());
break;
case "edge":
tx.run(String.format(edgeQuery, command[1], command[2],
command[5], command[6], command[3],
command[4]));
break;
default:
System.out.printf("Error: unknown command `%s`\n",
record.value());
return null;
}
System.out.printf("%s\n", record.value());
return null;
Expand Down
26 changes: 1 addition & 25 deletions backend/node/src/index.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,5 @@
/**
* This is a generic Kafka Consumer that will enable you to store data from
* Kafka to Memgraph.
*
* The data pushed to Kafka has to be in the following format:
* command|label|unique_fields|fields
* command|label1|unique_fields1|edge_type|edge_fields|label2|unique_fields2
*
* command - string: "edge", or "node"
* label - string: type(s) of a node e.g. "Person", or "Machine:Vehicle:Car"
* edge_type - string: type of an edge e.g. "CONNECTED_WITH"
* fields - string in form of a json/python dictionary representing the
* properties of a node or edge:
* `{age: 53}` or `{id: 4, name: "hero", alive: true}`
* This is a generic Kafka Consumer + an example express app.
*/
const express = require('express');
const app = express();
Expand Down Expand Up @@ -56,18 +44,6 @@ async function runConsumer() {
\n - partition ${partition} \
\n - offset ${offset}. \
\nUpdated total count to ${++kafkaCounter}`);
const [type, ...rest] = value.toString().split('|');
const session = driver.session();
if (type === 'node') {
const [label, uniqueFields, fields] = rest;
await session.run(`MERGE (n:${label} ${uniqueFields}) SET n += ${fields};`);
} else if (type === 'edge') {
const [n1l, n1u, edgeType, edgeFields, n2l, n2u] = rest;
await session.run(`MERGE (n1:${n1l} ${n1u}) MERGE (n2:${n2l} ${n2u}) \
MERGE (n1)-[:${edgeType} ${edgeFields}]->(n2);`);
} else {
throw new Error('Unknown message type.');
}
}),
);
}
Expand Down
61 changes: 13 additions & 48 deletions backend/python/app.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,26 @@
"""This is a generic Kafka Consumer and an example Python code on how to query
Memgraph.
"""
This is a generic Kafka Consumer that will enable you to store data from Kafka
to Memgraph.
The data pushed to Kafka has to be in the following format:
command|label|unique_fields|fields
command|label1|unique_fields1|edge_type|edge_fields|label2|unique_fields2
command - string: "edge", or "node"
label - string: type(s) of a node e.g. "Person", or "Machine:Vehicle:Car"
edge_type - string: type of an edge e.g. "CONNECTED_WITH"
fields - string in form of a json/python dictionary representing the
properties of a node or edge:
`{age: 53}` or `{id: 4, name: "hero", alive: true}`
"""
import csv
import logging
import csv

from gqlalchemy import Memgraph
from kafka import KafkaConsumer


def process(message: str, db: Memgraph):
"""Takes graph database `db` and a string message in the following format:
command|label|unique_fields|fields
command|label1|unique_fields1|edge_type|edge_fields|label2|unique_fields2
command - string: "edge", or "node"
label - string: type of a node e.g. "Person" or "Machine:Vehicle:Car"
edge_type - string: type of an edge e.g. "CONNECTED_WITH"
fields - string in form of a json/python dictionary representing the
properties of a node or edge:
`{age: 53}` or `{id: 4, name: "hero", alive: true}`
Throws a ValueError if the command isn't recognised.
"""
"""Prints the number of neighbors."""
logging.info(f"Received `{message}`")
payload = next(csv.reader([message], delimiter="|"))
command, *payload = payload

if command == "node":
label, unique_fields, fields = payload
db.execute_query(f"merge (a:{label} {unique_fields}) set a += {fields}")
neighbors = next(db.execute_and_fetch(
f"match (a:{label} {unique_fields}) return a.neighbors as n"
))['n']
neighbors = next(
db.execute_and_fetch(
f"match (a:{label} {unique_fields}) return a.neighbors as n"
)
)["n"]
if neighbors is None:
print(
"The neighbors variable isn't set. "
Expand All @@ -51,32 +29,20 @@ def process(message: str, db: Memgraph):
else:
print(f"(node:{label} {unique_fields}) has {neighbors} neighbors.")
elif command == "edge":
(
label1,
unique_fields1,
edge_type,
edge_fields,
label2,
unique_fields2,
) = payload
db.execute_query(
f"merge (a:{label1} {unique_fields1}) "
f"merge (b:{label2} {unique_fields2}) "
f"merge (a)-[:{edge_type} {edge_fields}]->(b)"
)
pass
else:
raise ValueError(f"Command `{command}` not recognized.")
logging.info(f"`{message}`, Successfully entered {command} in Memgraph.")


if __name__ == "__main__":
logging.basicConfig(
filename="info.log",
level=logging.INFO,
format="%(levelname)s: %(asctime)s %(message)s",
)

db = Memgraph(host="localhost", port=7687)
db.drop_database()

consumer = KafkaConsumer("topic", bootstrap_servers=["localhost:9092"])
try:
for message in consumer:
Expand All @@ -86,6 +52,5 @@ def process(message: str, db: Memgraph):
except Exception as error:
logging.error(f"`{message}`, {repr(error)}")
continue

except KeyboardInterrupt:
pass
Loading

0 comments on commit 3244980

Please sign in to comment.