Skip to content

Commit

Permalink
feat: Add the necessary structure for the feature pipelines and Mongo…
Browse files Browse the repository at this point in the history
…DB SQL
  • Loading branch information
iusztinpaul committed Jan 17, 2025
1 parent c4d7849 commit f1036a8
Show file tree
Hide file tree
Showing 16 changed files with 154 additions and 228 deletions.
4 changes: 3 additions & 1 deletion configs/compute_rag_vector_index.yaml
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
parameters:
parameters:
limit: 100
extract_collection_name: raw_data
4 changes: 3 additions & 1 deletion configs/etl.yaml
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
parameters:
parameters:
data_directory: data/
load_collection_name: raw_data
4 changes: 3 additions & 1 deletion configs/generate_dataset.yaml
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
parameters:
parameters:
limit: 100
extract_collection_name: raw_data
12 changes: 2 additions & 10 deletions pipelines/compute_rag_vector_index.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
from zenml import pipeline

from second_brain.config import settings
from steps.infrastructure import (
fetch_from_mongodb,
)


@pipeline
def compute_rag_vector_index() -> None:
fetch_documents_config = {
"mongodb_uri": settings.MONGODB_OFFLINE_URI,
"database_name": settings.MONGODB_OFFLINE_DATABASE,
"collection_name": settings.MONGODB_OFFLINE_COLLECTION,
"limit": 100,
}

fetch_from_mongodb(**fetch_documents_config)
def compute_rag_vector_index(limit: int = 100, extract_collection_name: str = "raw_data") -> None:
fetch_from_mongodb(limit=limit, collection_name=extract_collection_name)
23 changes: 5 additions & 18 deletions pipelines/etl.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,14 @@
from zenml import pipeline

from second_brain.config import settings
from steps.etl import crawl, read_pages_from_disk
from steps.infrastructure import (
fetch_from_mongodb,
ingest_to_mongodb,
)


@pipeline
def etl() -> None:
ingest_json_config = {
"mongodb_uri": settings.MONGODB_OFFLINE_URI,
"database_name": settings.MONGODB_OFFLINE_DATABASE,
"collection_name": settings.MONGODB_OFFLINE_COLLECTION,
"data_directory": settings.DATA_DIRECTORY,
}
fetch_documents_config = {
"mongodb_uri": settings.MONGODB_OFFLINE_URI,
"database_name": settings.MONGODB_OFFLINE_DATABASE,
"collection_name": settings.MONGODB_OFFLINE_COLLECTION,
"limit": 100,
}
def etl(data_directory: str, load_collection_name: str) -> None:
pages = read_pages_from_disk(data_directory=data_directory)
documents = crawl(pages=pages)
ingest_to_mongodb(documents=documents, collection_name=load_collection_name)

ingest_to_mongodb(**ingest_json_config)

fetch_from_mongodb(**fetch_documents_config)
12 changes: 2 additions & 10 deletions pipelines/generate_dataset.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
from zenml import pipeline

from second_brain.config import settings
from steps.infrastructure import (
fetch_from_mongodb,
)


@pipeline
def generate_dataset() -> None:
fetch_documents_config = {
"mongodb_uri": settings.MONGODB_OFFLINE_URI,
"database_name": settings.MONGODB_OFFLINE_DATABASE,
"collection_name": settings.MONGODB_OFFLINE_COLLECTION,
"limit": 100,
}

fetch_from_mongodb(**fetch_documents_config)
def generate_dataset(limit: int = 100, extract_collection_name: str = "raw_data") -> None:
fetch_from_mongodb(limit=limit, collection_name=extract_collection_name)
74 changes: 14 additions & 60 deletions src/second_brain/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@ class Settings(BaseSettings):
COMET_API_KEY: Optional[str] = None # API key for CometML integration.
COMET_PROJECT_NAME: str = "twin" # CometML project name for tracking experiments.

# --- Default Genre ---
DEFAULT_GENRE: str = Field("Western", description="Default genre for querying.")

# --- Docker and Network Configuration ---
DOCKER_NETWORK_NAME: str = Field(
"zenml-network", description="Docker network for the application."
)

# --- Enable Flags ---
ENABLE_OFFLINE_MODE: bool = Field(
True, description="Flag to enable offline mode (disables online ingestion)."
Expand All @@ -56,32 +48,17 @@ class Settings(BaseSettings):
# --- Hugging Face Configuration ---
HUGGINGFACE_ACCESS_TOKEN: Optional[str] = None # Token for Hugging Face API.

# --- Local Data File Path ---
DATA_DIRECTORY: str = Field(
"./data",
description="Path to the local JSON file for offline processing.",
)

# --- MongoDB Atlas Local Configuration ---
MONGODB_OFFLINE_COLLECTION: str = (
"offline_documents" # Name of the collection in the offline database.
)
MONGODB_OFFLINE_DATABASE: str = "rag_pipeline" # Name of the offline database.
# --- MongoDB Atlas Configuration ---
MONGODB_DATABASE_NAME: str = "second_brain"
MONGODB_OFFLINE_URI: str = Field(
default_factory=lambda: os.getenv(
"MONGODB_OFFLINE_URI", "mongodb://127.0.0.1:27017"
),
description="Connection URI for local MongoDB Atlas instance.",
)

# --- MongoDB Atlas Cloud Configuration ---
MONGODB_ONLINE_COLLECTION: str = (
"movies" # Name of the collection in the online database.
description="Connection URI for the local MongoDB Atlas instance.",
)
MONGODB_ONLINE_DATABASE: str = "sample_mflix" # Name of the online database.
MONGODB_ONLINE_URI: str | None = Field(
default=None,
description="Connection URI for cloud MongoDB Atlas instance.",
description="Connection URI for the Cloud MongoDB Atlas instance.",
)

# --- Notion API Configuration ---
Expand All @@ -93,9 +70,8 @@ class Settings(BaseSettings):

# --- Docker Runtime ---
IS_RUNNING_IN_DOCKER: bool = Field(
default_factory=lambda: os.getenv("IS_RUNNING_IN_DOCKER", "false").lower()
in ["true", "1"],
description="Flag to indicate if the application is running inside a Docker container.",
default=False,
)

def __init__(self, **kwargs):
Expand All @@ -104,45 +80,23 @@ def __init__(self, **kwargs):
"""
super().__init__(**kwargs)

# Adjust MongoDB URI based on runtime conditions, but respect .env
if os.getenv("IS_RUNNING_IN_DOCKER", "false").lower() == "true":
self.MONGODB_OFFLINE_URI = os.getenv(
"MONGODB_OFFLINE_URI", "mongodb://mongodb-atlas-local:27017"
)

@property
def MONGODB_URI(self) -> str:
"""
Returns the appropriate MongoDB URI based on ENABLE_OFFLINE_MODE.
"""

return (
self.MONGODB_OFFLINE_URI
if self.ENABLE_OFFLINE_MODE
else self.MONGODB_ONLINE_URI
)
if self.IS_RUNNING_IN_DOCKER is True:
self.MONGODB_OFFLINE_URI = "mongodb://mongodb-atlas-local:27017"

@property
def DATABASE_NAME(self) -> str:
"""
Returns the appropriate database name based on ENABLE_OFFLINE_MODE.
"""
return (
self.MONGODB_OFFLINE_DATABASE
if self.ENABLE_OFFLINE_MODE
else self.MONGODB_ONLINE_DATABASE
)
if self.ENABLE_OFFLINE_MODE is True:
return self.MONGODB_OFFLINE_URI

@property
def COLLECTION_NAME(self) -> str:
"""
Returns the appropriate collection name based on ENABLE_OFFLINE_MODE.
"""
return (
self.MONGODB_OFFLINE_COLLECTION
if self.ENABLE_OFFLINE_MODE
else self.MONGODB_ONLINE_COLLECTION
)
assert (
self.MONGODB_ONLINE_URI is not None
), "MONGODB_ONLINE_URI is not set, while ENABLE_OFFLINE_MODE is False."

return self.MONGODB_ONLINE_URI

@property
def OPENAI_MAX_TOKEN_WINDOW(self) -> int:
Expand Down
29 changes: 24 additions & 5 deletions src/second_brain/entities/page.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,29 @@ class PageMetadata(BaseModel):


class Page(BaseModel):
page_metadata: PageMetadata
metadata: PageMetadata
content: str
urls: list[str]

@classmethod
def from_file(cls, file_path: Path) -> "Page":
"""Read a Page object from a JSON file.
Args:
file_path: Path to the JSON file containing page data.
Returns:
Page: A new Page instance constructed from the file data.
Raises:
FileNotFoundError: If the specified file doesn't exist.
ValidationError: If the JSON data doesn't match the expected model structure.
"""

json_data = file_path.read_text(encoding="utf-8")

return cls.model_validate_json(json_data)

def write(
self, file_path: Path, obfuscate: bool = False, also_save_as_txt: bool = False
) -> None:
Expand All @@ -44,18 +63,18 @@ def write(
def _obfuscate_data(self, data: dict) -> dict:
"""Obfuscate sensitive IDs in the page data."""

original_id = data["page_metadata"]["id"]
original_id = data["metadata"]["id"]
fake_id = self._generate_random_hex(32)

obfuscated_data = data.copy()

# Obfuscate the page ID (32-char hex)
obfuscated_data["page_metadata"]["id"] = fake_id
obfuscated_data["metadata"]["id"] = fake_id

# Obfuscate UUID in URL if present
url = data["page_metadata"]["url"]
url = data["metadata"]["url"]
flattened_original_id = original_id.replace("-", "")
obfuscated_data["page_metadata"]["url"] = url.replace(
obfuscated_data["metadata"]["url"] = url.replace(
flattened_original_id, fake_id
)

Expand Down
Loading

0 comments on commit f1036a8

Please sign in to comment.