Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

video path is enabled in stream #1415

Merged
merged 6 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 72 additions & 66 deletions deepface/DeepFace.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,18 @@ def build_model(model_name: str, task: str = "facial_recognition") -> Any:


def verify(
img1_path: Union[str, np.ndarray, List[float]],
img2_path: Union[str, np.ndarray, List[float]],
model_name: str = "VGG-Face",
detector_backend: str = "opencv",
distance_metric: str = "cosine",
enforce_detection: bool = True,
align: bool = True,
expand_percentage: int = 0,
normalization: str = "base",
silent: bool = False,
threshold: Optional[float] = None,
anti_spoofing: bool = False,
img1_path: Union[str, np.ndarray, List[float]],
img2_path: Union[str, np.ndarray, List[float]],
model_name: str = "VGG-Face",
detector_backend: str = "opencv",
distance_metric: str = "cosine",
enforce_detection: bool = True,
align: bool = True,
expand_percentage: int = 0,
normalization: str = "base",
silent: bool = False,
threshold: Optional[float] = None,
anti_spoofing: bool = False,
) -> Dict[str, Any]:
"""
Verify if an image pair represents the same person or different persons.
Expand Down Expand Up @@ -164,14 +164,14 @@ def verify(


def analyze(
img_path: Union[str, np.ndarray],
actions: Union[tuple, list] = ("emotion", "age", "gender", "race"),
enforce_detection: bool = True,
detector_backend: str = "opencv",
align: bool = True,
expand_percentage: int = 0,
silent: bool = False,
anti_spoofing: bool = False,
img_path: Union[str, np.ndarray],
actions: Union[tuple, list] = ("emotion", "age", "gender", "race"),
enforce_detection: bool = True,
detector_backend: str = "opencv",
align: bool = True,
expand_percentage: int = 0,
silent: bool = False,
anti_spoofing: bool = False,
) -> List[Dict[str, Any]]:
"""
Analyze facial attributes such as age, gender, emotion, and race in the provided image.
Expand Down Expand Up @@ -263,20 +263,20 @@ def analyze(


def find(
img_path: Union[str, np.ndarray],
db_path: str,
model_name: str = "VGG-Face",
distance_metric: str = "cosine",
enforce_detection: bool = True,
detector_backend: str = "opencv",
align: bool = True,
expand_percentage: int = 0,
threshold: Optional[float] = None,
normalization: str = "base",
silent: bool = False,
refresh_database: bool = True,
anti_spoofing: bool = False,
batched: bool = False,
img_path: Union[str, np.ndarray],
db_path: str,
model_name: str = "VGG-Face",
distance_metric: str = "cosine",
enforce_detection: bool = True,
detector_backend: str = "opencv",
align: bool = True,
expand_percentage: int = 0,
threshold: Optional[float] = None,
normalization: str = "base",
silent: bool = False,
refresh_database: bool = True,
anti_spoofing: bool = False,
batched: bool = False,
) -> Union[List[pd.DataFrame], List[List[Dict[str, Any]]]]:
"""
Identify individuals in a database
Expand Down Expand Up @@ -369,15 +369,15 @@ def find(


def represent(
img_path: Union[str, np.ndarray],
model_name: str = "VGG-Face",
enforce_detection: bool = True,
detector_backend: str = "opencv",
align: bool = True,
expand_percentage: int = 0,
normalization: str = "base",
anti_spoofing: bool = False,
max_faces: Optional[int] = None,
img_path: Union[str, np.ndarray],
model_name: str = "VGG-Face",
enforce_detection: bool = True,
detector_backend: str = "opencv",
align: bool = True,
expand_percentage: int = 0,
normalization: str = "base",
anti_spoofing: bool = False,
max_faces: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""
Represent facial images as multi-dimensional vector embeddings.
Expand Down Expand Up @@ -441,15 +441,16 @@ def represent(


def stream(
db_path: str = "",
model_name: str = "VGG-Face",
detector_backend: str = "opencv",
distance_metric: str = "cosine",
enable_face_analysis: bool = True,
source: Any = 0,
time_threshold: int = 5,
frame_threshold: int = 5,
anti_spoofing: bool = False,
db_path: str = "",
model_name: str = "VGG-Face",
detector_backend: str = "opencv",
distance_metric: str = "cosine",
enable_face_analysis: bool = True,
source: Any = 0,
time_threshold: int = 5,
frame_threshold: int = 5,
anti_spoofing: bool = False,
output_path: Optional[str] = None,
) -> None:
"""
Run real time face recognition and facial attribute analysis
Expand Down Expand Up @@ -478,6 +479,10 @@ def stream(
frame_threshold (int): The frame threshold for face recognition (default is 5).

anti_spoofing (boolean): Flag to enable anti spoofing (default is False).

output_path (str): Path to save the output video. (default is None
If None, no video is saved).

Returns:
None
"""
Expand All @@ -495,19 +500,20 @@ def stream(
time_threshold=time_threshold,
frame_threshold=frame_threshold,
anti_spoofing=anti_spoofing,
output_path=output_path,
)


def extract_faces(
img_path: Union[str, np.ndarray],
detector_backend: str = "opencv",
enforce_detection: bool = True,
align: bool = True,
expand_percentage: int = 0,
grayscale: bool = False,
color_face: str = "rgb",
normalize_face: bool = True,
anti_spoofing: bool = False,
img_path: Union[str, np.ndarray],
detector_backend: str = "opencv",
enforce_detection: bool = True,
align: bool = True,
expand_percentage: int = 0,
grayscale: bool = False,
color_face: str = "rgb",
normalize_face: bool = True,
anti_spoofing: bool = False,
) -> List[Dict[str, Any]]:
"""
Extract faces from a given image
Expand Down Expand Up @@ -584,11 +590,11 @@ def cli() -> None:


def detectFace(
img_path: Union[str, np.ndarray],
target_size: tuple = (224, 224),
detector_backend: str = "opencv",
enforce_detection: bool = True,
align: bool = True,
img_path: Union[str, np.ndarray],
target_size: tuple = (224, 224),
detector_backend: str = "opencv",
enforce_detection: bool = True,
align: bool = True,
) -> Union[np.ndarray, None]:
"""
Deprecated face detection function. Use extract_faces for same functionality.
Expand Down
48 changes: 38 additions & 10 deletions deepface/modules/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
IDENTIFIED_IMG_SIZE = 112
TEXT_COLOR = (255, 255, 255)


# pylint: disable=unused-variable
def analysis(
db_path: str,
Expand All @@ -33,6 +34,7 @@ def analysis(
time_threshold=5,
frame_threshold=5,
anti_spoofing: bool = False,
output_path: Optional[str] = None,
):
"""
Run real time face recognition and facial attribute analysis
Expand Down Expand Up @@ -62,6 +64,8 @@ def analysis(

anti_spoofing (boolean): Flag to enable anti spoofing (default is False).

output_path (str): Path to save the output video. (default is None
If None, no video is saved).
Returns:
None
"""
Expand All @@ -77,12 +81,31 @@ def analysis(
model_name=model_name,
)

cap = cv2.VideoCapture(source if isinstance(source, str) else int(source))
if not cap.isOpened():
logger.error(f"Cannot open video source: {source}")
return

# Get video properties
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
fourcc = cv2.VideoWriter_fourcc(*"mp4v") # Codec for output file
# Ensure the output directory exists if output_path is provided
if output_path:
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# Initialize video writer if output_path is provided
video_writer = (
cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))
serengil marked this conversation as resolved.
Show resolved Hide resolved
if output_path
else None
)

freezed_img = None
freeze = False
num_frames_with_faces = 0
tic = time.time()

cap = cv2.VideoCapture(source) # webcam
while True:
has_frame, img = cap.read()
if not has_frame:
Expand All @@ -91,17 +114,16 @@ def analysis(
# we are adding some figures into img such as identified facial image, age, gender
serengil marked this conversation as resolved.
Show resolved Hide resolved
# that is why, we need raw image itself to make analysis
raw_img = img.copy()

faces_coordinates = []
if freeze is False:

if not freeze:
faces_coordinates = grab_facial_areas(
img=img, detector_backend=detector_backend, anti_spoofing=anti_spoofing
)

# we will pass img to analyze modules (identity, demography) and add some illustrations
serengil marked this conversation as resolved.
Show resolved Hide resolved
# that is why, we will not be able to extract detected face from img clearly
detected_faces = extract_facial_areas(img=img, faces_coordinates=faces_coordinates)

img = highlight_facial_areas(img=img, faces_coordinates=faces_coordinates)
img = countdown_to_freeze(
img=img,
Expand All @@ -111,8 +133,8 @@ def analysis(
)

num_frames_with_faces = num_frames_with_faces + 1 if len(faces_coordinates) else 0

freeze = num_frames_with_faces > 0 and num_frames_with_faces % frame_threshold == 0

if freeze:
# add analyze results into img - derive from raw_img
img = highlight_facial_areas(
Expand Down Expand Up @@ -144,22 +166,28 @@ def analysis(
tic = time.time()
logger.info("freezed")

elif freeze is True and time.time() - tic > time_threshold:
elif freeze and time.time() - tic > time_threshold:
freeze = False
freezed_img = None
# reset counter for freezing
serengil marked this conversation as resolved.
Show resolved Hide resolved
tic = time.time()
logger.info("freeze released")
logger.info("Freeze released")

freezed_img = countdown_to_release(img=freezed_img, tic=tic, time_threshold=time_threshold)
display_img = img if freezed_img is None else freezed_img

cv2.imshow("img", img if freezed_img is None else freezed_img)
# Save the frame to output video if writer is initialized
if video_writer:
video_writer.write(display_img)

if cv2.waitKey(1) & 0xFF == ord("q"): # press q to quit
cv2.imshow("img", display_img)
if cv2.waitKey(1) & 0xFF == ord("q"):
break

# kill open cv things
# Release resources
cap.release()
if video_writer:
video_writer.release()
cv2.destroyAllWindows()


Expand Down
Loading