Commit b84e731a authored by Kyle Anderson's avatar Kyle Anderson

Re-work old register-user

The old register-user and authentication systems have been reworked a little bit, working towards finalizing the authentication.
Big problem now is that it seems to recognize anyone as anyone, so not great authentication.
parent acf62927
"""
Methods for authenticating a user.
"""
import os
import time
import cv2
......@@ -9,7 +9,7 @@ import face_recognition
import imutils
import common
from data_handler import load_database
import data_handler
# How long to wait before timing out and saying failed authentication.
TIMEOUT: float = 30.0
......@@ -20,23 +20,7 @@ USER_IDS_KEY: str = "user_ids"
def load_encodings(file_location: str):
"""Loads the encodings for faces from the given file location."""
return load_database(file_location)
def find_faces(grey, face_cascade: cv2.CascadeClassifier):
"""
Finds the faces in the given image frame.
:param grey: The greyscale image.
:param face_cascade: The face cascade to be used for recognition.
:return: The face cascade classifier.
"""
return face_cascade.detectMultiScale(
grey,
scaleFactor=1.1,
minNeighbors=5,
minSize=(30, 30),
flags=cv2.CASCADE_SCALE_IMAGE
)
return data_handler.load_database(file_location)
def determine_identity(face_encoding, known_faces):
......@@ -87,41 +71,43 @@ def draw_rectangles_and_user_ids(image_frame, conversion: float, box_user_id_map
def recognize_user(known_faces: dict, encoding_model: str = "hog", image_flip: int = None,
draw_rectangles=False):
"""Attempts to recognize a user.
Returns the ID of the user if identified, or None if no users are identified.
Dictionary of the form { "user_id": #frames_recognized } to keep
track of how many times each user was recognized."""
Returns the ID of the user if identified, or None if no users are identified."""
recognized_users_count = {}
recognized_user = None
video_stream = common.start_video_stream(0)
# TODO get this
face_cascade = common.load_cascade(os.path.join(common.CASCADE_DIR, "haarcascade_frontalface_default.xml"))
# Determine the time at which we will time out. Equal to current time + timeout.
timeout_time: float = time.time() + TIMEOUT
while time.time() < timeout_time:
# Step 1: Image processing before we even get started with facial recognition.
grey, image_frame, r = process_next_image(video_stream, image_flip)
# Step 2: Detect locations of images.
boxes = find_faces(grey, face_cascade)
# Read a image_frame from the video stream.
ret, image_frame = video_stream.read()
if image_flip is not None:
image_frame = cv2.flip(image_frame, image_flip)
# Convert input from BGR to RGB
cv2.cvtColor(image_frame, cv2.COLOR_BGR2RGB)
# Resize image to width of 750 PX to speed up processing.
rgb_image = imutils.resize(image_frame, width=750)
r = image_frame.shape[1] / float(rgb_image.shape[1])
# Detect the location of each face and determine the boxes in which they lie
boxes = face_recognition.face_locations(
rgb_image, model=encoding_model)
# Compute the facial embeddings (the encoding) at
# each of the locations found in the previous line.
# encodings = face_recognition.face_encodings(grey, boxes)
#
# box_user_id_mapping = {}
# for (i, encoding) in enumerate(encodings):
# user_id: str = determine_identity(encoding, known_faces)
# if user_id:
# if user_id not in recognized_users_count:
# recognized_users_count[user_id] = 0
# recognized_users_count[user_id] += 1
# box_user_id_mapping[boxes[i]] = user_id
encodings = face_recognition.face_encodings(rgb_image, boxes)
box_user_id_mapping = {}
for (i, encoding) in enumerate(encodings):
user_id: str = determine_identity(encoding, known_faces)
if user_id:
if user_id not in recognized_users_count:
recognized_users_count[user_id] = 0
recognized_users_count[user_id] += 1
box_user_id_mapping[boxes[i]] = user_id
if draw_rectangles:
# draw_rectangles_and_user_ids(image_frame, r, box_user_id_mapping)
draw_rectangles_and_user_ids(image_frame, r, {box: "unknown" for box in boxes})
draw_rectangles_and_user_ids(image_frame, r, box_user_id_mapping)
# Now check if we have already positively identified a user enough times
recognized_users = check_recognized_users(recognized_users_count)
......@@ -137,42 +123,37 @@ def recognize_user(known_faces: dict, encoding_model: str = "hog", image_flip: i
return recognized_user
def process_next_image(video_stream, image_flip: int = None):
"""
Processes the next image on the given video stream.
:param video_stream: The video stream.
:param image_flip: The integer way in which to flip the image if it will need flipping.
:return: A tuple of three elements: the processed greyscale image, the original read image and the r ratio.
"""
# Read a image_frame from the video stream.
ret, image_frame = video_stream.read()
if image_flip is not None:
image_frame = cv2.flip(image_frame, image_flip)
# Convert input from BGR to RGB
grey = cv2.cvtColor(image_frame, cv2.COLOR_BGR2RGB)
# Resize image to width of 750 PX to speed up processing.
grey = imutils.resize(grey, width=750)
r = image_frame.shape[1] / float(grey.shape[1])
return grey, image_frame, r
def recognize_user_from_database(database_loc: str = common.EMBEDDINGS_LOC, encoding_model: str = "hog",
image_flip: int = None, draw_rectangles: bool = False):
return recognize_user(data_handler.load_database(database_loc), encoding_model=encoding_model, image_flip=image_flip,
draw_rectangles=draw_rectangles)
# If this program is the main program, authenticate the user.
if __name__ == "__main__":
import argparse;
import argparse
parser = argparse.ArgumentParser(description="Facial Identification Options")
parser.add_argument("--encodings", "-e", type=str, help="File location of facial encodings.", required=False,
default="./encodings.pickle")
default=None)
parser.add_argument("--model", "-m", type=str, help="Type of encoding method, either \"hog\" or \"cnn\". HOG is "
"faster, CNN is more accurate.", required=False,
default="hog", choices=["cnn", "hog"])
default=None, choices=["cnn", "hog"])
parser.add_argument("--flip", "-f", type=int,
help="Whether or not to flip the image vertically or horizontally. 0 to flip horizontally, 1 to flip vertically.",
help="Whether or not to flip the image vertically or horizontally. 0 to flip horizontally, "
"1 to flip vertically.",
required=False, default=None, choices=[0, 1])
parser.add_argument("--show", "-s", action="store_true",
help="Include this argument to have the image shown to you.", default=False)
args = parser.parse_args()
user = recognize_user(known_faces=load_encodings(args.encodings), encoding_model=args.model, image_flip=args.flip,
draw_rectangles=args.show)
args_dict = {}
if args.encodings is not None:
args_dict["encodings_location"] = args.encodings
if args.model is not None:
args_dict["encoding_model"] = args.model
user = recognize_user_from_database(**args_dict, image_flip=args.flip,
draw_rectangles=args.show)
if user:
print(f"Recognized user {user}.")
......@@ -5,6 +5,7 @@ import os
import cv2
import face_recognition
from imutils import paths as impaths
import common
import data_handler
......@@ -26,48 +27,103 @@ def process_image(image, encoding_model: str = "hog"):
return face_recognition.face_encodings(image_rgb, [boxes[0]]) if boxes and len(boxes) > 0 else []
def register_user(user_id: str, dataset_dir: str, encoding_model="hog", database_loc: str = common.EMBEDDINGS_LOC,
show_output: bool = False):
def delete_file(file_path: str) -> None:
"""
Deletes the file at the given location.
:param file_path: The path to the file.
:return: None
"""
os.unlink(file_path)
def register_user(user_id: str, dataset_dir: str, encoding_model="hog",
show_output: bool = False, delete_on_processed: bool = False):
"""
Function for registering a new user using the given video source. If video source isn't provided, then the camera
on id 0 is used.
:param user_id: The user id for the user that is being registered.
:param dataset_dir: The directory location of pictures for the user.
:param encoding_model: The type of encoding model. Must be either "hog" or "cnn". HOG is faster, CNN is more thorough.
:param database_loc: Location of the pickle file database.
:param show_output: True to print console output for progress, false otherwise.
:param delete_on_processed: True to delete the image file after processing it, false otherwise.
:return: Encoded face that was detected, or None if no face was detected or if there was another error.
"""
processed_images = []
for (i, filename) in enumerate(os.listdir(dataset_dir)):
full_path = os.path.join(dataset_dir, filename)
for (i, filename) in enumerate(impaths.list_images(dataset_dir)):
# Might want to check file validity here at some point, but won't for now.
image = cv2.imread(full_path)
image = cv2.imread(filename)
if image is not None:
if show_output:
print(f"Processing image {i + 1}")
print(f"Processing image {i + 1} for user {user_id}")
processed = process_image(image, encoding_model=encoding_model)
if processed:
processed_images.extend(processed)
if len(processed_images) > 0: # Only do things if we actually have stuff to add.
user_info = data_handler.load_database(database_loc)
if user_id not in user_info.keys():
user_info[user_id] = []
user_info[user_id].extend(processed_images)
data_handler.write_database(database_loc, user_info)
# Delete after we're done if we're supposed to.
if delete_on_processed:
delete_file(filename)
return {user_id: processed_images} if len(processed_images) > 0 else None
def register_users_in_dir(directory_location: str, encoding_model: str = "hog", delete_images_on_complete: bool = False,
show_output: bool = False):
"""
Registers all the users in a directory.
:param directory_location:
:param encoding_model: The type of encoding model to use.
:param delete_images_on_complete: True to delete the images after processing them, false otherwise.
:param show_output: True to print progress output, false otherwise.
:return: The dictionary of registered users in the given directory.
"""
total_dict = {}
for directory in next(os.walk(directory_location))[1]:
total_directory = os.path.join(directory_location, directory)
# Using the directory name as the user_id as well.
user_dict = register_user(directory, total_directory, encoding_model=encoding_model, show_output=show_output,
delete_on_processed=delete_images_on_complete)
if user_dict is not None:
total_dict.update(user_dict)
return total_dict if len(total_dict) > 0 else None
def register_users_and_save(directory_location: str = common.DATASET_DIR,
database_location: str = common.EMBEDDINGS_LOC, encoding_model="hog",
delete_images_on_complete: bool = True, show_output: bool = False,
overwrite_data: bool = False):
processed_users = register_users_in_dir(directory_location, encoding_model=encoding_model,
delete_images_on_complete=delete_images_on_complete,
show_output=show_output)
database = data_handler.load_database(database_location) if not overwrite_data else {}
if processed_users is not None:
for user_id, encodings in processed_users.items():
if user_id not in database:
database[user_id] = []
database[user_id].extend(encodings)
data_handler.write_database(database_location, database)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Facial Registration Options")
parser.add_argument("user_id", type=str, help="User ID to be used")
parser.add_argument("--encodings", "-e", type=str, help="File location to output encodings.", required=True,
default="./encodings.pickle")
parser.add_argument("--dataset", "-d", type=str, help="Directory location of the dataset images.", required=True)
parser.add_argument("--encodings", "-e", type=str, help="File location to output encodings.", default=None)
parser.add_argument("--dataset", "-d", type=str, help="Directory location of the dataset images.", default=None)
parser.add_argument("--model", "-m", type=str, help="Type of encoding method, either \"hog\" or \"cnn\". HOG is "
"faster, CNN is more accurate.", required=False,
default="hog", choices=["cnn", "hog"])
"faster, CNN is more accurate.", required=False, default=None,
choices=["cnn", "hog"])
parser.add_argument("--overwrite", default=False, action="store_true",
help="Include this flag to overwrite the database, replacing its content.", required=False)
args = parser.parse_args()
register_user(user_id=args.user_id, dataset_dir=args.dataset, encoding_model=args.model,
database_loc=args.encodings, show_output=True)
args_dict = {}
if args.encodings is not None:
args_dict["database_location"] = args.encodings
if args.dataset is not None:
args_dict["directory_location"] = args.dataset
if args.model is not None:
args_dict["encoding_model"] = args.model
register_users_and_save(**args_dict, show_output=True, delete_images_on_complete=False,
overwrite_data=args.overwrite)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment