Skip to content
Snippets Groups Projects
Commit d0de0740 authored by Spencer Delcore's avatar Spencer Delcore
Browse files

Merge branch 'added-ap50-implementation' into 'master'

- Added code for ap50 calculation

See merge request !1
parents 0c48e25d 470e38ff
No related branches found
No related tags found
1 merge request!1- Added code for ap50 calculation
# Copyright 2021 - Valeo Comfort and Driving Assistance - Oriane Siméoni @ valeo.ai # Copyright 2021 - Valeo Comfort and Driving Assistance - Oriane Siméoni @ valeo.ai
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
# You may obtain a copy of the License at # You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, # distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os import os
import argparse import argparse
import random import random
import pickle import pickle
import torch import torch
import torch.nn as nn import torch.nn as nn
import numpy as np import numpy as np
from tqdm import tqdm from tqdm import tqdm
from PIL import Image from PIL import Image
from networks import get_model from networks import get_model
from datasets import ImageDataset, Dataset, bbox_iou from datasets import ImageDataset, Dataset, bbox_iou
from visualizations import visualize_fms, visualize_predictions, visualize_seed_expansion from visualizations import visualize_fms, visualize_predictions, visualize_seed_expansion
from object_discovery import lost, detect_box, dino_seg from object_discovery import lost, detect_box, dino_seg
if __name__ == "__main__": def voc_ap(rec, prec, use_07_metric=False):
parser = argparse.ArgumentParser("Unsupervised object discovery with LOST.") """
parser.add_argument( It's gotten from https://github.com/valeoai/LOST/blob/fcedbecb644f18358a660ce58c739cc6374feda8/tools/evaluate_unsupervised_detection_voc.py#L46
"--arch",
default="vit_small", Compute VOC AP given precision and recall. If use_07_metric is true, uses
type=str, the VOC 07 11-point method (default:False).
choices=[ """
"vit_tiny", if use_07_metric:
"vit_small", # 11 point metric
"vit_base", ap = 0.0
"resnet50", for t in np.arange(0.0, 1.1, 0.1):
"vgg16_imagenet", if np.sum(rec >= t) == 0:
"resnet50_imagenet", p = 0
], else:
help="Model architecture.", p = np.max(prec[rec >= t])
) ap = ap + p / 11.0
parser.add_argument( else:
"--patch_size", default=16, type=int, help="Patch resolution of the model." # correct AP calculation
) # first append sentinel values at the end
mrec = np.concatenate(([0.0], rec, [1.0]))
# Use a dataset mpre = np.concatenate(([0.0], prec, [0.0]))
parser.add_argument(
"--dataset", # compute the precision envelope
default="VOC07", for i in range(mpre.size - 1, 0, -1):
type=str, mpre[i - 1] = np.maximum(mpre[i - 1], mpre[i])
choices=[None, "VOC07", "VOC12", "COCO20k"],
help="Dataset name.", # to calculate area under PR curve, look for points
) # where X axis (recall) changes value
parser.add_argument( i = np.where(mrec[1:] != mrec[:-1])[0]
"--set",
default="train", # and sum (\Delta recall) * prec
type=str, ap = np.sum((mrec[i + 1] - mrec[i]) * mpre[i + 1])
choices=["val", "train", "trainval", "test"], return ap
help="Path of the image to load.",
) if __name__ == "__main__":
# Or use a single image parser = argparse.ArgumentParser("Unsupervised object discovery with LOST.")
parser.add_argument( parser.add_argument(
"--image_path", "--arch",
type=str, default="vit_small",
default=None, type=str,
help="If want to apply only on one image, give file path.", choices=[
) "vit_tiny",
"vit_small",
# Folder used to output visualizations and "vit_base",
parser.add_argument( "resnet50",
"--output_dir", type=str, default="outputs", help="Output directory to store predictions and visualizations." "vgg16_imagenet",
) "resnet50_imagenet",
],
# Evaluation setup help="Model architecture.",
parser.add_argument("--no_hard", action="store_true", help="Only used in the case of the VOC_all setup (see the paper).") )
parser.add_argument("--no_evaluation", action="store_true", help="Compute the evaluation.") parser.add_argument(
parser.add_argument("--save_predictions", default=True, type=bool, help="Save predicted bouding boxes.") "--patch_size", default=16, type=int, help="Patch resolution of the model."
parser.add_argument("--num_init_seeds", default=1, type=int, help="Number of initial seeds to expand from.") )
# Visualization # Use a dataset
parser.add_argument( parser.add_argument(
"--visualize", "--dataset",
type=str, default="VOC07",
choices=["fms", "seed_expansion", "pred", None], type=str,
default=None, choices=[None, "VOC07", "VOC12", "COCO20k"],
help="Select the different type of visualizations.", help="Dataset name.",
) )
parser.add_argument(
# For ResNet dilation "--set",
parser.add_argument("--resnet_dilate", type=int, default=2, help="Dilation level of the resnet model.") default="train",
type=str,
# LOST parameters choices=["val", "train", "trainval", "test"],
parser.add_argument( help="Path of the image to load.",
"--which_features", )
type=str, # Or use a single image
default="k", parser.add_argument(
choices=["k", "q", "v"], "--image_path",
help="Which features to use", type=str,
) default=None,
parser.add_argument( help="If want to apply only on one image, give file path.",
"--k_patches", )
type=int,
default=100, # Folder used to output visualizations and
help="Number of patches with the lowest degree considered." parser.add_argument(
) "--output_dir", type=str, default="outputs", help="Output directory to store predictions and visualizations."
)
# Use dino-seg proposed method
parser.add_argument("--dinoseg", action="store_true", help="Apply DINO-seg baseline.") # Evaluation setup
parser.add_argument("--dinoseg_head", type=int, default=4) parser.add_argument("--no_hard", action="store_true", help="Only used in the case of the VOC_all setup (see the paper).")
parser.add_argument("--no_evaluation", action="store_true", help="Compute the evaluation.")
args = parser.parse_args() parser.add_argument("--save_predictions", default=True, type=bool, help="Save predicted bouding boxes.")
parser.add_argument("--num_init_seeds", default=50, type=int, help="Number of initial seeds to expand from.")
if args.image_path is not None:
args.save_predictions = False # Visualization
args.no_evaluation = True parser.add_argument(
args.dataset = None "--visualize",
type=str,
# ------------------------------------------------------------------------------------------------------- choices=["fms", "seed_expansion", "pred", None],
# Dataset default=None,
help="Select the different type of visualizations.",
# If an image_path is given, apply the method only to the image )
if args.image_path is not None:
dataset = ImageDataset(args.image_path) # For ResNet dilation
else: parser.add_argument("--resnet_dilate", type=int, default=2, help="Dilation level of the resnet model.")
dataset = Dataset(args.dataset, args.set, args.no_hard)
# LOST parameters
# ------------------------------------------------------------------------------------------------------- parser.add_argument(
# Model "--which_features",
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") type=str,
print("Running on device:", device) default="k",
model = get_model(args.arch, args.patch_size, args.resnet_dilate, device) choices=["k", "q", "v"],
help="Which features to use",
# ------------------------------------------------------------------------------------------------------- )
# Directories parser.add_argument(
if args.image_path is None: "--k_patches",
args.output_dir = os.path.join(args.output_dir, dataset.name) type=int,
os.makedirs(args.output_dir, exist_ok=True) default=100,
help="Number of patches with the lowest degree considered."
# Naming )
if args.dinoseg:
# Experiment with the baseline DINO-seg # Use dino-seg proposed method
if "vit" not in args.arch: parser.add_argument("--dinoseg", action="store_true", help="Apply DINO-seg baseline.")
raise ValueError("DINO-seg can only be applied to tranformer networks.") parser.add_argument("--dinoseg_head", type=int, default=4)
exp_name = f"{args.arch}-{args.patch_size}_dinoseg-head{args.dinoseg_head}"
else: args = parser.parse_args()
# Experiment with LOST
exp_name = f"LOST-{args.arch}" if args.image_path is not None:
if "resnet" in args.arch: args.save_predictions = False
exp_name += f"dilate{args.resnet_dilate}" args.no_evaluation = False
elif "vit" in args.arch: args.dataset = None
exp_name += f"{args.patch_size}_{args.which_features}"
# -------------------------------------------------------------------------------------------------------
print(f"Running LOST on the dataset {dataset.name} (exp: {exp_name})") # Dataset
# Visualization # If an image_path is given, apply the method only to the image
if args.visualize: if args.image_path is not None:
vis_folder = f"{args.output_dir}/visualizations/{exp_name}" dataset = ImageDataset(args.image_path)
os.makedirs(vis_folder, exist_ok=True) else:
dataset = Dataset(args.dataset, args.set, args.no_hard)
# -------------------------------------------------------------------------------------------------------
# Loop over images # -------------------------------------------------------------------------------------------------------
preds_dict = {} # Model
gt_dict = {} device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
cnt = 0 print("Running on device:", device)
corloc = np.zeros(len(dataset.dataloader)) model = get_model(args.arch, args.patch_size, args.resnet_dilate, device)
pbar = tqdm(dataset.dataloader) # -------------------------------------------------------------------------------------------------------
for im_id, inp in enumerate(pbar): # Directories
torch.cuda.empty_cache() if args.image_path is None:
# ------------ IMAGE PROCESSING ------------------------------------------- args.output_dir = os.path.join(args.output_dir, dataset.name)
img = inp[0] os.makedirs(args.output_dir, exist_ok=True)
init_image_size = img.shape
# Naming
# Get the name of the image if args.dinoseg:
im_name = dataset.get_image_name(inp[1]) # Experiment with the baseline DINO-seg
if "vit" not in args.arch:
# Pass in case of no gt boxes in the image raise ValueError("DINO-seg can only be applied to tranformer networks.")
if im_name is None: exp_name = f"{args.arch}-{args.patch_size}_dinoseg-head{args.dinoseg_head}"
continue else:
# Experiment with LOST
# Padding the image with zeros to fit multiple of patch-size exp_name = f"LOST-{args.arch}"
size_im = ( if "resnet" in args.arch:
img.shape[0], exp_name += f"dilate{args.resnet_dilate}"
int(np.ceil(img.shape[1] / args.patch_size) * args.patch_size), elif "vit" in args.arch:
int(np.ceil(img.shape[2] / args.patch_size) * args.patch_size), exp_name += f"{args.patch_size}_{args.which_features}"
)
paded = torch.zeros(size_im) print(f"Running LOST on the dataset {dataset.name} (exp: {exp_name})")
paded[:, : img.shape[1], : img.shape[2]] = img
img = paded # Visualization
if args.visualize:
# Move to gpu vis_folder = f"{args.output_dir}/visualizations/{exp_name}"
if device == torch.device("cuda"): os.makedirs(vis_folder, exist_ok=True)
img = img.cuda(non_blocking=True)
# -------------------------------------------------------------------------------------------------------
# Size for transformers # Loop over images
w_featmap = img.shape[-2] // args.patch_size preds_dict = {}
h_featmap = img.shape[-1] // args.patch_size gt_dict = {}
cnt = 0
# ------------ GROUND-TRUTH ------------------------------------------- corloc = np.zeros(len(dataset.dataloader))
if not args.no_evaluation:
gt_bbxs, gt_cls = dataset.extract_gt(inp[1], im_name)
total_true_positives = []
if gt_bbxs is not None: total_false_positives = []
# Discard images with no gt annotations total_gt_boxes = 0;
# Happens only in the case of VOC07 and VOC12
if gt_bbxs.shape[0] == 0 and args.no_hard: pbar = tqdm(dataset.dataloader)
continue for im_id, inp in enumerate(pbar):
torch.cuda.empty_cache()
# ------------ EXTRACT FEATURES ------------------------------------------- # ------------ IMAGE PROCESSING -------------------------------------------
with torch.no_grad(): img = inp[0]
init_image_size = img.shape
# ------------ FORWARD PASS -------------------------------------------
if "vit" in args.arch: # Get the name of the image
# Store the outputs of qkv layer from the last attention layer im_name = dataset.get_image_name(inp[1])
feat_out = {}
def hook_fn_forward_qkv(module, input, output): # Pass in case of no gt boxes in the image
feat_out["qkv"] = output if im_name is None:
model._modules["blocks"][-1]._modules["attn"]._modules["qkv"].register_forward_hook(hook_fn_forward_qkv) continue
# Forward pass in the model # Padding the image with zeros to fit multiple of patch-size
attentions = model.get_last_selfattention(img[None, :, :, :]) size_im = (
img.shape[0],
# Scaling factor int(np.ceil(img.shape[1] / args.patch_size) * args.patch_size),
scales = [args.patch_size, args.patch_size] int(np.ceil(img.shape[2] / args.patch_size) * args.patch_size),
)
# Dimensions paded = torch.zeros(size_im)
nb_im = attentions.shape[0] # Batch size paded[:, : img.shape[1], : img.shape[2]] = img
nh = attentions.shape[1] # Number of heads img = paded
nb_tokens = attentions.shape[2] # Number of tokens
# Move to gpu
# Baseline: compute DINO segmentation technique proposed in the DINO paper if device == torch.device("cuda"):
# and select the biggest component img = img.cuda(non_blocking=True)
if args.dinoseg:
pred = dino_seg(attentions, (w_featmap, h_featmap), args.patch_size, head=args.dinoseg_head) # Size for transformers
pred = np.asarray(pred) w_featmap = img.shape[-2] // args.patch_size
else: h_featmap = img.shape[-1] // args.patch_size
# Extract the qkv features of the last attention layer
qkv = ( # ------------ GROUND-TRUTH -------------------------------------------
feat_out["qkv"] if not args.no_evaluation:
.reshape(nb_im, nb_tokens, 3, nh, -1 // nh) gt_bbxs, gt_cls = dataset.extract_gt(inp[1], im_name)
.permute(2, 0, 3, 1, 4)
)
q, k, v = qkv[0], qkv[1], qkv[2] if gt_bbxs is not None:
k = k.transpose(1, 2).reshape(nb_im, nb_tokens, -1) # Discard images with no gt annotations
q = q.transpose(1, 2).reshape(nb_im, nb_tokens, -1) # Happens only in the case of VOC07 and VOC12
v = v.transpose(1, 2).reshape(nb_im, nb_tokens, -1) if gt_bbxs.shape[0] == 0 and args.no_hard:
continue
# Modality selection
if args.which_features == "k": # ------------ EXTRACT FEATURES -------------------------------------------
feats = k[:, 1:, :] with torch.no_grad():
elif args.which_features == "q":
feats = q[:, 1:, :] # ------------ FORWARD PASS -------------------------------------------
elif args.which_features == "v": if "vit" in args.arch:
feats = v[:, 1:, :] # Store the outputs of qkv layer from the last attention layer
feat_out = {}
elif "resnet" in args.arch: def hook_fn_forward_qkv(module, input, output):
x = model.forward(img[None, :, :, :]) feat_out["qkv"] = output
d, w_featmap, h_featmap = x.shape[1:] model._modules["blocks"][-1]._modules["attn"]._modules["qkv"].register_forward_hook(hook_fn_forward_qkv)
feats = x.reshape((1, d, -1)).transpose(2, 1)
# Apply layernorm # Forward pass in the model
layernorm = nn.LayerNorm(feats.size()[1:]).to(device) attentions = model.get_last_selfattention(img[None, :, :, :])
feats = layernorm(feats)
# Scaling factor # Scaling factor
scales = [ scales = [args.patch_size, args.patch_size]
float(img.shape[1]) / x.shape[2],
float(img.shape[2]) / x.shape[3], # Dimensions
] nb_im = attentions.shape[0] # Batch size
elif "vgg16" in args.arch: nh = attentions.shape[1] # Number of heads
x = model.forward(img[None, :, :, :]) nb_tokens = attentions.shape[2] # Number of tokens
d, w_featmap, h_featmap = x.shape[1:]
feats = x.reshape((1, d, -1)).transpose(2, 1) # Baseline: compute DINO segmentation technique proposed in the DINO paper
# Apply layernorm # and select the biggest component
layernorm = nn.LayerNorm(feats.size()[1:]).to(device) if args.dinoseg:
feats = layernorm(feats) pred = dino_seg(attentions, (w_featmap, h_featmap), args.patch_size, head=args.dinoseg_head)
# Scaling factor pred = np.asarray(pred)
scales = [ else:
float(img.shape[1]) / x.shape[2], # Extract the qkv features of the last attention layer
float(img.shape[2]) / x.shape[3], qkv = (
] feat_out["qkv"]
else: .reshape(nb_im, nb_tokens, 3, nh, -1 // nh)
raise ValueError("Unknown model.") .permute(2, 0, 3, 1, 4)
)
# ------------ Apply LOST ------------------------------------------- q, k, v = qkv[0], qkv[1], qkv[2]
if not args.dinoseg: k = k.transpose(1, 2).reshape(nb_im, nb_tokens, -1)
preds, A, scores, seeds = lost( q = q.transpose(1, 2).reshape(nb_im, nb_tokens, -1)
feats, v = v.transpose(1, 2).reshape(nb_im, nb_tokens, -1)
[w_featmap, h_featmap],
scales, # Modality selection
init_image_size, if args.which_features == "k":
k_patches=args.k_patches, feats = k[:, 1:, :]
num_init_seeds=args.num_init_seeds elif args.which_features == "q":
) feats = q[:, 1:, :]
elif args.which_features == "v":
# ------------ Visualizations ------------------------------------------- feats = v[:, 1:, :]
if args.visualize == "fms":
for i, x in enumerate(zip(preds, seeds)): elif "resnet" in args.arch:
pred, seed = x x = model.forward(img[None, :, :, :])
visualize_fms(A.clone().cpu().numpy(), seed, scores, [w_featmap, h_featmap], scales, vis_folder, im_name+'_'+str(i)) d, w_featmap, h_featmap = x.shape[1:]
feats = x.reshape((1, d, -1)).transpose(2, 1)
elif args.visualize == "seed_expansion": # Apply layernorm
for i, x in enumerate(zip(preds, seeds)): layernorm = nn.LayerNorm(feats.size()[1:]).to(device)
pred, seed = x feats = layernorm(feats)
image = dataset.load_image(im_name) # Scaling factor
scales = [
# Before expansion float(img.shape[1]) / x.shape[2],
pred_seed, _ = detect_box( float(img.shape[2]) / x.shape[3],
A[seed, :], ]
seed, elif "vgg16" in args.arch:
[w_featmap, h_featmap], x = model.forward(img[None, :, :, :])
scales=scales, d, w_featmap, h_featmap = x.shape[1:]
initial_im_size=init_image_size[1:], feats = x.reshape((1, d, -1)).transpose(2, 1)
) # Apply layernorm
visualize_seed_expansion(image, pred, seed, pred_seed, scales, [w_featmap, h_featmap], vis_folder, im_name+'_'+str(i)) layernorm = nn.LayerNorm(feats.size()[1:]).to(device)
feats = layernorm(feats)
elif args.visualize == "pred": # Scaling factor
image = dataset.load_image(im_name) scales = [
for i, x in enumerate(zip(preds, seeds)): float(img.shape[1]) / x.shape[2],
pred, seed = x float(img.shape[2]) / x.shape[3],
image_name = None ]
if i == len(preds) -1: else:
image_name = im_name raise ValueError("Unknown model.")
visualize_predictions(image, pred, seed, scales, [w_featmap, h_featmap], vis_folder, image_name)
# ------------ Apply LOST -------------------------------------------
# Save the prediction if not args.dinoseg:
#preds_dict[im_name] = preds preds, A, scores, seeds = lost(
feats,
# Evaluation [w_featmap, h_featmap],
if args.no_evaluation: scales,
continue init_image_size,
k_patches=args.k_patches,
# Compare prediction to GT boxes num_init_seeds=args.num_init_seeds
for pred in preds: )
if len(preds) == 0:
continue # ------------ Visualizations -------------------------------------------
if args.visualize == "fms":
if len(gt_bbxs) == 0: for i, x in enumerate(zip(preds, seeds)):
break # TODO: should do something else, should skip iou but count towards FP if pred exists pred, seed = x
visualize_fms(A.clone().cpu().numpy(), seed, scores, [w_featmap, h_featmap], scales, vis_folder, im_name+'_'+str(i))
ious = bbox_iou(torch.from_numpy(pred), torch.from_numpy(np.asarray(gt_bbxs)))
elif args.visualize == "seed_expansion":
# TODO: This calculates the corloc for i, x in enumerate(zip(preds, seeds)):
# we need to calculate the AP50 pred, seed = x
if torch.any(ious >= 0.50): image = dataset.load_image(im_name)
#corloc[im_id] = 1
corloc[im_id] = 0 # Before expansion
for i in ious: pred_seed, _ = detect_box(
if i >= 0.50: A[seed, :],
corloc[im_id] += 1 seed,
[w_featmap, h_featmap],
cnt += len(gt_bbxs) scales=scales,
initial_im_size=init_image_size[1:],
if cnt % 50 == 0: )
pbar.set_description(f"Found {int(np.sum(corloc))}/{cnt}") visualize_seed_expansion(image, pred, seed, pred_seed, scales, [w_featmap, h_featmap], vis_folder, im_name+'_'+str(i))
elif args.visualize == "pred":
# Save predicted bounding boxes image = dataset.load_image(im_name)
if args.save_predictions: for i, x in enumerate(zip(preds, seeds)):
folder = f"{args.output_dir}/{exp_name}" pred, seed = x
os.makedirs(folder, exist_ok=True) image_name = None
filename = os.path.join(folder, "preds.pkl") if i == len(preds) -1:
with open(filename, "wb") as f: image_name = im_name
pickle.dump(preds_dict, f) visualize_predictions(image, pred, seed, scales, [w_featmap, h_featmap], vis_folder, image_name)
print("Predictions saved at %s" % filename)
# Save the prediction
# Evaluate #preds_dict[im_name] = preds
if not args.no_evaluation:
print(f"corloc: {100*np.sum(corloc)/cnt:.2f} ({int(np.sum(corloc))}/{cnt})") # Evaluation
result_file = os.path.join(folder, 'results.txt') if args.no_evaluation:
with open(result_file, 'w') as f: continue
f.write('corloc,%.1f,,\n'%(100*np.sum(corloc)/cnt))
print('File saved at %s'%result_file) nd = len(gt_bbxs)
total_gt_boxes += nd
tp = [0] * nd
fp = [0] * nd
for idx, gt in enumerate(gt_bbxs):
for idy, pred in enumerate(preds):
iou = bbox_iou(torch.from_numpy(pred), torch.from_numpy(gt))
if iou >= 0.50:
tp[idx] = 1
break
else:
fp[idx] = 1
total_true_positives.extend(tp)
total_false_positives.extend(fp)
# compute precision recall
total_false_positives = np.cumsum(total_false_positives)
total_true_positives = np.cumsum(total_true_positives)
rec = total_true_positives / float(total_gt_boxes)
# avoid divide by zero in case the first detection matches a difficult
# ground truth
prec = total_true_positives / np.maximum(total_true_positives + total_false_positives, np.finfo(np.float64).eps)
ap = voc_ap(rec, prec, use_07_metric=True)
print("AP: %f" % ap)
# Save predicted bounding boxes
if args.save_predictions:
folder = f"{args.output_dir}/{exp_name}"
os.makedirs(folder, exist_ok=True)
filename = os.path.join(folder, "preds.pkl")
with open(filename, "wb") as f:
pickle.dump(preds_dict, f)
print("Predictions saved at %s" % filename)
# Evaluate
if not args.no_evaluation:
print(f"corloc: {100*np.sum(corloc)/cnt:.2f} ({int(np.sum(corloc))}/{cnt})")
result_file = os.path.join(folder, 'results.txt')
with open(result_file, 'w') as f:
f.write('corloc,%.1f,,\n'%(100*np.sum(corloc)/cnt))
print('File saved at %s'%result_file)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment