diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2430ceb --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +rust/target +.idea \ No newline at end of file diff --git a/LICENSE.md/LICENSE.md b/LICENSE.md similarity index 100% rename from LICENSE.md/LICENSE.md rename to LICENSE.md diff --git a/go/go.mod b/go/go.mod new file mode 100644 index 0000000..5f8d9fe --- /dev/null +++ b/go/go.mod @@ -0,0 +1,5 @@ +module vm + +go 1.13 + +require github.com/cespare/xxhash v1.1.0 diff --git a/go/go.sum b/go/go.sum new file mode 100644 index 0000000..6036232 --- /dev/null +++ b/go/go.sum @@ -0,0 +1,3 @@ +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= diff --git a/go/oracle_cache.go b/go/oracle_cache.go new file mode 100644 index 0000000..0f7cfb7 --- /dev/null +++ b/go/oracle_cache.go @@ -0,0 +1,159 @@ +package _go + +import ( + "encoding/binary" + "fmt" + "github.com/cespare/xxhash" + "strings" + "vm/shm" +) + +const LenSize uint32 = 4 +const ItemSize uint32 = 16 +const TickerSize uint32 = 8 + +type OracleCache struct { + mem *shm.Memory + capacity uint32 +} + +func CreateOracleCache(name string, capacity uint32) (*OracleCache, error) { + bufferSize := int32(LenSize + (capacity * ItemSize)) + mem, err := shm.Open(name, bufferSize) + if err != nil { + mem, err = shm.Create(name, bufferSize) + if err != nil { + return nil, err + } + } + + return &OracleCache{mem, capacity}, nil +} + +func (o OracleCache) Clear() { + o.setSize(0) +} + +func (o OracleCache) PutPrice(ticker string, price uint64) bool { + hash := xxTicker(ticker) + priceBytes := uint64ToBytes(price) + first, last := o.findIndex(hash) + if first == last { + o.storeAtIndex(first, uint64ToBytes(hash), priceBytes) + return true + } else { + if o.Len() == o.capacity { + return false + } else { + o.setSize(o.Len() + 1) + o.shiftRight(first) + o.storeAtIndex(first, uint64ToBytes(hash), priceBytes) + return true + } + } +} + +func (o OracleCache) GetPrice(ticker string) *uint64 { + hash := xxTicker(ticker) + first, last := o.findIndex(hash) + if first == last { + price := bytesToUint64(o.getByIndex(first)[TickerSize:]) + return &price + } else { + return nil + } +} + +func (o OracleCache) Len() uint32 { + bs := make([]byte, 4) + o.mem.ReadAt(bs, 0) + return binary.LittleEndian.Uint32(bs) +} + +func (o OracleCache) Close() (err error) { + return o.mem.Close() +} + +func (o OracleCache) ToString() string { + buff := "[" + l := o.Len() + for i := 0; i < int(l); i++ { + value := o.getByIndex(uint32(i)) + ticker := bytesToUint64(value[0 : TickerSize+1]) + price := bytesToUint64(value[TickerSize:]) + buff += fmt.Sprintf("%d -> %d, ", ticker, price) + } + buff += "]" + return buff +} + +func (o OracleCache) findIndex(ticker uint64) (uint32, uint32) { + len := o.Len() + if len == 0 { + return 0, 1 + } + + first := uint32(0) + last := len + for { + middle := (first + last) / 2 + middleTicker := bytesToUint64(o.getByIndex(middle)[0:TickerSize]) + if ticker == middleTicker { + return middle, middle + } else if ticker < middleTicker { + last = middle + } else { + first = middle + 1 + } + + if first >= last { + last += 1 + break + } + } + + return first, last +} + +func offset(index uint32) uint32 { + return ItemSize*index + LenSize +} + +func (o OracleCache) storeAtIndex(index uint32, ticker []byte, price []byte) { + offset := offset(index) + o.mem.WriteAt(ticker, int64(offset)) + o.mem.WriteAt(price, int64(offset+TickerSize)) +} + +func (o OracleCache) getByIndex(index uint32) []byte { + offset := offset(index) + return o.mem.Slice(int64(offset), int64(ItemSize+offset)) +} + +func (o OracleCache) setSize(size uint32) { + bs := make([]byte, 4) + binary.LittleEndian.PutUint32(bs, size) + o.mem.WriteAt(bs, 0) +} + +func (o OracleCache) shiftRight(index uint32) { + startOffset := offset(index) + endOffset := offset(o.Len() - 1) + bs := make([]byte, endOffset-startOffset) + o.mem.ReadAt(bs, int64(startOffset)) + o.mem.WriteAt(bs, int64(offset(index+1))) +} + +func xxTicker(ticker string) uint64 { + return xxhash.Sum64String(strings.ToLower(ticker)) +} + +func uint64ToBytes(int uint64) []byte { + bs := make([]byte, 8) + binary.LittleEndian.PutUint64(bs, int) + return bs +} + +func bytesToUint64(price []byte) uint64 { + return binary.LittleEndian.Uint64(price) +} diff --git a/go/oracle_cache_test.go b/go/oracle_cache_test.go new file mode 100644 index 0000000..b67edff --- /dev/null +++ b/go/oracle_cache_test.go @@ -0,0 +1,64 @@ +package _go + +import ( + "fmt" + "testing" +) + +func TestOracleCache(t *testing.T) { + cache, err := CreateOracleCache("/test", 100) + if err != nil { + t.Errorf("Failed to create cache") + } + defer cache.Close() + cache.Clear() + + cache.PutPrice("BTCUSD", 8000) + cache.PutPrice("USDRUB", 70) + + if *cache.GetPrice("USDRUB") != 70 { + t.Errorf("Invalid cache data") + } + + if *cache.GetPrice("BTCuSD") != 8000 { + t.Errorf("Invalid cache data") + } + + if cache.GetPrice("CuSD") != nil { + t.Errorf("Invalid cache data") + } + + cache.PutPrice("USDRUB", 80) + if *cache.GetPrice("USDRUB") != 80 { + t.Errorf("Invalid cache data") + } +} + +func TestCreateOracleCapacity(t *testing.T) { + cache, err := CreateOracleCache("/test_1", 100) + if err != nil { + t.Errorf("Failed to create cache") + } + defer cache.Close() + cache.Clear() + + for i := 0; i < 100; i++ { + if !cache.PutPrice(fmt.Sprintf("T:%d", i), uint64(i)) { + t.Errorf("Failed to put oracle value") + } + } + + if cache.Len() != 100 { + t.Errorf("Invalid cache len") + } + + if cache.PutPrice("T:101", 101) { + t.Errorf("Cache overflow") + } + + for i := 0; i < 100; i++ { + if *cache.GetPrice(fmt.Sprintf("T:%d", i)) != uint64(i) { + t.Errorf("Invalid cache value") + } + } +} diff --git a/go/shm/LICENSE b/go/shm/LICENSE new file mode 100644 index 0000000..d71cc2c --- /dev/null +++ b/go/shm/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2017 hidez8891 + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/go/shm/shm.go b/go/shm/shm.go new file mode 100644 index 0000000..91582dd --- /dev/null +++ b/go/shm/shm.go @@ -0,0 +1,49 @@ +package shm + +// Memory is shared memory struct +type Memory struct { + m *shmi +} + +// Create is create shared memory +func Create(name string, size int32) (*Memory, error) { + m, err := create(name, size) + if err != nil { + return nil, err + } + return &Memory{m}, nil +} + +// Open is open exist shared memory +func Open(name string, size int32) (*Memory, error) { + m, err := open(name, size) + if err != nil { + return nil, err + } + return &Memory{m}, nil +} + +// Close is close & discard shared memory +func (o *Memory) Close() (err error) { + if o.m != nil { + err = o.m.close() + if err == nil { + o.m = nil + } + } + return err +} + +func (o *Memory) Slice(off int64, size int64) []byte { + return o.m.memRef(off, size) +} + +// ReadAt is read shared memory (offset) +func (o *Memory) ReadAt(p []byte, off int64) int { + return o.m.readAt(p, off) +} + +// WriteAt is write shared memory (offset) +func (o *Memory) WriteAt(p []byte, off int64) int { + return o.m.writeAt(p, off) +} diff --git a/go/shm/shmi_darwin.go b/go/shm/shmi_darwin.go new file mode 100644 index 0000000..d4d2fbc --- /dev/null +++ b/go/shm/shmi_darwin.go @@ -0,0 +1,153 @@ +package shm + +/* +#include +#include +#include +#include +#include +#include +#include + +extern int errno; + +int _create(const char* name, int size, int flag) { + mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; + + int fd = shm_open(name, flag, mode); + if (fd < 0) { + return -1; + } + + struct stat mapstat; + if (-1 != fstat(fd, &mapstat) && mapstat.st_size == 0) { + if (ftruncate(fd, size) != 0) { + close(fd); + return -2; + } + } + return fd; +} + +int shm_create(const char* name, int size) { + int flag = O_RDWR | O_CREAT; + return _create(name, size, flag); +} + +int _shm_open(const char* name, int size) { + int flag = O_RDWR; + return _create(name, size, flag); +} + +void* shm_map(int fd, int size) { + void* p = mmap( + NULL, size, + PROT_READ | PROT_WRITE, + MAP_SHARED, fd, 0); + if (p == MAP_FAILED) { + return NULL; + } + return p; +} + +void shm_close(int fd, void* p, int size) { + if (p != NULL) { + munmap(p, size); + } + if (fd != 0) { + close(fd); + } +} + +void shm_delete(const char* name) { + shm_unlink(name); +} +*/ +import "C" + +import ( + "fmt" + "reflect" + "unsafe" +) + +type shmi struct { + name string + fd C.int + v unsafe.Pointer + size int32 + parent bool +} + +// create shared memory. return shmi object. +func create(name string, size int32) (*shmi, error) { + name = "/" + name + + fd := C.shm_create(C.CString(name), C.int(size)) + if fd < 0 { + return nil, fmt.Errorf("create:%v", fd) + } + + v := C.shm_map(fd, C.int(size)) + if v == nil { + C.shm_close(fd, nil, C.int(size)) + C.shm_delete(C.CString(name)) + } + + return &shmi{name, fd, v, size, true}, nil +} + +// open shared memory. return shmi object. +func open(name string, size int32) (*shmi, error) { + name = "/" + name + + fd := C._shm_open(C.CString(name), C.int(size)) + if fd < 0 { + return nil, fmt.Errorf("open:%v", fd) + } + + v := C.shm_map(fd, C.int(size)) + if v == nil { + C.shm_close(fd, nil, C.int(size)) + C.shm_delete(C.CString(name)) + } + + return &shmi{name, fd, v, size, false}, nil +} + +func (o *shmi) close() error { + if o.v != nil { + C.shm_close(o.fd, o.v, C.int(o.size)) + o.v = nil + } + if o.parent { + C.shm_delete(C.CString(o.name)) + } + return nil +} + +// read shared memory. return read size. +func (o *shmi) readAt(p []byte, off int64) int { + if max := int64(o.size) - off; int64(len(p)) > max { + p = p[:max] + } + return copyPtr2Slice(uintptr(o.v), p, off, o.size) +} + +// write shared memory. return write size. +func (o *shmi) writeAt(p []byte, off int64) int { + if max := int64(o.size) - off; int64(len(p)) > max { + p = p[:max] + } + return copySlice2Ptr(p, uintptr(o.v), off, o.size) +} + +func (o *shmi) memRef(off int64, size int64) []byte { + h := reflect.SliceHeader{} + h.Cap = int(o.size) + h.Len = int(o.size) + h.Data = uintptr(o.v) + bb := *(*[]byte)(unsafe.Pointer(&h)) + + return bb[off:size] +} diff --git a/go/shm/shmi_linux.go b/go/shm/shmi_linux.go new file mode 100644 index 0000000..6bb8afe --- /dev/null +++ b/go/shm/shmi_linux.go @@ -0,0 +1,150 @@ +// +build linux,cgo + +package shm + +/* +#cgo LDFLAGS: -lrt + +#include +#include +#include +#include +#include +#include + +int _create(const char* name, int size, int flag) { + mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; + + int fd = shm_open(name, flag, mode); + if (fd < 0) { + return -1; + } + + if (ftruncate(fd, size) != 0) { + close(fd); + return -2; + } + return fd; +} + +int shm_create(const char* name, int size) { + int flag = O_RDWR | O_CREAT; + return _create(name, size, flag); +} + +int _shm_open(const char* name, int size) { + int flag = O_RDWR; + return _create(name, size, flag); +} + +void* shm_mmap(int fd, int size) { + void* p = mmap( + NULL, size, + PROT_READ | PROT_WRITE, + MAP_SHARED, fd, 0); + if (p == MAP_FAILED) { + return NULL; + } + return p; +} + +void shm_close(int fd, void* p, int size) { + if (p != NULL) { + munmap(p, size); + } + if (fd != 0) { + close(fd); + } +} + +void shm_delete(const char* name) { + shm_unlink(name); +} +*/ +import "C" + +import ( + "fmt" + "unsafe" +) + +type shmi struct { + name string + fd C.int + v unsafe.Pointer + size int32 + parent bool +} + +// create shared memory. return shmi object. +func create(name string, size int32) (*shmi, error) { + name = "/" + name + + fd := C.shm_create(C.CString(name), C.int(size)) + if fd < 0 { + return nil, fmt.Errorf("create:%v", fd) + } + + v := C.shm_map(fd, C.int(size)) + if v == nil { + C.shm_close(fd, nil, C.int(size)) + C.shm_delete(C.CString(name)) + } + + return &shmi{name, fd, v, size, true}, nil +} + +// open shared memory. return shmi object. +func open(name string, size int32) (*shmi, error) { + name = "/" + name + + fd := C._shm_open(C.CString(name), C.int(size)) + if fd < 0 { + return nil, fmt.Errorf("open:%v", fd) + } + + v := C.shm_map(fd, C.int(size)) + if v == nil { + C.shm_close(fd, nil, C.int(size)) + C.shm_delete(C.CString(name)) + } + + return &shmi{name, fd, v, size, false}, nil +} + +func (o *shmi) close() error { + if o.v != nil { + C.shm_close(o.fd, o.v, C.int(o.size)) + o.v = nil + } + if o.parent { + C.shm_delete(C.CString(o.name)) + } + return nil +} + +// read shared memory. return read size. +func (o *shmi) readAt(p []byte, off int64) int { + if max := int64(o.size) - off; int64(len(p)) > max { + p = p[:max] + } + return copyPtr2Slice(uintptr(o.v), p, off, o.size) +} + +// write shared memory. return write size. +func (o *shmi) writeAt(p []byte, off int64) int { + if max := int64(o.size) - off; int64(len(p)) > max { + p = p[:max] + } + return copySlice2Ptr(p, uintptr(o.v), off, o.size) +} + +func (o *shmi) memRef(off int64, size int64) []byte { + h := reflect.SliceHeader{} + h.Cap = int(o.size) + h.Len = int(o.size) + h.Data = uintptr(o.v) + bb := *(*[]byte)(unsafe.Pointer(&h)) + + return bb[off:size] +} diff --git a/go/shm/tools.go b/go/shm/tools.go new file mode 100644 index 0000000..9830499 --- /dev/null +++ b/go/shm/tools.go @@ -0,0 +1,26 @@ +package shm + +import ( + "reflect" + "unsafe" +) + +func copySlice2Ptr(b []byte, p uintptr, off int64, size int32) int { + h := reflect.SliceHeader{} + h.Cap = int(size) + h.Len = int(size) + h.Data = p + + bb := *(*[]byte)(unsafe.Pointer(&h)) + return copy(bb[off:], b) +} + +func copyPtr2Slice(p uintptr, b []byte, off int64, size int32) int { + h := reflect.SliceHeader{} + h.Cap = int(size) + h.Len = int(size) + h.Data = p + + bb := *(*[]byte)(unsafe.Pointer(&h)) + return copy(b, bb[off:size]) +} diff --git a/rust/Cargo.lock b/rust/Cargo.lock new file mode 100644 index 0000000..51ad905 --- /dev/null +++ b/rust/Cargo.lock @@ -0,0 +1,150 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +[[package]] +name = "anyhow" +version = "1.0.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85bb70cc08ec97ca5450e6eba421deeea5f172c0fc61f78b5357b2a8e8be195f" + +[[package]] +name = "bitflags" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" + +[[package]] +name = "byteorder" +version = "1.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" + +[[package]] +name = "cc" +version = "1.0.54" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bbb73db36c1246e9034e307d0fba23f9a2e251faa47ade70c1bd252220c8311" + +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + +[[package]] +name = "getrandom" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "libc" +version = "0.2.71" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9457b06509d27052635f90d6466700c65095fdf75409b3fbdd903e988b886f49" + +[[package]] +name = "log" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "nix" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50e4785f2c3b7589a0d0c1dd60285e1188adac4006e8abd6dd578e1567027363" +dependencies = [ + "bitflags", + "cc", + "cfg-if", + "libc", + "void", +] + +[[package]] +name = "oraclesh" +version = "0.1.0" +dependencies = [ + "anyhow", + "byteorder", + "libc", + "log", + "nix", + "twox-hash", +] + +[[package]] +name = "ppv-lite86" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "237a5ed80e274dbc66f86bd59c1e25edc039660be53194b5fe0a482e0f2612ea" + +[[package]] +name = "rand" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" +dependencies = [ + "getrandom", + "libc", + "rand_chacha", + "rand_core", + "rand_hc", +] + +[[package]] +name = "rand_chacha" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" +dependencies = [ + "getrandom", +] + +[[package]] +name = "rand_hc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" +dependencies = [ + "rand_core", +] + +[[package]] +name = "twox-hash" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bfd5b7557925ce778ff9b9ef90e3ade34c524b5ff10e239c69a42d546d2af56" +dependencies = [ + "rand", +] + +[[package]] +name = "void" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" + +[[package]] +name = "wasi" +version = "0.9.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" diff --git a/rust/Cargo.toml b/rust/Cargo.toml new file mode 100644 index 0000000..8a1776c --- /dev/null +++ b/rust/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "oraclesh" +version = "0.1.0" +authors = [ + "Alex Koz. ", + "Dm. Yakushev ", + "Maxim Kurnikov " +] +edition = "2018" + +[dependencies] +anyhow = "1.0.31" +libc = "0.2.71" +nix = "0.17.0" +byteorder = "1.3.2" +log = "0.4.8" +twox-hash = "1.5.0" + diff --git a/rust/src/lib.rs b/rust/src/lib.rs new file mode 100644 index 0000000..63f56f4 --- /dev/null +++ b/rust/src/lib.rs @@ -0,0 +1,7 @@ +#[macro_use] +extern crate log; + +pub mod map; +pub mod oracle; +pub mod shm; +pub mod sorted_set; diff --git a/rust/src/map.rs b/rust/src/map.rs new file mode 100644 index 0000000..9db015e --- /dev/null +++ b/rust/src/map.rs @@ -0,0 +1,165 @@ +use crate::shm::Memory; +use crate::sorted_set::{Binary, BinaryCmp, Cmp, SetIterator, SortedSet}; +use std::cmp::Ordering; +use std::fmt; +use std::fmt::{Display, Formatter}; +use std::marker::PhantomData; + +/// Fixed size elements shared memory map. +pub struct ShmMap<'a, K, V> +where + K: Binary + BinaryCmp, + V: Binary, +{ + set: SortedSet<'a, Entry>, +} + +impl<'a, K, V> ShmMap<'a, K, V> +where + K: Binary + BinaryCmp, + V: Binary, +{ + /// Create a new shared memory map with given memory reference. + pub fn new(memory: Memory<'a>) -> ShmMap<'a, K, V> { + let set = SortedSet::new(memory); + ShmMap { set } + } + + /// Put a value to the map. + pub fn put(&mut self, key: K, value: V) { + self.set.add(Entry { key, value }); + } + + /// Retrieve the value associated with the key. + pub fn get(&self, key: K) -> Option { + let mut buffer = vec![0; K::const_size() as usize]; + key.to_bytes(&mut buffer); + + self.set.find(KeyCmp::new(key)).map(|e| e.value) + } + + /// Removes all keys. + pub fn clear(&mut self) { + self.set.clear(); + } + + /// Create map iterator. + pub fn iter(&self) -> MapIterator<'_, '_, K, V> { + MapIterator { + iter: self.set.iter(), + } + } +} + +/// Map iterator. +pub struct MapIterator<'a, 'b, K, V> +where + K: Binary + BinaryCmp, + V: Binary, +{ + iter: SetIterator<'a, 'b, Entry>, +} + +impl<'a, 'b, K, V> Iterator for MapIterator<'a, 'b, K, V> +where + K: Binary + BinaryCmp, + V: Binary, +{ + type Item = (K, V); + + fn next(&mut self) -> Option { + self.iter.next().map(|e| (e.key, e.value)) + } +} + +impl<'a, K, V> Display for ShmMap<'a, K, V> +where + K: Binary + BinaryCmp + Display, + V: Binary + BinaryCmp + Display, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "[")?; + for item in self.iter() { + write!(f, "({}->{}),", item.0, item.1)?; + } + write!(f, "]") + } +} + +struct KeyCmp +where + K: Binary + BinaryCmp, +{ + key: Vec, + _type: PhantomData, +} + +impl KeyCmp +where + K: Binary + BinaryCmp, +{ + pub fn new(key: K) -> KeyCmp { + let mut buffer = vec![0; K::const_size() as usize]; + key.to_bytes(&mut buffer); + + KeyCmp { + key: buffer, + _type: PhantomData, + } + } +} + +impl Cmp for KeyCmp +where + K: Binary + BinaryCmp, +{ + fn cmp(&self, order: &[u8]) -> Ordering { + K::cmp(&self.key, &order[0..K::const_size() as usize]) + } +} + +pub struct Entry { + pub key: K, + pub value: V, +} + +impl Binary for Entry +where + K: Binary, + V: Binary, +{ + fn const_size() -> u32 { + K::const_size() + V::const_size() + } + + fn to_bytes(&self, buffer: &mut [u8]) { + self.key.to_bytes(&mut buffer[0..K::const_size() as usize]); + self.value.to_bytes( + &mut buffer + [K::const_size() as usize..K::const_size() as usize + V::const_size() as usize], + ); + } + + fn from_bytes(buffer: &[u8]) -> Self { + Entry { + key: K::from_bytes(&buffer[0..K::const_size() as usize]), + value: V::from_bytes( + &buffer + [K::const_size() as usize..K::const_size() as usize + V::const_size() as usize], + ), + } + } +} + +impl BinaryCmp for Entry +where + K: Binary + BinaryCmp, + V: Binary, +{ + fn cmp(left: &[u8], right: &[u8]) -> Ordering { + K::cmp( + &left[0..K::const_size() as usize], + &right[0..K::const_size() as usize], + ) + } +} diff --git a/rust/src/oracle.rs b/rust/src/oracle.rs new file mode 100644 index 0000000..0737c5d --- /dev/null +++ b/rust/src/oracle.rs @@ -0,0 +1,119 @@ +use crate::map::ShmMap; +use crate::shm::Memory; +use crate::sorted_set::{Binary, BinaryCmp, LEN_SIZE}; +use byteorder::{ByteOrder, LittleEndian}; +use std::cmp::Ordering; +use std::hash::Hasher; +use twox_hash::XxHash64; + +/// Currency pair ticker. +#[derive(Clone, Copy, Debug, Eq, Ord, PartialOrd, PartialEq)] +pub struct Ticker(u64); + +impl Ticker { + /// Create a ticker with string. + pub fn new(ticker: &str) -> Ticker { + Ticker(str_xxhash(&ticker.to_ascii_lowercase())) + } +} + +/// Currency pair price. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct Price(pub u64); + +/// Price oracle cache. +pub struct PriceOracleCache<'a> { + cache: ShmMap<'a, Ticker, Price>, +} + +impl<'a> PriceOracleCache<'a> { + /// Returns memory layout size for the given oracle pairs count. + pub fn calculate_memory_size(pair_count: usize) -> usize { + (Ticker::const_size() + Price::const_size()) as usize * pair_count + LEN_SIZE + } + + /// Create a new shared memory map with given memory reference. + pub fn new(memory: Memory<'a>) -> PriceOracleCache<'a> { + PriceOracleCache { + cache: ShmMap::new(memory), + } + } + + /// Put or update oracle ticker->price pair. + pub fn put(&mut self, ticker: Ticker, price: Price) { + self.cache.put(ticker, price) + } + + /// Return price by ticker. + pub fn get(&self, ticker: Ticker) -> Option { + self.cache.get(ticker) + } + + /// Clear oracle cache. + pub fn clear(&mut self) { + self.cache.clear(); + } +} + +impl Binary for Ticker { + fn const_size() -> u32 { + 8 + } + + fn to_bytes(&self, buffer: &mut [u8]) { + LittleEndian::write_u64(&mut buffer[0..8], self.0); + } + + fn from_bytes(buffer: &[u8]) -> Self { + Ticker(LittleEndian::read_u64(&buffer[0..8])) + } +} + +impl BinaryCmp for Ticker { + fn cmp(left: &[u8], right: &[u8]) -> Ordering { + Ticker::from_bytes(left).cmp(&Ticker::from_bytes(right)) + } +} + +impl Binary for Price { + fn const_size() -> u32 { + 8 + } + + fn to_bytes(&self, buffer: &mut [u8]) { + LittleEndian::write_u64(&mut buffer[0..8], self.0); + } + + fn from_bytes(buffer: &[u8]) -> Self { + Price(LittleEndian::read_u64(&buffer[0..8])) + } +} + +/// Calculate string hash. +fn str_xxhash(val: &str) -> u64 { + let mut hash = XxHash64::default(); + Hasher::write(&mut hash, val.as_bytes()); + Hasher::finish(&hash) +} + +#[cfg(test)] +mod tests { + use crate::oracle::{Price, PriceOracleCache, Ticker}; + use crate::shm::Shm; + + #[test] + fn test_oracle() { + let shm = Shm::open_or_create( + "/test_oracle", + PriceOracleCache::calculate_memory_size(10) as u32, + ) + .unwrap(); + let mut oracle = PriceOracleCache::new(shm.memory()); + oracle.clear(); + oracle.put(Ticker::new("BTCUSD"), Price(8000)); + oracle.put(Ticker::new("USDRUB"), Price(70)); + assert_eq!(Some(Price(70)), oracle.get(Ticker::new("USDRUB"))); + assert_eq!(Some(Price(8000)), oracle.get(Ticker::new("BTCuSD"))); + assert_eq!(None, oracle.get(Ticker::new("BTCR"))); + } +} diff --git a/rust/src/shm.rs b/rust/src/shm.rs new file mode 100644 index 0000000..de48eab --- /dev/null +++ b/rust/src/shm.rs @@ -0,0 +1,125 @@ +use anyhow::Error; +use nix::fcntl::OFlag; +use nix::sys::{ + mman::{mmap, munmap, shm_open, MapFlags, ProtFlags}, + stat::fstat, + stat::Mode, +}; +use nix::unistd::{close, ftruncate}; +use std::ffi::c_void; +use std::os::unix::io::RawFd; +use std::ptr; +use std::slice; + +pub type ShmResult = Result; + +/// Shared memory. +pub struct Shm { + fd: RawFd, + mem: *mut c_void, + size: u32, +} + +unsafe impl Send for Shm {} + +impl Shm { + /// Open shared memory file. + pub fn open(name: &str, size: u32) -> ShmResult { + Shm::new(name, size, OFlag::O_RDWR) + } + + /// Create shared memory file. + pub fn create(name: &str, size: u32) -> ShmResult { + Shm::new(name, size, OFlag::O_RDWR | OFlag::O_CREAT) + } + + /// Open or create shared memory file. + pub fn open_or_create(name: &str, size: u32) -> ShmResult { + match Shm::open(name, size) { + Ok(shm) => Ok(shm), + Err(_) => Shm::create(name, size), + } + } + + /// Create a new shared memory file. + fn new(name: &str, size: u32, flag: OFlag) -> ShmResult { + let name = format!("/{}", name); + let fd = Shm::shm_open(&name, size, flag)?; + match Shm::mmap_shm(fd, size) { + Ok(mem) => Ok(Shm { fd, mem, size }), + Err(err) => { + close(fd)?; + Err(err) + } + } + } + + /// Open shared memory file. + fn shm_open(name: &str, size: u32, flag: OFlag) -> ShmResult { + let mode = Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IRGRP | Mode::S_IWGRP; + let fd = shm_open(name, flag, mode)?; + if let Ok(stat) = fstat(fd) { + if stat.st_size == 0 { + if let Err(err) = ftruncate(fd, size as i64) { + close(fd)?; + return Err(err.into()); + } + } + } + + Ok(fd) + } + + fn mmap_shm(fd: RawFd, size: u32) -> ShmResult<*mut c_void> { + Ok(unsafe { + mmap( + ptr::null_mut(), + size as usize, + ProtFlags::PROT_READ | ProtFlags::PROT_WRITE, + MapFlags::MAP_SHARED, + fd, + 0, + ) + }?) + } + + /// Returns shared memory reference. + pub fn memory(&self) -> Memory { + let mem = unsafe { slice::from_raw_parts_mut(self.mem as *mut u8, self.size as usize) }; + Memory { + inner: mem, + size: self.size as usize, + } + } +} + +/// Shared memory. +pub struct Memory<'a> { + inner: &'a mut [u8], + pub size: usize, +} + +impl<'a> Memory<'a> { + /// Returns memory reference. + pub fn mem_ref(&self) -> &[u8] { + self.inner + } + + /// Returns mutable memory reference. + pub fn mem_ref_mut(&mut self) -> &mut [u8] { + self.inner + } +} + +impl Drop for Shm { + fn drop(&mut self) { + unsafe { + if let Err(err) = munmap(self.mem, self.size as usize) { + error!("Failed to munmap:{:?}", err); + } + if let Err(err) = close(self.fd) { + error!("Failed to close shared memory file:{:?}", err); + } + } + } +} diff --git a/rust/src/sorted_set.rs b/rust/src/sorted_set.rs new file mode 100644 index 0000000..26226ee --- /dev/null +++ b/rust/src/sorted_set.rs @@ -0,0 +1,308 @@ +use crate::shm::Memory; +use byteorder::{ByteOrder, LittleEndian}; +use std::cmp::Ordering; +use std::fmt; +use std::fmt::{Display, Formatter}; +use std::marker::PhantomData; +use std::ops::Range; + +/// Set length size. +pub const LEN_SIZE: usize = 4; +/// Length bytes range; +pub const SIZE_RANGE: Range = 0..LEN_SIZE; + +/// Sorted set shared memory implementation. +pub struct SortedSet<'a, T: Binary + BinaryCmp> { + mem: Memory<'a>, + _item: PhantomData, +} + +impl<'a, T> SortedSet<'a, T> +where + T: Binary + BinaryCmp, +{ + /// Create a new sorted set with the given memory reference. + pub fn new(memory: Memory<'a>) -> SortedSet<'a, T> { + SortedSet { + mem: memory, + _item: PhantomData, + } + } + + /// Removes all elements from the set. + pub fn clear(&mut self) { + self.set_len(0); + } + + /// Add new elements to the set. + pub fn add(&mut self, item: T) { + let mut buffer = vec![0; T::const_size() as usize]; + item.to_bytes(&mut buffer); + let cmp: Comparator = Comparator { + buffer: &buffer, + t: PhantomData, + }; + + match self.find_index(cmp) { + Find::Index(index) => { + self.store_at_index(index, &buffer); + } + Find::LastRange((start, _)) => { + self.shift_right(start); + self.store_at_index(start, &buffer); + self.set_len(self.len() + 1); + } + } + } + + /// Get the element by the given index. + pub fn get(&self, index: usize) -> Option { + let len = self.len(); + if index >= len { + None + } else { + Some(T::from_bytes(self.get_by_index(index))) + } + } + + /// Returns set length. + pub fn len(&self) -> usize { + LittleEndian::read_u32(&self.mem.mem_ref()[SIZE_RANGE]) as usize + } + + /// Returns `true` if the set contains no elements. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Create vector with set elements. + pub fn to_vec(&self) -> Vec { + self.iter().collect() + } + + /// Finds the element by given comparator. + pub fn find(&self, order: impl Cmp) -> Option { + match self.find_index(order) { + Find::Index(index) => Some(T::from_bytes(self.get_by_index(index))), + Find::LastRange(_) => None, + } + } + + /// Returns iterator. + pub fn iter(&self) -> SetIterator { + SetIterator { + set: &self, + index: 0, + } + } + + /// Shifts all elements starting from the index to the right. + fn shift_right(&mut self, index: usize) { + let size = T::const_size() as usize; + for index in (index..self.len()).rev() { + let current_offset = self.offset(index); + let next_offset = current_offset + size as usize; + let mem = self.mem.mem_ref_mut(); + let (left, right) = mem.split_at_mut(next_offset); + right[0..size].copy_from_slice(&left[current_offset..next_offset]); + } + } + + /// Stores bytes by index. + fn store_at_index(&mut self, index: usize, buffer: &[u8]) { + let offset = self.offset(index); + let rf = &mut self.mem.mem_ref_mut()[offset..offset + T::const_size() as usize]; + rf.copy_from_slice(buffer); + } + + /// Find the element index or or the closest interval. + fn find_index(&self, order: impl Cmp) -> Find { + let len = self.len(); + if len == 0 { + return Find::LastRange((0, 0)); + } + + let mut list = (0, len); + loop { + let middle = (list.0 + list.1) / 2; + list = match order.cmp(self.get_by_index(middle)) { + Ordering::Equal => return Find::Index(middle), + Ordering::Less => (list.0, middle), + Ordering::Greater => (middle + 1, list.1), + }; + if list.0 >= list.1 { + break; + } + } + + Find::LastRange(list) + } + + /// Get bytes by index. + fn get_by_index(&self, index: usize) -> &[u8] { + let offset = self.offset(index); + &self.mem.mem_ref()[offset..offset + T::const_size() as usize] + } + + /// Create element offset by its index. + fn offset(&self, index: usize) -> usize { + T::const_size() as usize * index + LEN_SIZE + } + + /// Set list length. + fn set_len(&mut self, len: usize) { + LittleEndian::write_u32(&mut self.mem.mem_ref_mut()[SIZE_RANGE], len as u32); + } +} + +struct Comparator<'a, T: Binary + BinaryCmp> { + buffer: &'a [u8], + t: PhantomData, +} + +impl<'a, T> Cmp for Comparator<'a, T> +where + T: Binary + BinaryCmp, +{ + fn cmp(&self, order: &[u8]) -> Ordering { + T::cmp(self.buffer, order) + } +} + +enum Find { + Index(usize), + LastRange((usize, usize)), +} + +/// List element trait. +pub trait Binary { + fn const_size() -> u32; + fn to_bytes(&self, buffer: &mut [u8]); + fn from_bytes(buffer: &[u8]) -> Self; +} + +/// Binary comparator. +pub trait BinaryCmp { + fn cmp(left: &[u8], right: &[u8]) -> Ordering; +} + +/// Binary comparator. +pub trait Cmp { + fn cmp(&self, order: &[u8]) -> Ordering; +} + +pub struct SetIterator<'a, 'b, T> +where + T: Binary + BinaryCmp, +{ + set: &'b SortedSet<'a, T>, + index: usize, +} + +impl Iterator for SetIterator<'_, '_, T> +where + T: Binary + BinaryCmp, +{ + type Item = T; + + fn next(&mut self) -> Option { + let value = self.set.get(self.index); + self.index += 1; + value + } +} + +impl<'a, T> Display for SortedSet<'a, T> +where + T: Binary + BinaryCmp + Display, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "[")?; + for item in self.iter() { + write!(f, "{},", item)?; + } + write!(f, "]") + } +} + +#[cfg(test)] +mod tests { + use crate::shm::Shm; + use crate::sorted_set::{Binary, BinaryCmp, SortedSet}; + use byteorder::{ByteOrder, LittleEndian}; + use std::cmp::Ordering; + use std::fmt; + use std::fmt::Formatter; + + #[test] + fn test_list() { + let shm = Shm::open_or_create("/test", 1024).unwrap(); + let mut set = SortedSet::new(shm.memory()); + set.clear(); + set.add(Pair::new(1, 2)); + set.add(Pair::new(3, 1)); + set.add(Pair::new(0, 10)); + set.add(Pair::new(100, 100)); + set.add(Pair::new(10, 0)); + set.add(Pair::new(11, 0)); + set.add(Pair::new(11, 44)); + set.add(Pair::new(2, 2)); + + assert_eq!( + vec![ + Pair::new(0, 10), + Pair::new(1, 2), + Pair::new(2, 2), + Pair::new(3, 1), + Pair::new(10, 0), + Pair::new(11, 44), + Pair::new(100, 100), + ], + set.to_vec() + ); + } + + #[derive(Ord, PartialOrd, Eq, PartialEq, Debug)] + struct Pair { + key: u64, + value: u64, + } + + impl Pair { + pub fn new(key: u64, value: u64) -> Pair { + Pair { key, value } + } + } + + impl Binary for Pair { + fn const_size() -> u32 { + 16 + } + + fn to_bytes(&self, buffer: &mut [u8]) { + LittleEndian::write_u64(&mut buffer[0..8], self.key); + LittleEndian::write_u64(&mut buffer[8..16], self.value); + } + + fn from_bytes(buffer: &[u8]) -> Self { + Pair { + key: LittleEndian::read_u64(&buffer[0..8]), + value: LittleEndian::read_u64(&buffer[8..16]), + } + } + } + + impl BinaryCmp for Pair { + fn cmp(left: &[u8], right: &[u8]) -> Ordering { + let left = LittleEndian::read_u64(&left[0..8]); + let right = LittleEndian::read_u64(&right[0..8]); + left.cmp(&right) + } + } + + impl fmt::Display for Pair { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "({}:{})", self.key, self.value) + } + } +}