import math import numpy as np from io import TextIOWrapper from PIL import Image from zipfile import ZipFile trnX = np.zeros((60000, 28, 28), dtype = "float32") trnY = np.zeros((60000), dtype = "int32") tstX = np.zeros((10000, 28, 28), dtype = "float32") with ZipFile("ml530-2021-sp-mnist.zip", "r") as archive: index = 0 for i in range(trnX.shape[0]): with archive.open("mnist_trn_images/mnist_trn_" + str(i).zfill(5) + ".png") as file: img = Image.open(file) trnX[i] = np.asarray(img) index = index + 1 with TextIOWrapper(archive.open("mnist_trn.csv", "r")) as file: header = file.readline() for i in range(trnY.shape[0]): trnY[i] = np.int32(file.readline().strip("\r\n").split(",")[1]) index = 0 for i in range(tstX.shape[0]): with archive.open("mnist_tst_images/mnist_tst_" + str(i).zfill(5) + ".png") as file: img = Image.open(file) tstX[i] = np.asarray(img) index = index + 1 trnX = trnX.reshape(trnX.shape[0], 28, 28, 1) / 255.0 tstX = tstX.reshape(tstX.shape[0], 28, 28, 1) / 255.0 valX = trnX[54000:] valY = trnY[54000:] trnX = trnX[:54000] trnY = trnY[:54000] from scipy.ndimage.filters import gaussian_filter from scipy.ndimage.interpolation import map_coordinates from tensorflow import keras from tensorflow.keras import activations, callbacks, layers, optimizers def elastic_transform(image): random_state = np.random.RandomState(None) if (random_state.rand() < 0.9): shape = image.shape sigma = 3 alpha = 8 dx = gaussian_filter((random_state.rand(*shape) * 2 - 1), sigma) * alpha dy = gaussian_filter((random_state.rand(*shape) * 2 - 1), sigma) * alpha x, y, z = np.meshgrid(np.arange(shape[0]), np.arange(shape[1]), np.arange(shape[2]), indexing = "ij") indices = np.reshape(x + dx, (-1, 1)), np.reshape(y + dy, (-1, 1)), np.reshape(z, (-1, 1)) return map_coordinates(image, indices, order = 1).reshape(shape) else: return image datagen = keras.preprocessing.image.ImageDataGenerator(preprocessing_function = lambda x: elastic_transform(x)) model = keras.Sequential() model.add(layers.Reshape((784,), input_shape = (28,28,1))) model.add(layers.Dense(units = 512, activation = "relu")) model.add(layers.Dropout(0.2)) model.add(layers.Dense(units = 512, activation = "relu")) model.add(layers.Dropout(0.2)) model.add(layers.Dense(trnY.max() + 1, activation = "softmax")) learning_rate = optimizers.schedules.ExponentialDecay(initial_learning_rate = 0.001, decay_steps = math.ceil(54000.0 / 512), decay_rate = 0.95) model.compile(optimizer = optimizers.RMSprop(learning_rate = learning_rate), loss = "sparse_categorical_crossentropy", metrics = [ "accuracy" ]) model.summary() callbacks = [ callbacks.EarlyStopping(monitor = "val_accuracy", patience = 32, restore_best_weights = True) ] model.fit(datagen.flow(trnX, trnY, batch_size = 512), epochs = 32, validation_data = (valX, valY), callbacks = callbacks) probabilities = model.predict(tstX) classes = probabilities.argmax(axis = -1) predictions = open("predictions-elastic.csv", "w") predictions.write("id,label\n") for i in range(tstX.shape[0]): predictions.write(str(i).zfill(5) + "," + str(classes[i]) + "\n") predictions.close()