Skip to content

Commit b1959c1

Browse files
committedFeb 15, 2025
refactor: Enhance email processing configuration and connection management
1 parent 61aa8c8 commit b1959c1

File tree

9 files changed

+212
-111
lines changed

9 files changed

+212
-111
lines changed
 

‎.appLogic/emailModule.md

+66-69
Original file line numberDiff line numberDiff line change
@@ -1,91 +1,88 @@
11
# Email Processing Module
22

33
## Overview
4-
Handles IMAP email retrieval and routing for two inboxes:
5-
1. `pacenotefoo@caf-gpt.com` - Routes to Pace Notes system
6-
2. `policyfoo@caf-gpt.com` - Routes to Policy Foo system
4+
Handles IMAP email retrieval and routing for two specific folders in ProtonMail:
5+
1. `CAF-GPT/PaceNoteFoo` - Routes to Pace Notes system
6+
2. `CAF-GPT/PolicyFoo` - Routes to Policy Foo system
77

88
## Architecture
99

1010
### Data Flow
11-
0. Initialization
11+
1. Initialization
1212
- On startup: Connect to IMAP
13-
- Reload all unread messages into queue
14-
- Maintain original received order
13+
- Select specific mailboxes for processing
14+
- Load configuration from environment
15+
- Setup logging based on development mode
1516

16-
1. Queue Management
17-
- Thread-safe in-memory storage using Python's deque
18-
- Cold start protection via IMAP reload
19-
- No persistent storage of queue state
20-
- Lock-based concurrency control
17+
2. Connection Management
18+
- Single IMAP connection with health monitoring
19+
- Automatic reconnection with exponential backoff
20+
- Connection status tracking
21+
- Clean error handling
2122

22-
2. Processing Flow
23-
- Emails added to queue as received
24-
- **Routing Logic**:
25-
- Add `system` metadata key based on `To:` field:
26-
- `pacenotefoo@caf-gpt.com``pace_notes`
27-
- `policyfoo@caf-gpt.com``policy_foo`
28-
- Unknown recipients are logged and skipped
29-
- Async processing loop with health checks
30-
- Success confirmation required before next item
31-
- Mark email as read after processing
32-
- Failed emails are retried with exponential backoff
23+
3. Email Processing
24+
- Continuous processing loop
25+
- Mailbox-specific routing (pace_notes/policy_foo)
26+
- Mark-as-read confirmation
27+
- Error handling with logging
3328

34-
## Technical Implementation
29+
### Implementation Details
3530

36-
### Environment Variables
37-
```
38-
EMAIL_HOST=127.0.0.1
31+
#### Configuration
32+
```python
33+
# Environment Variables
34+
EMAIL_HOST=100.99.136.75
3935
EMAIL_PASSWORD=****
4036
IMAP_PORT=1143
4137
SMTP_PORT=1025
42-
```
43-
44-
### System Design
45-
- **Queue Characteristics**:
46-
- Pure Python deque with maxlen=100
47-
- Thread-safe operations
48-
- Messages stored as EmailMessage objects
49-
- Order preserved from IMAP UID sequence
5038

51-
- **Connection Management**:
52-
- Single IMAP connection with health monitoring
53-
- Automatic reconnection with exponential backoff
54-
- Connection status exposed via health check
55-
56-
- **Health Monitoring**:
57-
- Queue statistics (size, processing state)
58-
- Connection health (status, retry count)
59-
- Integrated with FastAPI health check endpoint
39+
# Hardcoded Mailboxes
40+
MAILBOXES = {
41+
"pace_notes": "CAF-GPT/PaceNoteFoo",
42+
"policy_foo": "CAF-GPT/PolicyFoo"
43+
}
44+
```
6045

61-
### Message Parsing
62-
- **Headers Extracted**:
63-
- `From`: Sender's email address
64-
- `To`: Recipient address(es)
65-
- `Subject`: Email subject line
66-
- `Date`: Received timestamp
67-
- **Body Handling**:
68-
- Only process `text/plain` content
69-
- Ignore HTML and attachments
46+
### Mailbox Handling
47+
- **Folder Selection**: Explicitly select each mailbox before processing
48+
- **Folder Switching**: Switch between mailboxes during processing loop
49+
- **Folder Monitoring**: Track last processed message for each folder
50+
- **Error Handling**: Handle folder access errors gracefully
7051

7152
### Error Handling
72-
- **IMAP Connection Failures**:
73-
- Exponential backoff (1s to 1 hour)
74-
- Health status monitoring
75-
- Automatic reconnection attempts
53+
- Connection failures with backoff
54+
- Folder access errors with logging
55+
- Graceful shutdown on interrupts
56+
- Development mode detailed logging
7657

77-
- **Processing Errors**:
78-
- Failed messages marked for retry
79-
- Retry count tracking
80-
- Error reason logging
58+
### Health Monitoring
59+
- **Connection status**:
60+
- Last successful connection time
61+
- Connection error count
62+
- Current connection state
63+
- **Queue statistics**:
64+
- Current queue size
65+
- Messages processed
66+
- Messages failed
67+
- Average processing time
68+
- **System metrics**:
69+
- CPU/memory usage
70+
- Thread count
71+
- Active connections
72+
- **Alerting**:
73+
- Email processing failures
74+
- Queue capacity warnings
75+
- Connection errors
8176

82-
- **Queue Management**:
83-
- Full queue handling (drop new messages)
84-
- Thread-safe operations
77+
### Queue Implementation
78+
- **Thread-safe in-memory storage** using Python's deque
79+
- **Max capacity**: 100 messages
80+
- **Message ordering**: Preserved from IMAP UID sequence
81+
- **Retry mechanism**:
82+
- Failed messages are requeued
83+
- Exponential backoff between retries
84+
- Max retry attempts: 5
85+
- **Message tracking**:
86+
- UID-based message identification
8587
- Processing state tracking
86-
87-
### Integration
88-
- Async startup/shutdown methods
89-
- Health check status reporting
90-
- Background processing loop
91-
- Clean process lifecycle management
88+
- Error history for failed messages

‎.appLogic/overview.md

+37-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,22 @@
11
# CAF-GPT Application Plan
22

33
## Overview
4-
A collection of AI tools and agents for army personnel, packaged as a Python Docker container. Triggers on news emails via IMAP. Uses LLMs to formulate a response to the email.
4+
A collection of AI tools and agents for army personnel, packaged as a Python application. Processes emails via IMAP and routes them to appropriate systems.
5+
6+
## Core Components
7+
8+
### Email Processing
9+
1. **IMAPConnection**
10+
- Manages IMAP server connection
11+
- Handles authentication
12+
- Retrieves unread messages
13+
- Tracks connection health
14+
15+
2. **EmailProcessor**
16+
- Manages processing workflow
17+
- Routes messages to appropriate systems
18+
- Handles errors and retries
19+
- Maintains processing queue
520

621
## Core Principles
722
- Keep user messages browser-side only
@@ -41,4 +56,24 @@ A collection of AI tools and agents for army personnel, packaged as a Python Doc
4156
- Read-only access configuration
4257

4358
## Project Structure
44-
See [dirStructure.yaml](dirStructure.yaml) for details.
59+
See [dirStructure.yaml](dirStructure.yaml) for details.
60+
61+
## Development vs Production Mode
62+
63+
### Development Mode
64+
- **Enabled by**: DEVELOPMENT=true
65+
- **Features**:
66+
- Detailed debug logging
67+
- Automatic application restart on changes
68+
- Reduced rate limiting
69+
- Mock services for testing
70+
- Verbose error messages
71+
72+
### Production Mode
73+
- **Enabled by**: DEVELOPMENT=false
74+
- **Features**:
75+
- Info-level logging only
76+
- Strict rate limiting
77+
- Real service connections
78+
- Generic error messages
79+
- Performance optimizations

‎.cursor/rules/base.mdc

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@ globs: src/*
66
- **Primary Goal**: Simplify and streamline app development while maintaining functionality.
77
- **Core Philosophy**: Simplicity is key. Prioritize built-in libraries and frameworks first.
88
- **User-Centric**: Always wait for user confirmation before making changes to the code base.
9+
- **Code-Docs**: Use single line comment for code level documentation.
910

1011
## Best Practices
1112
1. **Clarity Over Complexity**:
1213
- Always explain reasoning behind decisions.
1314
- Use simple language and examples when possible.
1415

1516
2. **Transparency**:
16-
- Document all changes and thought processes in `notePad/`.
17+
- Document all changes and thought processes in `notepad/`.
1718
- Maintain a clear audit trail of decisions.
1819

1920
3. **User Control**:

‎.env.example

+8-7
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22
PORT=8080
33
DEVELOPMENT=true
44

5-
# LLM
6-
OLLAMA_HOST=http://localhost:11434 # Default Ollama port
7-
PACE_NOTE_MODEL=meta-llama/llama-3.3-70b-instruct
8-
DOAD_FINDER_MODEL=amazon/nova-pro-v1
9-
DOAD_CHAT_MODEL=anthropic/claude-3.5-sonnet:beta
5+
# Email
6+
EMAIL_HOST=100.99.136.75
7+
EMAIL_PASSWORD=****
8+
IMAP_PORT=1143
9+
SMTP_PORT=1025
10+
1011

1112
# S3
12-
S3_BUCKET_NAME=your-bucket-name
13+
S3_BUCKET_NAME=your-bucket
1314
S3_ACCESS_KEY=your-access-key
14-
S3_SECRET_KEY=your-secret-key
15+
S3_SECRET_KEY=your-secret-key

‎src/emails/connection.py

+16-6
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ class IMAPConnection:
1515
def __init__(self):
1616
self.host = EMAIL_CONFIG['host']
1717
self.port = EMAIL_CONFIG['imap_port']
18+
self.username = EMAIL_CONFIG['username']
1819
self.password = EMAIL_CONFIG['password']
1920
self.connection: Optional[imaplib.IMAP4] = None
2021
self.is_healthy = False
@@ -31,8 +32,11 @@ def connect(self) -> bool:
3132
except:
3233
self.connection = None
3334

35+
logger.debug(f"Attempting IMAP connection to {self.host}:{self.port}")
3436
self.connection = imaplib.IMAP4(self.host, self.port)
35-
self.connection.login('user', self.password)
37+
38+
logger.debug(f"Attempting login with username: {self.username}")
39+
self.connection.login(self.username, self.password)
3640

3741
self.is_healthy = True
3842
self.retry_count = 0
@@ -41,11 +45,11 @@ def connect(self) -> bool:
4145
logger.info("IMAP connection established")
4246
return True
4347

48+
except imaplib.IMAP4.error as e:
49+
logger.error(f"IMAP login failed: {str(e)}")
50+
logger.debug(f"Login details - Host: {self.host}:{self.port}, User: {self.username}")
51+
return False
4452
except Exception as e:
45-
self.is_healthy = False
46-
self.retry_count += 1
47-
self.last_error = str(e)
48-
4953
logger.error(f"IMAP connection failed: {str(e)}")
5054
return False
5155

@@ -125,4 +129,10 @@ def get_health_check(self) -> dict:
125129
"is_healthy": self.is_healthy,
126130
"retry_count": self.retry_count,
127131
"last_error": self.last_error
128-
}
132+
}
133+
134+
def is_connected(self) -> bool:
135+
"""Check if the IMAP connection is active and healthy."""
136+
return self.connection is not None and self.is_healthy
137+
138+
# Add this method to the IMAPConnection class

‎src/emails/processor.py

+8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
from typing import Optional, Callable
33
from datetime import datetime
4+
import time
45

56
from src.utils.logger import logger
67
from src.types import EmailMessage, EmailHealthCheck
@@ -40,7 +41,14 @@ async def stop(self) -> None:
4041

4142
async def _processing_loop(self) -> None:
4243
# Main processing loop.
44+
retry_delay = 5 # seconds
4345
while self.running:
46+
if not self.connection.is_connected():
47+
logger.debug(f"Waiting {retry_delay}s before connection retry")
48+
time.sleep(retry_delay)
49+
retry_delay = min(retry_delay * 2, 300) # Cap at 5 minutes
50+
continue
51+
4452
try:
4553
# Check connection health
4654
if not self.connection.is_healthy:

‎src/main.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@
1818
)
1919
logger = logging.getLogger(__name__)
2020

21-
# Initialize email processor
22-
email_processor = EmailProcessor()
23-
2421
async def main():
2522
"""Main application entry point."""
2623
try:
@@ -35,6 +32,11 @@ async def main():
3532
except KeyboardInterrupt:
3633
logger.info("Shutting down...")
3734
await email_processor.stop()
35+
except Exception as e:
36+
logger.error(f"Unexpected error: {str(e)}")
37+
await email_processor.stop()
38+
raise
3839

3940
if __name__ == "__main__":
41+
email_processor = EmailProcessor()
4042
asyncio.run(main())

‎src/utils/config.py

+33-13
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import os
22
from dotenv import load_dotenv
3+
from typing import Dict, Any
34

45
# Load environment variables
56
load_dotenv()
@@ -15,8 +16,11 @@
1516
raise ValueError(f"Missing required environment variable: {key}")
1617

1718
# Export environment helpers
18-
IS_DEV = os.getenv('DEVELOPMENT', 'false').lower() == 'true'
19-
PORT = int(os.getenv('PORT', '8080'))
19+
def _get_env(key: str, default: Any = None) -> str:
20+
value = os.getenv(key, default)
21+
if value is None:
22+
raise ValueError(f"Missing required environment variable: {key}")
23+
return value
2024

2125
# Export model configurations
2226
MODELS = {
@@ -27,18 +31,34 @@
2731
'paceNote': os.getenv('PACE_NOTE_MODEL')
2832
}
2933

30-
# Email configuration
31-
EMAIL_INBOXES = {
32-
'pace_notes': 'pacenotefoo@caf-gpt.com',
33-
'policy_foo': 'policyfoo@caf-gpt.com'
34+
# Email Configuration
35+
EMAIL_CONFIG: Dict[str, Any] = {
36+
# From environment
37+
"host": _get_env("EMAIL_HOST"),
38+
"password": _get_env("EMAIL_PASSWORD"),
39+
"imap_port": int(_get_env("IMAP_PORT")),
40+
"smtp_port": int(_get_env("SMTP_PORT")),
41+
42+
# Hardcoded values
43+
"username": "pacenotefoo@caf-gpt.com", # Default inbox
44+
45+
# System inboxes mapping
46+
"inboxes": {
47+
"pace_notes": "pacenotefoo@caf-gpt.com",
48+
"policy_foo": "policyfoo@caf-gpt.com"
49+
}
50+
}
51+
52+
# Server Configuration
53+
SERVER_CONFIG = {
54+
"port": int(_get_env("PORT", "8080")),
55+
"development": _get_env("DEVELOPMENT", "false").lower() == "true"
3456
}
3557

36-
# Add email configuration section
37-
EMAIL_CONFIG = {
38-
'host': os.getenv('EMAIL_HOST', '127.0.0.1'),
39-
'password': os.getenv('EMAIL_PASSWORD'),
40-
'imap_port': int(os.getenv('IMAP_PORT', '1143')),
41-
'smtp_port': int(os.getenv('SMTP_PORT', '1025')),
42-
'inboxes': EMAIL_INBOXES
58+
# S3 Configuration
59+
S3_CONFIG = {
60+
"bucket_name": _get_env("S3_BUCKET_NAME"),
61+
"access_key": _get_env("S3_ACCESS_KEY"),
62+
"secret_key": _get_env("S3_SECRET_KEY")
4363
}
4464

‎src/utils/logger.py

+37-10
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44
from enum import Enum
55
from typing import Any, Dict, Optional
66
from uuid import uuid4
7-
8-
from .config import IS_DEV
7+
from .config import SERVER_CONFIG
98

109
class LogLevel(Enum):
1110
DEBUG = 0
@@ -15,15 +14,27 @@ class LogLevel(Enum):
1514

1615
class Logger:
1716
def __init__(self):
18-
self.current_level = LogLevel.DEBUG if IS_DEV else LogLevel.INFO
17+
self.current_level = LogLevel.DEBUG if SERVER_CONFIG['development'] else LogLevel.INFO
1918
self.llm_requests: Dict[str, float] = {} # Track request start times
2019

2120
# Configure logging
22-
logging.basicConfig(
23-
level=logging.DEBUG if IS_DEV else logging.INFO,
24-
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
21+
self.logger = logging.getLogger('caf-gpt')
22+
self.logger.setLevel(logging.DEBUG if SERVER_CONFIG['development'] else logging.INFO)
23+
24+
# Create console handler
25+
ch = logging.StreamHandler()
26+
ch.setLevel(logging.DEBUG if SERVER_CONFIG['development'] else logging.INFO)
27+
28+
# Create formatter
29+
formatter = logging.Formatter(
30+
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
2531
)
26-
self.logger = logging.getLogger(__name__)
32+
33+
# Add formatter to ch
34+
ch.setFormatter(formatter)
35+
36+
# Add ch to logger
37+
self.logger.addHandler(ch)
2738

2839
def _trim_system_message(self, content: str) -> str:
2940
max_length = 200
@@ -80,7 +91,7 @@ def _format_llm_response(self, data: Dict[str, Any], request_id: str, duration_m
8091
return json.dumps({**response, **extra_metadata}, indent=2)
8192

8293
async def log_llm_interaction(self, data: Dict[str, Any]) -> None:
83-
if not IS_DEV:
94+
if not SERVER_CONFIG['development']:
8495
return
8596

8697
request_id = data.get('metadata', {}).get('requestId')
@@ -123,7 +134,7 @@ def error(self, message: str, metadata: Optional[Dict[str, Any]] = None) -> None
123134
self.logger.error(self._format_message(message, metadata))
124135

125136
def log_request(self, method: str, url: str, status_code: int, metadata: Optional[Dict[str, Any]] = None) -> None:
126-
if not IS_DEV:
137+
if not SERVER_CONFIG['development']:
127138
return
128139

129140
path = url.split('?')[0]
@@ -144,7 +155,23 @@ def _format_message(self, message: str, metadata: Optional[Dict[str, Any]] = Non
144155
return f"{message} {json.dumps(metadata)}"
145156

146157
def _should_log(self, level: LogLevel) -> bool:
147-
return IS_DEV or level.value >= self.current_level.value
158+
return SERVER_CONFIG['development'] or level.value >= self.current_level.value
148159

149160
# Export singleton instance
150161
logger = Logger()
162+
163+
def log_error(message: str, **kwargs: Any) -> None:
164+
"""Log error with additional context"""
165+
logger.error(message, extra=kwargs)
166+
167+
def log_warning(message: str, **kwargs: Any) -> None:
168+
"""Log warning with additional context"""
169+
logger.warning(message, extra=kwargs)
170+
171+
def log_info(message: str, **kwargs: Any) -> None:
172+
"""Log info with additional context"""
173+
logger.info(message, extra=kwargs)
174+
175+
def log_debug(message: str, **kwargs: Any) -> None:
176+
"""Log debug with additional context"""
177+
logger.debug(message, extra=kwargs)

0 commit comments

Comments
 (0)
Please sign in to comment.