Skip to content

Commit

Permalink
feat: Add boilerplace code for feature pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
iusztinpaul committed Jan 17, 2025
1 parent 70ea182 commit c4d7849
Show file tree
Hide file tree
Showing 21 changed files with 191 additions and 55 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# ruff
# Ruff
.ruff_cache

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
3 changes: 1 addition & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ FROM python:3.12-slim
ENV DEBIAN_FRONTEND=noninteractive
ENV TERM=xterm


# Update and install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
apt-utils \
Expand Down Expand Up @@ -36,7 +35,7 @@ COPY pyproject.toml uv.lock README.md ./

# Create virtual environment and install dependencies
RUN python3 -m venv /app/.venv && \
/app/.venv/bin/pip install --no-cache-dir --upgrade pip uv zenml[server] && \
/app/.venv/bin/pip install --no-cache-dir --upgrade pip uv && \
/app/.venv/bin/uv sync --python /app/.venv/bin/python && \
ls -la /app/.venv/bin

Expand Down
1 change: 1 addition & 0 deletions INSTALL_AND_USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ make_me_laugh
make local-infrastructure-up
```


### Run ZenML pipelines

```bash
Expand Down
15 changes: 12 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export PYTHONPATH = .

# --- Default Values ---

CHECK_DIRS := src
CHECK_DIRS := .
LOCAL_DATA_PATH := data


Expand All @@ -22,6 +22,9 @@ help:
local-docker-infrastructure-up:
docker compose up --build -d

local-docker-infrastructure-up-development:
docker compose watch

local-docker-infrastructure-stop:
docker compose stop

Expand Down Expand Up @@ -52,10 +55,16 @@ s3-download: # Download from S3 to local folder using AWS
# --- Pipelines ---

collect-notion-data-pipeline:
uv run python -m tools.run --run-collect-notion-data --no-cache
uv run python -m tools.run --run-collect-notion-data-pipeline --no-cache

etl-pipeline:
uv run python -m tools.run --run-etl --no-cache
uv run python -m tools.run --run-etl-pipeline --no-cache

generate-dataset-pipeline:
uv run python -m tools.run --run-generate-dataset-pipeline --no-cache

compute-rag-vector-index-pipeline:
uv run python -m tools.run --run-compute-rag-vector-index-pipeline --no-cache

# --- Tests ---

Expand Down
1 change: 1 addition & 0 deletions configs/compute_rag_vector_index.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
parameters:
1 change: 1 addition & 0 deletions configs/generate_dataset.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
parameters:
15 changes: 14 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ services:
- IS_RUNNING_IN_DOCKER=true
volumes:
- ./data:/app/data
- zenml-config:/root/.zenml
- zenml-config:/root/.config/zenml
networks:
- zenml-network
ports:
Expand All @@ -46,6 +46,19 @@ services:
zenml status &&
zenml server list &&
tail -f /dev/null"
develop:
watch:
# Sync the working directory with the `/app` directory in the container
- action: sync
path: .
target: /app
# Exclude the project virtual environment
ignore:
- .venv/

# Rebuild the image on changes to the `pyproject.toml`
- action: rebuild
path: ./pyproject.toml

networks:
zenml-network:
Expand Down
7 changes: 3 additions & 4 deletions pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from .collect_notion_data import collect_notion_data
from .compute_rag_vector_index import compute_rag_vector_index
from .etl import etl
from .generate_dataset import generate_dataset

__all__ = [
"collect_notion_data",
"etl"
]
__all__ = ["collect_notion_data", "etl", "generate_dataset", "compute_rag_vector_index"]
18 changes: 18 additions & 0 deletions pipelines/compute_rag_vector_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from zenml import pipeline

from second_brain.config import settings
from steps.infrastructure import (
fetch_from_mongodb,
)


@pipeline
def compute_rag_vector_index() -> None:
fetch_documents_config = {
"mongodb_uri": settings.MONGODB_OFFLINE_URI,
"database_name": settings.MONGODB_OFFLINE_DATABASE,
"collection_name": settings.MONGODB_OFFLINE_COLLECTION,
"limit": 100,
}

fetch_from_mongodb(**fetch_documents_config)
19 changes: 10 additions & 9 deletions pipelines/etl.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
from zenml import pipeline

from second_brain.config import settings
from steps.infrastructure import (
ingest_to_mongodb,
fetch_from_mongodb,
ingest_to_mongodb,
)

from second_brain.config import settings

@pipeline
def etl() -> None:
ingest_json_config= {
"mongodb_uri": settings.MONGODB_OFFLINE_URI,
"database_name": settings.MONGODB_OFFLINE_DATABASE,
"collection_name": settings.MONGODB_OFFLINE_COLLECTION,
"data_directory": settings.DATA_DIRECTORY,
}
ingest_json_config = {
"mongodb_uri": settings.MONGODB_OFFLINE_URI,
"database_name": settings.MONGODB_OFFLINE_DATABASE,
"collection_name": settings.MONGODB_OFFLINE_COLLECTION,
"data_directory": settings.DATA_DIRECTORY,
}
fetch_documents_config = {
"mongodb_uri": settings.MONGODB_OFFLINE_URI,
"database_name": settings.MONGODB_OFFLINE_DATABASE,
"collection_name": settings.MONGODB_OFFLINE_COLLECTION,
"limit": 100,
}

ingest_to_mongodb(**ingest_json_config)

fetch_from_mongodb(**fetch_documents_config)
18 changes: 18 additions & 0 deletions pipelines/generate_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from zenml import pipeline

from second_brain.config import settings
from steps.infrastructure import (
fetch_from_mongodb,
)


@pipeline
def generate_dataset() -> None:
fetch_documents_config = {
"mongodb_uri": settings.MONGODB_OFFLINE_URI,
"database_name": settings.MONGODB_OFFLINE_DATABASE,
"collection_name": settings.MONGODB_OFFLINE_COLLECTION,
"limit": 100,
}

fetch_from_mongodb(**fetch_documents_config)
5 changes: 1 addition & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies = [

[dependency-groups]
dev = [
"pytest>=8.3.4",
"ruff>=0.7.2",
]

Expand All @@ -39,7 +40,3 @@ packages = ["src/second_brain"]

[tool.ruff]
target-version = "py312"
line-length = 120
ignore = [
"E501", # Disable Ruff's line length checks
]
7 changes: 3 additions & 4 deletions src/second_brain/config.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import os
from pathlib import Path
from typing import Optional
from loguru import logger

from loguru import logger
from pydantic import Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
"""
A Pydantic-based settings class for managing application configurations.
Expand All @@ -28,7 +28,6 @@ class Settings(BaseSettings):
"decodingml-public-data" # Name of the S3 bucket for storing application data.
)
AWS_S3_PREFIX: str = "second_brain_course/notion"


# --- CometML Configuration ---
COMET_API_KEY: Optional[str] = None # API key for CometML integration.
Expand All @@ -46,7 +45,7 @@ class Settings(BaseSettings):
ENABLE_OFFLINE_MODE: bool = Field(
True, description="Flag to enable offline mode (disables online ingestion)."
)

# --- GROQ Configuration ---
GROQ_API_KEY: Optional[str] = None # API key for accessing GROQ services.

Expand Down
7 changes: 4 additions & 3 deletions src/second_brain/infrastructure/mongo/service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import List, Dict
from pymongo import MongoClient, errors
from typing import Dict, List

from bson import ObjectId
from loguru import logger
from pymongo import MongoClient, errors


class MongoDBService:
Expand Down Expand Up @@ -155,4 +156,4 @@ def verify_genre(self, genre: str) -> int:
return count
except errors.PyMongoError as e:
logger.error(f"Error verifying genre '{genre}': {e}")
raise
raise
2 changes: 1 addition & 1 deletion src/second_brain/infrastructure/notion/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def query_notion_database(
Returns:
A list of dictionaries containing the query results.
"""

url = f"https://api.notion.com/v1/databases/{database_id}/query"
headers = {
"Authorization": f"Bearer {self.api_key}",
Expand Down
2 changes: 1 addition & 1 deletion steps/infrastructure/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .upload_to_s3 import upload_to_s3
from .fetch_from_mongodb import fetch_from_mongodb
from .ingest_to_mongodb import ingest_to_mongodb
from .upload_to_s3 import upload_to_s3

__all__ = ["upload_to_s3", "fetch_from_mongodb", "ingest_to_mongodb"]
6 changes: 2 additions & 4 deletions steps/infrastructure/fetch_from_mongodb.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from zenml.steps import step
from loguru import logger
from zenml.steps import step

from second_brain.infrastructure.mongo import MongoDBService

Expand All @@ -24,7 +24,7 @@ def fetch_from_mongodb(
Raises:
Exception: If the fetch process fails.
"""

try:
service = MongoDBService(mongodb_uri, database_name, collection_name)

Expand All @@ -39,5 +39,3 @@ def fetch_from_mongodb(
except Exception as e:
logger.error(f"Failed to fetch documents: {e}")
raise


12 changes: 5 additions & 7 deletions steps/infrastructure/ingest_to_mongodb.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import json
from pathlib import Path
from zenml.steps import step

from loguru import logger
from zenml.steps import step

from second_brain.infrastructure.mongo.service import MongoDBService


@step
def ingest_to_mongodb(
mongodb_uri: str,
database_name: str,
collection_name: str,
data_directory: str
mongodb_uri: str, database_name: str, collection_name: str, data_directory: str
) -> None:
"""ZenML step to ingest JSON data from multiple files into MongoDB.
Expand Down Expand Up @@ -65,10 +63,10 @@ def read_all_json_files(data_directory: str) -> list[dict]:
if database_dir.is_dir():
# Find all .json files in the database directory
json_files = database_dir.glob("*.json")

for json_file in json_files:
try:
with open(json_file, 'r', encoding='utf-8') as f:
with open(json_file, "r", encoding="utf-8") as f:
document = json.load(f)
all_documents.append(document)
except json.JSONDecodeError as e:
Expand Down
1 change: 1 addition & 0 deletions steps/infrastructure/upload_to_s3.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pathlib import Path

from zenml import step

from second_brain.config import settings
Expand Down
Loading

0 comments on commit c4d7849

Please sign in to comment.