-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdb.py
102 lines (90 loc) · 3.04 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import json
import psycopg2
import uuid
import os
def init_db():
con = psycopg2.connect(
dbname=os.getenv('POSTGRES_DB'),
user=os.getenv('POSTGRES_USER'),
password=os.getenv('POSTGRES_PASSWORD'),
host=os.getenv('POSTGRES_HOST'),
port=os.getenv('POSTGRES_PORT')
)
cur = con.cursor()
cur.execute('''CREATE TABLE IF NOT EXISTS MarketData
(id UUID PRIMARY KEY, symbol TEXT, interval TEXT, close REAL, high REAL, low REAL, timestamp BIGINT)''')
con.commit()
con.close()
def log_db(message):
parsed = json.loads(message)
k = parsed["k"]
id = uuid.uuid4() # PostgreSQL supports UUID natively
symbol = k["s"]
interval = k["i"]
close = float(k["c"])
high = float(k["h"])
low = float(k["l"])
time = int(k["t"])
print("log:")
print("symbol: " + symbol)
print("interval: " + interval)
print("close: " + str(close))
print("high: " + str(high))
print("low: " + str(low))
print("time: " + str(time))
row = (str(id), symbol, interval, close, high, low, time)
con = psycopg2.connect(
dbname=os.getenv('POSTGRES_DB'),
user=os.getenv('POSTGRES_USER'),
password=os.getenv('POSTGRES_PASSWORD'),
host=os.getenv('POSTGRES_HOST'),
port=os.getenv('POSTGRES_PORT')
)
cur = con.cursor()
cmd = "INSERT INTO MarketData (id, symbol, interval, close, high, low, timestamp) VALUES (%s, %s, %s, %s, %s, %s, %s)"
cur.execute(cmd, row)
con.commit()
con.close()
def get_recent_market_data():
con = psycopg2.connect(
dbname=os.getenv('POSTGRES_DB'),
user=os.getenv('POSTGRES_USER'),
password=os.getenv('POSTGRES_PASSWORD'),
host=os.getenv('POSTGRES_HOST'),
port=os.getenv('POSTGRES_PORT')
)
cur = con.cursor()
cur.execute("SELECT * FROM MarketData ORDER BY timestamp ASC")
rows = cur.fetchall()
con.close()
return rows
def get_recent_market_data_length():
try:
con = psycopg2.connect(
dbname=os.getenv('POSTGRES_DB'),
user=os.getenv('POSTGRES_USER'),
password=os.getenv('POSTGRES_PASSWORD'),
host=os.getenv('POSTGRES_HOST'),
port=os.getenv('POSTGRES_PORT')
)
cur = con.cursor()
cur.execute("SELECT COUNT(*) FROM MarketData") # Count the number of rows
row_count = cur.fetchone()[0] # Fetch the count
con.close()
return row_count
except psycopg2.Error as e:
print(f"Error fetching data: {e}")
return 0 # Return 0 in case of an error
# def get_recent_market_data(period):
# con = psycopg2.connect(
# dbname=os.getenv('POSTGRES_DB'),
# user=os.getenv('POSTGRES_USER'),
# password=os.getenv('POSTGRES_PASSWORD'),
# host=os.getenv('POSTGRES_HOST'),
# port=os.getenv('POSTGRES_PORT')
# )
# cur = con.cursor()
# cur.execute(f"SELECT * FROM MarketData ORDER BY timestamp DESC LIMIT {period}")
# rows = cur.fetchall()
# con.close()
# return rows