Skip to content

Commit

Permalink
feat: indexer orders from EventBuyDirect
Browse files Browse the repository at this point in the history
  • Loading branch information
blushi committed Nov 6, 2024
1 parent 4c2fbdb commit 6ce0f42
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 0 deletions.
152 changes: 152 additions & 0 deletions index_orders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import logging
import os
import textwrap
from psycopg2.extras import Json
import requests
from utils import PollingProcess, events_to_process
from collections import defaultdict

logger = logging.getLogger(__name__)


def fetch_sell_order(height, sell_order_id):
resp = requests.get(
f"{os.environ['REGEN_API']}/regen/ecocredit/marketplace/v1/sell-orders/{sell_order_id}",
headers={"x-cosmos-block-height": str(height)},
)
resp.raise_for_status()
return resp.json()["sell_order"]

def fetch_project_id(batch_denom):
resp = requests.get(
f"{os.environ['REGEN_API']}/regen/ecocredit/v1/batches/{batch_denom}"
)
resp.raise_for_status()
batch = resp.json()["batch"]
return batch["project_id"]


def _index_orders(pg_conn, _client, _chain_num):
with pg_conn.cursor() as cur:
# Dictionary to store events grouped by project_id
for event in events_to_process(
cur,
"orders",
):
events_by_project_and_denom = defaultdict(lambda: defaultdict(list))
(type, block_height, tx_idx, msg_idx, _, _, chain_num, timestamp, tx_hash) = event[0]
logger.info("event")
logger.info(event)
# We need to get the corresponding msg.data
# because EventBuyDirect only stores sell order id currently
sql = textwrap.dedent(
f"""
SELECT data
FROM msg
WHERE chain_num = {chain_num} AND block_height = {block_height} AND tx_idx = {tx_idx} AND msg_idx = {msg_idx}
"""
)
cur.execute(sql)
res = cur.fetchone()
data = res[0]

normalize = {}
normalize["type"] = type
normalize["block_height"] = block_height
normalize["tx_idx"] = tx_idx
normalize["msg_idx"] = msg_idx
normalize["chain_num"] = chain_num
normalize["timestamp"] = timestamp
normalize["tx_hash"] = tx_hash
normalize["buyer_address"] = data["buyer"]

for order in data["orders"]:
# If all credits have been purchased in the sell order, then it's pruned from state,
# so we need to retrieve the sell order info at height - 1 to get the corresponding project_id
sell_order = fetch_sell_order(
normalize["block_height"] - 1, order["sell_order_id"]
)
project_id = fetch_project_id(sell_order["batch_denom"])
ask_denom = sell_order["ask_denom"]
# We group by project_id and ask_denom so we insert a new row in orders table by (project_id, ask_denom)
events_by_project_and_denom[project_id][ask_denom].append(order)

logger.info("events_by_project_and_denom")
logger.info(events_by_project_and_denom)
for project_id, denoms in events_by_project_and_denom.items():
for ask_denom, orders in denoms.items():
normalize["credits_amount"] = 0
normalize["total_price"] = 0

for order in orders:
normalize["credits_amount"] = normalize["credits_amount"] + float(order["quantity"])
normalize["total_price"] = normalize["total_price"] + float(order["bid_price"]["amount"]) * float(order["quantity"])
normalize["ask_denom"] = order["bid_price"]["denom"]
normalize["retired_credits"] = not order["disable_auto_retire"]
row = (
normalize["type"],
normalize["block_height"],
normalize["tx_idx"],
normalize["msg_idx"],
normalize["chain_num"],
normalize["timestamp"],
normalize["tx_hash"],
normalize["buyer_address"],
normalize["credits_amount"],
normalize["total_price"],
normalize["ask_denom"],
normalize["retired_credits"],
order["retirement_reason"],
order["retirement_jurisdiction"],
project_id,
)
insert_text = textwrap.dedent("""
INSERT INTO orders (
type,
block_height,
tx_idx,
msg_idx,
chain_num,
timestamp,
tx_hash,
buyer_address,
credits_amount,
total_price,
ask_denom,
retired_credits,
retirement_reason,
retirement_jurisdiction,
project_id
) VALUES (
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s
);""").strip("\n")
with pg_conn.cursor() as _cur:
_cur.execute(
insert_text,
row,
)
logger.debug(_cur.statusmessage)
pg_conn.commit()
logger.info("order inserted...")


def index_orders():
p = PollingProcess(
target=_index_orders,
sleep_secs=1,
)
p.start()
2 changes: 2 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from index_proposals import index_proposals
from index_class_issuers import index_class_issuers
from index_votes import index_votes
from index_orders import index_orders

load_dotenv()

Expand All @@ -34,6 +35,7 @@

if __name__ == "__main__":
index_blocks()
index_orders()
index_retires()
index_proposals()
index_class_issuers()
Expand Down
26 changes: 26 additions & 0 deletions migrations/committed/000005.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
--! Previous: sha1:8aa6f840171d1c0076e70fbee8dd9b56f794a03d
--! Hash: sha1:b0be1153261a470d83023f509af01421f190ea79

DROP TABLE IF EXISTS orders;

CREATE TABLE orders (
"timestamp" timestamp with time zone,
type text NOT NULL,
credits_amount text NOT NULL,
project_id text NOT NULL,
buyer_address text NOT NULL,
total_price text NOT NULL,
ask_denom text NOT NULL,
retired_credits BOOLEAN NOT NULL,
retirement_reason text,
retirement_jurisdiction text,
block_height bigint NOT NULL,
chain_num smallint NOT NULL,
tx_idx smallint NOT NULL,
msg_idx smallint NOT NULL,
tx_hash text NOT NULL,
PRIMARY KEY (chain_num, block_height, tx_idx, msg_idx)
);

DROP INDEX IF EXISTS orders_buyer_address_idx;
CREATE INDEX orders_buyer_address_idx ON orders USING btree (buyer_address);
38 changes: 38 additions & 0 deletions migrations/schema_snapshot.sql
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,29 @@ CREATE TABLE public.msg_event (
);


--
-- Name: orders; Type: TABLE; Schema: public; Owner: -
--

CREATE TABLE public.orders (
"timestamp" timestamp with time zone,
type text NOT NULL,
credits_amount text NOT NULL,
project_id text NOT NULL,
buyer_address text NOT NULL,
total_price text NOT NULL,
ask_denom text NOT NULL,
retired_credits boolean NOT NULL,
retirement_reason text,
retirement_jurisdiction text,
block_height bigint NOT NULL,
chain_num smallint NOT NULL,
tx_idx smallint NOT NULL,
msg_idx smallint NOT NULL,
tx_hash text NOT NULL
);


--
-- Name: proposals; Type: TABLE; Schema: public; Owner: -
--
Expand Down Expand Up @@ -518,6 +541,14 @@ ALTER TABLE ONLY public.msg
ADD CONSTRAINT msg_pkey PRIMARY KEY (chain_num, block_height, tx_idx, msg_idx);


--
-- Name: orders orders_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--

ALTER TABLE ONLY public.orders
ADD CONSTRAINT orders_pkey PRIMARY KEY (chain_num, block_height, tx_idx, msg_idx);


--
-- Name: proposals proposals_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--
Expand Down Expand Up @@ -630,6 +661,13 @@ CREATE INDEX msg_event_type_idx ON public.msg_event USING btree (type);
CREATE INDEX msg_expr_idx ON public.msg USING gin (((data -> '@type'::text)));


--
-- Name: orders_buyer_address_idx; Type: INDEX; Schema: public; Owner: -
--

CREATE INDEX orders_buyer_address_idx ON public.orders USING btree (buyer_address);


--
-- Name: proposals_group_policy_address_idx; Type: INDEX; Schema: public; Owner: -
--
Expand Down
1 change: 1 addition & 0 deletions utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def run(self):
"regen.ecocredit.v1.EventUpdateClassIssuers",
],
"votes": ["cosmos.group.v1.EventVote"],
"orders": ["regen.ecocredit.marketplace.v1.EventBuyDirect"]
}


Expand Down

0 comments on commit 6ce0f42

Please sign in to comment.