Commit e16a3ae8 authored by Kyle Anderson's avatar Kyle Anderson

Merge branch 'interface' into 'master'

Add Interface for React App

See merge request !3
parents 40c01470 9a8ba48e
dataset
\ No newline at end of file
dataset
# Ignore database files
*.pickle
# Created by https://www.gitignore.io/api/python,pycharm
# Edit at https://www.gitignore.io/?templates=python,pycharm
### PyCharm ###
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and WebStorm
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/usage.statistics.xml
.idea/**/dictionaries
.idea/**/shelf
# Generated files
.idea/**/contentModel.xml
# Sensitive or high-churn files
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
.idea/**/dbnavigator.xml
# Gradle
.idea/**/gradle.xml
.idea/**/libraries
# Gradle and Maven with auto-import
# When using Gradle or Maven with auto-import, you should exclude module files,
# since they will be recreated, and may cause churn. Uncomment if using
# auto-import.
# .idea/modules.xml
# .idea/*.iml
# .idea/modules
# *.iml
# *.ipr
# CMake
cmake-build-*/
# Mongo Explorer plugin
.idea/**/mongoSettings.xml
# File-based project format
*.iws
# IntelliJ
out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Cursive Clojure plugin
.idea/replstate.xml
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
# Editor-based Rest Client
.idea/httpRequests
# Android studio 3.1+ serialized cache file
.idea/caches/build_file_checksums.ser
### PyCharm Patch ###
# Comment Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-215987721
# *.iml
# modules.xml
# .idea/misc.xml
# *.ipr
# Sonarlint plugin
.idea/**/sonarlint/
# SonarQube Plugin
.idea/**/sonarIssues.xml
# Markdown Navigator plugin
.idea/**/markdown-navigator.xml
.idea/**/markdown-navigator/
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# Mr Developer
.mr.developer.cfg
.project
.pydevproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# End of https://www.gitignore.io/api/python,pycharm
\ No newline at end of file
# Default ignored files
/workspace.xml
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="TestRunnerService">
<option name="PROJECT_TEST_RUNNER" value="Unittests" />
</component>
</module>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.7" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/FLEX vision repo.iml" filepath="$PROJECT_DIR$/.idea/FLEX vision repo.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
# FLEX Vision
The vision portion of the FLEX team project, responsible for detecting and authenticating users' faces.
\ No newline at end of file
The vision portion of the FLEX team project, responsible for detecting and authenticating users' faces.
# Installation
1. Install OpenCV
1. Run `pip install -U scikit-learn`
# Installing OpenCV on the Raspberry PI
I followed [these instructions](https://www.pyimagesearch.com/2018/09/26/install-opencv-4-on-your-raspberry-pi/) with modifications in the following places:
- In Step #4, replace `sudo pip install virtualenv virtualenvwrapper` with `sudo pip3 install virtualenv virtualenvwrapper` to install it on python 3.
\ No newline at end of file
......@@ -2,54 +2,42 @@
Methods for authenticating a user.
"""
from imutils.video import VideoStream
import face_recognition
import imutils
import pickle
import time
import cv2
import face_recognition
import imutils
import common
import data_handler
# How long to wait before timing out and saying failed authentication.
TIMEOUT: float = 30.0
# Minimum number of frames in which a user must be recognized in order to be authenticated.
MIN_USER_RECOGNITION_COUNT = 10
image_writer = None
USER_IDS_KEY: str = "names" # TODO change
USER_IDS_KEY: str = "user_ids"
def load_encodings(file_location: str):
"""Loads the encodings for faces from the given file location."""
with open(file_location, "rb") as encodings_file:
encodings = pickle.loads(encodings_file.read())
return encodings
def start_video_stream(camera: int):
"""Starts the video stream and returns the created stream.
Also waits for the video stream to open before returning it."""
video_stream = VideoStream(src=camera).start()
time.sleep(2.0)
return video_stream
return data_handler.load_database(file_location)
def determine_identity(face_encoding, known_faces):
"""Determines the most likely identity of a single face. Returns the user id."""
matches = face_recognition.compare_faces(
known_faces["encodings"], face_encoding)
matched_user = ''
matched_user_id_count = {}
# If there is at least one match to a face in the database, figure out which one it is.
if True in matches:
matched_users = [user_index for (
user_index, is_match) in enumerate(matches) if is_match]
for i in matched_users:
user_id: str = known_faces[USER_IDS_KEY][i]
matched_user_id_count[user_id] = matched_user_id_count.get(user_id, 0) + 1
matched_user: str = max(matched_user_id_count,
key=matched_user_id_count.get)
"""
"Determines the most likely identity of a single face. Returns the user id.
:param face_encoding: The encoding which needs identification.
:param known_faces: The database of known faces to use for searching.
:return: The string user_id of the recognized user.
"""
recognized_users = {}
for (user_id, user_encodings) in known_faces.items():
matches = face_recognition.compare_faces(user_encodings, face_encoding)
# Count the number of occurrences of true.
recognized_users[user_id] = matches.count(True)
matched_user: str = max(recognized_users,
key=recognized_users.get)
return matched_user
......@@ -63,10 +51,10 @@ def check_recognized_users(recognized_user_counts):
return recognized_users
def draw_rectanges_and_user_ids(image_frame, conversion: float, boxes, user_ids: list):
def draw_rectangles_and_user_ids(image_frame, conversion: float, box_user_id_map: dict):
"""Draws the rectangles and user_ids onto the video stream so anyone viewing the stream could see them."""
if boxes and user_ids and len(user_ids) > 0:
for ((top, right, bottom, left), user_id) in zip(boxes, user_ids):
if box_user_id_map and len(box_user_id_map) > 0:
for ((top, right, bottom, left), user_id) in box_user_id_map.items():
top = round(top * conversion)
right = round(right * conversion)
bottom = round(bottom * conversion)
......@@ -77,30 +65,22 @@ def draw_rectanges_and_user_ids(image_frame, conversion: float, boxes, user_ids:
# Find the top so we can put the text there.
y = top - 15 if top - 15 > 15 else top + 15
cv2.putText(image_frame, user_id, (left, y), cv2.FONT_HERSHEY_PLAIN, 0.75, (0, 255, 0), 2)
display_frame(image_frame)
common.display_frame(image_frame)
def display_frame(frame):
"""Displays the frame to the user."""
cv2.imshow("Frame", frame)
def recognize_user(encodings_location: str = "./encodings.pickle", encoding_model: str = "hog", image_flip: int = None,
def recognize_user(known_faces: dict, encoding_model: str = "hog", image_flip: int = None,
draw_rectangles=False):
"""Attempts to recognize a user.
Returns the ID of the user if identified, or None if no users are identified.
Dictionary of the form { "user_id": #frames_recognized } to keep
track of how many times each user was recognized."""
Returns the ID of the user if identified, or None if no users are identified."""
recognized_users_count = {}
recognized_user = None
video_stream = start_video_stream(0)
known_faces = load_encodings(encodings_location)
video_stream = common.start_video_stream(0)
# Determine the time at which we will time out. Equal to current time + timeout.
timeout_time: float = time.time() + TIMEOUT
while time.time() < timeout_time:
# Read a image_frame from the video stream.
image_frame = video_stream.read()
ret, image_frame = video_stream.read()
if image_flip is not None:
image_frame = cv2.flip(image_frame, image_flip)
......@@ -113,25 +93,27 @@ def recognize_user(encodings_location: str = "./encodings.pickle", encoding_mode
# Detect the location of each face and determine the boxes in which they lie
boxes = face_recognition.face_locations(
rgb_image, model=encoding_model)
# Computer the facial embeddings (the encoding) at
# Compute the facial embeddings (the encoding) at
# each of the locations found in the previous line.
encodings = face_recognition.face_encodings(rgb_image, boxes)
for encoding in encodings:
box_user_id_mapping = {}
for (i, encoding) in enumerate(encodings):
user_id: str = determine_identity(encoding, known_faces)
if user_id:
if user_id not in recognized_users_count:
recognized_users_count[user_id] = 0
recognized_users_count[user_id] += 1
box_user_id_mapping[boxes[i]] = user_id
if draw_rectangles:
draw_rectanges_and_user_ids(image_frame, r, boxes, list(known_faces[USER_IDS_KEY]))
draw_rectangles_and_user_ids(image_frame, r, box_user_id_mapping)
# Now check if we have already positively identified a user enough times
recognized_users = check_recognized_users(recognized_users_count)
if len(recognized_users) > 0:
break
cv2.waitKey(1) # Required or else video stream doesn't really render.
cv2.waitKey(20) # Required or else video stream doesn't really render.
if recognized_users_count:
recognized_user = max(recognized_users_count,
......@@ -141,23 +123,46 @@ def recognize_user(encodings_location: str = "./encodings.pickle", encoding_mode
return recognized_user
def recognize_user_from_database(database_loc: str = common.DATABASE_LOC, encoding_model: str = "hog",
image_flip: int = None, draw_rectangles: bool = False):
"""
Recognizes a user
:param database_loc: The database containing the face encodings for users.
:param encoding_model: The encoding model to be used for recognition.
:param image_flip: The type of image flip to be applied to the image, if it will be upside-down or horizontally inverted.
:param draw_rectangles: True to draw the rectangles to the screen, false otherwise.
:return: The recognized user's id, or None if no user was recognized.
"""
return recognize_user(data_handler.load_database(database_loc), encoding_model=encoding_model,
image_flip=image_flip,
draw_rectangles=draw_rectangles)
# If this program is the main program, authenticate the user.
if __name__ == "__main__":
import argparse;
import argparse
parser = argparse.ArgumentParser(description="Facial Identification Options")
parser.add_argument("--encodings", "-e", type=str, help="File location of facial encodings.", required=False,
default="./encodings.pickle")
default=None)
parser.add_argument("--model", "-m", type=str, help="Type of encoding method, either \"hog\" or \"cnn\". HOG is "
"faster, CNN is more accurate.", required=False,
default="hog", choices=["cnn", "hog"])
default=None, choices=["cnn", "hog"])
parser.add_argument("--flip", "-f", type=int,
help="Whether or not to flip the image vertically or horizontally. 0 to flip horizontally, 1 to flip vertically.",
help="Whether or not to flip the image vertically or horizontally. 0 to flip horizontally, "
"1 to flip vertically.",
required=False, default=None, choices=[0, 1])
parser.add_argument("--show", "-s", action="store_true",
help="Include this argument to have the image shown to you.", default=False)
args = parser.parse_args()
user = recognize_user(encoding_model=args.model, encodings_location=args.encodings, image_flip=args.flip,
draw_rectangles=args.show)
args_dict = {}
if args.encodings is not None:
args_dict["encodings_location"] = args.encodings
if args.model is not None:
args_dict["encoding_model"] = args.model
user = recognize_user_from_database(**args_dict, image_flip=args.flip,
draw_rectangles=args.show)
if user:
print(f"Recognized user {user}.")
import os
import cv2
DATA_DIR = "data"
DATASET_DIR = "dataset"
DATABASE_LOC = os.path.join(DATA_DIR, "database.pickle")
RES_DIRECTORY = "./res"
# Directory for the face detection model.
FACE_DETECTION_MODEL_DIR = os.path.join(RES_DIRECTORY, "face_detection_model")
EMBEDDINGS_PROCESSOR_LOC = os.path.join(RES_DIRECTORY, "openface_nn4.small2.v1.t7")
def display_frame(frame):
"""Displays the frame to the user."""
cv2.imshow("Frame", frame)
def start_video_stream(camera: int):
"""Starts the video stream and returns the created stream.
Also waits for the video stream to open before returning it."""
video_stream = cv2.VideoCapture(0)
return video_stream
def load_cascade(cascade_loc: str) -> cv2.CascadeClassifier:
"""
Opens the cascade classifier at the given path.
:param cascade_loc: The file location of the cascade.
:return:The CascadeClassifier class.
"""
return cv2.CascadeClassifier(cascade_loc)
def load_detector(proto_path: str, model_path: str):
"""
Loads the caffe detector with the given proto text file and the model file.
:param proto_path: The path location of the prototext file.
:param model_path: The path to the caffe model.
:return: The detector.
"""
return cv2.dnn.readNetFromCaffe(proto_path, model_path)
def load_embedding_model(model_path: str):
"""
Loads the torch embedding model at the given location.
:param model_path: The path to the model.
:return: The embedding model
"""
return cv2.dnn.readNetFromTorch(model_path)
CAFFE_MODEL_NAME = "res10_300x300_ssd_iter_140000.caffemodel"
PROTOTXT_NAME = "deploy.prototxt"
def load_detector_from_dir(detector_dir: str):
prototxt: str = os.path.join(detector_dir, PROTOTXT_NAME)
caffe_model: str = os.path.join(detector_dir, CAFFE_MODEL_NAME)
return load_detector(prototxt, caffe_model)
"""
Creates a facial recognition profile for a new user.
"""
"""
General IO for pickle database operations.
"""
import pickle
def get_user_ids_in_database(database: dict) -> list:
"""
Gets all the user_ids in the given database.
:param database: The database to look through.
:return: All the user_ids in the database.
"""
return list(database.keys())
def get_encodings_in_database(database: dict):
"""
Gets a list of all encodings in the given database.
:param database: The database dictionary.
:return: All the encodings
"""
result = []
for encodings in database.values():
result.extend(encodings)
return result
def load_database(file_location: str):
"""
Attempts to load the pickle database at the given file location
:param file_location: String location of file to be loaded.
:return: The loaded pickle database.
"""
file_content = {}
try:
with open(file_location, "rb") as database:
file_content = pickle.load(database)
except (FileNotFoundError, EOFError):
file_content = {}
return file_content
def write_database(output_file: str, database_content) -> None:
"""
Writes the dictionary database to the given file location
:param output_file: The location of the file to be outputted on.
:param database_content: The database content to be written to the file.
:return: None
"""
if output_file and database_content and database_content is not None:
with open(output_file, "wb") as output:
pickle.dump(database_content, output)
"""
API Interface to the modules in this project. To be used by the UI application.
"""
import os
import authenticate_user as authenticate
import common
import register_user as register
def authenticate_user(base_dir: str = os.getcwd()) -> str:
"""
Interface for authenticating a user with the given base directory location.
:param base_dir: The base directory, from which all default paths are relative. Defaults to current working directory.
:return: The user_id of the recognized user, or empty string if none was recognized.
"""
recognized = authenticate.recognize_user_from_database(database_loc=os.path.join(base_dir, common.DATA_DIR))
return recognized if recognized is not None else ""
def register_user(base_dir: str = os.getcwd()) -> None:
"""
Registers new added users in the database.
:param base_dir: The base directory, from which all default paths are relative. Defaults to current working directory.
:return: None
"""
register.register_users_and_save(directory_location=os.path.join(base_dir, common.DATASET_DIR),
delete_images_on_complete=True, overwrite_data=False)
"""
Creates a facial recognition profile for a new user.
"""
import os
import cv2
import face_recognition
from imutils import paths as impaths
import common
import data_handler
def process_image(image, encoding_model: str = "hog"):
"""
Processes a single image, returning the encoded face object
:param image: The image containing the face to be processed.
:param encoding_model: The encoding model, either CNN or HOG
:return: The processed facial recognition profile encoding.
"""
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # Convert between the image formats
# Detect the coordinates of the boxes corresponding to faces in the input image
boxes = face_recognition.face_locations(image_rgb, model=encoding_model)
# Actually make the encodings for the face.
# Only want the first recognized face
return face_recognition.face_encodings(image_rgb, [boxes[0]]) if boxes and len(boxes) > 0 else []
def delete_file(file_path: str) -> None:
"""
Deletes the file at the given location.
:param file_path: The path to the file.
:return: None
"""
os.unlink(file_path)
def register_user(user_id: str, dataset_dir: str, encoding_model="hog",
show_output: bool = False, delete_on_processed: bool = False):
"""
Function for registering a new user using the given video source. If video source isn't provided, then the camera
on id 0 is used.
:param user_id: The user id for the user that is being registered.
:param dataset_dir: The directory location of pictures for the user.
:param encoding_model: The type of encoding model. Must be either "hog" or "cnn". HOG is faster, CNN is more thorough.
:param show_output: True to print console output for progress, false otherwise.
:param delete_on_processed: True to delete the image file after processing it, false otherwise.
:return: Encoded face that was detected, or None if no face was detected or if there was another error.
"""
processed_images = []
for (i, filename) in enumerate(impaths.list_images(dataset_dir)):
# Might want to check file validity here at some point, but won't for now.
image = cv2.imread(filename)
if image is not None:
if show_output:
print(f"Processing image {i + 1} for user {user_id}")
processed = process_image(image, encoding_model=encoding_model)
if processed:
processed_images.extend(processed)
# Delete after we're done if we're supposed to.
if delete_on_processed:
delete_file(filename)
return {user_id: processed_images} if len(processed_images) > 0 else None
def register_users_in_dir(directory_location: str, encoding_model: str = "hog", delete_images_on_complete: bool = False,
show_output: bool = False):
"""
Registers all the users in a directory.
:param directory_location:
:param encoding_model: The type of encoding model to use.
:param delete_images_on_complete: True to delete the images after processing them, false otherwise.
:param show_output: True to print progress output, false otherwise.
:return: The dictionary of registered users in the given directory.
"""
total_dict = {}
for directory in next(os.walk(directory_location))[1]:
total_directory = os.path.join(directory_location, directory)
# Using the directory name as the user_id as well.
user_dict = register_user(directory, total_directory, encoding_model=encoding_model, show_output=show_output,
delete_on_processed=delete_images_on_complete)
if user_dict is not None:
total_dict.update(user_dict)
return total_dict if len(total_dict) > 0 else None
def register_users_and_save(directory_location: str = common.DATASET_DIR,
database_location: str = common.DATABASE_LOC, encoding_model="hog",
delete_images_on_complete: bool = True, show_output: bool = False,
overwrite_data: bool = False):
processed_users = register_users_in_dir(directory_location, encoding_model=encoding_model,
delete_images_on_complete=delete_images_on_complete,
show_output=show_output)
database = data_handler.load_database(database_location) if not overwrite_data else {}
if processed_users is not None: