Releases: betfund/betfund-event-broker
Releases · betfund/betfund-event-broker
Pre-Release - [0.0.7]
NOTE
This has effectively cut the runtime of both of our flows in HALF
(the mongo operations were wildly inefficient, ESPECIALLY locally)
The core enhancement is migrating from individual pymongo
operations to using a .bulk_write(...)
The enhancement summed up:
# We create a list of "operators"...
# Thru the pymongo client we can "bulk_write" these operators
operator = UpdateOne(
filter={"_id": event.fi},
update={
"$set": {
"data.odds": odds
}
},
upsert=True
)
In 0b6049f:
- Removed some unused code
- Fixed a full install dependency.
In b3e2e75:
- Integrated/tested use of Prefect Depth First Execution (using bleeding edge install)
- There is no major change in terms of codebase to enable it, since we already use Dask
In 767ea24-4a1f66b:
- Two bug fixes
- Resolve unused import (test fail)
- Return
odds
asdict
instead oflist
0.0.3: Bugfix/logging (#7)
* fix logging issue * bump version * fix flake error
Pre-Release - [0.0.2]
BUGFIX
-Logging number of events rather than amount of keys
Pre-Release - [0.0.1]
betfund-event-broker
Intermediary between Bet365 and Downstream Data Processing
Installation
From source
$ git clone https://github.com/betfund/betfund-event-broker.git
$ cd betfund-event-broker
$ python3.7 -m venv venv
$ pip install -e .
Design/Usage
Implementation of Prefect Task
for betfund-bet365
# Using a Bet365 Base Class
from betfund_event_broker.tasks.bet365 import Bet365Task
class Bet365UpcomingEvents(Bet365Task):
"""Executes GET request to `Test` endpoint."""
def run(self, test_arg: str):
"""
Executes API Request to `test_endpoint(...)` endpoint.
Args:
test_arg (str): A test argument
Returns:
tuple: contains API response object and kafka topic
"""
bet365_client = self._build_client()
response = bet365_client.upcoming_events(test_arg=test_arg)
return response
Implementation of Prefect Flow
for a betfund-event
from betfund_event_broker.flows.base_flow import EventBrokerFlow
from prefect import Flow
class TestFlow(EventBrokerFlow):
"""The Test FLow for EventBrokerFlow"""
def build(self, *args, **kwargs):
"""Builds a flow via imperative API."""
plus_one = PlusOneTask()
with Flow("my-Imperative-flow") as flow:
flow.set_dependencies(
task=plus_one,
upstream_tasks=[RunMeFirst()],
keyword_tasks=dict(x=10)),
mapped=False
)
return flow
Flows
UpcomingEventsFlow
Workflow Diagram:
- Fetch upcoming events and insert into NoSQL Datastore
Testing
pip install -e ".[testing]"
# Test with pytest
make tests
# Lint with flake8
make flake
# Lint with pylint
make lint