RetinaNet Object Detection in Python

I am implementing RetinaNet for object detection in this tutorial. RetinaNet is a single stage object detection model that uses Feature Pyramid Networks (FPN) and Focal Loss (FL) to improve its performance in relation to older R-CNN models. RetinaNet works well with dense and small objects.

Object detection is a computer vision and image processing technique that is used to locate instances of objects of a certain class (car, human or cat for example) in images and videos. Object detection involves classification and regression, the model needs to predict the class of objects and the bounding boxes surrounding them.

RetinaNet is a Convolutional Neural Network (CNN) with Feature Pyramid Networks (FPN) and Focal Loss. CNN:s is designed to perform well with images, the arcitecture is inspired by the organization of the visual cortex in the human brain. A CNN reduces images into a form which is easier to process, without sacrificing predictive ability.

A Feature Pyramid Network (FPN) is a feature extractor that generates multiple feature map layers of an image in a pyramidal hierarchical structure. Focal Loss (FL) is an enhancement over Cross-Entropy Loss (CE), it puts more weight to objects that is hard to classify and less weight to objects that is easy to classify. FL focuses on the hard examples.

This code in this tutorial is written in Python and adapted from Keras RetinaNet by Fizyr. I have downloaded a sample of images from Open Images V5 (you can use the latest version) and Keras Functional API is used to build models.

Download Images

You will need to download three .csv files to be able to download a sample of images from the open images project. Browse to Open Images Dataset and download class-descriptions-boxable.csv and train-annotations-bbox.csv, download train-images-boxable.csv from Figure Eight. I choosed to use only three categories (beer, laptop and goat) and 200 images for each class, 80 % is used for training and 20 % for testing/validation.

# Import libraries
import numpy as np
import random
import pandas as pd
from skimage import io

# The main entry point for this module
def main():

    # Get classes from file
    classes = pd.read_csv('C:\\DATA\\Python-data\\open-images-v5\\class-descriptions-boxable.csv', names=['label', 'name'])

    # Get labels for Beer, Laptop and Goat
    lbl_beer = classes.loc[classes['name']=='Beer', 'label'].iloc[0]
    lbl_laptop = classes.loc[classes['name']=='Laptop', 'label'].iloc[0]
    lbl_goat = classes.loc[classes['name']=='Goat', 'label'].iloc[0]

    # Print labels
    print('Beer: {0}, Laptop: {1}, Goat: {2}'.format(lbl_beer, lbl_laptop, lbl_goat))

    # Load dataset of annotations
    annotations = pd.read_csv('C:\\DATA\\Python-data\\open-images-v5\\train-annotations-bbox.csv')

    # Get objects in annotations file
    beer_bbox = annotations[annotations['LabelName']==lbl_beer]
    laptop_bbox = annotations[annotations['LabelName']==lbl_laptop]
    goat_bbox = annotations[annotations['LabelName']==lbl_goat]

    # Print counts
    print('There is {0} beer in the dataset.'.format(len(beer_bbox)))
    print('There is {0} laptops in the dataset.'.format(len(laptop_bbox)))
    print('There is {0} goats in the dataset.'.format(len(goat_bbox)))

    # Get images
    beer_images = np.unique(beer_bbox['ImageID'])
    laptop_images = np.unique(laptop_bbox['ImageID'])
    goat_images = np.unique(goat_bbox['ImageID'])

    # Print count of images
    print('There are {0} images which contain beer'.format(len(beer_images)))
    print('There are {0} images which contain laptops'.format(len(laptop_images)))
    print('There are {0} images which contain goats'.format(len(goat_images)))

    # Variables
    n = 200
    train_size = int(n*0.8)
    base_path = 'C:\\DATA\\Python-data\\open-images-v5\\imgs\\'

    # Pick n of each object
    beers_subset = beer_images[:n]
    laptops_subset = laptop_images[:n]
    goats_subset = goat_images[:n]

    # Load dataset of images
    images = pd.read_csv('C:\\DATA\\Python-data\\open-images-v5\\train-images-boxable.csv')

    # Get 3 balanced subsets of images
    beer_imgs_subset = [images[images['image_name']==name+'.jpg'] for name in beers_subset]
    laptop_imgs_subset = [images[images['image_name']==name+'.jpg'] for name in laptops_subset]
    goat_imgs_subset = [images[images['image_name']==name+'.jpg'] for name in goats_subset]

    # Download images
    for i in range(n):
        beer_img = io.imread(beer_imgs_subset[i]['image_url'].values[0])
        laptop_img = io.imread(laptop_imgs_subset[i]['image_url'].values[0])
        goat_img = io.imread(goat_imgs_subset[i]['image_url'].values[0])
        if(i < train_size):
            io.imsave(base_path + 'train\\' + beer_imgs_subset[i]['image_name'].values[0], beer_img)
            io.imsave(base_path + 'train\\' + laptop_imgs_subset[i]['image_name'].values[0], laptop_img)
            io.imsave(base_path + 'train\\' + goat_imgs_subset[i]['image_name'].values[0], goat_img)
        else:
            io.imsave(base_path + 'test\\' + beer_imgs_subset[i]['image_name'].values[0], beer_img)
            io.imsave(base_path + 'test\\' + laptop_imgs_subset[i]['image_name'].values[0], laptop_img)
            io.imsave(base_path + 'test\\' + goat_imgs_subset[i]['image_name'].values[0], goat_img)

# Tell python to run main method
if __name__ == "__main__": main()

Create Annotations

I have downloaded images and now it is time to create annotation files, one for training and one for testing. Annotation files tells us about the location of objects in images. These files show filename, bounding box and class name for each file and object. One image can include many objects.

# Import libraries
import os
import cv2
import pandas as pd

# Create an annotations file
def create_annotation_file(base_path, annotations, classes, labels, mode='train'):

    # Create a dataframe
    df = pd.DataFrame(columns=['filename', 'xmin', 'ymin', 'xmax', 'ymax', 'class'])

    # Get all images
    images = os.listdir(base_path + mode)

    # Loop images
    for i in range(len(images)):
        img_name = images[i]
        img_id = img_name.split('.')[0]
        tmp_df = annotations[annotations['ImageID']==img_id]
        img = cv2.imread((base_path + mode + '\\' + img_name))
        height, width = img.shape[:2]
        for index, row in tmp_df.iterrows():
            label_name = row['LabelName']
            for i in range(len(labels)):
                if label_name == labels[i]:
                    df = df.append({
                        'filename': mode + '\\' + img_name, 
                        'xmin': int(row['XMin'] * width),
                        'ymin': int(row['YMin'] * height),
                        'xmax': int(row['XMax'] * width), 
                        'ymax': int(row['YMax'] * height), 
                        'class': classes[i]}, 
                        ignore_index=True)

    # Save annotations file
    df.to_csv(base_path + mode + '_annotations.csv', index=None, header=False)

# The main entry point for this module
def main():

    # Variables
    base_path = 'C:\\DATA\\Python-data\\open-images-v5\\imgs\\'
    classes = ['Beer', 'Laptop', 'Goat']
    labels = ['/m/01599', '/m/01c648', '/m/03fwl']

    # Load dataset of annotations
    annotations = pd.read_csv('C:\\DATA\\Python-data\\open-images-v5\\train-annotations-bbox.csv')

    # Create annotation files for train and test
    create_annotation_file(base_path, annotations, classes, labels, mode='train')
    create_annotation_file(base_path, annotations, classes, labels, mode='test')

# Tell python to run main method
if __name__ == "__main__": main()

Classes

I have created a .csv file (classes.csv) with information about the classes that is used in this tutorial. This file is read by the generator.

Laptop,0
Beer,1
Goat,2

Common Methods

This module include common methods that is used by many other modules. The methods in this module is used to setup the gpu, to process images and to make transformations.

"""
Copyright 2017-2018 Fizyr (https://fizyr.com)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

# Import libraries
from __future__ import division
import cv2
import warnings
import keras
import numpy as np
import tensorflow as tf
from PIL import Image
from annytab.retinanet.anchors import AnchorParameters

# The pseudo-random number generator to use.
DEFAULT_PRNG = np.random

# Anchor parameters
anchor_parameters = AnchorParameters(
    sizes   = [32, 64, 128, 256, 512],
    strides = [8, 16, 32, 64, 128],
    ratios  = np.array([0.125, 0.25, 0.5, 1, 2], keras.backend.floatx()),
    scales  = np.array([2 ** 0, 2 ** (1.0 / 3.0), 2 ** (2.0 / 3.0)], keras.backend.floatx()))

# Colors
colors = [
    [31,0,255],[0,159,255],[255,95,0],[255,19,0],[255,0,0],[255,38,0],[0,255,25],[255,0,133],[255,172,0],[108,0,255],[0,82,255],
    [0,255,6],[255,0,152],[223,0,255],[12,0,255],[0,255,178],[108,255,0],[184,0,255],[255,0,76],[146,255,0],[51,0,255],[0,197,255],
    [255,248,0],[255,0,19],[255,0,38],[89,255,0],[127,255,0],[255,153,0],[0,255,255],[0,255,216],[0,255,121],[255,0,248],[70,0,255],
    [0,255,159],[0,216,255],[0,6,255],[0,63,255],[31,255,0],[255,57,0],[255,0,210],[0,255,102],[242,255,0],[255,191,0],[0,255,63],
    [255,0,95],[146,0,255],[184,255,0],[255,114,0],[0,255,235],[255,229,0],[0,178,255],[255,0,114],[255,0,57],[0,140,255],[0,121,255],
    [12,255,0],[255,210,0],[0,255,44],[165,255,0],[0,25,255],[0,255,140],[0,101,255],[0,255,82],[223,255,0],[242,0,255],[89,0,255],
    [165,0,255],[70,255,0],[255,0,172],[255,76,0],[203,255,0],[204,0,255],[255,0,229],[255,133,0],[127,0,255],[0,235,255],[0,255,197],
    [255,0,191],[0,44,255],[50,255,0]
]

# Minimum tensorflow version
MINIMUM_TF_VERSION = 1, 12, 0

# Restrict TensorFlow to only use the one GPU
def setup_gpu(gpu_id):
    if tf_version_ok((2, 0, 0)):
        if gpu_id == 'cpu' or gpu_id == -1:
            tf.config.experimental.set_visible_devices([], 'GPU')
            return

        # Get all gpus
        gpus = tf.config.experimental.list_physical_devices('GPU')
        if gpus:
            # Restrict TensorFlow to only use the first GPU.
            try:
                # Currently, memory growth needs to be the same across GPUs.
                for gpu in gpus:
                    tf.config.experimental.set_memory_growth(gpu, True)

                # Use only the selcted gpu.
                tf.config.experimental.set_visible_devices(gpus[gpu_id], 'GPU')
            except RuntimeError as e:
                # Visible devices must be set before GPUs have been initialized.
                print(e)

            logical_gpus = tf.config.experimental.list_logical_devices('GPU')
            print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    else:
        import os
        if gpu_id == 'cpu' or gpu_id == -1:
            os.environ['CUDA_VISIBLE_DEVICES'] = ""
            return

        os.environ['CUDA_VISIBLE_DEVICES'] = str(gpu_id)
        config = tf.ConfigProto()
        config.gpu_options.per_process_gpu_memory_fraction = 0.5
        config.gpu_options.allow_growth = True
        tf.keras.backend.set_session(tf.Session(config=config))

# Get the Tensorflow version
def tf_version():
    return tuple(map(int, tf.version.VERSION.split('-')[0].split('.')))

# Check if the current Tensorflow version is higher than the minimum version
def tf_version_ok(minimum_tf_version=MINIMUM_TF_VERSION):
    return tf_version() >= minimum_tf_version

# Return a color from a set of predefined colors. Contains 80 colors in total
def label_color(label):
    if label < len(colors):
        return colors[label]
    else:
        warnings.warn('Label {} has no color, returning default.'.format(label))
        return (0, 255, 0)

# Draws a box on an image with a given color
def draw_box(image, box, color, thickness=2):
    b = np.array(box).astype(int)
    cv2.rectangle(image, (b[0], b[1]), (b[2], b[3]), color, thickness, cv2.LINE_AA)

# Draws a caption above the box in an image
def draw_caption(image, box, caption):
    b = np.array(box).astype(int)
    cv2.putText(image, caption, (b[0] + 2, b[1] + 22), cv2.FONT_HERSHEY_DUPLEX, 0.8, (0, 0, 0), 2)
    cv2.putText(image, caption, (b[0] + 4, b[1] + 24), cv2.FONT_HERSHEY_DUPLEX, 0.8, (255, 255, 255), 1)

# Draws boxes on an image with a given color
def draw_boxes(image, boxes, color, thickness=2):
    for b in boxes:
        draw_box(image, b, color, thickness=thickness)

# Draws detections in an image
def draw_detections(image, boxes, scores, labels, color=None, label_to_name=None, score_threshold=0.5):
    selection = np.where(scores > score_threshold)[0]

    for i in selection:
        c = color if color is not None else label_color(labels[i])
        draw_box(image, boxes[i, :], color=c)

        # draw labels
        caption = (label_to_name(labels[i]) if label_to_name else labels[i]) + ': {0:.2f}'.format(scores[i])
        draw_caption(image, boxes[i, :], caption)

# Draws annotations in an image
def draw_annotations(image, annotations, color=(0, 255, 0), label_to_name=None):
    
    if isinstance(annotations, np.ndarray):
        annotations = {'bboxes': annotations[:, :4], 'labels': annotations[:, 4]}

    assert('bboxes' in annotations)
    assert('labels' in annotations)
    assert(annotations['bboxes'].shape[0] == annotations['labels'].shape[0])

    for i in range(annotations['bboxes'].shape[0]):
        label   = annotations['labels'][i]
        c       = color if color is not None else label_color(label)
        caption = '{}'.format(label_to_name(label) if label_to_name else label)
        draw_caption(image, annotations['bboxes'][i], caption)
        draw_box(image, annotations['bboxes'][i], color=c)

# Create a numpy array representing a column vector
def colvec(*args):
    return np.array([args]).T

# Apply a transformation to an axis aligned bounding box
def transform_aabb(transform, aabb):

    x1, y1, x2, y2 = aabb
    # Transform all 4 corners of the AABB.
    points = transform.dot([
        [x1, x2, x1, x2],
        [y1, y2, y2, y1],
        [1,  1,  1,  1 ],
    ])

    # Extract the min and max corners again.
    min_corner = points.min(axis=1)
    max_corner = points.max(axis=1)

    return [min_corner[0], min_corner[1], max_corner[0], max_corner[1]]

# Construct a random vector between min and max
def _random_vector(min, max, prng=DEFAULT_PRNG):

    min = np.array(min)
    max = np.array(max)
    assert min.shape == max.shape
    assert len(min.shape) == 1
    return prng.uniform(min, max)

# Construct a homogeneous 2D rotation matrix
def rotation(angle):

    return np.array([
        [np.cos(angle), -np.sin(angle), 0],
        [np.sin(angle),  np.cos(angle), 0],
        [0, 0, 1]
    ])

# Construct a random rotation between -max and max
def random_rotation(min, max, prng=DEFAULT_PRNG):

    return rotation(prng.uniform(min, max))

# Construct a homogeneous 2D translation matrix
def translation(translation):

    return np.array([
        [1, 0, translation[0]],
        [0, 1, translation[1]],
        [0, 0, 1]
    ])

# Construct a random 2D translation between min and max
def random_translation(min, max, prng=DEFAULT_PRNG):

    return translation(_random_vector(min, max, prng))

# Construct a homogeneous 2D shear matrix
def shear(angle):

    return np.array([
        [1, -np.sin(angle), 0],
        [0,  np.cos(angle), 0],
        [0, 0, 1]
    ])

# Construct a random 2D shear matrix with shear angle between -max and max
def random_shear(min, max, prng=DEFAULT_PRNG):

    return shear(prng.uniform(min, max))

# Construct a homogeneous 2D scaling matrix
def scaling(factor):

    return np.array([
        [factor[0], 0, 0],
        [0, factor[1], 0],
        [0, 0, 1]
    ])

# Construct a random 2D scale matrix between -max and max
def random_scaling(min, max, prng=DEFAULT_PRNG):

    return scaling(_random_vector(min, max, prng))

# Construct a transformation randomly containing X/Y flips (or not)
def random_flip(flip_x_chance, flip_y_chance, prng=DEFAULT_PRNG):

    flip_x = prng.uniform(0, 1) < flip_x_chance
    flip_y = prng.uniform(0, 1) < flip_y_chance
    # 1 - 2 * bool gives 1 for False and -1 for True.
    return scaling((1 - 2 * flip_x, 1 - 2 * flip_y))

# Create a new transform representing the same transformation, only with the origin of the linear part changed
def change_transform_origin(transform, center):

    center = np.array(center)
    return np.linalg.multi_dot([translation(center), transform, translation(-center)])

# Create a random transformation.
def random_transform(
    min_rotation=0,
    max_rotation=0,
    min_translation=(0, 0),
    max_translation=(0, 0),
    min_shear=0,
    max_shear=0,
    min_scaling=(1, 1),
    max_scaling=(1, 1),
    flip_x_chance=0,
    flip_y_chance=0,
    prng=DEFAULT_PRNG
):
    return np.linalg.multi_dot([
        random_rotation(min_rotation, max_rotation, prng),
        random_translation(min_translation, max_translation, prng),
        random_shear(min_shear, max_shear, prng),
        random_scaling(min_scaling, max_scaling, prng),
        random_flip(flip_x_chance, flip_y_chance, prng)
    ])

# Create a random transform generator
def random_transform_generator(prng=None, **kwargs):

    if prng is None:
        # RandomState automatically seeds using the best available method
        prng = np.random.RandomState()

    while True:
        yield random_transform(prng=prng, **kwargs)

# Applies deltas (usually regression results) to boxes (usually anchors)
def bbox_transform_inv(boxes, deltas, mean=None, std=None):
    
    if mean is None:
        mean = [0, 0, 0, 0]
    if std is None:
        std = [0.2, 0.2, 0.2, 0.2]

    width  = boxes[:, :, 2] - boxes[:, :, 0]
    height = boxes[:, :, 3] - boxes[:, :, 1]

    x1 = boxes[:, :, 0] + (deltas[:, :, 0] * std[0] + mean[0]) * width
    y1 = boxes[:, :, 1] + (deltas[:, :, 1] * std[1] + mean[1]) * height
    x2 = boxes[:, :, 2] + (deltas[:, :, 2] * std[2] + mean[2]) * width
    y2 = boxes[:, :, 3] + (deltas[:, :, 3] * std[3] + mean[3]) * height

    pred_boxes = keras.backend.stack([x1, y1, x2, y2], axis=2)

    return pred_boxes

# Read an image in BGR format
def read_image_bgr(path):

    # We deliberately don't use cv2.imread here, since it gives no feedback on errors while reading the image.
    image = np.asarray(Image.open(path).convert('RGB'))
    return image[:, :, ::-1].copy()

# Preprocess an image by subtracting the ImageNet mean
def preprocess_image(x, mode='caffe'):

    # covert always to float32 to keep compatibility with opencv
    x = x.astype(np.float32)

    if mode == 'tf':
        x /= 127.5
        x -= 1.
    elif mode == 'caffe':
        x[..., 0] -= 103.939
        x[..., 1] -= 116.779
        x[..., 2] -= 123.68

    return x

# Adjust a transformation for a specific image
def adjust_transform_for_image(transform, image, relative_translation):

    height, width, channels = image.shape

    result = transform

    # Scale the translation with the image size if specified.
    if relative_translation:
        result[0:2, 2] *= [width, height]

    # Move the origin of transformation.
    result = change_transform_origin(transform, (0.5 * width, 0.5 * height))

    return result

# Struct holding parameters determining how to apply a transformation to an image
class TransformParameters:

    def __init__(
        self,
        fill_mode            = 'nearest',
        interpolation        = 'linear',
        cval                 = 0,
        relative_translation = True,
    ):
        self.fill_mode            = fill_mode
        self.cval                 = cval
        self.interpolation        = interpolation
        self.relative_translation = relative_translation

    def cvBorderMode(self):
        if self.fill_mode == 'constant':
            return cv2.BORDER_CONSTANT
        if self.fill_mode == 'nearest':
            return cv2.BORDER_REPLICATE
        if self.fill_mode == 'reflect':
            return cv2.BORDER_REFLECT_101
        if self.fill_mode == 'wrap':
            return cv2.BORDER_WRAP

    def cvInterpolation(self):
        if self.interpolation == 'nearest':
            return cv2.INTER_NEAREST
        if self.interpolation == 'linear':
            return cv2.INTER_LINEAR
        if self.interpolation == 'cubic':
            return cv2.INTER_CUBIC
        if self.interpolation == 'area':
            return cv2.INTER_AREA
        if self.interpolation == 'lanczos4':
            return cv2.INTER_LANCZOS4

# Apply a transformation to an image
def apply_transform(matrix, image, params):

    output = cv2.warpAffine(
        image,
        matrix[:2, :],
        dsize       = (image.shape[1], image.shape[0]),
        flags       = params.cvInterpolation(),
        borderMode  = params.cvBorderMode(),
        borderValue = params.cval,
    )
    return output

# Compute an image scale such that the image size is constrained to min_side and max_side
def compute_resize_scale(image_shape, min_side=800, max_side=1333):

    (rows, cols, _) = image_shape

    smallest_side = min(rows, cols)

    # rescale the image so the smallest side is min_side
    scale = min_side / smallest_side

    # check if the largest side is now greater than max_side, which can happen
    # when images have a large aspect ratio
    largest_side = max(rows, cols)
    if largest_side * scale > max_side:
        scale = max_side / largest_side

    return scale

# Resize an image such that the size is constrained to min_side and max_side
def resize_image(img, min_side=800, max_side=1333):

    # compute scale to resize the image
    scale = compute_resize_scale(img.shape, min_side=min_side, max_side=max_side)

    # resize the image with the computed scale
    img = cv2.resize(img, None, fx=scale, fy=scale)

    return img, scale

# Uniformly sample from the given range
def _uniform(val_range):

    return np.random.uniform(val_range[0], val_range[1])

# Check whether the range is a valid range
def _check_range(val_range, min_val=None, max_val=None):

    if val_range[0] > val_range[1]:
        raise ValueError('interval lower bound > upper bound')
    if min_val is not None and val_range[0] < min_val:
        raise ValueError('invalid interval lower bound')
    if max_val is not None and val_range[1] > max_val:
        raise ValueError('invalid interval upper bound')

# Clip and convert an image to np.uint8
def _clip(image):

    return np.clip(image, 0, 255).astype(np.uint8)

# Struct holding parameters and applying image color transformation
class VisualEffect:

    def __init__(
        self,
        contrast_factor,
        brightness_delta,
        hue_delta,
        saturation_factor,
    ):
        self.contrast_factor = contrast_factor
        self.brightness_delta = brightness_delta
        self.hue_delta = hue_delta
        self.saturation_factor = saturation_factor

    # Apply a visual effect on the image
    def __call__(self, image):

        if self.contrast_factor:
            image = adjust_contrast(image, self.contrast_factor)
        if self.brightness_delta:
            image = adjust_brightness(image, self.brightness_delta)

        if self.hue_delta or self.saturation_factor:

            image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)

            if self.hue_delta:
                image = adjust_hue(image, self.hue_delta)
            if self.saturation_factor:
                image = adjust_saturation(image, self.saturation_factor)

            image = cv2.cvtColor(image, cv2.COLOR_HSV2BGR)

        return image

# Generate visual effect parameters uniformly sampled from the given intervals
def random_visual_effect_generator(
    contrast_range=(0.9, 1.1),
    brightness_range=(-.1, .1),
    hue_range=(-0.05, 0.05),
    saturation_range=(0.95, 1.05)
):

    _check_range(contrast_range, 0)
    _check_range(brightness_range, -1, 1)
    _check_range(hue_range, -1, 1)
    _check_range(saturation_range, 0)

    def _generate():
        while True:
            yield VisualEffect(
                contrast_factor=_uniform(contrast_range),
                brightness_delta=_uniform(brightness_range),
                hue_delta=_uniform(hue_range),
                saturation_factor=_uniform(saturation_range),
            )

    return _generate()

# Adjust contrast of an image
def adjust_contrast(image, factor):

    mean = image.mean(axis=0).mean(axis=0)
    return _clip((image - mean) * factor + mean)

# Adjust brightness of an image
def adjust_brightness(image, delta):

    return _clip(image + delta * 255)

# Adjust hue of an image
def adjust_hue(image, delta):

    image[..., 0] = np.mod(image[..., 0] + delta * 180, 180)
    return image

# Adjust saturation of an image
def adjust_saturation(image, factor):

    image[..., 1] = np.clip(image[..., 1] * factor, 0 , 255)
    return image

Generator

This is an abstract base class for a generator, specialized generators can inherit properties and methods from this base generator. A generator is used to generate input data for training and evaluation.

"""
Copyright 2017-2018 Fizyr (https://fizyr.com)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

# Import libraries
import numpy as np
import random
import warnings
import keras
from annytab.retinanet.anchors import (
    anchor_targets_bbox,
    anchors_for_shape,
    guess_shapes
)
from annytab.retinanet.common import (
    TransformParameters,
    adjust_transform_for_image,
    apply_transform,
    preprocess_image,
    resize_image,
)
from annytab.retinanet.common import transform_aabb
import annytab.retinanet.common as common

# Abstract generator class
class Generator(keras.utils.Sequence):

    def __init__(
        self,
        transform_generator = None,
        visual_effect_generator=None,
        batch_size=1,
        group_method='ratio',  # one of 'none', 'random', 'ratio'
        shuffle_groups=True,
        image_min_side=800,
        image_max_side=1333,
        no_resize=False,
        transform_parameters=None,
        compute_anchor_targets=anchor_targets_bbox,
        compute_shapes=guess_shapes,
        preprocess_image=preprocess_image,
        config=None
    ):
        
        self.transform_generator    = transform_generator
        self.visual_effect_generator = visual_effect_generator
        self.batch_size             = int(batch_size)
        self.group_method           = group_method
        self.shuffle_groups         = shuffle_groups
        self.image_min_side         = image_min_side
        self.image_max_side         = image_max_side
        self.no_resize              = no_resize
        self.transform_parameters   = transform_parameters or TransformParameters()
        self.compute_anchor_targets = compute_anchor_targets
        self.compute_shapes         = compute_shapes
        self.preprocess_image       = preprocess_image
        self.config                 = config

        # Define groups
        self.group_images()

        # Shuffle when initializing
        if self.shuffle_groups:
            self.on_epoch_end()

    def on_epoch_end(self):
        if self.shuffle_groups:
            random.shuffle(self.groups)

    # Size of the dataset
    def size(self):
        raise NotImplementedError('size method not implemented')

    # Number of classes in the dataset
    def num_classes(self):
        raise NotImplementedError('num_classes method not implemented')

    # Returns True if label is a known label
    def has_label(self, label):
        raise NotImplementedError('has_label method not implemented')

    # Returns True if name is a known class
    def has_name(self, name):
        raise NotImplementedError('has_name method not implemented')

    # Map name to label
    def name_to_label(self, name):
        raise NotImplementedError('name_to_label method not implemented')

    # Map label to name
    def label_to_name(self, label):
        raise NotImplementedError('label_to_name method not implemented')

    # Compute the aspect ratio for an image with image_index
    def image_aspect_ratio(self, image_index):
        raise NotImplementedError('image_aspect_ratio method not implemented')

    # Load an image at the image_index
    def load_image(self, image_index):
        raise NotImplementedError('load_image method not implemented')

    # Load annotations for an image_index
    def load_annotations(self, image_index):
        raise NotImplementedError('load_annotations method not implemented')

    # Load annotations for all images in group
    def load_annotations_group(self, group):

        annotations_group = [self.load_annotations(image_index) for image_index in group]
        for annotations in annotations_group:
            assert(isinstance(annotations, dict)), '\'load_annotations\' should return a list of dictionaries, received: {}'.format(type(annotations))
            assert('labels' in annotations), '\'load_annotations\' should return a list of dictionaries that contain \'labels\' and \'bboxes\'.'
            assert('bboxes' in annotations), '\'load_annotations\' should return a list of dictionaries that contain \'labels\' and \'bboxes\'.'

        return annotations_group

    # Filter annotations by removing those that are outside of the image bounds or whose width/height < 0
    def filter_annotations(self, image_group, annotations_group, group):

        # test all annotations
        for index, (image, annotations) in enumerate(zip(image_group, annotations_group)):
            # test x2 < x1 | y2 < y1 | x1 < 0 | y1 < 0 | x2 <= 0 | y2 <= 0 | x2 >= image.shape[1] | y2 >= image.shape[0]
            invalid_indices = np.where(
                (annotations['bboxes'][:, 2] <= annotations['bboxes'][:, 0]) |
                (annotations['bboxes'][:, 3] <= annotations['bboxes'][:, 1]) |
                (annotations['bboxes'][:, 0] < 0) |
                (annotations['bboxes'][:, 1] < 0) |
                (annotations['bboxes'][:, 2] > image.shape[1]) |
                (annotations['bboxes'][:, 3] > image.shape[0])
            )[0]

            # delete invalid indices
            if len(invalid_indices):
                warnings.warn('Image with id {} (shape {}) contains the following invalid boxes: {}.'.format(
                    group[index],
                    image.shape,
                    annotations['bboxes'][invalid_indices, :]
                ))
                for k in annotations_group[index].keys():
                    annotations_group[index][k] = np.delete(annotations[k], invalid_indices, axis=0)

        return image_group, annotations_group

    # Load images for all images in a group
    def load_image_group(self, group):
        return [self.load_image(image_index) for image_index in group]

    # Randomly transforms image and annotation
    def random_visual_effect_group_entry(self, image, annotations):
        visual_effect = next(self.visual_effect_generator)
        # apply visual effect
        image = visual_effect(image)
        return image, annotations

    # Randomly apply visual effect on each image
    def random_visual_effect_group(self, image_group, annotations_group):

        assert(len(image_group) == len(annotations_group))

        if self.visual_effect_generator is None:
            # do nothing
            return image_group, annotations_group

        for index in range(len(image_group)):
            # apply effect on a single group entry
            image_group[index], annotations_group[index] = self.random_visual_effect_group_entry(
                image_group[index], annotations_group[index]
            )

        return image_group, annotations_group

    # Randomly transforms image and annotation
    def random_transform_group_entry(self, image, annotations, transform=None):

        # randomly transform both image and annotations
        if transform is not None or self.transform_generator:
            if transform is None:
                transform = adjust_transform_for_image(next(self.transform_generator), image, self.transform_parameters.relative_translation)

            # apply transformation to image
            image = apply_transform(transform, image, self.transform_parameters)

            # Transform the bounding boxes in the annotations.
            annotations['bboxes'] = annotations['bboxes'].copy()
            for index in range(annotations['bboxes'].shape[0]):
                annotations['bboxes'][index, :] = transform_aabb(transform, annotations['bboxes'][index, :])

        return image, annotations

    # Randomly transforms each image and its annotations
    def random_transform_group(self, image_group, annotations_group):

        assert(len(image_group) == len(annotations_group))

        for index in range(len(image_group)):
            # transform a single group entry
            image_group[index], annotations_group[index] = self.random_transform_group_entry(image_group[index], annotations_group[index])

        return image_group, annotations_group

    # Resize an image using image_min_side and image_max_side
    def resize_image(self, image):

        if self.no_resize:
            return image, 1
        else:
            return resize_image(image, min_side=self.image_min_side, max_side=self.image_max_side)

    # Preprocess image and its annotations
    def preprocess_group_entry(self, image, annotations):

        # preprocess the image
        image = self.preprocess_image(image)

        # resize image
        image, image_scale = self.resize_image(image)

        # apply resizing to annotations too
        annotations['bboxes'] *= image_scale

        # convert to the wanted keras floatx
        image = keras.backend.cast_to_floatx(image)

        return image, annotations

    # Preprocess each image and its annotations in its group
    def preprocess_group(self, image_group, annotations_group):

        assert(len(image_group) == len(annotations_group))

        for index in range(len(image_group)):
            # preprocess a single group entry
            image_group[index], annotations_group[index] = self.preprocess_group_entry(image_group[index], annotations_group[index])

        return image_group, annotations_group

    # Order the images according to self.order and makes groups of self.batch_size
    def group_images(self):

        # determine the order of the images
        order = list(range(self.size()))
        if self.group_method == 'random':
            random.shuffle(order)
        elif self.group_method == 'ratio':
            order.sort(key=lambda x: self.image_aspect_ratio(x))

        # divide into groups, one group = one batch
        self.groups = [[order[x % len(order)] for x in range(i, i + self.batch_size)] for i in range(0, len(order), self.batch_size)]

    # Compute inputs for the network using an image_group
    def compute_inputs(self, image_group):

        # get the max image shape
        max_shape = tuple(max(image.shape[x] for image in image_group) for x in range(3))

        # construct an image batch object
        image_batch = np.zeros((self.batch_size,) + max_shape, dtype=keras.backend.floatx())

        # copy all images to the upper left part of the image batch object
        for image_index, image in enumerate(image_group):
            image_batch[image_index, :image.shape[0], :image.shape[1], :image.shape[2]] = image

        if keras.backend.image_data_format() == 'channels_first':
            image_batch = image_batch.transpose((0, 3, 1, 2))

        return image_batch

    # Generate anchors
    def generate_anchors(self, image_shape):
        anchor_params = common.anchor_parameters
        if self.config and 'anchor_parameters' in self.config:
            anchor_params = parse_anchor_parameters(self.config)
        return anchors_for_shape(image_shape, anchor_params=anchor_params, shapes_callback=self.compute_shapes)

    # Compute target outputs for the network using images and their annotations
    def compute_targets(self, image_group, annotations_group):

        # get the max image shape
        max_shape = tuple(max(image.shape[x] for image in image_group) for x in range(3))
        anchors   = self.generate_anchors(max_shape)

        batches = self.compute_anchor_targets(
            anchors,
            image_group,
            annotations_group,
            self.num_classes()
        )

        return list(batches)

    # Compute inputs and target outputs for the network
    def compute_input_output(self, group):

        # load images and annotations
        image_group       = self.load_image_group(group)
        annotations_group = self.load_annotations_group(group)

        # check validity of annotations
        image_group, annotations_group = self.filter_annotations(image_group, annotations_group, group)

        # randomly apply visual effect
        image_group, annotations_group = self.random_visual_effect_group(image_group, annotations_group)

        # randomly transform data
        image_group, annotations_group = self.random_transform_group(image_group, annotations_group)

        # perform preprocessing steps
        image_group, annotations_group = self.preprocess_group(image_group, annotations_group)

        # compute network inputs
        inputs = self.compute_inputs(image_group)

        # compute network targets
        targets = self.compute_targets(image_group, annotations_group)

        return inputs, targets

    # Number of batches for generator
    def __len__(self):
        return len(self.groups)

    # Keras sequence method for generating batches
    def __getitem__(self, index):

        group = self.groups[index]
        inputs, targets = self.compute_input_output(group)

        return inputs, targets

CSV Generator

This is a specialized generator that generates data from .csv files. The Pascal VOC format is more efficient if the dataset is large, .csv data works well for the dataset in this tutorial.

"""
Copyright 2017-2018 yhenon (https://github.com/yhenon/)
Copyright 2017-2018 Fizyr (https://fizyr.com)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

# Import libraries
import numpy as np
from PIL import Image
from six import raise_from
import csv
import sys
import os.path
from collections import OrderedDict
from annytab.retinanet.generator import Generator
from annytab.retinanet.common import read_image_bgr

# Parse a string into a value, and format a nice ValueError if it fails
def _parse(value, function, fmt):

    try:
        return function(value)
    except ValueError as e:
        raise_from(ValueError(fmt.format(e)), None)

# Parse the classes file given by csv_reader
def _read_classes(csv_reader):

    result = OrderedDict()
    for line, row in enumerate(csv_reader):
        line += 1

        try:
            class_name, class_id = row
        except ValueError:
            raise_from(ValueError('line {}: format should be \'class_name, class_id\''.format(line)), None)
        class_id = _parse(class_id, int, 'line {}: malformed class ID: {{}}'.format(line))

        if class_name in result:
            raise ValueError('line {}: duplicate class name: \'{}\''.format(line, class_name))
        result[class_name] = class_id
    return result

# Read annotations from the csv_reader
def _read_annotations(csv_reader, classes):

    result = OrderedDict()
    for line, row in enumerate(csv_reader):
        line += 1

        try:
            img_file, x1, y1, x2, y2, class_name = row[:6]
        except ValueError:
            raise_from(ValueError('line {}: format should be \'img_file,x1,y1,x2,y2,class_name\' or \'img_file,,,,,\''.format(line)), None)

        if img_file not in result:
            result[img_file] = []

        # If a row contains only an image path, it's an image without annotations.
        if (x1, y1, x2, y2, class_name) == ('', '', '', '', ''):
            continue

        x1 = _parse(x1, int, 'line {}: malformed x1: {{}}'.format(line))
        y1 = _parse(y1, int, 'line {}: malformed y1: {{}}'.format(line))
        x2 = _parse(x2, int, 'line {}: malformed x2: {{}}'.format(line))
        y2 = _parse(y2, int, 'line {}: malformed y2: {{}}'.format(line))

        # Check that the bounding box is valid.
        if x2 <= x1:
            raise ValueError('line {}: x2 ({}) must be higher than x1 ({})'.format(line, x2, x1))
        if y2 <= y1:
            raise ValueError('line {}: y2 ({}) must be higher than y1 ({})'.format(line, y2, y1))

        # check if the current class name is correctly present
        if class_name not in classes:
            raise ValueError('line {}: unknown class name: \'{}\' (classes: {})'.format(line, class_name, classes))

        result[img_file].append({'x1': x1, 'x2': x2, 'y1': y1, 'y2': y2, 'class': class_name})
    return result

# Open a file with flags suitable for csv.reader
def _open_for_csv(path):

    if sys.version_info[0] < 3:
        return open(path, 'rb')
    else:
        return open(path, 'r', newline='')

# Generate data for a custom CSV dataset
class CSVGenerator(Generator):

    def __init__(
        self,
        csv_data_file,
        csv_class_file,
        base_dir=None,
        **kwargs
    ):

        self.image_names = []
        self.image_data  = {}
        self.base_dir    = base_dir

        # Take base_dir from annotations file if not explicitly specified.
        if self.base_dir is None:
            self.base_dir = os.path.dirname(csv_data_file)

        # parse the provided class file
        try:
            with _open_for_csv(csv_class_file) as file:
                self.classes = _read_classes(csv.reader(file, delimiter=','))
        except ValueError as e:
            raise_from(ValueError('invalid CSV class file: {}: {}'.format(csv_class_file, e)), None)

        self.labels = {}
        for key, value in self.classes.items():
            self.labels[value] = key

        # csv with img_path, x1, y1, x2, y2, class_name
        try:
            with _open_for_csv(csv_data_file) as file:
                self.image_data = _read_annotations(csv.reader(file, delimiter=','), self.classes)
        except ValueError as e:
            raise_from(ValueError('invalid CSV annotations file: {}: {}'.format(csv_data_file, e)), None)
        self.image_names = list(self.image_data.keys())

        super(CSVGenerator, self).__init__(**kwargs)

    # Size of the dataset
    def size(self):
        return len(self.image_names)

    # Number of classes in the dataset
    def num_classes(self):
        return max(self.classes.values()) + 1

    # Return True if label is a known label
    def has_label(self, label):
        return label in self.labels

    # Returns True if name is a known class
    def has_name(self, name):
        return name in self.classes

    # Map name to label
    def name_to_label(self, name):
        return self.classes[name]

    # Map label to name
    def label_to_name(self, label):
        return self.labels[label]

    # Returns the image path for image_index
    def image_path(self, image_index):
        return os.path.join(self.base_dir, self.image_names[image_index])

    # Compute the aspect ratio for an image with image_index
    def image_aspect_ratio(self, image_index):

        # PIL is fast for metadata
        image = Image.open(self.image_path(image_index))
        return float(image.width) / float(image.height)

    # Load an image at the image_index
    def load_image(self, image_index):
        return read_image_bgr(self.image_path(image_index))

    # Load annotations for an image_index
    def load_annotations(self, image_index):

        path        = self.image_names[image_index]
        annotations = {'labels': np.empty((0,)), 'bboxes': np.empty((0, 4))}

        for idx, annot in enumerate(self.image_data[path]):
            annotations['labels'] = np.concatenate((annotations['labels'], [self.name_to_label(annot['class'])]))
            annotations['bboxes'] = np.concatenate((annotations['bboxes'], [[
                float(annot['x1']),
                float(annot['y1']),
                float(annot['x2']),
                float(annot['y2']),
            ]]))

        return annotations

Anchors

This module is used to generate anchors for objects. Multiple anchor boxes is generated for objects in order to find the bounding box with the best fit.

"""
Copyright 2017-2018 Fizyr (https://fizyr.com)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

# Import libraries
import numpy as np
import keras

# The parameters that define how anchors are generated
class AnchorParameters:

    # Initialize the class
    def __init__(self, sizes, strides, ratios, scales):
        self.sizes   = sizes
        self.strides = strides
        self.ratios  = ratios
        self.scales  = scales

    # Get the number of anchors
    def num_anchors(self):
        return len(self.ratios) * len(self.scales)

# Default anchor parameters
AnchorParameters.default = AnchorParameters(
    sizes   = [32, 64, 128, 256, 512],
    strides = [8, 16, 32, 64, 128],
    ratios  = np.array([0.5, 1, 2], keras.backend.floatx()),
    scales  = np.array([2 ** 0, 2 ** (1.0 / 3.0), 2 ** (2.0 / 3.0)], keras.backend.floatx()),
)

# Compute overlap
def compute_overlap(boxes, query_boxes):
    N = boxes.shape[0]
    K = query_boxes.shape[0]
    overlaps = np.zeros((N, K), dtype=np.float64)
    iw = 0.0
    ih = 0.0
    box_area = 0.0
    ua = 0.0
    k = 0.0
    n = 0.0
    for k in range(K):
        box_area = (
            (query_boxes[k, 2] - query_boxes[k, 0] + 1) *
            (query_boxes[k, 3] - query_boxes[k, 1] + 1)
        )
        for n in range(N):
            iw = (
                min(boxes[n, 2], query_boxes[k, 2]) -
                max(boxes[n, 0], query_boxes[k, 0]) + 1
            )
            if iw > 0:
                ih = (
                    min(boxes[n, 3], query_boxes[k, 3]) -
                    max(boxes[n, 1], query_boxes[k, 1]) + 1
                )
                if ih > 0:
                    ua = np.float64(
                        (boxes[n, 2] - boxes[n, 0] + 1) *
                        (boxes[n, 3] - boxes[n, 1] + 1) +
                        box_area - iw * ih
                    )
                    overlaps[n, k] = iw * ih / ua
    return overlaps

# Generate anchor targets for bbox detection
def anchor_targets_bbox(
    anchors,
    image_group,
    annotations_group,
    num_classes,
    negative_overlap=0.4,
    positive_overlap=0.5
):

    assert(len(image_group) == len(annotations_group)), "The length of the images and annotations need to be equal."
    assert(len(annotations_group) > 0), "No data received to compute anchor targets for."
    for annotations in annotations_group:
        assert('bboxes' in annotations), "Annotations should contain bboxes."
        assert('labels' in annotations), "Annotations should contain labels."

    batch_size = len(image_group)

    regression_batch  = np.zeros((batch_size, anchors.shape[0], 4 + 1), dtype=keras.backend.floatx())
    labels_batch      = np.zeros((batch_size, anchors.shape[0], num_classes + 1), dtype=keras.backend.floatx())

    # compute labels and regression targets
    for index, (image, annotations) in enumerate(zip(image_group, annotations_group)):
        if annotations['bboxes'].shape[0]:
            # obtain indices of gt annotations with the greatest overlap
            positive_indices, ignore_indices, argmax_overlaps_inds = compute_gt_annotations(anchors, annotations['bboxes'], negative_overlap, positive_overlap)

            labels_batch[index, ignore_indices, -1]       = -1
            labels_batch[index, positive_indices, -1]     = 1

            regression_batch[index, ignore_indices, -1]   = -1
            regression_batch[index, positive_indices, -1] = 1

            # compute target class labels
            labels_batch[index, positive_indices, annotations['labels'][argmax_overlaps_inds[positive_indices]].astype(int)] = 1

            regression_batch[index, :, :-1] = bbox_transform(anchors, annotations['bboxes'][argmax_overlaps_inds, :])

        # ignore annotations outside of image
        if image.shape:
            anchors_centers = np.vstack([(anchors[:, 0] + anchors[:, 2]) / 2, (anchors[:, 1] + anchors[:, 3]) / 2]).T
            indices = np.logical_or(anchors_centers[:, 0] >= image.shape[1], anchors_centers[:, 1] >= image.shape[0])

            labels_batch[index, indices, -1]     = -1
            regression_batch[index, indices, -1] = -1

    return regression_batch, labels_batch

# Obtain indices of gt annotations with the greatest overlap
def compute_gt_annotations(
    anchors,
    annotations,
    negative_overlap=0.4,
    positive_overlap=0.5
):

    overlaps = compute_overlap(anchors.astype(np.float64), annotations.astype(np.float64))
    argmax_overlaps_inds = np.argmax(overlaps, axis=1)
    max_overlaps = overlaps[np.arange(overlaps.shape[0]), argmax_overlaps_inds]

    # assign "dont care" labels
    positive_indices = max_overlaps >= positive_overlap
    ignore_indices = (max_overlaps > negative_overlap) & ~positive_indices

    return positive_indices, ignore_indices, argmax_overlaps_inds

# Compute layer shapes given input image shape and the model
def layer_shapes(image_shape, model):

    shape = {
        model.layers[0].name: (None,) + image_shape,
    }

    for layer in model.layers[1:]:
        nodes = layer._inbound_nodes
        for node in nodes:
            inputs = [shape[lr.name] for lr in node.inbound_layers]
            if not inputs:
                continue
            shape[layer.name] = layer.compute_output_shape(inputs[0] if len(inputs) == 1 else inputs)

    return shape

# Make a function for getting the shape of the pyramid levels
def make_shapes_callback(model):

    def get_shapes(image_shape, pyramid_levels):
        shape = layer_shapes(image_shape, model)
        image_shapes = [shape["P{}".format(level)][1:3] for level in pyramid_levels]
        return image_shapes

    return get_shapes

# Guess shapes based on pyramid levels
def guess_shapes(image_shape, pyramid_levels):

    image_shape = np.array(image_shape[:2])
    image_shapes = [(image_shape + 2 ** x - 1) // (2 ** x) for x in pyramid_levels]
    return image_shapes

# Generators anchors for a given shape
def anchors_for_shape(
    image_shape,
    pyramid_levels=None,
    anchor_params=None,
    shapes_callback=None,
):

    if pyramid_levels is None:
        pyramid_levels = [3, 4, 5, 6, 7]

    if anchor_params is None:
        anchor_params = AnchorParameters.default

    if shapes_callback is None:
        shapes_callback = guess_shapes
    image_shapes = shapes_callback(image_shape, pyramid_levels)

    # compute anchors over all pyramid levels
    all_anchors = np.zeros((0, 4))
    for idx, p in enumerate(pyramid_levels):
        anchors = generate_anchors(
            base_size=anchor_params.sizes[idx],
            ratios=anchor_params.ratios,
            scales=anchor_params.scales
        )
        shifted_anchors = shift(image_shapes[idx], anchor_params.strides[idx], anchors)
        all_anchors     = np.append(all_anchors, shifted_anchors, axis=0)

    return all_anchors

# Produce shifted anchors based on shape of the map and stride size
def shift(shape, stride, anchors):

    # create a grid starting from half stride from the top left corner
    shift_x = (np.arange(0, shape[1]) + 0.5) * stride
    shift_y = (np.arange(0, shape[0]) + 0.5) * stride

    shift_x, shift_y = np.meshgrid(shift_x, shift_y)

    shifts = np.vstack((
        shift_x.ravel(), shift_y.ravel(),
        shift_x.ravel(), shift_y.ravel()
    )).transpose()

    # add A anchors (1, A, 4) to
    # cell K shifts (K, 1, 4) to get
    # shift anchors (K, A, 4)
    # reshape to (K*A, 4) shifted anchors
    A = anchors.shape[0]
    K = shifts.shape[0]
    all_anchors = (anchors.reshape((1, A, 4)) + shifts.reshape((1, K, 4)).transpose((1, 0, 2)))
    all_anchors = all_anchors.reshape((K * A, 4))

    return all_anchors

# Generate anchor (reference) windows by enumerating aspect ratios X scales w.r.t. a reference window
def generate_anchors(base_size=16, ratios=None, scales=None):

    if ratios is None:
        ratios = AnchorParameters.default.ratios

    if scales is None:
        scales = AnchorParameters.default.scales

    num_anchors = len(ratios) * len(scales)

    # initialize output anchors
    anchors = np.zeros((num_anchors, 4))

    # scale base_size
    anchors[:, 2:] = base_size * np.tile(scales, (2, len(ratios))).T

    # compute areas of anchors
    areas = anchors[:, 2] * anchors[:, 3]

    # correct for ratios
    anchors[:, 2] = np.sqrt(areas / np.repeat(ratios, len(scales)))
    anchors[:, 3] = anchors[:, 2] * np.repeat(ratios, len(scales))

    # transform from (x_ctr, y_ctr, w, h) -> (x1, y1, x2, y2)
    anchors[:, 0::2] -= np.tile(anchors[:, 2] * 0.5, (2, 1)).T
    anchors[:, 1::2] -= np.tile(anchors[:, 3] * 0.5, (2, 1)).T

    return anchors

# Compute bounding-box regression targets for an image
def bbox_transform(anchors, gt_boxes, mean=None, std=None):

    if mean is None:
        mean = np.array([0, 0, 0, 0])
    if std is None:
        std = np.array([0.2, 0.2, 0.2, 0.2])

    if isinstance(mean, (list, tuple)):
        mean = np.array(mean)
    elif not isinstance(mean, np.ndarray):
        raise ValueError('Expected mean to be a np.ndarray, list or tuple. Received: {}'.format(type(mean)))

    if isinstance(std, (list, tuple)):
        std = np.array(std)
    elif not isinstance(std, np.ndarray):
        raise ValueError('Expected std to be a np.ndarray, list or tuple. Received: {}'.format(type(std)))

    anchor_widths  = anchors[:, 2] - anchors[:, 0]
    anchor_heights = anchors[:, 3] - anchors[:, 1]

    targets_dx1 = (gt_boxes[:, 0] - anchors[:, 0]) / anchor_widths
    targets_dy1 = (gt_boxes[:, 1] - anchors[:, 1]) / anchor_heights
    targets_dx2 = (gt_boxes[:, 2] - anchors[:, 2]) / anchor_widths
    targets_dy2 = (gt_boxes[:, 3] - anchors[:, 3]) / anchor_heights

    targets = np.stack((targets_dx1, targets_dy1, targets_dx2, targets_dy2))
    targets = targets.T

    targets = (targets - mean) / std

    return targets

Layers

This module includes custom layers that is used by the model builder to create a RetinaNet model.

"""
Copyright 2017-2018 Fizyr (https://fizyr.com)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

# Import libraries
import numpy as np
import keras
import math
import tensorflow
import annytab.retinanet.anchors as utils_anchors
from annytab.retinanet.common import random_transform_generator, bbox_transform_inv
import annytab.retinanet.common

# Keras layer for upsampling a Tensor to be the same shape as another Tensor.
class UpsampleLike(keras.layers.Layer):

    def call(self, inputs, **kwargs):
        source, target = inputs
        target_shape = keras.backend.shape(target)
        if keras.backend.image_data_format() == 'channels_first':
            source = tensorflow.transpose(source, (0, 2, 3, 1))
            output = tensorflow.compat.v1.image.resize_images(source, (target_shape[2], target_shape[3]), tensorflow.image.ResizeMethod.NEAREST_NEIGHBOR, False)
            output = tensorflow.transpose(output, (0, 3, 1, 2))
            return output
        else:
            return tensorflow.compat.v1.image.resize_images(source, (target_shape[1], target_shape[2]), tensorflow.image.ResizeMethod.NEAREST_NEIGHBOR, False)

    def compute_output_shape(self, input_shape):
        if keras.backend.image_data_format() == 'channels_first':
            return (input_shape[0][0], input_shape[0][1]) + input_shape[1][2:4]
        else:
            return (input_shape[0][0],) + input_shape[1][1:3] + (input_shape[0][-1],)

# Apply a prior probability to the weights
class PriorProbability(keras.initializers.Initializer):
    def __init__(self, probability=0.01):
        self.probability = probability

    def get_config(self):
        return { 'probability': self.probability }

    def __call__(self, shape, dtype=None):
        # set bias to -log((1 - p)/p) for foreground
        result = np.ones(shape, dtype=dtype) * -math.log((1 - self.probability) / self.probability)

        return result

# Keras layer for applying regression values to boxes
class RegressBoxes(keras.layers.Layer):

    def __init__(self, mean=None, std=None, *args, **kwargs):

        if mean is None:
            mean = np.array([0, 0, 0, 0])
        if std is None:
            std = np.array([0.2, 0.2, 0.2, 0.2])

        if isinstance(mean, (list, tuple)):
            mean = np.array(mean)
        elif not isinstance(mean, np.ndarray):
            raise ValueError('Expected mean to be a np.ndarray, list or tuple. Received: {}'.format(type(mean)))

        if isinstance(std, (list, tuple)):
            std = np.array(std)
        elif not isinstance(std, np.ndarray):
            raise ValueError('Expected std to be a np.ndarray, list or tuple. Received: {}'.format(type(std)))

        self.mean = mean
        self.std  = std
        super(RegressBoxes, self).__init__(*args, **kwargs)

    def call(self, inputs, **kwargs):
        anchors, regression = inputs
        return bbox_transform_inv(anchors, regression, mean=self.mean, std=self.std)

    def compute_output_shape(self, input_shape):
        return input_shape[0]

    def get_config(self):
        config = super(RegressBoxes, self).get_config()
        config.update({
            'mean': self.mean.tolist(),
            'std' : self.std.tolist(),
        })

        return config

# Keras layer for filtering detections using score threshold and NMS
class FilterDetections(keras.layers.Layer):
    def __init__(
        self,
        nms                   = True,
        class_specific_filter = True,
        nms_threshold         = 0.5,
        score_threshold       = 0.05,
        max_detections        = 300,
        parallel_iterations   = 32,
        **kwargs
    ):

        self.nms                   = nms
        self.class_specific_filter = class_specific_filter
        self.nms_threshold         = nms_threshold
        self.score_threshold       = score_threshold
        self.max_detections        = max_detections
        self.parallel_iterations   = parallel_iterations
        super(FilterDetections, self).__init__(**kwargs)

    # Constructs the NMS graph
    def call(self, inputs, **kwargs):

        boxes          = inputs[0]
        classification = inputs[1]
        other          = inputs[2:]

        # wrap nms with our parameters
        def _filter_detections(args):
            boxes          = args[0]
            classification = args[1]
            other          = args[2]

            return filter_detections(
                boxes,
                classification,
                other,
                nms                   = self.nms,
                class_specific_filter = self.class_specific_filter,
                score_threshold       = self.score_threshold,
                max_detections        = self.max_detections,
                nms_threshold         = self.nms_threshold,
            )

        # call filter_detections on each batch
        outputs = tensorflow.map_fn(
            _filter_detections,
            elems=[boxes, classification, other],
            dtype=[keras.backend.floatx(), keras.backend.floatx(), 'int32'] + [o.dtype for o in other],
            parallel_iterations=self.parallel_iterations
        )

        return outputs

    # Computes the output shapes given the input shapes
    def compute_output_shape(self, input_shape):

        return [
            (input_shape[0][0], self.max_detections, 4),
            (input_shape[1][0], self.max_detections),
            (input_shape[1][0], self.max_detections),
        ] + [
            tuple([input_shape[i][0], self.max_detections] + list(input_shape[i][2:])) for i in range(2, len(input_shape))
        ]

    # This is required in Keras when there is more than 1 output
    def compute_mask(self, inputs, mask=None):
        return (len(inputs) + 1) * [None]

    # Gets the configuration of this layer
    def get_config(self):

        config = super(FilterDetections, self).get_config()
        config.update({
            'nms'                   : self.nms,
            'class_specific_filter' : self.class_specific_filter,
            'nms_threshold'         : self.nms_threshold,
            'score_threshold'       : self.score_threshold,
            'max_detections'        : self.max_detections,
            'parallel_iterations'   : self.parallel_iterations,
        })

        return config

# Keras layer for generating achors for a given shape
class Anchors(keras.layers.Layer):

    def __init__(self, size, stride, ratios=None, scales=None, *args, **kwargs):
 
        self.size   = size
        self.stride = stride
        self.ratios = ratios
        self.scales = scales

        if ratios is None:
            self.ratios  = common.anchor_parameters.ratios
        elif isinstance(ratios, list):
            self.ratios  = np.array(ratios)
        if scales is None:
            self.scales  = common.anchor_parameters.scales
        elif isinstance(scales, list):
            self.scales  = np.array(scales)

        self.num_anchors = len(ratios) * len(scales)
        self.anchors     = keras.backend.variable(utils_anchors.generate_anchors(
            base_size=size,
            ratios=ratios,
            scales=scales,
        ))

        super(Anchors, self).__init__(*args, **kwargs)

    def call(self, inputs, **kwargs):
        features = inputs
        features_shape = keras.backend.shape(features)

        # generate proposals from bbox deltas and shifted anchors
        if keras.backend.image_data_format() == 'channels_first':
            anchors = shift(features_shape[2:4], self.stride, self.anchors)
        else:
            anchors = shift(features_shape[1:3], self.stride, self.anchors)
        anchors = keras.backend.tile(keras.backend.expand_dims(anchors, axis=0), (features_shape[0], 1, 1))

        return anchors

    def compute_output_shape(self, input_shape):
        if None not in input_shape[1:]:
            if keras.backend.image_data_format() == 'channels_first':
                total = np.prod(input_shape[2:4]) * self.num_anchors
            else:
                total = np.prod(input_shape[1:3]) * self.num_anchors

            return (input_shape[0], total, 4)
        else:
            return (input_shape[0], None, 4)

    def get_config(self):
        config = super(Anchors, self).get_config()
        config.update({
            'size'   : self.size,
            'stride' : self.stride,
            'ratios' : self.ratios.tolist(),
            'scales' : self.scales.tolist(),
        })

        return config

# Clip boxes
class ClipBoxes(keras.layers.Layer):

    def call(self, inputs, **kwargs):
        image, boxes = inputs
        shape = keras.backend.cast(keras.backend.shape(image), keras.backend.floatx())
        if keras.backend.image_data_format() == 'channels_first':
            _, _, height, width = tensorflow.unstack(shape, axis=0)
        else:
            _, height, width, _ = tensorflow.unstack(shape, axis=0)

        x1, y1, x2, y2 = tensorflow.unstack(boxes, axis=-1)
        x1 = tensorflow.clip_by_value(x1, 0, width)
        y1 = tensorflow.clip_by_value(y1, 0, height)
        x2 = tensorflow.clip_by_value(x2, 0, width)
        y2 = tensorflow.clip_by_value(y2, 0, height)

        return keras.backend.stack([x1, y1, x2, y2], axis=2)

    def compute_output_shape(self, input_shape):
        return input_shape[1]

# Identical to keras.layers.BatchNormalization, but adds the option to freeze parameters.
class BatchNormalization(keras.layers.BatchNormalization):

    def __init__(self, freeze, *args, **kwargs):
        self.freeze = freeze
        super(BatchNormalization, self).__init__(*args, **kwargs)

        # set to non-trainable if freeze is true
        self.trainable = not self.freeze

    def call(self, *args, **kwargs):
        # return super.call, but set training
        return super(BatchNormalization, self).call(training=(not self.freeze), *args, **kwargs)

    def get_config(self):
        config = super(BatchNormalization, self).get_config()
        config.update({'freeze': self.freeze})
        return config

# Filter detections using the boxes and classification values
def filter_detections(boxes,classification,other= [],class_specific_filter = True,nms= True,score_threshold= 0.05,max_detections= 300,nms_threshold= 0.5):
    
    # Threshold based on score
    def _filter_detections(scores, labels):
        # threshold based on score
        indices = tensorflow.where(keras.backend.greater(scores, score_threshold))

        if nms:
            filtered_boxes  = tensorflow.gather_nd(boxes, indices)
            filtered_scores = keras.backend.gather(scores, indices)[:, 0]

            # perform NMS
            nms_indices = tensorflow.image.non_max_suppression(filtered_boxes, filtered_scores, max_output_size=max_detections, iou_threshold=nms_threshold)

            # filter indices based on NMS
            indices = keras.backend.gather(indices, nms_indices)

        # add indices to list of all indices
        labels = tensorflow.gather_nd(labels, indices)
        indices = keras.backend.stack([indices[:, 0], labels], axis=1)

        return indices

    if class_specific_filter:
        all_indices = []
        # perform per class filtering
        for c in range(int(classification.shape[1])):
            scores = classification[:, c]
            labels = c * tensorflow.ones((keras.backend.shape(scores)[0],), dtype='int64')
            all_indices.append(_filter_detections(scores, labels))

        # concatenate indices to single tensor
        indices = keras.backend.concatenate(all_indices, axis=0)
    else:
        scores  = keras.backend.max(classification, axis    = 1)
        labels  = keras.backend.argmax(classification, axis = 1)
        indices = _filter_detections(scores, labels)

    # select top k
    scores              = tensorflow.gather_nd(classification, indices)
    labels              = indices[:, 1]
    scores, top_indices = tensorflow.nn.top_k(scores, k=keras.backend.minimum(max_detections, keras.backend.shape(scores)[0]))

    # filter input using the final set of indices
    indices             = keras.backend.gather(indices[:, 0], top_indices)
    boxes               = keras.backend.gather(boxes, indices)
    labels              = keras.backend.gather(labels, top_indices)
    other_              = [keras.backend.gather(o, indices) for o in other]

    # zero pad the outputs
    pad_size = keras.backend.maximum(0, max_detections - keras.backend.shape(scores)[0])
    boxes    = tensorflow.pad(boxes, [[0, pad_size], [0, 0]], constant_values=-1)
    scores   = tensorflow.pad(scores, [[0, pad_size]], constant_values=-1)
    labels   = tensorflow.pad(labels, [[0, pad_size]], constant_values=-1)
    labels   = keras.backend.cast(labels, 'int32')
    other_   = [tensorflow.pad(o, [[0, pad_size]] + [[0, 0] for _ in range(1, len(o.shape))], constant_values=-1) for o in other_]

    # set shapes, since we know what they are
    boxes.set_shape([max_detections, 4])
    scores.set_shape([max_detections])
    labels.set_shape([max_detections])
    for o, s in zip(other_, [list(keras.backend.int_shape(o)) for o in other]):
        o.set_shape([max_detections] + s[1:])

    return [boxes, scores, labels] + other_

# Create a resnet 50 model
def resnet50(inputs, include_top=True, classes=1000, freeze_bn=True, numerical_names=None, *args, **kwargs):

    # Create an input layer 
    #inputs = keras.layers.Input(shape=(None, None, 3))

    # Set variables
    blocks = [3, 4, 6, 3]
    numerical_names = [False, False, False, False]
    block = bottleneck_2d

    if keras.backend.image_data_format() == "channels_last":
        axis = 3
    else:
        axis = 1

    if numerical_names is None:
        numerical_names = [True] * len(blocks)

    # Create layers
    x = keras.layers.ZeroPadding2D(padding=3, name="padding_conv1")(inputs)
    x = keras.layers.Conv2D(64, (7, 7), strides=(2, 2), use_bias=False, name="conv1")(x)
    x = BatchNormalization(axis=axis, epsilon=1e-5, freeze=freeze_bn, name="bn_conv1")(x)
    x = keras.layers.Activation("relu", name="conv1_relu")(x)
    x = keras.layers.MaxPooling2D((3, 3), strides=(2, 2), padding="same", name="pool1")(x)

    features = 64

    outputs = []

    # Loop blocks
    for stage_id, iterations in enumerate(blocks):
        for block_id in range(iterations):
            x = block(features, stage_id, block_id, numerical_name=(block_id > 0 and numerical_names[stage_id]), freeze_bn=freeze_bn)(x)

        features *= 2

        outputs.append(x)

    if include_top:
        assert classes > 0

        x = keras.layers.GlobalAveragePooling2D(name="pool5")(x)
        x = keras.layers.Dense(classes, activation="softmax", name="fc1000")(x)

        return keras.models.Model(inputs=inputs, outputs=x, *args, **kwargs)
    else:
        # Else output each stages features
        return keras.models.Model(inputs=inputs, outputs=outputs, *args, **kwargs)


# Create a functor for computing the focal loss
def focal(alpha=0.25, gamma=2.0):

    # Compute the focal loss given the target tensor and the predicted tensor
    def _focal(y_true, y_pred):
        
        # Variables
        labels         = y_true[:, :, :-1]
        anchor_state   = y_true[:, :, -1]  # -1 for ignore, 0 for background, 1 for object
        classification = y_pred

        # filter out "ignore" anchors
        indices        = tensorflow.where(keras.backend.not_equal(anchor_state, -1))
        labels         = tensorflow.gather_nd(labels, indices)
        classification = tensorflow.gather_nd(classification, indices)

        # compute the focal loss
        alpha_factor = keras.backend.ones_like(labels) * alpha
        alpha_factor = tensorflow.where(keras.backend.equal(labels, 1), alpha_factor, 1 - alpha_factor)
        focal_weight = tensorflow.where(keras.backend.equal(labels, 1), 1 - classification, classification)
        focal_weight = alpha_factor * focal_weight ** gamma

        cls_loss = focal_weight * keras.backend.binary_crossentropy(labels, classification)

        # compute the normalizer: the number of positive anchors
        normalizer = tensorflow.where(keras.backend.equal(anchor_state, 1))
        normalizer = keras.backend.cast(keras.backend.shape(normalizer)[0], keras.backend.floatx())
        normalizer = keras.backend.maximum(keras.backend.cast_to_floatx(1.0), normalizer)

        return keras.backend.sum(cls_loss) / normalizer

    return _focal

# Create a smooth L1 loss functor
def smooth_l1(sigma=3.0):

    # This argument defines the point where the loss changes from L2 to L1
    sigma_squared = sigma ** 2

    # Compute the smooth L1 loss of y_pred w.r.t. y_true.
    def _smooth_l1(y_true, y_pred):

        # separate target and state
        regression        = y_pred
        regression_target = y_true[:, :, :-1]
        anchor_state      = y_true[:, :, -1]

        # filter out "ignore" anchors
        indices           = tensorflow.where(keras.backend.equal(anchor_state, 1))
        regression        = tensorflow.gather_nd(regression, indices)
        regression_target = tensorflow.gather_nd(regression_target, indices)

        # compute smooth L1 loss
        # f(x) = 0.5 * (sigma * x)^2          if |x| < 1 / sigma / sigma
        #        |x| - 0.5 / sigma / sigma    otherwise
        regression_diff = regression - regression_target
        regression_diff = keras.backend.abs(regression_diff)
        regression_loss = tensorflow.where(
            keras.backend.less(regression_diff, 1.0 / sigma_squared),
            0.5 * sigma_squared * keras.backend.pow(regression_diff, 2),
            regression_diff - 0.5 / sigma_squared
        )

        # compute the normalizer: the number of positive anchors
        normalizer = keras.backend.maximum(1, keras.backend.shape(indices)[0])
        normalizer = keras.backend.cast(normalizer, dtype=keras.backend.floatx())
        return keras.backend.sum(regression_loss) / normalizer

    return _smooth_l1

# A two-dimensional bottleneck block
def bottleneck_2d(filters, stage=0, block=0, kernel_size=3, numerical_name=False, stride=None, freeze_bn=False):
   
    if stride is None:
        if block != 0 or stage == 0:
            stride = 1
        else:
            stride = 2

    # Set parameters
    parameters = { "kernel_initializer": "he_normal" }

    if keras.backend.image_data_format() == "channels_last":
        axis = 3
    else:
        axis = 1

    if block > 0 and numerical_name:
        block_char = "b{}".format(block)
    else:
        block_char = chr(ord('a') + block)

    stage_char = str(stage + 2)

    def f(x):
        y = keras.layers.Conv2D(filters, (1, 1), strides=stride, use_bias=False, name="res{}{}_branch2a".format(stage_char, block_char), **parameters)(x)
        y = BatchNormalization(axis=axis, epsilon=1e-5, freeze=freeze_bn, name="bn{}{}_branch2a".format(stage_char, block_char))(y)
        y = keras.layers.Activation("relu", name="res{}{}_branch2a_relu".format(stage_char, block_char))(y)

        y = keras.layers.ZeroPadding2D(padding=1, name="padding{}{}_branch2b".format(stage_char, block_char))(y)
        y = keras.layers.Conv2D(filters, kernel_size, use_bias=False, name="res{}{}_branch2b".format(stage_char, block_char), **parameters)(y)
        y = BatchNormalization(axis=axis, epsilon=1e-5, freeze=freeze_bn, name="bn{}{}_branch2b".format(stage_char, block_char))(y)
        y = keras.layers.Activation("relu", name="res{}{}_branch2b_relu".format(stage_char, block_char))(y)

        y = keras.layers.Conv2D(filters * 4, (1, 1), use_bias=False, name="res{}{}_branch2c".format(stage_char, block_char), **parameters)(y)
        y = BatchNormalization(axis=axis, epsilon=1e-5, freeze=freeze_bn, name="bn{}{}_branch2c".format(stage_char, block_char))(y)

        if block == 0:
            shortcut = keras.layers.Conv2D(filters * 4, (1, 1), strides=stride, use_bias=False, name="res{}{}_branch1".format(stage_char, block_char), **parameters)(x)
            shortcut = BatchNormalization(axis=axis, epsilon=1e-5, freeze=freeze_bn, name="bn{}{}_branch1".format(stage_char, block_char))(shortcut)
        else:
            shortcut = x

        y = keras.layers.Add(name="res{}{}".format(stage_char, block_char))([y, shortcut])
        y = keras.layers.Activation("relu", name="res{}{}_relu".format(stage_char, block_char))(y)

        return y

    return f

# Produce shifted anchors based on shape of the map and stride size
def shift(shape, stride, anchors):
    shift_x = (keras.backend.arange(0, shape[1], dtype=keras.backend.floatx()) + keras.backend.constant(0.5, dtype=keras.backend.floatx())) * stride
    shift_y = (keras.backend.arange(0, shape[0], dtype=keras.backend.floatx()) + keras.backend.constant(0.5, dtype=keras.backend.floatx())) * stride

    shift_x, shift_y = tensorflow.meshgrid(shift_x, shift_y)
    shift_x = keras.backend.reshape(shift_x, [-1])
    shift_y = keras.backend.reshape(shift_y, [-1])

    shifts = keras.backend.stack([
        shift_x,
        shift_y,
        shift_x,
        shift_y
    ], axis=0)

    shifts            = keras.backend.transpose(shifts)
    number_of_anchors = keras.backend.shape(anchors)[0]

    k = keras.backend.shape(shifts)[0]  # number of base points = feat_h * feat_w

    shifted_anchors = keras.backend.reshape(anchors, [1, number_of_anchors, 4]) + keras.backend.cast(keras.backend.reshape(shifts, [k, 1, 4]), keras.backend.floatx())
    shifted_anchors = keras.backend.reshape(shifted_anchors, [k * number_of_anchors, 4])

    return shifted_anchors

# Builds anchors for the shape of the features from FPN
def build_anchors(anchor_parameters, features):
    anchors = [
        Anchors(
            size=anchor_parameters.sizes[i],
            stride=anchor_parameters.strides[i],
            ratios=anchor_parameters.ratios,
            scales=anchor_parameters.scales,
            name='anchors_{}'.format(i)
        )(f) for i, f in enumerate(features)
    ]

    return keras.layers.Concatenate(axis=1, name='anchors')(anchors)

Model Builder

This module is used to create training models and inference models. A ResNet50 model is used as a backbone model for image classification, weights is preloaded from a pretrained model to speed up training time.

# Import libraries
import numpy as np
import keras
import math
import tensorflow
import annytab.retinanet.anchors as utils_anchors
import annytab.retinanet.layers as layers
import annytab.retinanet.common as common

# Get a classification model
def get_classification_model(num_classes, num_anchors, pyramid_feature_size=256, prior_probability=0.01, classification_feature_size=256):

    # Create options
    options = {
        'kernel_size' : 3,
        'strides'     : 1,
        'padding'     : 'same',
    }

    # Create an input layer (Tensorflow)
    inputs  = keras.layers.Input(shape=(None, None, pyramid_feature_size))

    # Set outputs to inputs
    outputs = inputs

    # Add convolution layers
    for i in range(4):
        outputs = keras.layers.Conv2D(
            filters=classification_feature_size,
            activation='relu',
            name='pyramid_classification_{0}'.format(i),
            kernel_initializer=keras.initializers.normal(mean=0.0, stddev=0.01, seed=None),
            bias_initializer='zeros', 
            **options)(outputs)

    # Create a convolution layer
    outputs = keras.layers.Conv2D(
        filters=num_classes * num_anchors,
        kernel_initializer=keras.initializers.normal(mean=0.0, stddev=0.01, seed=None),
        bias_initializer=layers.PriorProbability(probability=prior_probability),
        name='pyramid_classification',
        **options
    )(outputs)

    # Reshape output and apply sigmoid
    if keras.backend.image_data_format() == 'channels_first':
        outputs = keras.layers.Permute((2, 3, 1), name='pyramid_classification_permute')(outputs)
    outputs = keras.layers.Reshape((-1, num_classes), name='pyramid_classification_reshape')(outputs)
    outputs = keras.layers.Activation('sigmoid', name='pyramid_classification_sigmoid')(outputs)

    # Create the classification model
    model = keras.models.Model(inputs=inputs, outputs=outputs, name='classification_submodel')

    # Return the classification model
    return model

# Get a regression model
def get_regression_model(num_values, num_anchors, pyramid_feature_size=256, regression_feature_size=256):
    
    # Create options
    options = {
        'kernel_size'        : 3,
        'strides'            : 1,
        'padding'            : 'same',
        'kernel_initializer' : keras.initializers.normal(mean=0.0, stddev=0.01, seed=None),
        'bias_initializer'   : 'zeros'
    }
    
    # Create an input layer (Tensorflow)
    inputs  = keras.layers.Input(shape=(None, None, pyramid_feature_size))

    # Set inputs to outputs
    outputs = inputs

    # Create convolution layers
    for i in range(4):
        outputs = keras.layers.Conv2D(
            filters=regression_feature_size,
            activation='relu',
            name='pyramid_regression_{}'.format(i),
            **options
        )(outputs)

    outputs = keras.layers.Conv2D(num_anchors * num_values, name='pyramid_regression', **options)(outputs)
    if keras.backend.image_data_format() == 'channels_first':
        outputs = keras.layers.Permute((2, 3, 1), name='pyramid_regression_permute')(outputs)
    outputs = keras.layers.Reshape((-1, num_values), name='pyramid_regression_reshape')(outputs)
    
    # Return a regression model
    return keras.models.Model(inputs=inputs, outputs=outputs, name='regression_submodel')

# Create pyramid featurs
def create_pyramid_features(C3, C4, C5, feature_size=256):

    # upsample C5 to get P5 from the FPN paper
    P5           = keras.layers.Conv2D(feature_size, kernel_size=1, strides=1, padding='same', name='C5_reduced')(C5)
    P5_upsampled = layers.UpsampleLike(name='P5_upsampled')([P5, C4])
    P5           = keras.layers.Conv2D(feature_size, kernel_size=3, strides=1, padding='same', name='P5')(P5)

    # add P5 elementwise to C4
    P4           = keras.layers.Conv2D(feature_size, kernel_size=1, strides=1, padding='same', name='C4_reduced')(C4)
    P4           = keras.layers.Add(name='P4_merged')([P5_upsampled, P4])
    P4_upsampled = layers.UpsampleLike(name='P4_upsampled')([P4, C3])
    P4           = keras.layers.Conv2D(feature_size, kernel_size=3, strides=1, padding='same', name='P4')(P4)

    # add P4 elementwise to C3
    P3 = keras.layers.Conv2D(feature_size, kernel_size=1, strides=1, padding='same', name='C3_reduced')(C3)
    P3 = keras.layers.Add(name='P3_merged')([P4_upsampled, P3])
    P3 = keras.layers.Conv2D(feature_size, kernel_size=3, strides=1, padding='same', name='P3')(P3)

    # "P6 is obtained via a 3x3 stride-2 conv on C5"
    P6 = keras.layers.Conv2D(feature_size, kernel_size=3, strides=2, padding='same', name='P6')(C5)

    # "P7 is computed by applying ReLU followed by a 3x3 stride-2 conv on P6"
    P7 = keras.layers.Activation('relu', name='C6_relu')(P6)
    P7 = keras.layers.Conv2D(feature_size, kernel_size=3, strides=2, padding='same', name='P7')(P7)

    return [P3, P4, P5, P6, P7]

# Build model pyramid
def build_model_pyramid(name, model, features):
    return keras.layers.Concatenate(axis=1, name=name)([model(f) for f in features])

# Build a pyramid
def build_pyramid(models, features):
    return [build_model_pyramid(n, m, features) for n, m in models]

# Get training model
def get_training_model(num_classes, freeze_backbone=False):

    # Get the number of anchors
    num_anchors = common.anchor_parameters.num_anchors()

    # Create an inputs layer to read image data [height, width, colors] (Tensorflow)
    inputs = keras.layers.Input(shape=(None, None, 3))

    # Get submodels
    submodels = [('regression', get_regression_model(4, num_anchors)), ('classification', get_classification_model(num_classes, num_anchors))]

    # Get backbone layers
    resnet = layers.resnet50(inputs, include_top=False, freeze_bn=freeze_backbone)
    C3, C4, C5 = resnet.outputs[1:]

    # Compute pyramid features
    features = create_pyramid_features(C3, C4, C5)

    # For all pyramid levels, run available submodels
    pyramids = build_pyramid(submodels, features)

    # Return the model
    return keras.models.Model(inputs=inputs, outputs=pyramids, name='retinanet')

# Get inference model
def get_inference_model(model=None, nms=True, class_specific_filter=True, **kwargs):
    
    # Get anchor parameters
    anchor_params = common.anchor_parameters

    # Compute the anchors
    features = [model.get_layer(p_name).output for p_name in ['P3', 'P4', 'P5', 'P6', 'P7']]
    anchors  = layers.build_anchors(anchor_params, features)

    # We expect the anchors, regression and classification values as first output
    regression     = model.outputs[0]
    classification = model.outputs[1]

    # Any additional output from custom submodels, by default this will be []
    other = model.outputs[2:]

    # Apply predicted regression to anchors
    boxes = layers.RegressBoxes(name='boxes')([anchors, regression])
    boxes = layers.ClipBoxes(name='clipped_boxes')([model.inputs[0], boxes])

    # Filter detections (apply NMS / score threshold / select top-k)
    detections = layers.FilterDetections(nms= nms,class_specific_filter=class_specific_filter,name='filtered_detections')([boxes, classification] + other)

    # Return the model
    return keras.models.Model(inputs=model.inputs, outputs=detections, name='retinanet-bbox')

Debugging

The purpose of debugging is to make sure that annotations has been specified correctly, it is important to check this before training is initiated. A generated image from debugging is shown below the code.

# Import libraries
import argparse
import os
import sys
import cv2
import random
import annytab.retinanet.csv_generator
import annytab.retinanet.anchors
import annytab.retinanet.common as common

# The main entry point for this module
def main():

    # Create a transform generator
    transform_generator = common.random_transform_generator(
        min_rotation=-0.1,
        max_rotation=0.1,
        min_translation=(-0.1, -0.1),
        max_translation=(0.1, 0.1),
        min_shear=-0.1,
        max_shear=0.1,
        min_scaling=(0.9, 0.9),
        max_scaling=(1.1, 1.1),
        flip_x_chance=0.5,
        flip_y_chance=0.5)

    # Create a visual effect generator
    visual_effect_generator = common.random_visual_effect_generator(
        contrast_range=(0.9, 1.1),
        brightness_range=(-.1, .1),
        hue_range=(-0.05, 0.05),
        saturation_range=(0.95, 1.05))

    # Create a csv generator
    generator = annytab.retinanet.csv_generator.CSVGenerator(
        'C:\\DATA\\Python-data\\open-images-v5\\imgs\\train_annotations.csv',
        'C:\\DATA\\Python-data\\open-images-v5\\imgs\\classes.csv',
        transform_generator=transform_generator,
        visual_effect_generator=visual_effect_generator,
        no_resize = True)

    # Get anchor paramters
    anchor_params = common.anchor_parameters

    # Loop images
    i = random.randint(0, generator.size())
    count = 0
    while True:

        # Get the image
        image = generator.load_image(i)

        # Get annotations
        annotations = generator.load_annotations(i)

        # Make sure that there is labels
        if len(annotations['labels']) > 0:

            # Apply random transformations
            #image, annotations = generator.random_transform_group_entry(image, annotations)
            #image, annotations = generator.random_visual_effect_group_entry(image, annotations)

            # Get anchors
            anchors = annytab.retinanet.anchors.anchors_for_shape(image.shape, anchor_params=anchor_params)
            positive_indices, _, max_indices = annytab.retinanet.anchors.compute_gt_annotations(anchors, annotations['bboxes'])

            # Draw anchors on the image
            common.draw_boxes(image, anchors[positive_indices], (255, 255, 0), thickness=1)

            # Draw annotations
            common.draw_annotations(image, annotations, color=(0, 0, 255), label_to_name=generator.label_to_name)

            # Draw regressed anchors in green to override most red annotations
            # result is that annotations without anchors are red, with anchors are green
            common.draw_boxes(image, annotations['bboxes'][max_indices[positive_indices], :], (0, 255, 0))
            
        # Write to a file
        cv2.imwrite('C:\\DATA\\Python-data\\open-images-v5\\retinanet\\debug\\' + str(count) + '.jpg', image)

        # Continue if there is more images, limit debugging to 20 images
        i = 0 if i == generator.size() else i + 1
        count += 1
        if count >= 20:
            break
        else:
            continue

# Tell python to run main method
if __name__ == "__main__": main()
RetinaNet Debugging

Training

Training models is created if no training has been done before, weights can be loaded from a pretrained model. Training models is loaded with saved weights if training has been conducted before, this enables the model to continue training (transfer learning). The result from a run is shown below the code.

# Import libraries
import os
import keras
import numpy as np
import annytab.retinanet.csv_generator
import annytab.retinanet.model_builder as mb
import annytab.retinanet.layers as layers
import annytab.retinanet.common as common

# Train
def train():

    # Set the gpu to use
    common.setup_gpu('cpu')

    # Custom objects
    custom_layers = {
        'UpsampleLike'          : layers.UpsampleLike,
        'PriorProbability'      : layers.PriorProbability,
        'RegressBoxes'          : layers.RegressBoxes,
        'FilterDetections'      : layers.FilterDetections,
        'Anchors'               : layers.Anchors,
        'ClipBoxes'             : layers.ClipBoxes,
        '_smooth_l1'            : layers.smooth_l1(),
        '_focal'                : layers.focal(),
        'BatchNormalization'    : layers.BatchNormalization}

    # Create a transform generator
    transform_generator = common.random_transform_generator(
        min_rotation=-0.1,
        max_rotation=0.1,
        min_translation=(-0.1, -0.1),
        max_translation=(0.1, 0.1),
        min_shear=-0.1,
        max_shear=0.1,
        min_scaling=(0.9, 0.9),
        max_scaling=(1.1, 1.1),
        flip_x_chance=0.5,
        flip_y_chance=0.5)

    # Create a visual effect generator
    visual_effect_generator = common.random_visual_effect_generator(
        contrast_range=(0.9, 1.1),
        brightness_range=(-.1, .1),
        hue_range=(-0.05, 0.05),
        saturation_range=(0.95, 1.05)
    )

    # Set options
    common_options = {
        'batch_size' : 1,
        'shuffle_groups' : True,
        'image_min_side' : 300,
        'image_max_side' : 800,
        'no_resize' : False,
        'preprocess_image' : common.preprocess_image}

    # Create a train generator
    train_generator = annytab.retinanet.csv_generator.CSVGenerator(
        'C:\\DATA\\Python-data\\open-images-v5\\imgs\\train_annotations.csv',
        'C:\\DATA\\Python-data\\open-images-v5\\imgs\\classes.csv',
        transform_generator=None,
        visual_effect_generator=None,
        **common_options)

    # Get a training model
    if(os.path.isfile('C:\\DATA\\Python-data\\open-images-v5\\retinanet\\training_model.h5') == True):
        training_model = keras.models.load_model('C:\\DATA\\Python-data\\open-images-v5\\retinanet\\training_model.h5', custom_objects=custom_layers)
    else:
        training_model = mb.get_training_model(train_generator.num_classes(), True)
        training_model.compile(loss={'regression' : layers.smooth_l1(), 'classification' : layers.focal() },
                optimizer=keras.optimizers.adam(lr=1e-5, clipnorm=0.001))

    # Plot training model
    #keras.utils.plot_model(training_model, show_shapes=True, to_file='C:\\DATA\\Python-data\\open-images-v5\\retinanet\\plots\\training_model.png')

    # Start training
    training_model.fit_generator(
            generator=train_generator,
            steps_per_epoch=480, # 480 images (epoch_length)
            epochs=4, # 40 so far
            verbose=1,
            max_queue_size=2)

    # Save the training model
    training_model.save('C:\\DATA\\Python-data\\open-images-v5\\retinanet\\training_model.h5')

    # Get inference model
    inference_model = mb.get_inference_model(model=training_model, nms=True, class_specific_filter=True)

    # Save the inference model
    inference_model.save('C:\\DATA\\Python-data\\open-images-v5\\retinanet\\inference_model.h5')

# The main entry point for this module
def main():

    # Start training
    train()

# Tell python to run main method
if __name__ == "__main__": main()
351/480 [====================>.........] - ETA: 3:04 - loss: 0.8706 - regression_loss: 0.6824 - classification_loss: 0.1882
352/480 [=====================>........] - ETA: 3:03 - loss: 0.8690 - regression_loss: 0.6812 - classification_loss: 0.1878
353/480 [=====================>........] - ETA: 3:01 - loss: 0.8681 - regression_loss: 0.6806 - classification_loss: 0.1875
354/480 [=====================>........] - ETA: 3:00 - loss: 0.8685 - regression_loss: 0.6811 - classification_loss: 0.1874
355/480 [=====================>........] - ETA: 2:59 - loss: 0.8677 - regression_loss: 0.6806 - classification_loss: 0.1871
356/480 [=====================>........] - ETA: 2:57 - loss: 0.8685 - regression_loss: 0.6812 - classification_loss: 0.1873
357/480 [=====================>........] - ETA: 2:56 - loss: 0.8677 - regression_loss: 0.6804 - classification_loss: 0.1873
358/480 [=====================>........] - ETA: 2:54 - loss: 0.8673 - regression_loss: 0.6798 - classification_loss: 0.1875
359/480 [=====================>........] - ETA: 2:53 - loss: 0.8668 - regression_loss: 0.6795 - classification_loss: 0.1873
360/480 [=====================>........] - ETA: 2:52 - loss: 0.8674 - regression_loss: 0.6798 - classification_loss: 0.1876
361/480 [=====================>........] - ETA: 2:50 - loss: 0.8678 - regression_loss: 0.6802 - classification_loss: 0.1876
362/480 [=====================>........] - ETA: 2:49 - loss: 0.8694 - regression_loss: 0.6813 - classification_loss: 0.1881
363/480 [=====================>........] - ETA: 2:47 - loss: 0.8697 - regression_loss: 0.6815 - classification_loss: 0.1882
364/480 [=====================>........] - ETA: 2:46 - loss: 0.8689 - regression_loss: 0.6809 - classification_loss: 0.1880
365/480 [=====================>........] - ETA: 2:45 - loss: 0.8682 - regression_loss: 0.6805 - classification_loss: 0.1878
366/480 [=====================>........] - ETA: 2:43 - loss: 0.8688 - regression_loss: 0.6808 - classification_loss: 0.1880
367/480 [=====================>........] - ETA: 2:42 - loss: 0.8678 - regression_loss: 0.6801 - classification_loss: 0.1877
368/480 [======================>.......] - ETA: 2:40 - loss: 0.8675 - regression_loss: 0.6798 - classification_loss: 0.1876
369/480 [======================>.......] - ETA: 2:39 - loss: 0.8687 - regression_loss: 0.6807 - classification_loss: 0.1879
370/480 [======================>.......] - ETA: 2:38 - loss: 0.8682 - regression_loss: 0.6805 - classification_loss: 0.1877
371/480 [======================>.......] - ETA: 2:36 - loss: 0.8664 - regression_loss: 0.6791 - classification_loss: 0.1873
372/480 [======================>.......] - ETA: 2:35 - loss: 0.8660 - regression_loss: 0.6789 - classification_loss: 0.1871
373/480 [======================>.......] - ETA: 2:33 - loss: 0.8643 - regression_loss: 0.6771 - classification_loss: 0.1872
374/480 [======================>.......] - ETA: 2:32 - loss: 0.8639 - regression_loss: 0.6769 - classification_loss: 0.1870
375/480 [======================>.......] - ETA: 2:30 - loss: 0.8636 - regression_loss: 0.6764 - classification_loss: 0.1872
376/480 [======================>.......] - ETA: 2:29 - loss: 0.8646 - regression_loss: 0.6769 - classification_loss: 0.1877
377/480 [======================>.......] - ETA: 2:28 - loss: 0.8631 - regression_loss: 0.6757 - classification_loss: 0.1874
378/480 [======================>.......] - ETA: 2:26 - loss: 0.8646 - regression_loss: 0.6767 - classification_loss: 0.1879
379/480 [======================>.......] - ETA: 2:25 - loss: 0.8661 - regression_loss: 0.6780 - classification_loss: 0.1881
380/480 [======================>.......] - ETA: 2:24 - loss: 0.8679 - regression_loss: 0.6793 - classification_loss: 0.1887
381/480 [======================>.......] - ETA: 2:22 - loss: 0.8674 - regression_loss: 0.6791 - classification_loss: 0.1884
382/480 [======================>.......] - ETA: 2:21 - loss: 0.8684 - regression_loss: 0.6795 - classification_loss: 0.1890
383/480 [======================>.......] - ETA: 2:19 - loss: 0.8688 - regression_loss: 0.6798 - classification_loss: 0.1891
384/480 [=======================>......] - ETA: 2:18 - loss: 0.8692 - regression_loss: 0.6802 - classification_loss: 0.1890
385/480 [=======================>......] - ETA: 2:17 - loss: 0.8704 - regression_loss: 0.6812 - classification_loss: 0.1892
386/480 [=======================>......] - ETA: 2:15 - loss: 0.8702 - regression_loss: 0.6809 - classification_loss: 0.1893
387/480 [=======================>......] - ETA: 2:14 - loss: 0.8699 - regression_loss: 0.6808 - classification_loss: 0.1892
388/480 [=======================>......] - ETA: 2:12 - loss: 0.8716 - regression_loss: 0.6823 - classification_loss: 0.1893
389/480 [=======================>......] - ETA: 2:11 - loss: 0.8725 - regression_loss: 0.6829 - classification_loss: 0.1897
390/480 [=======================>......] - ETA: 2:09 - loss: 0.8718 - regression_loss: 0.6822 - classification_loss: 0.1896
391/480 [=======================>......] - ETA: 2:08 - loss: 0.8747 - regression_loss: 0.6842 - classification_loss: 0.1905
392/480 [=======================>......] - ETA: 2:07 - loss: 0.8742 - regression_loss: 0.6839 - classification_loss: 0.1902
393/480 [=======================>......] - ETA: 2:05 - loss: 0.8751 - regression_loss: 0.6842 - classification_loss: 0.1910
394/480 [=======================>......] - ETA: 2:04 - loss: 0.8761 - regression_loss: 0.6851 - classification_loss: 0.1911
395/480 [=======================>......] - ETA: 2:02 - loss: 0.8757 - regression_loss: 0.6849 - classification_loss: 0.1908
396/480 [=======================>......] - ETA: 2:01 - loss: 0.8765 - regression_loss: 0.6855 - classification_loss: 0.1910
397/480 [=======================>......] - ETA: 2:00 - loss: 0.8766 - regression_loss: 0.6857 - classification_loss: 0.1909
398/480 [=======================>......] - ETA: 1:58 - loss: 0.8765 - regression_loss: 0.6856 - classification_loss: 0.1909
399/480 [=======================>......] - ETA: 1:57 - loss: 0.8761 - regression_loss: 0.6854 - classification_loss: 0.1907
400/480 [========================>.....] - ETA: 1:55 - loss: 0.8756 - regression_loss: 0.6851 - classification_loss: 0.1905
401/480 [========================>.....] - ETA: 1:54 - loss: 0.8752 - regression_loss: 0.6848 - classification_loss: 0.1904
402/480 [========================>.....] - ETA: 1:52 - loss: 0.8743 - regression_loss: 0.6842 - classification_loss: 0.1901
403/480 [========================>.....] - ETA: 1:51 - loss: 0.8745 - regression_loss: 0.6845 - classification_loss: 0.1900
404/480 [========================>.....] - ETA: 1:50 - loss: 0.8741 - regression_loss: 0.6844 - classification_loss: 0.1897
405/480 [========================>.....] - ETA: 1:48 - loss: 0.8736 - regression_loss: 0.6841 - classification_loss: 0.1894
406/480 [========================>.....] - ETA: 1:47 - loss: 0.8738 - regression_loss: 0.6844 - classification_loss: 0.1894
407/480 [========================>.....] - ETA: 1:45 - loss: 0.8747 - regression_loss: 0.6851 - classification_loss: 0.1896
408/480 [========================>.....] - ETA: 1:44 - loss: 0.8743 - regression_loss: 0.6849 - classification_loss: 0.1893
409/480 [========================>.....] - ETA: 1:42 - loss: 0.8751 - regression_loss: 0.6857 - classification_loss: 0.1894
410/480 [========================>.....] - ETA: 1:41 - loss: 0.8744 - regression_loss: 0.6850 - classification_loss: 0.1893
411/480 [========================>.....] - ETA: 1:40 - loss: 0.8738 - regression_loss: 0.6847 - classification_loss: 0.1891
412/480 [========================>.....] - ETA: 1:38 - loss: 0.8736 - regression_loss: 0.6846 - classification_loss: 0.1890
413/480 [========================>.....] - ETA: 1:37 - loss: 0.8742 - regression_loss: 0.6852 - classification_loss: 0.1890
414/480 [========================>.....] - ETA: 1:35 - loss: 0.8737 - regression_loss: 0.6850 - classification_loss: 0.1888
415/480 [========================>.....] - ETA: 1:34 - loss: 0.8740 - regression_loss: 0.6852 - classification_loss: 0.1888
416/480 [=========================>....] - ETA: 1:32 - loss: 0.8755 - regression_loss: 0.6864 - classification_loss: 0.1890
417/480 [=========================>....] - ETA: 1:31 - loss: 0.8756 - regression_loss: 0.6866 - classification_loss: 0.1891
418/480 [=========================>....] - ETA: 1:29 - loss: 0.8760 - regression_loss: 0.6868 - classification_loss: 0.1892
419/480 [=========================>....] - ETA: 1:28 - loss: 0.8750 - regression_loss: 0.6861 - classification_loss: 0.1889
420/480 [=========================>....] - ETA: 1:27 - loss: 0.8762 - regression_loss: 0.6868 - classification_loss: 0.1893
421/480 [=========================>....] - ETA: 1:25 - loss: 0.8768 - regression_loss: 0.6874 - classification_loss: 0.1894
422/480 [=========================>....] - ETA: 1:24 - loss: 0.8767 - regression_loss: 0.6870 - classification_loss: 0.1896
423/480 [=========================>....] - ETA: 1:22 - loss: 0.8768 - regression_loss: 0.6871 - classification_loss: 0.1896
424/480 [=========================>....] - ETA: 1:21 - loss: 0.8763 - regression_loss: 0.6867 - classification_loss: 0.1896
425/480 [=========================>....] - ETA: 1:19 - loss: 0.8765 - regression_loss: 0.6869 - classification_loss: 0.1896
426/480 [=========================>....] - ETA: 1:18 - loss: 0.8776 - regression_loss: 0.6879 - classification_loss: 0.1897
427/480 [=========================>....] - ETA: 1:17 - loss: 0.8767 - regression_loss: 0.6873 - classification_loss: 0.1894
428/480 [=========================>....] - ETA: 1:15 - loss: 0.8760 - regression_loss: 0.6867 - classification_loss: 0.1893
429/480 [=========================>....] - ETA: 1:14 - loss: 0.8762 - regression_loss: 0.6868 - classification_loss: 0.1894
430/480 [=========================>....] - ETA: 1:12 - loss: 0.8774 - regression_loss: 0.6878 - classification_loss: 0.1896
431/480 [=========================>....] - ETA: 1:11 - loss: 0.8774 - regression_loss: 0.6879 - classification_loss: 0.1895
432/480 [==========================>...] - ETA: 1:09 - loss: 0.8772 - regression_loss: 0.6877 - classification_loss: 0.1895
433/480 [==========================>...] - ETA: 1:08 - loss: 0.8767 - regression_loss: 0.6875 - classification_loss: 0.1892
434/480 [==========================>...] - ETA: 1:06 - loss: 0.8774 - regression_loss: 0.6879 - classification_loss: 0.1895
435/480 [==========================>...] - ETA: 1:05 - loss: 0.8778 - regression_loss: 0.6884 - classification_loss: 0.1894
436/480 [==========================>...] - ETA: 1:03 - loss: 0.8778 - regression_loss: 0.6886 - classification_loss: 0.1892
437/480 [==========================>...] - ETA: 1:02 - loss: 0.8784 - regression_loss: 0.6893 - classification_loss: 0.1891
438/480 [==========================>...] - ETA: 1:01 - loss: 0.8775 - regression_loss: 0.6887 - classification_loss: 0.1888
439/480 [==========================>...] - ETA: 59s - loss: 0.8773 - regression_loss: 0.6886 - classification_loss: 0.1886
440/480 [==========================>...] - ETA: 58s - loss: 0.8772 - regression_loss: 0.6886 - classification_loss: 0.1886
441/480 [==========================>...] - ETA: 56s - loss: 0.8778 - regression_loss: 0.6892 - classification_loss: 0.1886
442/480 [==========================>...] - ETA: 55s - loss: 0.8779 - regression_loss: 0.6894 - classification_loss: 0.1885
443/480 [==========================>...] - ETA: 53s - loss: 0.8787 - regression_loss: 0.6905 - classification_loss: 0.1882
444/480 [==========================>...] - ETA: 52s - loss: 0.8800 - regression_loss: 0.6913 - classification_loss: 0.1887
445/480 [==========================>...] - ETA: 50s - loss: 0.8802 - regression_loss: 0.6914 - classification_loss: 0.1888
446/480 [==========================>...] - ETA: 49s - loss: 0.8812 - regression_loss: 0.6924 - classification_loss: 0.1888
447/480 [==========================>...] - ETA: 47s - loss: 0.8816 - regression_loss: 0.6928 - classification_loss: 0.1888
448/480 [===========================>..] - ETA: 46s - loss: 0.8829 - regression_loss: 0.6939 - classification_loss: 0.1890
449/480 [===========================>..] - ETA: 45s - loss: 0.8843 - regression_loss: 0.6950 - classification_loss: 0.1893
450/480 [===========================>..] - ETA: 43s - loss: 0.8844 - regression_loss: 0.6951 - classification_loss: 0.1893
451/480 [===========================>..] - ETA: 42s - loss: 0.8852 - regression_loss: 0.6959 - classification_loss: 0.1892
452/480 [===========================>..] - ETA: 40s - loss: 0.8853 - regression_loss: 0.6961 - classification_loss: 0.1892
453/480 [===========================>..] - ETA: 39s - loss: 0.8855 - regression_loss: 0.6961 - classification_loss: 0.1894
454/480 [===========================>..] - ETA: 37s - loss: 0.8853 - regression_loss: 0.6958 - classification_loss: 0.1895
455/480 [===========================>..] - ETA: 36s - loss: 0.8854 - regression_loss: 0.6958 - classification_loss: 0.1896
456/480 [===========================>..] - ETA: 34s - loss: 0.8846 - regression_loss: 0.6952 - classification_loss: 0.1895
457/480 [===========================>..] - ETA: 33s - loss: 0.8837 - regression_loss: 0.6945 - classification_loss: 0.1892
458/480 [===========================>..] - ETA: 32s - loss: 0.8841 - regression_loss: 0.6946 - classification_loss: 0.1895
459/480 [===========================>..] - ETA: 30s - loss: 0.8846 - regression_loss: 0.6943 - classification_loss: 0.1903
460/480 [===========================>..] - ETA: 29s - loss: 0.8840 - regression_loss: 0.6938 - classification_loss: 0.1901
461/480 [===========================>..] - ETA: 27s - loss: 0.8837 - regression_loss: 0.6937 - classification_loss: 0.1899
462/480 [===========================>..] - ETA: 26s - loss: 0.8826 - regression_loss: 0.6930 - classification_loss: 0.1896
463/480 [===========================>..] - ETA: 24s - loss: 0.8824 - regression_loss: 0.6928 - classification_loss: 0.1896
464/480 [============================>.] - ETA: 23s - loss: 0.8821 - regression_loss: 0.6925 - classification_loss: 0.1896
465/480 [============================>.] - ETA: 21s - loss: 0.8808 - regression_loss: 0.6915 - classification_loss: 0.1893
466/480 [============================>.] - ETA: 20s - loss: 0.8818 - regression_loss: 0.6924 - classification_loss: 0.1894
467/480 [============================>.] - ETA: 18s - loss: 0.8818 - regression_loss: 0.6926 - classification_loss: 0.1892
468/480 [============================>.] - ETA: 17s - loss: 0.8811 - regression_loss: 0.6920 - classification_loss: 0.1890
469/480 [============================>.] - ETA: 16s - loss: 0.8820 - regression_loss: 0.6928 - classification_loss: 0.1891
470/480 [============================>.] - ETA: 14s - loss: 0.8824 - regression_loss: 0.6931 - classification_loss: 0.1893
471/480 [============================>.] - ETA: 13s - loss: 0.8837 - regression_loss: 0.6943 - classification_loss: 0.1894
472/480 [============================>.] - ETA: 11s - loss: 0.8844 - regression_loss: 0.6949 - classification_loss: 0.1895
473/480 [============================>.] - ETA: 10s - loss: 0.8849 - regression_loss: 0.6952 - classification_loss: 0.1898
474/480 [============================>.] - ETA: 8s - loss: 0.8857 - regression_loss: 0.6956 - classification_loss: 0.1901
475/480 [============================>.] - ETA: 7s - loss: 0.8851 - regression_loss: 0.6951 - classification_loss: 0.1900
476/480 [============================>.] - ETA: 5s - loss: 0.8845 - regression_loss: 0.6947 - classification_loss: 0.1898
477/480 [============================>.] - ETA: 4s - loss: 0.8839 - regression_loss: 0.6942 - classification_loss: 0.1897
478/480 [============================>.] - ETA: 2s - loss: 0.8836 - regression_loss: 0.6939 - classification_loss: 0.1897
479/480 [============================>.] - ETA: 1s - loss: 0.8839 - regression_loss: 0.6942 - classification_loss: 0.1897
480/480 [==============================] - 702s 1s/step - loss: 0.8850 - regression_loss: 0.6950 - classification_loss: 0.1900

Evaluation

This module is used to perform visual evaluation of the model. I have only trained the model in 40 epochs, this type of models needs a lot of training to perform well. I have set the score threshold to 40 % and the maximum number of detections to 10. The result from an evaluation is shown below the code.

# Import libraries
import os
import sys
import cv2
import keras
import numpy as np
import progressbar
assert(callable(progressbar.progressbar)), "Using wrong progressbar module, install 'progressbar2' instead."
import annytab.retinanet.csv_generator
import annytab.retinanet.anchors
import annytab.retinanet.model_builder as mb
import annytab.retinanet.layers as layers
import annytab.retinanet.common as common

# Compute the average precision, given the recall and precision curves
def _compute_ap(recall, precision):

    # correct AP calculation
    # first append sentinel values at the end
    mrec = np.concatenate(([0.], recall, [1.]))
    mpre = np.concatenate(([0.], precision, [0.]))

    # compute the precision envelope
    for i in range(mpre.size - 1, 0, -1):
        mpre[i - 1] = np.maximum(mpre[i - 1], mpre[i])

    # to calculate area under PR curve, look for points
    # where X axis (recall) changes value
    i = np.where(mrec[1:] != mrec[:-1])[0]

    # and sum (\Delta recall) * prec
    ap = np.sum((mrec[i + 1] - mrec[i]) * mpre[i + 1])
    return ap

# Get the detections from the model using the generator
def _get_detections(generator, model, score_threshold=0.05, max_detections=100, save_path=None):
    
    all_detections = [[None for i in range(generator.num_classes()) if generator.has_label(i)] for j in range(generator.size())]

    for i in progressbar.progressbar(range(generator.size()), prefix='Running network: '):
        raw_image    = generator.load_image(i)
        image        = generator.preprocess_image(raw_image.copy())
        image, scale = generator.resize_image(image)

        if keras.backend.image_data_format() == 'channels_first':
            image = image.transpose((2, 0, 1))

        # Run network
        boxes, scores, labels = model.predict_on_batch(np.expand_dims(image, axis=0))[:3]

        # Correct boxes for image scale
        boxes /= scale

        # Select indices which have a score above the threshold
        indices = np.where(scores[0, :] > score_threshold)[0]

        # Select those scores
        scores = scores[0][indices]

        # Find the order with which to sort the scores
        scores_sort = np.argsort(-scores)[:max_detections]

        # Select detections
        image_boxes      = boxes[0, indices[scores_sort], :]
        image_scores     = scores[scores_sort]
        image_labels     = labels[0, indices[scores_sort]]
        image_detections = np.concatenate([image_boxes, np.expand_dims(image_scores, axis=1), np.expand_dims(image_labels, axis=1)], axis=1)

        # Save image to disk
        if save_path is not None:
            #common.draw_annotations(raw_image, generator.load_annotations(i), label_to_name=generator.label_to_name)
            common.draw_detections(raw_image, image_boxes, image_scores, image_labels, label_to_name=generator.label_to_name, score_threshold=score_threshold)
            cv2.imwrite(os.path.join(save_path, '{}.png'.format(i)), raw_image)

        # Copy detections to all_detections
        for label in range(generator.num_classes()):
            if not generator.has_label(label):
                continue

            all_detections[i][label] = image_detections[image_detections[:, -1] == label, :-1]

    return all_detections

# Get the ground truth annotations from the generator
def _get_annotations(generator):

    all_annotations = [[None for i in range(generator.num_classes())] for j in range(generator.size())]

    for i in progressbar.progressbar(range(generator.size()), prefix='Parsing annotations: '):
        # load the annotations
        annotations = generator.load_annotations(i)

        # copy detections to all_annotations
        for label in range(generator.num_classes()):
            if not generator.has_label(label):
                continue

            all_annotations[i][label] = annotations['bboxes'][annotations['labels'] == label, :].copy()

    return all_annotations

# Evaluate a given dataset using a given model
def evaluate(generator, model, iou_threshold=0.5, score_threshold=0.05, max_detections=100, save_path=None):

    # gather all detections and annotations
    all_detections     = _get_detections(generator, model, score_threshold=score_threshold, max_detections=max_detections, save_path=save_path)
    all_annotations    = _get_annotations(generator)
    average_precisions = {}

    # process detections and annotations
    for label in range(generator.num_classes()):
        if not generator.has_label(label):
            continue

        false_positives = np.zeros((0,))
        true_positives  = np.zeros((0,))
        scores          = np.zeros((0,))
        num_annotations = 0.0

        for i in range(generator.size()):
            detections           = all_detections[i][label]
            annotations          = all_annotations[i][label]
            num_annotations     += annotations.shape[0]
            detected_annotations = []

            for d in detections:
                scores = np.append(scores, d[4])

                if annotations.shape[0] == 0:
                    false_positives = np.append(false_positives, 1)
                    true_positives  = np.append(true_positives, 0)
                    continue

                overlaps            = annytab.retinanet.anchors.compute_overlap(np.expand_dims(d, axis=0), annotations)
                assigned_annotation = np.argmax(overlaps, axis=1)
                max_overlap         = overlaps[0, assigned_annotation]

                if max_overlap >= iou_threshold and assigned_annotation not in detected_annotations:
                    false_positives = np.append(false_positives, 0)
                    true_positives  = np.append(true_positives, 1)
                    detected_annotations.append(assigned_annotation)
                else:
                    false_positives = np.append(false_positives, 1)
                    true_positives  = np.append(true_positives, 0)

        # no annotations -> AP for this class is 0 (is this correct?)
        if num_annotations == 0:
            average_precisions[label] = 0, 0
            continue

        # sort by score
        indices         = np.argsort(-scores)
        false_positives = false_positives[indices]
        true_positives  = true_positives[indices]

        # compute false positives and true positives
        false_positives = np.cumsum(false_positives)
        true_positives  = np.cumsum(true_positives)

        # compute recall and precision
        recall    = true_positives / num_annotations
        precision = true_positives / np.maximum(true_positives + false_positives, np.finfo(np.float64).eps)

        # compute average precision
        average_precision  = _compute_ap(recall, precision)
        average_precisions[label] = average_precision, num_annotations

    return average_precisions

# The main entry point for this module
def main():

    # Set the gpu to use
    common.setup_gpu('cpu')

    # Create a test generator
    test_generator = annytab.retinanet.csv_generator.CSVGenerator(
        'C:\\DATA\\Python-data\\open-images-v5\\imgs\\test_annotations.csv',
        'C:\\DATA\\Python-data\\open-images-v5\\imgs\\classes.csv',
        image_min_side=300,
        image_max_side=800,
        shuffle_groups=True)

    # Load a model
    print('Loading model, this may take a second...')

    # Custom objects
    custom_layers = {
        'UpsampleLike'          : layers.UpsampleLike,
        'PriorProbability'      : layers.PriorProbability,
        'RegressBoxes'          : layers.RegressBoxes,
        'FilterDetections'      : layers.FilterDetections,
        'Anchors'               : layers.Anchors,
        'ClipBoxes'             : layers.ClipBoxes,
        '_smooth_l1'            : layers.smooth_l1(),
        '_focal'                : layers.focal(),
        'BatchNormalization'    : layers.BatchNormalization}

    # Get a model
    model = keras.models.load_model('C:\\DATA\\Python-data\\open-images-v5\\retinanet\\inference_model.h5', custom_objects=custom_layers)

    # Start evaluations
    average_precisions = evaluate(test_generator, model, iou_threshold=0.5, score_threshold=0.4, max_detections=10, save_path='C:\\DATA\\Python-data\\open-images-v5\\retinanet\\evaluation\\')

    # Print evaluation
    total_instances = []
    precisions = []
    for label, (average_precision, num_annotations) in average_precisions.items():
        print('{:.0f} instances of class'.format(num_annotations), test_generator.label_to_name(label), 'with average precision: {0:.4f}'.format(average_precision))
        total_instances.append(num_annotations)
        precisions.append(average_precision)

    if sum(total_instances) == 0:
        print('No test instances found.')
    else:
        print('mAP using the weighted average of precisions among classes: {0:.4f}'.format(sum([a * b for a, b in zip(total_instances, precisions)]) / sum(total_instances)))
        print('mAP: {0:.4f}'.format(sum(precisions) / sum(x > 0 for x in total_instances)))

# Tell python to run main method
if __name__ == "__main__": main()
RetinaNet Evaluation

Leave a Reply

Your email address will not be published. Required fields are marked *