-
Notifications
You must be signed in to change notification settings - Fork 71
/
Copy pathcelery_worker.py
178 lines (149 loc) · 6.95 KB
/
celery_worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
from celery import Celery
import os
from dotenv import load_dotenv
import requests
import time
import random
import logging
import uuid
from fake_useragent import UserAgent
import re
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Initialize User-Agent
ua = UserAgent()
# Create Celery instance
celery = Celery('ttsfm',
broker=os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
backend=os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'))
# Celery Configuration
celery.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
task_track_started=True,
# Read settings from environment variables with defaults
task_time_limit=int(os.getenv('CELERY_TASK_TIME_LIMIT', '300')), # 5 minutes max
worker_prefetch_multiplier=int(os.getenv('CELERY_WORKER_PREFETCH_MULTIPLIER', '1')), # Process one task at a time
worker_max_tasks_per_child=int(os.getenv('CELERY_WORKER_MAX_TASKS_PER_CHILD', '1000')), # Restart worker after 1000 tasks
)
def _get_headers():
"""Generate realistic browser headers with rotation"""
# Get a random User-Agent
user_agent = ua.random
# Base headers common to most browsers
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": random.choice(["en-US,en;q=0.9", "en-GB,en;q=0.8", "en-CA,en;q=0.7"]),
"Cache-Control": "no-cache",
"Dnt": "1", # Do Not Track
"Pragma": "no-cache",
"Referer": "https://www.openai.fm/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"User-Agent": user_agent,
"X-Requested-With": "XMLHttpRequest", # Often used in AJAX requests
}
# Add browser-specific headers (Sec-CH-UA) if applicable
# These are primarily for Chromium-based browsers (Chrome, Edge, Opera, etc.)
if 'chrome' in user_agent.lower() or 'edge' in user_agent.lower() or 'chromium' in user_agent.lower():
# Extract major version number (handle cases where it might not be present)
version_match = re.search(r'(?:Chrome|Edge|Chromium)/(\d+)', user_agent)
major_version = version_match.group(1) if version_match else "121" # Default if not found
brands = []
if 'google chrome' in user_agent.lower():
brands.append(f'"Google Chrome";v="{major_version}"')
brands.append(f'"Chromium";v="{major_version}"')
brands.append('"Not A(Brand";v="99"')
elif 'microsoft edge' in user_agent.lower():
brands.append(f'"Microsoft Edge";v="{major_version}"')
brands.append(f'"Chromium";v="{major_version}"')
brands.append('"Not A(Brand";v="99"')
else: # Generic Chromium or others
brands.append(f'"Chromium";v="{major_version}"')
brands.append('"Not A(Brand";v="8"')
headers["Sec-Ch-Ua"] = ", ".join(brands)
headers["Sec-Ch-Ua-Mobile"] = "?0" # Assuming desktop
headers["Sec-Ch-Ua-Platform"] = random.choice(['"Windows"', '"macOS"', '"Linux"'])
# Add Upgrade-Insecure-Requests sometimes (common for initial navigation)
if random.random() < 0.5:
headers["Upgrade-Insecure-Requests"] = "1"
# Use Authority or Host - Authority is more common with HTTP/2
headers["Authority"] = "www.openai.fm" # Prefer Authority for HTTP/2
return headers
def _get_random_delay():
"""Get random delay time (1-5 seconds) with jitter"""
base_delay = random.uniform(1, 5)
jitter = random.uniform(0.1, 0.5)
return base_delay + jitter
@celery.task(bind=True, name='tasks.process_tts_request')
def process_tts_request(self, task_data):
"""Process a TTS request and return the audio data"""
max_retries = 3
retry_count = 0
base_delay = 1
verify_ssl = os.getenv("VERIFY_SSL", "true").lower() != "false"
while retry_count < max_retries:
try:
# Add random delay between requests for more natural behavior
time.sleep(_get_random_delay())
logger.info(f"Sending request to OpenAI.fm with data: {task_data['data']}")
# Add generation ID to request data
task_data['data']['generation'] = str(uuid.uuid4())
# Check format setting
if 'format' in task_data['data']:
logger.info(f"Requesting audio in format: {task_data['data']['format']}")
response = requests.post(
"https://www.openai.fm/api/generate",
data=task_data['data'],
headers=_get_headers(),
timeout=30,
verify=verify_ssl
)
if response.status_code == 403:
logger.warning("Received 403 Forbidden from OpenAI.fm")
retry_count += 1
time.sleep(base_delay * (2 ** retry_count)) # Exponential backoff
continue
if response.status_code == 429:
logger.warning("Rate limited by OpenAI.fm")
retry_after = int(response.headers.get('Retry-After', 60))
self.retry(countdown=retry_after)
if response.status_code == 503:
logger.warning("Service unavailable from OpenAI.fm")
retry_count += 1
time.sleep(base_delay * (2 ** retry_count))
continue
if response.status_code != 200:
logger.error(f"Error from OpenAI.fm: {response.status_code}")
error_msg = f"Error from upstream service: {response.status_code}"
return None, error_msg, response.status_code
# Return the audio data, content type, and status code
return response.content, None, 200
except requests.exceptions.Timeout:
logger.error("Request timeout")
retry_count += 1
time.sleep(base_delay * (2 ** retry_count))
except requests.exceptions.RequestException as e:
logger.error(f"Network error: {str(e)}")
retry_count += 1
time.sleep(base_delay * (2 ** retry_count))
except Exception as e:
logger.error(f"Error processing TTS request: {str(e)}")
retry_count += 1
time.sleep(base_delay * (2 ** retry_count))
if retry_count >= max_retries:
return None, str(e), 500
# If we've exhausted retries
logger.error("Exhausted retries for TTS request")
return None, "Failed to process request after multiple retries", 500