-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathdh_questdb.py
98 lines (70 loc) · 3.06 KB
/
dh_questdb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import deephaven.dtypes as dht
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
from deephaven import kafka_consumer as ck
from deephaven import pandas as dhpd
from deephaven.plot.figure import Figure
from dhquest import qdb # custom lib
########################################
# call wrapper func to QuestDB
trades = qdb.get_trades(last_nticks=1000)
candles = qdb.get_candles(sample_by='1m')
candles_btc = candles.where(['symbol == `BTC-USD`'])
# plot candles from QuestDB (static, will not update)
plot_btc_candles = Figure()\
.chart_title(title="BTC OHLC - 1min candles from QuestDB (non-ticking)")\
.plot_ohlc(series_name="BTC", t=candles_btc, x="ts", open="openp", high="highp", low="lowp", close="closep")\
.show()
########################################
# call QuestDB SQL directly
query = """
SELECT * FROM trades
WHERE symbol = 'BTC-USD'
LIMIT -200
"""
trades_btc = qdb.run_query(query)
########################################
# or subscribe to stream from Kafka
trades_latest = ck.consume(
{'bootstrap.servers': 'redpanda:29092'},
'trades',
key_spec=KeyValueSpec.IGNORE,
value_spec = ck.json_spec([
('ts', dht.DateTime),
('receipt_ts', dht.DateTime),
('symbol', dht.string),
('exchange', dht.string),
('side', dht.string),
('size', dht.double),
('price', dht.double),
]),
table_type=TableType.ring(10000))\
.update_view([
"bin = upperBin(ts, MINUTE * 1)",
"latency_ms = (receipt_ts - ts) / 1e6",
]).tail_by(1000,['symbol'])\
.drop_columns(['KafkaPartition', 'KafkaOffset', 'KafkaTimestamp'])
########################################
# create partitions and pivot them into columns
def get_partitions(org_table, partition_by: str):
partitioned_table = org_table.partition_by([partition_by])
keys_table = partitioned_table.table.select_distinct(partitioned_table.key_columns) # a DH 1 column table of unique keys
iterator = keys_table.j_object.columnIterator(partition_by) # this is a Java iterator
keys_list = []
while iterator.hasNext():
keys_list.append(iterator.next())
return partitioned_table, keys_list
def create_pivots(partitioned_table, keys_list):
for index, key in enumerate(keys_list):
table_now = partitioned_table.get_constituent([key])
symbol_now = key.lower().replace('-', '_')
print(symbol_now)
locals()[f"output_{index}"] = table_now.view(['bin', f'{symbol_now} = quote_size'])
if index == 0:
locals()[f"output_final"] = locals()[f"output_0"] # FIXME: this assumes 1st symbol has all 'bin' timestamps!!!
else:
locals()[f"output_final"] = locals()[f"output_final"].natural_join(locals()[f"output_{index}"], on=['bin'])
return locals()[f"output_final"]
table_to_partition = trades_latest.view(['symbol','bin','quote_size = size*price']).sum_by(['bin','symbol'])
partitioned_table, keys_list = get_partitions(table_to_partition, 'symbol')
print(keys_list)
pivot_table = create_pivots(partitioned_table, keys_list)