import math import numpy as np import tensorflow as tf from tensorflow.keras import applications, callbacks, layers, metrics, models, optimizers batch_size = 64 model_name = 'EfficientNetV2B0' base_class = applications.EfficientNetV2B0 default_size = { 'EfficientNetV2B0': 224, 'EfficientNetV2B1': 240, 'EfficientNetV2B2': 260, 'EfficientNetV2B3': 300, 'EfficientNetV2S': 384, 'EfficientNetV2M': 480, 'EfficientNetV2L': 480 } class MeanAveragePrecision(metrics.Metric): def __init__(self, name = 'map', k = 5, **kwargs): super().__init__(name = name, **kwargs) self.k = k self.map_sum = self.add_weight(name = 'map_sum', initializer = 'zeros') self.total_samples = self.add_weight(name = 'total_samples', initializer = 'zeros', dtype = 'int32') def update_state(self, y_true, y_pred, sample_weight = None): _, indices = tf.nn.top_k(y_pred, self.k) matches = tf.expand_dims(tf.where(tf.equal(tf.cast(indices, tf.int32), y_true))[:, 1], 1) map_sum = tf.reduce_sum(1 / (matches + 1)) self.map_sum.assign_add(tf.cast(map_sum, tf.float32)) num_samples = tf.shape(y_pred)[0] self.total_samples.assign_add(num_samples) def result(self): return self.map_sum / tf.cast(self.total_samples, tf.float32) def reset_state(self): self.map_sum.assign(0.) self.total_samples.assign(0) trnX = np.load('imagenet_trnX.npy') trnY = np.load('imagenet_trnY.npy') valX = np.load('imagenet_valX.npy') valY = np.load('imagenet_valY.npy') tstX = np.load('imagenet_tstX.npy') # unusual: no preprocessing in preprocess_input(): # https://github.com/keras-team/keras/blob/v2.9.0/keras/applications/efficientnet_v2.py#L1272 input = layers.Input(shape = trnX.shape[1:]) resizing = layers.Resizing(np.int32(1.05 * default_size[model_name]), np.int32(1.05 * default_size[model_name]))(input) crop = layers.RandomCrop(default_size[model_name], default_size[model_name])(resizing) flip = layers.RandomFlip('horizontal')(crop) base = base_class(include_top = False, weights = 'imagenet', input_tensor = flip, pooling = 'avg') base.trainable = False output = layers.Dropout(0.2, name = 'pool_dropout')(base.output) output = layers.Dense(512, activation = 'relu', name = 'dense_features')(output) output = layers.Dropout(0.2, name = 'dense_dropout')(output) output = layers.Dense(trnY.max() + 1, activation = 'softmax', name = 'classifier')(output) model = models.Model(inputs = input, outputs = output, name = 'transfer_model') callbacks = [ callbacks.EarlyStopping(monitor = 'val_map', mode = 'max', patience = 4, restore_best_weights = True) ] steps_per_epoch = math.ceil(1.0 * trnX.shape[0] / batch_size) schedule = optimizers.schedules.CosineDecayRestarts(0.001, first_decay_steps = steps_per_epoch, t_mul = 1, m_mul = 0.95) model.compile(loss = 'sparse_categorical_crossentropy', optimizer = optimizers.Adam(learning_rate = schedule), metrics =[ 'accuracy', MeanAveragePrecision() ]) model.summary(line_length = 135) model.fit(trnX, trnY, epochs = 8, batch_size = batch_size, validation_data = (valX, valY), callbacks = callbacks) base.trainable = True schedule = optimizers.schedules.CosineDecayRestarts(0.0001, first_decay_steps = steps_per_epoch, t_mul = 1, m_mul = 0.95) model.compile(loss = 'sparse_categorical_crossentropy', optimizer = optimizers.Adam(learning_rate = schedule), metrics =[ 'accuracy', MeanAveragePrecision() ]) model.summary(line_length = 135) model.fit(trnX, trnY, epochs = 2, batch_size = batch_size, validation_data = (valX, valY), callbacks = callbacks) probabilities = model.predict(tstX) indices = probabilities.argsort(axis = -1)[:,-5:][:,::-1] predictions = open('predictions.csv', 'w') predictions.write('id,label\n') for i in range(tstX.shape[0]): predictions.write(str(i).zfill(5) + ',' + str(' '.join([ str(classIndex) for classIndex in indices[i,:] ])) + '\n') predictions.close()