Commit ebfcaf08 authored by Jack Hu's avatar Jack Hu
parents eb415628 e296995d
......@@ -15,7 +15,8 @@ import data_handler
TIMEOUT: float = 30.0
# Minimum number of frames in which a user must be recognized in order to be authenticated.
MIN_USER_RECOGNITION_COUNT = 10
USER_IDS_KEY: str = "user_ids"
DEFAULT_CAMERA: int = 0
DEFAULT_WIDTH: int = 275
def load_encodings(file_location: str):
......@@ -33,8 +34,10 @@ def determine_identity(face_encoding, known_faces) -> str:
recognized_users = {}
for (user_id, user_encodings) in known_faces.items():
matches = face_recognition.compare_faces(user_encodings, face_encoding)
# Count the number of occurrences of true.
recognized_users[user_id] = matches.count(True)
count = matches.count(True)
if count > 0:
# Count the number of occurrences of true.
recognized_users[user_id] = count
matched_user = ""
if len(recognized_users) > 0:
......@@ -70,13 +73,54 @@ def draw_rectangles_and_user_ids(image_frame, conversion: float, box_user_id_map
common.display_frame(image_frame)
def run_face_recognition(frame, known_faces: dict, encoding_model: str = "hog", draw_rectangles: bool = False,
image_width: int = DEFAULT_WIDTH) -> list:
"""
:param frame: The frame to use in recognition.
:param known_faces: The known faces data.
:param encoding_model: The type of encoding model to use. CNN is more reliable but much slower, HOG is faster.
:param draw_rectangles: True to draw the rectangles and user ids around identified faces, false otherwise.
:param image_width: The width to which images should be resized in order to speed up processing.
Lower quality means faster but less reliable recognition.
:return: The list of recognized user ids.
"""
recognized_user_ids: list = []
original_frame = frame
# Convert input from BGR to RGB
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Resize image to speed up processing.
rgb_image = imutils.resize(frame, width=image_width)
r = frame.shape[1] / float(rgb_image.shape[1])
# Detect the location of each face and determine the boxes in which they lie
boxes = face_recognition.face_locations(
rgb_image, model=encoding_model)
# Compute the facial embeddings (the encoding) at
# each of the locations found in the previous line.
encodings = face_recognition.face_encodings(rgb_image, boxes)
box_user_id_mapping = {}
for (i, encoding) in enumerate(encodings):
user_id: str = determine_identity(encoding, known_faces)
if user_id:
box_user_id_mapping[boxes[i]] = user_id
recognized_user_ids.append(user_id)
if draw_rectangles:
draw_rectangles_and_user_ids(original_frame, r, box_user_id_mapping)
return recognized_user_ids
def recognize_user(known_faces: dict, encoding_model: str = "hog", image_flip: int = None,
draw_rectangles=False):
draw_rectangles: bool = False, camera: int = DEFAULT_CAMERA, image_width: int = DEFAULT_WIDTH):
"""Attempts to recognize a user.
Returns the ID of the user if identified, or None if no users are identified."""
recognized_users_count = {}
recognized_user = None
video_stream = common.start_video_stream(0)
video_stream = common.start_video_stream(camera)
# Determine the time at which we will time out. Equal to current time + timeout.
timeout_time: float = time.time() + TIMEOUT
......@@ -86,30 +130,13 @@ def recognize_user(known_faces: dict, encoding_model: str = "hog", image_flip: i
if image_flip is not None:
image_frame = cv2.flip(image_frame, image_flip)
# Convert input from BGR to RGB
cv2.cvtColor(image_frame, cv2.COLOR_BGR2RGB)
# Resize image to width of 750 PX to speed up processing.
rgb_image = imutils.resize(image_frame, width=750)
r = image_frame.shape[1] / float(rgb_image.shape[1])
# Detect the location of each face and determine the boxes in which they lie
boxes = face_recognition.face_locations(
rgb_image, model=encoding_model)
# Compute the facial embeddings (the encoding) at
# each of the locations found in the previous line.
encodings = face_recognition.face_encodings(rgb_image, boxes)
box_user_id_mapping = {}
for (i, encoding) in enumerate(encodings):
user_id: str = determine_identity(encoding, known_faces)
if user_id:
if user_id not in recognized_users_count:
recognized_users_count[user_id] = 0
recognized_users_count[user_id] += 1
box_user_id_mapping[boxes[i]] = user_id
if draw_rectangles:
draw_rectangles_and_user_ids(image_frame, r, box_user_id_mapping)
recognized_user_ids = run_face_recognition(image_frame, known_faces, encoding_model=encoding_model,
draw_rectangles=draw_rectangles, image_width=image_width)
for user_id in recognized_user_ids:
if user_id not in recognized_users_count:
recognized_users_count[user_id] = 0
recognized_users_count[user_id] += 1
# Now check if we have already positively identified a user enough times
recognized_users = check_recognized_users(recognized_users_count)
......@@ -126,18 +153,22 @@ def recognize_user(known_faces: dict, encoding_model: str = "hog", image_flip: i
def recognize_user_from_database(database_loc: str = common.DATABASE_LOC, encoding_model: str = "hog",
image_flip: int = None, draw_rectangles: bool = False):
image_flip: int = None, draw_rectangles: bool = False, camera: int = DEFAULT_CAMERA,
image_width: int = DEFAULT_WIDTH):
"""
Recognizes a user
:param database_loc: The database containing the face encodings for users.
:param encoding_model: The encoding model to be used for recognition.
:param image_flip: The type of image flip to be applied to the image, if it will be upside-down or horizontally inverted.
:param draw_rectangles: True to draw the rectangles to the screen, false otherwise.
:param camera: Which camera to use
:param image_width: The width to which images should be resized in order to speed up processing.
Lower quality means faster but less reliable recognition.
:return: The recognized user's id, or None if no user was recognized.
"""
return recognize_user(data_handler.load_database(database_loc), encoding_model=encoding_model,
image_flip=image_flip,
draw_rectangles=draw_rectangles)
draw_rectangles=draw_rectangles, camera=camera, image_width=image_width)
# If this program is the main program, authenticate the user.
......@@ -156,15 +187,19 @@ if __name__ == "__main__":
required=False, default=None, choices=[0, 1])
parser.add_argument("--show", "-s", action="store_true",
help="Include this argument to have the image shown to you.", default=False)
parser.add_argument("--camera", "-c", type=int, required=False,
help="Which camera to be used during authentication.", default=DEFAULT_CAMERA)
parser.add_argument("--width", "-w", type=int, required=False,
help=f"The image width to use, in pixels. Default is {DEFAULT_WIDTH} pixels.")
args = parser.parse_args()
args_dict = {}
if args.encodings is not None:
args_dict["encodings_location"] = args.encodings
args_dict["database_loc"] = args.encodings
if args.model is not None:
args_dict["encoding_model"] = args.model
user = recognize_user_from_database(**args_dict, image_flip=args.flip,
draw_rectangles=args.show)
draw_rectangles=args.show, camera=args.camera, image_width=args.width)
if user:
print(f"Recognized user {user}.")
......@@ -5,6 +5,8 @@ import cv2
DATA_DIR = "data"
DATASET_DIR = "dataset"
DATABASE_LOC = os.path.join(DATA_DIR, "database.pickle")
# Name for the display frame.
FRAME_NAME = "Frame"
RES_DIRECTORY = "res"
# Directory for the face detection model.
......@@ -14,13 +16,13 @@ EMBEDDINGS_PROCESSOR_LOC = os.path.join(RES_DIRECTORY, "openface_nn4.small2.v1.t
def display_frame(frame):
"""Displays the frame to the user."""
cv2.imshow("Frame", frame)
cv2.imshow(FRAME_NAME, frame)
def start_video_stream(camera: int):
"""Starts the video stream and returns the created stream.
Also waits for the video stream to open before returning it."""
video_stream = cv2.VideoCapture(0)
video_stream = cv2.VideoCapture(camera)
return video_stream
......
......@@ -26,8 +26,8 @@ def register_user(base_dir: str = os.getcwd()) -> None:
:param base_dir: The base directory, from which all default paths are relative. Defaults to current working directory.
:return: None
"""
register.register_users_and_save(directory_location=os.path.join(base_dir, common.DATABASE_LOC),
delete_images_on_complete=True, overwrite_data=False)
register.register_users_from_dir_and_save(directory_location=os.path.join(base_dir, common.DATABASE_LOC),
delete_images_on_complete=True, overwrite_data=False)
if __name__ == "__main__":
......
......@@ -36,22 +36,20 @@ def delete_file(file_path: str) -> None:
os.unlink(file_path)
def register_user(user_id: str, dataset_dir: str, encoding_model="hog",
show_output: bool = False, delete_on_processed: bool = False):
def register_user(user_id: str, images: list, encoding_model="hog",
show_output: bool = False):
"""
Function for registering a new user using the given video source. If video source isn't provided, then the camera
on id 0 is used.
Function for registering a new user using the given images of that user.
:param user_id: The user id for the user that is being registered.
:param dataset_dir: The directory location of pictures for the user.
:param images: Images of the user. Should only contain the user, and no other faces.
:param encoding_model: The type of encoding model. Must be either "hog" or "cnn". HOG is faster, CNN is more thorough.
:param show_output: True to print console output for progress, false otherwise.
:param delete_on_processed: True to delete the image file after processing it, false otherwise.
:return: Encoded face that was detected, or None if no face was detected or if there was another error.
"""
processed_images = []
for (i, filename) in enumerate(impaths.list_images(dataset_dir)):
for (i, image) in enumerate(images):
# Might want to check file validity here at some point, but won't for now.
image = cv2.imread(filename)
if image is not None:
if show_output:
print(f"Processing image {i + 1} for user {user_id}")
......@@ -59,13 +57,52 @@ def register_user(user_id: str, dataset_dir: str, encoding_model="hog",
if processed:
processed_images.extend(processed)
# Delete after we're done if we're supposed to.
if delete_on_processed:
delete_file(filename)
return {user_id: processed_images} if len(processed_images) > 0 else None
def register_user_from_directory(user_id: str, dataset_dir: str, encoding_model="hog",
show_output: bool = False, delete_on_processed: bool = False):
"""
Function for registering a new user using the directory location of images for that user.
:param user_id: The user id for the user that is being registered.
:param dataset_dir: The directory location of pictures for the user.
:param encoding_model: The type of encoding model. Must be either "hog" or "cnn". HOG is faster, CNN is more thorough.
:param show_output: True to print console output for progress, false otherwise.
:param delete_on_processed: True to delete the image file after processing it, false otherwise.
:return: Encoded face that was detected, or None if no face was detected or if there was another error.
"""
images = []
filenames = []
for filename in impaths.list_images(dataset_dir):
filenames.append(filename)
images.append(cv2.imread(filename))
processed = register_user(user_id, images, encoding_model=encoding_model, show_output=show_output)
if delete_on_processed:
for filename in filenames:
delete_file(filename)
return processed
def register_users(user_dataset: dict, encoding_model: str = "hog", show_output: bool = False):
"""
Registers all the given users from their dataset images.
:param user_dataset: Dictionary of { "user_id" : [image1, image2, ...] , ... } of images of each user.
:param encoding_model: The type of encoding model to use.
:param show_output: True to print progress output, false otherwise.
:return: The dictionary of registered users in the given directory.
"""
total_dict = {}
for user_id, dataset in user_dataset.items():
user_dict = register_user(user_id, dataset, encoding_model=encoding_model, show_output=show_output)
if user_dict is not None:
total_dict.update(user_dict)
return total_dict if len(total_dict) > 0 else None
def register_users_in_dir(directory_location: str, encoding_model: str = "hog", delete_images_on_complete: bool = False,
show_output: bool = False):
"""
......@@ -80,21 +117,18 @@ def register_users_in_dir(directory_location: str, encoding_model: str = "hog",
for directory in next(os.walk(directory_location))[1]:
total_directory = os.path.join(directory_location, directory)
# Using the directory name as the user_id as well.
user_dict = register_user(directory, total_directory, encoding_model=encoding_model, show_output=show_output,
delete_on_processed=delete_images_on_complete)
user_dict = register_user_from_directory(directory, total_directory, encoding_model=encoding_model,
show_output=show_output,
delete_on_processed=delete_images_on_complete)
if user_dict is not None:
total_dict.update(user_dict)
return total_dict if len(total_dict) > 0 else None
def register_users_and_save(directory_location: str = common.DATASET_DIR,
database_location: str = common.DATABASE_LOC, encoding_model="hog",
delete_images_on_complete: bool = True, show_output: bool = False,
overwrite_data: bool = False):
processed_users = register_users_in_dir(directory_location, encoding_model=encoding_model,
delete_images_on_complete=delete_images_on_complete,
show_output=show_output)
def save_processed_users(processed_users,
database_location: str = common.DATABASE_LOC,
overwrite_data: bool = False) -> None:
database = data_handler.load_database(database_location) if not overwrite_data else {}
if processed_users is not None:
for user_id, encodings in processed_users.items():
......@@ -104,6 +138,26 @@ def register_users_and_save(directory_location: str = common.DATASET_DIR,
data_handler.write_database(database_location, database)
def register_users_from_dir_and_save(directory_location: str = common.DATASET_DIR,
database_location: str = common.DATABASE_LOC, encoding_model="hog",
delete_images_on_complete: bool = True, show_output: bool = False,
overwrite_data: bool = False) -> None:
"""
Registers the users in the given directory location and saves the user encodings to the database location.
:param directory_location: The location of the directory to crawl for dataset images.
:param database_location: The location of the database pickle file.
:param encoding_model: The encoding model which should be used. Hog is faster, cnn more reliable.
:param delete_images_on_complete: True to delete the images that are processed on completion, false otherwise.
:param show_output: True to show progress output to the console, false otherwise.
:param overwrite_data: True to overwrite the data in the database, false to append to it.
:return: None
"""
processed_users = register_users_in_dir(directory_location, encoding_model=encoding_model,
delete_images_on_complete=delete_images_on_complete,
show_output=show_output)
save_processed_users(processed_users, database_location=database_location, overwrite_data=overwrite_data)
if __name__ == "__main__":
import argparse
......@@ -125,5 +179,5 @@ if __name__ == "__main__":
if args.model is not None:
args_dict["encoding_model"] = args.model
register_users_and_save(**args_dict, show_output=True, delete_images_on_complete=False,
overwrite_data=args.overwrite)
register_users_from_dir_and_save(**args_dict, show_output=True, delete_images_on_complete=False,
overwrite_data=args.overwrite)
import argparse
import cv2
import imutils
import authenticate_user
import common
import data_handler
parser = argparse.ArgumentParser()
parser.add_argument("--image", type=str, help="Image on which to run recognition.", required=True)
args = parser.parse_args()
frame = cv2.imread(args.image)
frame = imutils.resize(frame, width=750)
authenticate_user.run_face_recognition(frame, data_handler.load_database(common.DATABASE_LOC), draw_rectangles=True)
while cv2.waitKey(1) & 0xFF != ord('q') and cv2.getWindowProperty(common.FRAME_NAME, 0) >= 0:
pass
cv2.destroyAllWindows()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment