Skip to content

Commit

Permalink
feat: Refactor MongoDB code. Clean codE
Browse files Browse the repository at this point in the history
  • Loading branch information
iusztinpaul committed Jan 16, 2025
1 parent b0b965b commit fecef8e
Show file tree
Hide file tree
Showing 20 changed files with 634 additions and 475 deletions.
5 changes: 1 addition & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,5 @@ ENV PATH="/app/.venv/bin:$PATH"
# Ensure Python output is not buffered
ENV PYTHONUNBUFFERED=1

# Expose the application port
EXPOSE 8000

# Default command to allow debugging
CMD ["tail", "-f", "/dev/null"]
CMD ["uv", "run", "zenml", "login", "--local", "--port", "8237", "--ip-address", "0.0.0.0"]
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ help:
# --- Infrastructure ---

local-docker-infrastructure-up:
docker compose up -d
docker compose up --build -d

local-docker-infrastructure-down:
docker compose stop
Expand Down Expand Up @@ -54,6 +54,9 @@ s3-download: # Download from S3 to local folder using AWS
collect-notion-data-pipeline:
uv run python -m tools.run --run-collect-notion-data --no-cache

etl-pipeline:
uv run python -m tools.run --run-etl --no-cache

# --- Tests ---

test:
Expand Down
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
<p class="tagline">Open-source bootcamp by <a href="https://decodingml.substack.com">Decoding ML</a> in collaboration with <a href="...">...</a>.</p>
</div>

---

# ZenML RAG Pipeline: Foundational Stack

This repository offers a **foundational stack** for building and integrating into ZenML Retrieval-Augmented Generation (RAG) pipelines. By leveraging **ZenML** for pipeline orchestration and **Local MongoDB Atlas** for document storage, this stack is designed to run seamlessly in both local and Dockerized environments.
Expand Down
1 change: 1 addition & 0 deletions configs/etl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
parameters:
9 changes: 7 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ services:
retries: 5
start_period: 20s

zenml-runner:
zenml-server:
build:
context: .
dockerfile: Dockerfile
container_name: zenml-runner
container_name: zenml-server
ports:
- "8237:8237"
depends_on:
mongodb:
condition: service_healthy # Add dependency on MongoDB being healthy
env_file:
- .env
environment:
Expand Down
92 changes: 9 additions & 83 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,12 @@
"""

import os
import json
import logging
from steps.infrastructure.mongodb.mongodb_data_processing import MongoDBService
from zenml.logger import get_logger
from pipelines.mongodb_atlas_pipeline import mongodb_atlas_pipeline
from src.second_brain.config import Settings

# TODO: Replace all current logger configurations with loguru.
# Initialize logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# TODO: Replace all current logger configurations with loguru.
# Flag to enable/disable structured logging
ENABLE_STRUCTURED_LOGGING = (
os.getenv("ENABLE_STRUCTURED_LOGGING", "false").lower() == "true"
)


# TODO: Replace all current logger configurations with loguru.
def log_section(title: str, content: str = "", level=logging.INFO):
"""
Log a structured section with a title and optional content,
based on the structured logging flag.
Args:
title (str): Title of the section.
content (str): Optional content to log.
level (int): Log level (default is INFO).
"""
if ENABLE_STRUCTURED_LOGGING:
# logger.log(level, "\n" + "=" * 60)
logger.log(level, "=" * 60)
logger.log(level, f"{title}")
if content:
logger.log(level, "-" * 60)
logger.log(level, content)
logger.log(level, "=" * 60)
# logger.log(level, "=" * 60 + "\n")
else:
# Default logging behavior
logger.log(level, f"{title}: {content}")
from loguru import logger

from pipelines.etl import etl
from second_brain.config import settings
from steps.infrastructure.fetch_from_mongodb import MongoDBService

def main():
"""
Expand All @@ -80,63 +43,26 @@ def main():
Exception: If any error occurs during pipeline execution.
"""
try:
# Load settings
settings = Settings()
genre = settings.DEFAULT_GENRE # Use environment-defined genre

# Validate input JSON
with open(settings.LOCAL_JSON_FILE_PATH, "r") as file:
documents = json.load(file)
expected_genre_count = sum(
1 for doc in documents if "genres" in doc and genre in doc["genres"]
)
log_section(
f"Expected '{genre}' genre count in JSON file: {expected_genre_count}"
)

# Run the pipeline
mongodb_pipeline_instance = mongodb_atlas_pipeline(
etl(
ingest_json_config={
"mongodb_uri": settings.MONGODB_OFFLINE_URI,
"database_name": settings.MONGODB_OFFLINE_DATABASE,
"collection_name": settings.MONGODB_OFFLINE_COLLECTION,
"json_file_path": settings.LOCAL_JSON_FILE_PATH,
"data_directory": settings.DATA_DIRECTORY,
},
fetch_documents_config={
"mongodb_uri": settings.MONGODB_OFFLINE_URI,
"database_name": settings.MONGODB_OFFLINE_DATABASE,
"collection_name": settings.MONGODB_OFFLINE_COLLECTION,
"limit": settings.MAX_FETCH_LIMIT,
"genre": genre,
"limit": 100,
},
)

# Post-run validation
service = MongoDBService(
settings.MONGODB_OFFLINE_URI,
settings.MONGODB_OFFLINE_DATABASE,
settings.MONGODB_OFFLINE_COLLECTION,
)
log_section("Verifying document ingestion and genre query...")
total_documents = service.verify_collection_count()
genre_count = service.verify_genre(genre)

# Log results
log_section(f"Total documents in the collection: {total_documents}")
log_section(f"Total documents found for genre '{genre}': {genre_count}")
log_section("\nPipeline execution completed.")
log_section(f"Pipeline run details:")
log_section(f" ID: {mongodb_pipeline_instance.id}")
log_section(f" Name: {mongodb_pipeline_instance.name}")
log_section(f" Status: {mongodb_pipeline_instance.status}")
log_section(f" Created: {mongodb_pipeline_instance.created}")
log_section(f" Ended: {mongodb_pipeline_instance.end_time}")



except Exception as e:
logger.error(f"An unexpected error occurred during pipeline execution: {e}")


if __name__ == "__main__":
os.makedirs("./logs", exist_ok=True)
log_section("Log directory created or already exists.")
main()
2 changes: 2 additions & 0 deletions pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from .collect_notion_data import collect_notion_data
from .etl import etl

__all__ = [
"collect_notion_data",
"etl"
]
26 changes: 26 additions & 0 deletions pipelines/etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from zenml import pipeline
from steps.infrastructure import (
ingest_to_mongodb,
fetch_from_mongodb,
)

from second_brain.config import settings

@pipeline
def etl() -> None:
ingest_json_config= {
"mongodb_uri": settings.MONGODB_OFFLINE_URI,
"database_name": settings.MONGODB_OFFLINE_DATABASE,
"collection_name": settings.MONGODB_OFFLINE_COLLECTION,
"data_directory": settings.DATA_DIRECTORY,
}
fetch_documents_config = {
"mongodb_uri": settings.MONGODB_OFFLINE_URI,
"database_name": settings.MONGODB_OFFLINE_DATABASE,
"collection_name": settings.MONGODB_OFFLINE_COLLECTION,
"limit": 100,
}

ingest_to_mongodb(**ingest_json_config)

fetch_from_mongodb(**fetch_documents_config)
67 changes: 0 additions & 67 deletions pipelines/mongodb_atlas_pipeline.py

This file was deleted.

21 changes: 12 additions & 9 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
[build-system]
requires = ["setuptools>=61.0", "wheel", "uv"]
build-backend = "setuptools.build_meta"

[project]
name = "zenml-rag-pipeline"
name = "second_brain_course"
version = "0.1.0"
description = "Foundational ZenML-based pipeline stack with Local MongoDB Atlas integration."
description = "Self-paced course on production LLMs and RAG by teaching you how to build an AI assistant on top of your second brain."
authors = [
{name = "Paul Iusztin", email = "[email protected]"},
{name = "Jesse Moses", email = "[email protected]"},
Expand All @@ -23,10 +19,13 @@ dependencies = [
"python-dotenv>=1.0.0",
"pymongo>=4.4.0",
"zenml[server]>=0.72.0",
"boto3>=1.36.0",
]

[project.optional-dependencies]
dev = ["ruff"] # Include only Ruff for linting
[dependency-groups]
dev = [
"ruff>=0.7.2",
]

[tool.ruff]
line-length = 88
Expand All @@ -37,7 +36,11 @@ ignore = [
[tool.pip]
extra-index-url = "https://download.pytorch.org/whl/cpu/torch_stable.html"

[build-system]
requires = ["setuptools>=61.0", "wheel", "uv"]
build-backend = "setuptools.build_meta"

[tool.setuptools.packages.find]
where = ["."]
where = ["src/."]
include = ["configs", "pipelines", "steps"]
exclude = ["data", "logs", "temp_store", "model_cache"]
Loading

0 comments on commit fecef8e

Please sign in to comment.