-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathapp.py
274 lines (225 loc) · 9 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
import sqlite3
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import FileResponse
import shutil
from insightface.app import FaceAnalysis
import insightface
import cv2
import os
import uuid
from gfpgan import GFPGANer
import numpy as np
from PIL import Image
<<<<<<< HEAD
import warnings
warnings.filterwarnings("ignore", category=UserWarning, module="onnxruntime")
=======
import logging
from fastapi.middleware.cors import CORSMiddleware
>>>>>>> 29f4956de4b0bfc1afb896b5b0740c099be08a12
# Initialize FastAPI app
app = FastAPI()
<<<<<<< HEAD
=======
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Update with allowed origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize logging
logging.basicConfig(level=logging.INFO)
# Initialize FaceAnalysis
face_app = FaceAnalysis(name='buffalo_l')
face_app.prepare(ctx_id=0, det_size=(640, 640))
swapper = insightface.model_zoo.get_model('inswapper/inswapper_128.onnx', download=False, download_zip=False)
# Initialize GFPGAN for face enhancement
gfpganer = GFPGANer(model_path='models/GFPGANv1.4.pth', upscale=1, arch='clean', channel_multiplier=2)
>>>>>>> 29f4956de4b0bfc1afb896b5b0740c099be08a12
# Directory setup
UPLOAD_FOLDER = 'uploads'
RESULT_FOLDER = 'results'
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(RESULT_FOLDER, exist_ok=True)
<<<<<<< HEAD
# Initialize FaceAnalysis
print("Initializing FaceAnalysis...")
try:
face_app = FaceAnalysis(name='buffalo_l')
face_app.prepare(ctx_id=0, det_size=(640, 640))
print("FaceAnalysis initialized successfully.")
except Exception as e:
print(f"Error initializing FaceAnalysis: {e}")
raise
=======
# Initialize SQLite database
DATABASE = "user.db"
def init_db():
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT NOT NULL,
result_image_path TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()
init_db() # Initialize the database on app startup
def simple_face_swap(sourceImage, targetImage, face_app, swapper):
logging.info("Starting face swap...")
facesimg1 = face_app.get(sourceImage)
facesimg2 = face_app.get(targetImage)
logging.info(f"Number of faces detected in source image: {len(facesimg1)}")
logging.info(f"Number of faces detected in target image: {len(facesimg2)}")
>>>>>>> 29f4956de4b0bfc1afb896b5b0740c099be08a12
# Initialize Face Swapper
print("Loading Face Swapper model...")
try:
swapper = insightface.model_zoo.get_model('models/inswapper_128.onnx', download=False, download_zip=False)
print("Face Swapper model loaded successfully.")
except Exception as e:
print(f"Error loading Face Swapper model: {e}")
raise
# Initialize GFPGAN for face enhancement
print("Loading GFPGAN model...")
try:
gfpganer = GFPGANer(
model_path='models/GFPGANv1.4.pth',
upscale=1,
arch='clean',
channel_multiplier=2
)
print("GFPGAN model loaded successfully.")
except Exception as e:
print(f"Error loading GFPGAN model: {e}")
raise
def load_image(file_path):
"""Load an image using OpenCV."""
image = cv2.imread(file_path)
if image is None:
print(f"Failed to load image: {file_path}")
raise HTTPException(status_code=500, detail=f"Failed to load image: {file_path}")
print(f"Image loaded: {file_path} (shape: {image.shape})")
return image
def save_image(image, folder):
"""Save an image and return the file path."""
file_name = f"{uuid.uuid4()}.jpg"
file_path = os.path.join(folder, file_name)
cv2.imwrite(file_path, image)
print(f"Image saved: {file_path}")
return file_path
def single_face_swap(source_img, target_img, face_app, swapper):
"""Perform a single face swap."""
print("Starting single face swap...")
faces_src = face_app.get(source_img)
faces_tgt = face_app.get(target_img)
print(f"Faces detected in source image: {len(faces_src)}")
print(f"Faces detected in target image: {len(faces_tgt)}")
if not faces_src or not faces_tgt:
print("No faces detected in one or both images.")
return None
face_src = faces_src[0]
face_tgt = faces_tgt[0]
swapped_img = swapper.get(source_img, face_src, face_tgt, paste_back=True)
print("Single face swap completed.")
return swapped_img
def two_face_swap(source_img, target_img, face_app, swapper):
"""Perform a two-face swap."""
print("Starting two-face swap...")
faces_src = face_app.get(source_img)
faces_tgt = face_app.get(target_img)
print(f"Faces detected in source image: {len(faces_src)}")
print(f"Faces detected in target image: {len(faces_tgt)}")
if len(faces_src) < 2 or len(faces_tgt) < 2:
print("Less than two faces detected in one or both images.")
return None
# Swap the first face from source with the first face from target
face_src1 = faces_src[0]
face_tgt1 = faces_tgt[1]
img_swapped1 = swapper.get(source_img, face_src1, face_tgt1, paste_back=True)
# Swap the second face from source with the second face from target
face_src2 = faces_src[1]
face_tgt2 = faces_tgt[0]
img_swapped2 = swapper.get(img_swapped1, face_src2, face_tgt2, paste_back=True)
print("Two-face swap completed.")
return img_swapped2
def enhance_face(image):
"""Enhance a face using GFPGAN."""
print("Starting face enhancement...")
_, _, restored_img = gfpganer.enhance(image, has_aligned=False, only_center_face=False, paste_back=True)
if isinstance(restored_img, np.ndarray):
print("Face enhancement completed.")
return restored_img
else:
print("Face enhancement failed.")
raise HTTPException(status_code=500, detail="Face enhancement failed.")
@app.post("/api/swap-face/")
<<<<<<< HEAD
async def swap_faces(sourceImage: UploadFile = File(...), targetImage: UploadFile = File(...)):
"""API endpoint for face swapping."""
try:
# Save uploaded files
src_path = os.path.join(UPLOAD_FOLDER, sourceImage.filename)
tgt_path = os.path.join(UPLOAD_FOLDER, targetImage.filename)
with open(src_path, "wb") as buffer:
shutil.copyfileobj(sourceImage.file, buffer)
with open(tgt_path, "wb") as buffer:
shutil.copyfileobj(targetImage.file, buffer)
=======
async def swap_faces(sourceImage: UploadFile = File(...), targetImage: UploadFile = File(...), name: str = File(...), email: str = File(...)):
img1_path = os.path.join(UPLOAD_FOLDER, sourceImage.filename)
img2_path = os.path.join(UPLOAD_FOLDER, targetImage.filename)
print('userDetails',name,email)
>>>>>>> 29f4956de4b0bfc1afb896b5b0740c099be08a12
# Load images
source_img = load_image(src_path)
target_img = load_image(tgt_path)
# Perform face swap (use single or two-face swap as needed)
swapped_img = single_face_swap(source_img, target_img, face_app, swapper)
# swapped_img = two_face_swap(source_img, target_img, face_app, swapper) # Uncomment if needed
if swapped_img is None:
raise HTTPException(status_code=400, detail="Face swap failed.")
# Enhance the swapped image
enhanced_img = enhance_face(swapped_img)
# Save the enhanced image
result_path = save_image(enhanced_img, RESULT_FOLDER)
return FileResponse(result_path)
except Exception as e:
print(f"Error during face swap: {e}")
raise HTTPException(status_code=500, detail=str(e))
<<<<<<< HEAD
if __name__ == "__main__":
=======
logging.info(f"Enhanced image shape: {enhanced_image.shape}")
result_filename = str(uuid.uuid4()) + '.jpg'
result_path = os.path.join(RESULT_FOLDER, result_filename)
cv2.imwrite(result_path, enhanced_image)
# Save user data in SQLite
try:
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO users (name, email, result_image_path)
VALUES (?, ?, ?)
""",
(name, email, result_path),
)
conn.commit()
conn.close()
except Exception as e:
logging.error(f"Failed to save user data in database: {e}")
raise HTTPException(status_code=500, detail="Failed to save user data")
logging.info(f"User details saved: {name}, {email}, {result_path}")
logging.info(f"Image saved to: {result_path}")
return FileResponse(result_path)
# HTTP server
if __name__ == '__main__':
>>>>>>> 29f4956de4b0bfc1afb896b5b0740c099be08a12
import uvicorn
uvicorn.run(app, host="localhost", port=8000)