import numpy as np from tensorflow.keras import callbacks, layers, models, regularizers from tensorflow.keras.preprocessing import image trnX = np.load("trnX.npy") / 255.0 trnY = np.load("trnY.npy") valX = np.load("valX.npy") / 255.0 valY = np.load("valY.npy") tstX = np.load("tstX.npy") / 255.0 trnX_mean = np.mean(trnX, axis = 0) trnX = trnX - trnX_mean valX = valX - trnX_mean tstX = tstX - trnX_mean datagen = image.ImageDataGenerator( width_shift_range = 0.125, height_shift_range = 0.125, fill_mode = "nearest", horizontal_flip = True, vertical_flip = False ) def resnet_layer(input, filters = 16, kernel_size = 3, strides = 1, activation = "relu", batch_normalization = True, conv_first = True): conv = layers.Conv2D(filters = filters, kernel_size = kernel_size, strides = strides, padding = "same", kernel_initializer = "he_normal", kernel_regularizer = regularizers.l2(0.0001)) x = input if (conv_first): x = conv(x) if (batch_normalization): x = layers.BatchNormalization()(x) if (activation is not None): x = layers.Activation(activation)(x) else: if (batch_normalization): x = layers.BatchNormalization()(x) if (activation is not None): x = layers.Activation(activation)(x) x = conv(x) return x def resnet_v2(input_shape, res_blocks, classes): input = layers.Input(shape = input_shape) filters_in = 16 current = resnet_layer(input = input, filters = filters_in, conv_first = True) for stage in range(3): # 3 stages; each consisting of res_blocks for res_block in range(res_blocks): # configure block activation = "relu" batch_normalization = True strides = 1 if (stage == 0): # first stage filters_out = 4 * filters_in if (res_block == 0): # first stage; first block activation = None batch_normalization = False else: filters_out = 2 * filters_in if (res_block == 0): # not first stage; but first block strides = 2 # downsample # create bottleneck residual unit: 1x1; 3x3; 1x1 residual = resnet_layer(input = current, filters = filters_in, kernel_size = 1, strides = strides, activation = activation, batch_normalization = batch_normalization, conv_first = False) residual = resnet_layer(input = residual, filters = filters_in, conv_first = False) residual = resnet_layer(input = residual, filters = filters_out, kernel_size = 1, conv_first = False) # add residual to current if res_block == 0: current = resnet_layer(input = current, filters = filters_out, kernel_size = 1, strides = strides, activation = None, batch_normalization = False) current = layers.Add()([ current, residual ]) filters_in = filters_out current = layers.BatchNormalization()(current) current = layers.Activation("relu")(current) current = layers.AveragePooling2D(pool_size = 8)(current) current = layers.Flatten()(current) output = layers.Dense(classes, activation = "softmax", kernel_initializer = "he_normal")(current) model = models.Model(inputs = input, outputs = output) return model def schedule(epoch): learning_rate = 0.001 if (epoch >= 80) and (epoch < 120): learning_rate = 0.0001 elif (epoch >= 120) and (epoch < 160): learning_rate = 0.00001 elif (epoch >= 160) and (epoch < 180): learning_rate = 0.000001 elif (epoch >= 180): learning_rate = 0.0000005 print("learning rate: ", learning_rate) return learning_rate model = resnet_v2(input_shape = trnX.shape[1:], res_blocks = 3, classes = trnY.max() + 1) # consider trying res_blocks = 9 model.compile(loss = "sparse_categorical_crossentropy", optimizer = "adam", metrics = [ "accuracy" ]) model.summary(line_length = 135) model.fit(datagen.flow(trnX, trnY, batch_size = 128), validation_data = (valX, valY), epochs = 128, callbacks = [ callbacks.LearningRateScheduler(schedule) ]) probabilities = model.predict(tstX) classes = probabilities.argmax(axis = -1) predictions = open("predictions.csv", "w") predictions.write("id,label\n") for i in range(tstX.shape[0]): predictions.write(str(i).zfill(5) + "," + str(classes[i]) + "\n") predictions.close()