Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved decoding, example match DSL #2

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,15 @@ This repository contains small app that starts Neo4ex connection and Neo4j serve
After running `docker-compose up` go to the web interface (`http://localhost:7474`) and execute import command:
```
LOAD CSV WITH HEADERS FROM 'file:///example_data/customers-10000.csv' AS row
CALL apoc.create.node(['Customer'], row) YIELD node
CALL apoc.create.node(['Elixir.ExampleApp.Schema.Customer'], row) YIELD node
RETURN node

LOAD CSV WITH HEADERS FROM 'file:///example_data/organizations-10000.csv' AS row
CALL apoc.create.node(['Elixir.ExampleApp.Schema.Organization'], row) YIELD node
RETURN node

MATCH (c:`Elixir.ExampleApp.Schema.Customer`), (o:`Elixir.ExampleApp.Schema.Organization`) WHERE c.country = o.country
MERGE (c)-[:CUSTOMER_OF]->(o)
```
After that you can start application located in `example_app` and play with the data.

Expand Down
41 changes: 29 additions & 12 deletions example_app/lib/example_app.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,36 @@ defmodule ExampleApp do
Documentation for `ExampleApp`.
"""

import Neo4ex.Cypher.Query

alias Neo4ex.BoltProtocol.Structure.Message.Summary.Success
alias Neo4ex.BoltProtocol.Structure.Graph.Node
alias Neo4ex.BoltProtocol.Structure.Graph.{Node, Path}
alias Neo4ex.Cypher
alias ExampleApp.Connector
alias ExampleApp.Schema.Customer
alias ExampleApp.Schema.{Customer, Organization}

def hello do
query = """
MATCH (customer:Customer {company: $company})
RETURN customer
LIMIT 10
"""
country = "Nepal"

{:ok, _query, results} =
Connector.run(%Cypher.Query{query: query, params: %{company: "Davenport Inc"}})
query =
match(
customer: %Organization{country: country} <- [:customer_of] - %Customer{},
org: %Organization{},
nothing: %{},
return: customer,
limit: 5
)

{:ok, _query, results} = Connector.run(query)

results
|> Enum.reject(fn msg -> match?(%Success{}, msg) end)
|> Enum.map(fn [%Node{properties: properties}] ->
properties = Map.new(properties, fn {k, v} -> {String.to_atom(k), v} end)
struct(Customer, properties)
|> Enum.map(fn
[%Path{nodes: nodes}] ->
Enum.map(nodes, &node_to_struct/1)

[%Node{}] = node ->
node_to_struct(node)
end)
end

Expand All @@ -49,4 +58,12 @@ defmodule ExampleApp do
end)
end)
end

defp node_to_struct(%{labels: [label], properties: properties}) do
properties = Map.new(properties, fn {k, v} -> {String.to_atom(k), v} end)

label
|> String.to_existing_atom()
|> struct(properties)
end
end
12 changes: 12 additions & 0 deletions example_app/lib/example_app/schema/organization.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
defmodule ExampleApp.Schema.Organization do
defstruct [
:country,
:description,
:founded,
:index,
:industry,
:name,
:"organization id",
:"number of employees"
]
end
10,001 changes: 10,001 additions & 0 deletions example_data/organizations-10000.csv

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion lib/neo4ex/connector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Neo4ex.Connector do
Module responsible for communication with the database engine
"""
import Kernel, except: [send: 2]
import Bitwise

alias Neo4ex.Connector.Socket
alias Neo4ex.Cypher
Expand Down Expand Up @@ -148,7 +149,7 @@ defmodule Neo4ex.Connector do
def send_noop(%Socket{sock: sock}), do: Socket.send(sock, @noop)

def send(message, %Socket{sock: sock, bolt_version: bolt_version}) do
max_chunk_size = Integer.pow(2, @chunk_size)
max_chunk_size = 1 <<< @chunk_size

message
|> Encoder.encode(bolt_version)
Expand Down
88 changes: 87 additions & 1 deletion lib/neo4ex/cypher/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,93 @@ defmodule Neo4ex.Cypher.Query do

alias Neo4ex.Cypher.Query

defstruct query: "", params: %{}, opts: []
defstruct query: [], params: %{}, opts: []

## Top level operations ##

@match_ops [:return, :delete, :with, :unwind, :limit]
defmacro match(query_parts) do
{ast, query_parts} =
Macro.postwalk(query_parts, [], fn
{match_op, value}, acc when match_op in @match_ops ->
match_op = match_op |> Atom.to_string() |> String.upcase()
value = extract_value(value)
{nil, ["#{match_op} #{value}" | acc]}

{:<-, _, [to, {:-, _, [[rel], from]}]}, acc ->
rel = rel |> Atom.to_string() |> String.upcase()
from_ast = extract_label_with_filters(from, __CALLER__)
to_ast = extract_label_with_filters(to, __CALLER__)

expr =
quote do
"(:#{unquote(from_ast)})-[:#{unquote(rel)}]->(:#{unquote(to_ast)})"
end

{nil, [expr | acc]}

ast, acc ->
{ast, acc}
end)

match_ops =
Enum.map(@match_ops, fn match_op -> match_op |> Atom.to_string() |> String.upcase() end)

outstanding_identifiers =
Enum.filter(ast, fn
{identifier, nil} -> is_atom(identifier)
_ -> false
end)

query_string =
query_parts
|> Enum.reverse()
|> Enum.with_index()
|> Enum.map(fn {elem, idx} ->
if is_binary(elem) && Enum.any?(match_ops, &String.starts_with?(elem, &1)) do
elem
else
{identifier, nil} = Enum.at(outstanding_identifiers, idx)

quote do
"#{unquote(identifier)} = #{unquote(elem)}"
end
end
end)

quote do
%Query{
query: "MATCH " <> Enum.join(unquote(query_string), " ")
}
end
end

defp extract_value({var, _, _}), do: var
defp extract_value(val), do: val

defp extract_label_with_filters(ast, env) do
{_, label} =
Macro.prewalk(ast, nil, fn
{:__aliases__, _, _} = ast, _acc ->
{nil, "`#{Macro.expand(ast, env)}`"}

ast, acc ->
{ast, acc}
end)

{_, filters} =
Macro.prewalk(ast, [], fn
{:%{}, _, filters}, _acc ->
{nil, filters}

ast, acc ->
{ast, acc}
end)

quote do
"#{unquote(label)}{#{Enum.map(unquote(filters), fn {k, v} -> "#{k}: \"#{v}\"" end)}}"
end
end

defimpl DBConnection.Query do
def parse(%Query{params: %{}} = query, _opts), do: query
Expand Down
34 changes: 12 additions & 22 deletions lib/neo4ex/pack_stream/decoder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,40 @@ defmodule Neo4ex.PackStream.Decoder do

import Neo4ex.PackStream.DecoderBuilder

alias Neo4ex.Utils

@spec decode(binary()) :: {term(), binary()}
def decode(data), do: do_decode(data)

# those are being generated by the macro
register_decoder(nil, marker, data, do: {nil, data})
register_decoder(false, marker, data, do: {false, data})
register_decoder(true, marker, data, do: {true, data})
register_decoder(nil, data, do: {nil, data})
register_decoder(false, data, do: {false, data})
register_decoder(true, data, do: {true, data})

register_decoder(Float, marker, data) do
<<num::float, rest::binary>> = data
register_decoder(Float, <<num::float, rest::binary>>) do
{num, rest}
end

register_decoder(Integer, marker, data) do
num_size = Utils.bit_size_for_term_size(marker, Integer)
<<num::size(num_size), rest::binary>> = data
register_decoder(Integer, <<num::signed-integer-size(_), rest::binary>>) do
{num, rest}
end

register_decoder(String, marker, data) do
marker_size = Utils.bit_size_for_term_size(marker, String)
<<str_length::size(marker_size), str::binary-size(str_length), rest::binary>> = data
register_decoder(String, <<str_length::size(_), str::binary-size(str_length), rest::binary>>) do
{str, rest}
end

register_decoder(BitString, marker, data) do
marker_size = Utils.bit_size_for_term_size(marker, BitString)
<<bytes_length::size(marker_size), bytes::binary-size(bytes_length), rest::binary>> = data
register_decoder(
BitString,
<<bytes_length::size(_), bytes::binary-size(bytes_length), rest::binary>>
) do
{bytes, rest}
end

# PackStream only informs that the List/Map starts
# it can't decode its items since those can be ANY type (some of them may need Bolt version information to be decoded)
register_decoder(List, marker, data) do
marker_size = Utils.bit_size_for_term_size(marker, List)
<<list_length::size(marker_size), rest::binary>> = data
register_decoder(List, <<list_length::size(_), rest::binary>>) do
{List.duplicate(nil, list_length), rest}
end

register_decoder(Map, marker, data) do
marker_size = Utils.bit_size_for_term_size(marker, Map)
<<map_size::size(marker_size), rest::binary>> = data
register_decoder(Map, <<map_size::size(_), rest::binary>>) do
# that's not pretty but compact
{%{size: map_size}, rest}
end
Expand Down
30 changes: 22 additions & 8 deletions lib/neo4ex/pack_stream/decoder_builder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,36 @@ defmodule Neo4ex.PackStream.DecoderBuilder do
alias Neo4ex.Utils
alias Neo4ex.PackStream.Markers

defmacro register_decoder(type, marker_var, data_var, do: block) do
defmacro register_decoder(type, data_var, do: block) do
type = Macro.expand(type, __ENV__)

type
|> Macro.expand(__ENV__)
|> Markers.get!()
|> List.wrap()
|> Enum.map(&build_decode_fn(&1, marker_var, data_var, block))
# if consecutive markers repeat it means that the values encoded should fall into the bigger limit
# this happens for BitStrings which have the same marker up to 255 bytes of data and we have to remove repeating markers to get proper index
# while other types tend to have one more marker for small data up to 16 bytes
|> Enum.dedup()
|> Enum.with_index(fn element, index ->
{element, Utils.bit_size_for_term_size(index, type)}
end)
|> Enum.map(&build_decode_fn(&1, data_var, block))
end

defp build_decode_fn(marker_value, marker_var, data_var, block) do
marker_size = Utils.bit_size_for_integer(marker_value)
defp build_decode_fn({marker_value, data_size}, data_var, block) do
marker_size = Utils.count_bits(marker_value)

data_var =
Macro.prewalk(data_var, fn
# inject size of data in places where size has placeholder
{:size, meta, [{:_, _, _}]} -> {:size, meta, [data_size]}
ast -> ast
end)

quote do
defp do_decode(
<<unquote(marker_var)::size(unquote(marker_size)), unquote(data_var)::bitstring>>
)
when unquote(marker_var) == unquote(marker_value) do
<<unquote(marker_value)::size(unquote(marker_size)), unquote(data_var)::bitstring>>
) do
unquote(block)
end
end
Expand Down
46 changes: 28 additions & 18 deletions lib/neo4ex/pack_stream/encoder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,50 @@ defprotocol Neo4ex.PackStream.Encoder do
end

defimpl Neo4ex.PackStream.Encoder, for: Integer do
import Bitwise

alias Neo4ex.Utils
alias Neo4ex.PackStream.Markers

def encode(number) do
# get real size of number to retrieve MSB
byte_count = Utils.byte_size_for_integer(number)
bits = byte_count * 8
byte_count = byte_size_for_integer(number)

# we read one more bit from the beginning of number and check if it matches MSB
# if it doesn't then we have to add one byte to properly write number to PackStream
# for small ints we follow the rule from documentation and when we detect it then we don't add any marker
pack_byte_diff =
case <<number::size(bits + 1)>> do
<<1::1, 0xF::4, _::bitstring>> when bits == 8 -> -1
<<0::2, _::bitstring>> when bits == 8 -> -1
<<msb::1, msb::1, _::bitstring>> -> 0
_ -> 1
tiny_int =
case <<number::size(byte_count <<< 3)>> do
<<0xF::4, _::4>> -> true
<<0::1, _::7>> -> true
_ -> false
end

if pack_byte_diff < 0 do
if tiny_int do
<<number>>
else
marker_index =
byte_count
|> Kernel.+(pack_byte_diff)
|> :math.log2()
|> ceil()

marker_index = ceil_log2(byte_count)
marker = @for |> Markers.get!() |> Enum.at(marker_index)
byte_count = Integer.pow(2, marker_index)
byte_count = 1 <<< marker_index

<<marker, number::size(byte_count * 8)>>
end
end

defp byte_size_for_integer(0), do: 1

defp byte_size_for_integer(number) do
((Utils.count_bits(number, true) - 1) >>> 3) + 1
end

defp ceil_log2(x) do
bit_length = Utils.count_bits(x)

# Check if x is already a power of 2
if Utils.power_of_two?(x) do
bit_length - 1
else
bit_length
end
end
end

defimpl Neo4ex.PackStream.Encoder, for: Float do
Expand Down
Loading