-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathcampaign_tracking.py
More file actions
294 lines (231 loc) · 10.4 KB
/
campaign_tracking.py
File metadata and controls
294 lines (231 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Campaign Tracking - Multi-message follow-up tracking logic"""
import pandas as pd
import re
from typing import Dict, List, Tuple, Optional
def get_tracking_status_columns(df: pd.DataFrame) -> List[str]:
"""
Find all tracking status columns in the DataFrame
Args:
df: DataFrame with contact data
Returns:
List of tracking status column names, sorted by number (e.g., ['Tracking Status 1', 'Tracking Status 2'])
"""
tracking_cols = []
pattern = r'^tracking\s+status\s+(\d+)$'
for col in df.columns:
col_lower = str(col).lower().strip()
match = re.match(pattern, col_lower)
if match:
tracking_cols.append((int(match.group(1)), col)) # (number, original column name)
# Sort by number and return column names
tracking_cols.sort(key=lambda x: x[0])
return [col for _, col in tracking_cols]
def get_next_message_number(contact_row: pd.Series, tracking_cols: List[str]) -> int:
"""
Determine which message number to send next for a contact
Args:
contact_row: Single row from DataFrame representing one contact
tracking_cols: List of tracking status column names
Returns:
Message number to send (1-indexed). Returns 1 if no tracking statuses exist.
"""
if not tracking_cols:
return 1
# Count how many tracking statuses have values (not empty/NaN)
filled_count = 0
for col in tracking_cols:
value = str(contact_row.get(col, '')).strip()
if value and value.lower() not in ['', 'nan', 'none']:
filled_count += 1
else:
break # Stop at first empty tracking status
# Next message is filled_count + 1
return filled_count + 1
def should_skip_contact(contact_row: pd.Series, tracking_cols: List[str]) -> Tuple[bool, Optional[str]]:
"""
Determine if a contact should be skipped based on previous tracking statuses
Args:
contact_row: Single row from DataFrame representing one contact
tracking_cols: List of tracking status column names
Returns:
(should_skip, reason) - True if contact should be skipped, with reason
"""
skip_statuses = ['replied', 'not delivered', 'do not contact', 'not available']
for col in tracking_cols:
value = str(contact_row.get(col, '')).strip().lower()
if value in skip_statuses:
return True, f"Previous status: {value}"
return False, None
def get_contact_outreach_plan(df: pd.DataFrame, campaign_message_count: int) -> pd.DataFrame:
"""
Create an outreach plan for all contacts showing which message to send
Args:
df: DataFrame with contact data
campaign_message_count: Total number of messages in the campaign
Returns:
DataFrame with added columns: 'next_message_number', 'should_skip', 'skip_reason'
"""
tracking_cols = get_tracking_status_columns(df)
df = df.copy()
df['next_message_number'] = 0
df['should_skip'] = False
df['skip_reason'] = ''
for idx, row in df.iterrows():
# Determine which message to send
next_msg = get_next_message_number(row, tracking_cols)
# Check if should skip
should_skip, skip_reason = should_skip_contact(row, tracking_cols)
# Also skip if next message exceeds campaign message count
if next_msg > campaign_message_count:
should_skip = True
skip_reason = f'All {campaign_message_count} messages already sent'
df.at[idx, 'next_message_number'] = next_msg
df.at[idx, 'should_skip'] = should_skip
df.at[idx, 'skip_reason'] = skip_reason if skip_reason else ''
return df
def create_tracking_status_column_name(tracking_cols: List[str]) -> str:
"""
Create the name for the next tracking status column
Args:
tracking_cols: List of existing tracking status column names
Returns:
New tracking status column name (e.g., "Tracking Status 3")
"""
if not tracking_cols:
return "Tracking Status 1"
# Get the highest number
max_num = len(tracking_cols)
return f"Tracking Status {max_num + 1}"
def update_sheet_with_tracking_status(sheet_url_or_id: str,
tracking_results: List[Dict],
is_file_id: bool = False) -> bool:
"""
Update Google Sheet by updating the last tracking status column with results
Args:
sheet_url_or_id: Google Sheets URL or file_id
tracking_results: List of dicts with 'username', 'message_number', 'status'
is_file_id: Whether sheet_url_or_id is a file_id
Returns:
True if successful
"""
import gspread
from google.oauth2.service_account import Credentials
import os
import re
try:
# Get credentials
creds_file = 'google_credentials.json'
if not os.path.exists(creds_file):
raise Exception(f'Google credentials file not found: {creds_file}')
scope = [
'https://spreadsheets.google.com/feeds',
'https://www.googleapis.com/auth/drive'
]
credentials = Credentials.from_service_account_file(creds_file, scopes=scope)
client = gspread.authorize(credentials)
# Get sheet ID
if is_file_id:
sheet_id = sheet_url_or_id
else:
# Extract from URL
match = re.search(r'/spreadsheets/d/([a-zA-Z0-9-_]+)', sheet_url_or_id)
if not match:
raise Exception('Invalid Google Sheets URL')
sheet_id = match.group(1)
# Open spreadsheet - try to handle Excel files by converting them
try:
spreadsheet = client.open_by_key(sheet_id)
except Exception as e:
error_msg = str(e)
if 'FAILED_PRECONDITION' in error_msg or 'not supported for this document' in error_msg:
# Try to convert Excel file to Google Sheets
try:
from googleapiclient.discovery import build
drive_service = build('drive', 'v3', credentials=credentials)
# Copy the file as a Google Sheets document
copied_file = drive_service.files().copy(
fileId=sheet_id,
body={
'name': 'Converted Spreadsheet (from InstagramOutreach)',
'mimeType': 'application/vnd.google-apps.spreadsheet'
}
).execute()
new_sheet_id = copied_file['id']
spreadsheet = client.open_by_key(new_sheet_id)
raise Exception(f'Original file was Excel format. Created Google Sheets copy with ID: {new_sheet_id}. Please use this new URL: https://docs.google.com/spreadsheets/d/{new_sheet_id}/edit')
except Exception as convert_error:
raise Exception(f'The file ID "{sheet_id}" is not a Google Spreadsheet. Please open it in Google Drive and select File → Save as Google Sheets. Error: {str(convert_error)}')
raise
worksheet = spreadsheet.get_worksheet(0)
# Get all data
records = worksheet.get_all_records()
df = pd.DataFrame(records)
if len(df) == 0:
raise Exception('Sheet is empty')
# Find instagram column
instagram_col = None
for col in df.columns:
if 'instagram' in str(col).lower():
instagram_col = col
break
if not instagram_col:
raise Exception('Sheet must have an "instagram" column')
# Get existing tracking status columns
tracking_cols = get_tracking_status_columns(df)
# Determine which tracking status column to update
# Find the highest message number from tracking results
max_msg_num = max([r.get('message_number', 1) for r in tracking_results])
target_col_name = f"Tracking Status {max_msg_num}"
# Check if column exists in the sheet
column_exists = target_col_name in df.columns
# Create results map: username -> status
results_map = {}
for result in tracking_results:
username = result.get('username', '').lower()
status = result.get('status', '')
results_map[username] = status
# Update values for contacts in the tracking results
from app import normalize_username
# Find the column index for target_col_name
if column_exists:
col_index = df.columns.tolist().index(target_col_name) + 1 # 1-indexed for gspread
else:
# Need to add the column - get next available column
col_index = len(df.columns) + 1
# Get current headers from sheet
headers = worksheet.row_values(1)
# Add new column header
headers.append(target_col_name)
worksheet.update('1:1', [headers])
# Helper function to convert column index to letter (1=A, 26=Z, 27=AA, etc.)
def col_index_to_letter(n):
result = ''
while n > 0:
n -= 1
result = chr(65 + (n % 26)) + result
n //= 26
return result
# Define statuses that should not be overwritten by automated tracking
protected_statuses = ['replied', 'not delivered', 'do not contact', 'not available']
# Individual cell updates (works with all file types)
for idx, row in df.iterrows():
username = normalize_username(row[instagram_col])
if username in results_map:
# Row number is idx + 2 (1 for header, 1 for 0-indexing)
row_num = idx + 2
col_letter = col_index_to_letter(col_index)
# Read current value directly from sheet to check if it's protected
try:
current_value = str(worksheet.acell(f'{col_letter}{row_num}').value or '').strip().lower()
if current_value in protected_statuses:
# Skip update - preserve manually set status
continue
except:
pass # If read fails, proceed with update
# Update individual cell
worksheet.update_acell(f'{col_letter}{row_num}', results_map[username])
return True
except Exception as e:
raise Exception(f'Error updating tracking status: {str(e)}')