feat(machinelearning): add files for project 5.
This commit is contained in:
601
machinelearning/autograder.py
Normal file
601
machinelearning/autograder.py
Normal file
@ -0,0 +1,601 @@
|
||||
# A custom autograder for this project
|
||||
|
||||
################################################################################
|
||||
# A mini-framework for autograding
|
||||
################################################################################
|
||||
|
||||
import optparse
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
|
||||
class WritableNull:
|
||||
def write(self, string):
|
||||
pass
|
||||
|
||||
def flush(self):
|
||||
pass
|
||||
|
||||
class Tracker(object):
|
||||
def __init__(self, questions, maxes, prereqs, mute_output):
|
||||
self.questions = questions
|
||||
self.maxes = maxes
|
||||
self.prereqs = prereqs
|
||||
|
||||
self.points = {q: 0 for q in self.questions}
|
||||
|
||||
self.current_question = None
|
||||
|
||||
self.current_test = None
|
||||
self.points_at_test_start = None
|
||||
self.possible_points_remaining = None
|
||||
|
||||
self.mute_output = mute_output
|
||||
self.original_stdout = None
|
||||
self.muted = False
|
||||
|
||||
def mute(self):
|
||||
if self.muted:
|
||||
return
|
||||
|
||||
self.muted = True
|
||||
self.original_stdout = sys.stdout
|
||||
sys.stdout = WritableNull()
|
||||
|
||||
def unmute(self):
|
||||
if not self.muted:
|
||||
return
|
||||
|
||||
self.muted = False
|
||||
sys.stdout = self.original_stdout
|
||||
|
||||
def begin_q(self, q):
|
||||
assert q in self.questions
|
||||
text = 'Question {}'.format(q)
|
||||
print('\n' + text)
|
||||
print('=' * len(text))
|
||||
|
||||
for prereq in sorted(self.prereqs[q]):
|
||||
if self.points[prereq] < self.maxes[prereq]:
|
||||
print("""*** NOTE: Make sure to complete Question {} before working on Question {},
|
||||
*** because Question {} builds upon your answer for Question {}.
|
||||
""".format(prereq, q, q, prereq))
|
||||
return False
|
||||
|
||||
self.current_question = q
|
||||
self.possible_points_remaining = self.maxes[q]
|
||||
return True
|
||||
|
||||
def begin_test(self, test_name):
|
||||
self.current_test = test_name
|
||||
self.points_at_test_start = self.points[self.current_question]
|
||||
print("*** {}) {}".format(self.current_question, self.current_test))
|
||||
if self.mute_output:
|
||||
self.mute()
|
||||
|
||||
def end_test(self, pts):
|
||||
if self.mute_output:
|
||||
self.unmute()
|
||||
self.possible_points_remaining -= pts
|
||||
if self.points[self.current_question] == self.points_at_test_start + pts:
|
||||
print("*** PASS: {}".format(self.current_test))
|
||||
elif self.points[self.current_question] == self.points_at_test_start:
|
||||
print("*** FAIL")
|
||||
|
||||
self.current_test = None
|
||||
self.points_at_test_start = None
|
||||
|
||||
def end_q(self):
|
||||
assert self.current_question is not None
|
||||
assert self.possible_points_remaining == 0
|
||||
print('\n### Question {}: {}/{} ###'.format(
|
||||
self.current_question,
|
||||
self.points[self.current_question],
|
||||
self.maxes[self.current_question]))
|
||||
|
||||
self.current_question = None
|
||||
self.possible_points_remaining = None
|
||||
|
||||
def finalize(self):
|
||||
import time
|
||||
print('\nFinished at %d:%02d:%02d' % time.localtime()[3:6])
|
||||
print("\nProvisional grades\n==================")
|
||||
|
||||
for q in self.questions:
|
||||
print('Question %s: %d/%d' % (q, self.points[q], self.maxes[q]))
|
||||
print('------------------')
|
||||
print('Total: %d/%d' % (sum(self.points.values()),
|
||||
sum([self.maxes[q] for q in self.questions])))
|
||||
|
||||
print("""
|
||||
Your grades are NOT yet registered. To register your grades, make sure
|
||||
to follow your instructor's guidelines to receive credit on your project.
|
||||
""")
|
||||
|
||||
def add_points(self, pts):
|
||||
self.points[self.current_question] += pts
|
||||
|
||||
TESTS = []
|
||||
PREREQS = {}
|
||||
def add_prereq(q, pre):
|
||||
if isinstance(pre, str):
|
||||
pre = [pre]
|
||||
|
||||
if q not in PREREQS:
|
||||
PREREQS[q] = set()
|
||||
PREREQS[q] |= set(pre)
|
||||
|
||||
def test(q, points):
|
||||
def deco(fn):
|
||||
TESTS.append((q, points, fn))
|
||||
return fn
|
||||
return deco
|
||||
|
||||
def parse_options(argv):
|
||||
parser = optparse.OptionParser(description = 'Run public tests on student code')
|
||||
parser.set_defaults(
|
||||
edx_output=False,
|
||||
gs_output=False,
|
||||
no_graphics=False,
|
||||
mute_output=False,
|
||||
check_dependencies=False,
|
||||
)
|
||||
parser.add_option('--edx-output',
|
||||
dest = 'edx_output',
|
||||
action = 'store_true',
|
||||
help = 'Ignored, present for compatibility only')
|
||||
parser.add_option('--gradescope-output',
|
||||
dest = 'gs_output',
|
||||
action = 'store_true',
|
||||
help = 'Ignored, present for compatibility only')
|
||||
parser.add_option('--question', '-q',
|
||||
dest = 'grade_question',
|
||||
default = None,
|
||||
help = 'Grade only one question (e.g. `-q q1`)')
|
||||
parser.add_option('--no-graphics',
|
||||
dest = 'no_graphics',
|
||||
action = 'store_true',
|
||||
help = 'Do not display graphics (visualizing your implementation is highly recommended for debugging).')
|
||||
parser.add_option('--mute',
|
||||
dest = 'mute_output',
|
||||
action = 'store_true',
|
||||
help = 'Mute output from executing tests')
|
||||
parser.add_option('--check-dependencies',
|
||||
dest = 'check_dependencies',
|
||||
action = 'store_true',
|
||||
help = 'check that numpy and matplotlib are installed')
|
||||
(options, args) = parser.parse_args(argv)
|
||||
return options
|
||||
|
||||
def main():
|
||||
options = parse_options(sys.argv)
|
||||
if options.check_dependencies:
|
||||
check_dependencies()
|
||||
return
|
||||
|
||||
if options.no_graphics:
|
||||
disable_graphics()
|
||||
|
||||
questions = set()
|
||||
maxes = {}
|
||||
for q, points, fn in TESTS:
|
||||
questions.add(q)
|
||||
maxes[q] = maxes.get(q, 0) + points
|
||||
if q not in PREREQS:
|
||||
PREREQS[q] = set()
|
||||
|
||||
questions = list(sorted(questions))
|
||||
if options.grade_question:
|
||||
if options.grade_question not in questions:
|
||||
print("ERROR: question {} does not exist".format(options.grade_question))
|
||||
sys.exit(1)
|
||||
else:
|
||||
questions = [options.grade_question]
|
||||
PREREQS[options.grade_question] = set()
|
||||
|
||||
tracker = Tracker(questions, maxes, PREREQS, options.mute_output)
|
||||
for q in questions:
|
||||
started = tracker.begin_q(q)
|
||||
if not started:
|
||||
continue
|
||||
|
||||
for testq, points, fn in TESTS:
|
||||
if testq != q:
|
||||
continue
|
||||
tracker.begin_test(fn.__name__)
|
||||
try:
|
||||
fn(tracker)
|
||||
except KeyboardInterrupt:
|
||||
tracker.unmute()
|
||||
print("\n\nCaught KeyboardInterrupt: aborting autograder")
|
||||
tracker.finalize()
|
||||
print("\n[autograder was interrupted before finishing]")
|
||||
sys.exit(1)
|
||||
except:
|
||||
tracker.unmute()
|
||||
print(traceback.format_exc())
|
||||
tracker.end_test(points)
|
||||
tracker.end_q()
|
||||
tracker.finalize()
|
||||
|
||||
################################################################################
|
||||
# Tests begin here
|
||||
################################################################################
|
||||
|
||||
import numpy as np
|
||||
import matplotlib
|
||||
import contextlib
|
||||
|
||||
from torch import nn, Tensor
|
||||
import torch
|
||||
import backend
|
||||
|
||||
def check_dependencies():
|
||||
import matplotlib.pyplot as plt
|
||||
fig, ax = plt.subplots(1, 1)
|
||||
ax.set_xlim([-1, 1])
|
||||
ax.set_ylim([-1, 1])
|
||||
line, = ax.plot([], [], color="black")
|
||||
plt.show(block=False)
|
||||
|
||||
for t in range(400):
|
||||
angle = t * 0.05
|
||||
x = np.sin(angle)
|
||||
y = np.cos(angle)
|
||||
line.set_data([x,-x], [y,-y])
|
||||
fig.canvas.draw_idle()
|
||||
fig.canvas.start_event_loop(1e-3)
|
||||
|
||||
def disable_graphics():
|
||||
backend.use_graphics = False
|
||||
|
||||
@contextlib.contextmanager
|
||||
def no_graphics():
|
||||
old_use_graphics = backend.use_graphics
|
||||
backend.use_graphics = False
|
||||
yield
|
||||
backend.use_graphics = old_use_graphics
|
||||
|
||||
def verify_node(node, expected_type, expected_shape, method_name):
|
||||
if expected_type == 'parameter':
|
||||
assert node is not None, (
|
||||
"{} should return an instance of nn.Parameter, not None".format(method_name))
|
||||
assert isinstance(node, nn.Parameter), (
|
||||
"{} should return an instance of nn.Parameter, instead got type {!r}".format(
|
||||
method_name, type(node).__name__))
|
||||
elif expected_type == 'loss':
|
||||
assert node is not None, (
|
||||
"{} should return an instance a loss node, not None".format(method_name))
|
||||
assert isinstance(node, (nn.modules.loss._Loss)), (
|
||||
"{} should return a loss node, instead got type {!r}".format(
|
||||
method_name, type(node).__name__))
|
||||
elif expected_type == 'tensor':
|
||||
assert node is not None, (
|
||||
"{} should return a node object, not None".format(method_name))
|
||||
assert isinstance(node, Tensor), (
|
||||
"{} should return a node object, instead got type {!r}".format(
|
||||
method_name, type(node).__name__))
|
||||
else:
|
||||
assert False, "If you see this message, please report a bug in the autograder"
|
||||
|
||||
if expected_type != 'loss':
|
||||
assert all([(expected is '?' or actual == expected) for (actual, expected) in zip(node.detach().numpy().shape, expected_shape)]), (
|
||||
"{} should return an object with shape {}, got {}".format(
|
||||
method_name, expected_shape, node.shape))
|
||||
|
||||
@test('q1', points=6)
|
||||
def check_perceptron(tracker):
|
||||
import models
|
||||
|
||||
print("Sanity checking perceptron...")
|
||||
np_random = np.random.RandomState(0)
|
||||
|
||||
# Check that the perceptron weights are initialized to a single vector with `dimensions` entries.
|
||||
for dimensions in range(1, 10):
|
||||
p = models.PerceptronModel(dimensions)
|
||||
p_weights = p.get_weights()
|
||||
|
||||
number_of_parameters = 0
|
||||
|
||||
for param in p.parameters():
|
||||
number_of_parameters += 1
|
||||
verify_node(param, 'parameter', (1, dimensions), 'PerceptronModel.parameters()')
|
||||
|
||||
assert number_of_parameters == 1, ('Perceptron Model should only have 1 parameter')
|
||||
|
||||
# Check that run returns a Tensor, and that the score in the node is correct
|
||||
for dimensions in range(1, 10):
|
||||
p = models.PerceptronModel(dimensions)
|
||||
point = np_random.uniform(-10, 10, (1, dimensions))
|
||||
score = p.run(Tensor(point))
|
||||
verify_node(score, 'tensor', (1,), "PerceptronModel.run()")
|
||||
calculated_score = score.item()
|
||||
|
||||
# Compare run output to actual value
|
||||
for param in p.parameters():
|
||||
expected_score = float(np.dot(point.flatten(), param.detach().numpy().flatten()))
|
||||
|
||||
assert np.isclose(calculated_score, expected_score), (
|
||||
"The score computed by PerceptronModel.run() ({:.4f}) does not match the expected score ({:.4f})".format(
|
||||
calculated_score, expected_score))
|
||||
|
||||
# Check that get_prediction returns the correct values, including the
|
||||
# case when a point lies exactly on the decision boundary
|
||||
for dimensions in range(1, 10):
|
||||
p = models.PerceptronModel(dimensions)
|
||||
random_point = np_random.uniform(-10, 10, (1, dimensions))
|
||||
for point in (random_point, np.zeros_like(random_point)):
|
||||
prediction = p.get_prediction(Tensor(point))
|
||||
assert prediction == 1 or prediction == -1, (
|
||||
"PerceptronModel.get_prediction() should return 1 or -1, not {}".format(
|
||||
prediction))
|
||||
|
||||
expected_prediction = np.where(np.dot(point, p.get_weights().data.T) >= 0, 1, -1).item()
|
||||
assert prediction == expected_prediction, (
|
||||
"PerceptronModel.get_prediction() returned {}; expected {}".format(
|
||||
prediction, expected_prediction))
|
||||
|
||||
tracker.add_points(2) # Partial credit for passing sanity checks
|
||||
|
||||
print("Sanity checking perceptron weight updates...")
|
||||
|
||||
# Test weight updates. This involves constructing a dataset that
|
||||
# requires 0 or 1 updates before convergence, and testing that weight
|
||||
# values change as expected. Note that (multiplier < -1 or multiplier > 1)
|
||||
# must be true for the testing code to be correct.
|
||||
dimensions = 2
|
||||
for multiplier in (-5, -2, 2, 5):
|
||||
p = models.PerceptronModel(dimensions)
|
||||
orig_weights = p.get_weights().data.reshape((1, dimensions)).detach().numpy().copy()
|
||||
if np.abs(orig_weights).sum() == 0.0:
|
||||
# This autograder test doesn't work when weights are exactly zero
|
||||
continue
|
||||
|
||||
point = multiplier * orig_weights
|
||||
|
||||
sanity_dataset = backend.Custom_Dataset(
|
||||
x=np.tile(point, (500, 1)),
|
||||
y=np.ones((500, 1)) * -1.0
|
||||
)
|
||||
|
||||
p.train(sanity_dataset)
|
||||
new_weights = p.get_weights().data.reshape((1, dimensions)).detach().numpy()
|
||||
|
||||
if multiplier < 0:
|
||||
expected_weights = orig_weights
|
||||
else:
|
||||
expected_weights = orig_weights - point
|
||||
|
||||
if not np.all(new_weights == expected_weights):
|
||||
print()
|
||||
print("Initial perceptron weights were: [{:.4f}, {:.4f}]".format(
|
||||
orig_weights[0,0], orig_weights[0,1]))
|
||||
print("All data points in the dataset were identical and had:")
|
||||
print(" x = [{:.4f}, {:.4f}]".format(
|
||||
point[0,0], point[0,1]))
|
||||
print(" y = -1")
|
||||
print("Your trained weights were: [{:.4f}, {:.4f}]".format(
|
||||
new_weights[0,0], new_weights[0,1]))
|
||||
print("Expected weights after training: [{:.4f}, {:.4f}]".format(
|
||||
expected_weights[0,0], expected_weights[0,1]))
|
||||
print()
|
||||
assert False, "Weight update sanity check failed"
|
||||
|
||||
print("Sanity checking complete. Now training perceptron")
|
||||
model = models.PerceptronModel(3)
|
||||
dataset = backend.PerceptronDataset(model)
|
||||
|
||||
model.train(dataset)
|
||||
backend.maybe_sleep_and_close(1)
|
||||
|
||||
assert dataset.epoch != 0, "Perceptron code never iterated over the training data"
|
||||
|
||||
accuracy = np.mean(np.where(np.dot(dataset.x, model.get_weights().data.T) >= 0.0, 1.0, -1.0) == dataset.y)
|
||||
if accuracy < 1.0:
|
||||
print("The weights learned by your perceptron correctly classified {:.2%} of training examples".format(accuracy))
|
||||
print("To receive full points for this question, your perceptron must converge to 100% accuracy")
|
||||
return
|
||||
|
||||
tracker.add_points(4)
|
||||
|
||||
@test('q2', points=6)
|
||||
def check_regression(tracker):
|
||||
import models
|
||||
model = models.RegressionModel()
|
||||
dataset = backend.RegressionDataset(model=model)
|
||||
detected_parameters = None
|
||||
|
||||
for batch_size in (1, 2, 4):
|
||||
inp_x = torch.tensor(dataset.x[:batch_size], dtype=torch.float, requires_grad=True)
|
||||
inp_y = torch.tensor(dataset.y[:batch_size], dtype=torch.float, requires_grad=True)
|
||||
|
||||
loss = model.get_loss(inp_x, inp_y)
|
||||
|
||||
verify_node(loss, 'tensor', (1,), "RegressionModel.get_loss()")
|
||||
|
||||
|
||||
grad_y = torch.autograd.grad(loss, inp_x, allow_unused=True, retain_graph=True)
|
||||
grad_x = torch.autograd.grad(loss, inp_y, allow_unused=True, retain_graph=True)
|
||||
|
||||
assert grad_x[0] != None, "Node returned from RegressionModel.get_loss() does not depend on the provided input (x)"
|
||||
assert grad_y[0] != None, "Node returned from RegressionModel.get_loss() does not depend on the provided labels (y)"
|
||||
|
||||
|
||||
|
||||
tracker.add_points(2) # Partial credit for passing sanity checks
|
||||
|
||||
model.train(dataset)
|
||||
backend.maybe_sleep_and_close(1)
|
||||
|
||||
data_x = torch.tensor(dataset.x,dtype=torch.float32)
|
||||
labels = torch.tensor(dataset.y, dtype=torch.float32)
|
||||
train_loss = model.get_loss(data_x, labels)
|
||||
verify_node(train_loss, 'tensor', (1,), "RegressionModel.get_loss()")
|
||||
train_loss = train_loss.item()
|
||||
|
||||
# Re-compute the loss ourselves: otherwise get_loss() could be hard-coded
|
||||
# to always return zero
|
||||
train_predicted = model(data_x)
|
||||
|
||||
verify_node(train_predicted, 'tensor', (dataset.x.shape[0], 1), "RegressionModel()")
|
||||
error = labels - train_predicted
|
||||
sanity_loss = torch.mean((error.detach())**2)
|
||||
|
||||
assert np.isclose(train_loss, sanity_loss), (
|
||||
"RegressionModel.get_loss() returned a loss of {:.4f}, "
|
||||
"but the autograder computed a loss of {:.4f} "
|
||||
"based on the output of RegressionModel()".format(
|
||||
train_loss, sanity_loss))
|
||||
|
||||
loss_threshold = 0.02
|
||||
|
||||
if train_loss <= loss_threshold:
|
||||
print("Your final loss is: {:f}".format(train_loss))
|
||||
tracker.add_points(4)
|
||||
else:
|
||||
print("Your final loss ({:f}) must be no more than {:.4f} to receive full points for this question".format(train_loss, loss_threshold))
|
||||
|
||||
@test('q3', points=6)
|
||||
def check_digit_classification(tracker):
|
||||
import models
|
||||
model = models.DigitClassificationModel()
|
||||
dataset = backend.DigitClassificationDataset(model)
|
||||
|
||||
detected_parameters = None
|
||||
|
||||
for batch_size in (1, 2, 4):
|
||||
inp_x = torch.tensor(dataset.x[:batch_size], dtype=torch.float, requires_grad=True)
|
||||
inp_y = torch.tensor(dataset.y[:batch_size], dtype=torch.float, requires_grad=True)
|
||||
|
||||
loss = model.get_loss(inp_x, inp_y)
|
||||
|
||||
verify_node(loss, 'tensor', (1,), "DigitClassificationModel.run()")
|
||||
|
||||
|
||||
grad_y = torch.autograd.grad(loss, inp_x, allow_unused=True, retain_graph=True)
|
||||
grad_x = torch.autograd.grad(loss, inp_y, allow_unused=True, retain_graph=True)
|
||||
|
||||
assert grad_x[0] != None, "Node returned from RegressionModel.get_loss() does not depend on the provided input (x)"
|
||||
assert grad_y[0] != None, "Node returned from RegressionModel.get_loss() does not depend on the provided labels (y)"
|
||||
|
||||
|
||||
tracker.add_points(2) # Partial credit for passing sanity checks
|
||||
|
||||
model.train(dataset)
|
||||
|
||||
|
||||
test_logits = model.run(torch.tensor(dataset.test_images)).data
|
||||
test_predicted = np.argmax(test_logits, axis=1).detach().numpy()
|
||||
test_accuracy = np.mean(test_predicted == dataset.test_labels)
|
||||
|
||||
accuracy_threshold = 0.97
|
||||
if test_accuracy >= accuracy_threshold:
|
||||
print("Your final test set accuracy is: {:%}".format(test_accuracy))
|
||||
tracker.add_points(4)
|
||||
else:
|
||||
print("Your final test set accuracy ({:%}) must be at least {:.0%} to receive full points for this question".format(test_accuracy, accuracy_threshold))
|
||||
|
||||
@test('q4', points=7)
|
||||
def check_lang_id(tracker):
|
||||
import models
|
||||
model = models.LanguageIDModel()
|
||||
dataset = backend.LanguageIDDataset(model)
|
||||
|
||||
detected_parameters = None
|
||||
for batch_size, word_length in ((1, 1), (2, 1), (2, 6), (4, 8)):
|
||||
start = dataset.dev_buckets[-1, 0]
|
||||
end = start + batch_size
|
||||
inp_xs, inp_y = dataset._encode(dataset.dev_x[start:end], dataset.dev_y[start:end])
|
||||
inp_xs = torch.tensor(inp_xs[:word_length], requires_grad=True)
|
||||
|
||||
output_node = model.run(inp_xs)
|
||||
verify_node(output_node, 'tensor', (batch_size, len(dataset.language_names)), "LanguageIDModel.run()")
|
||||
|
||||
grad = torch.autograd.grad(torch.sum(output_node), inp_xs, allow_unused=True, retain_graph=True)
|
||||
for gradient in grad:
|
||||
assert gradient != None, "Output returned from LanguageIDModel.run() does not depend on all of the provided inputs (xs)"
|
||||
|
||||
# Word length 1 does not use parameters related to transferring the
|
||||
# hidden state across timesteps, so initial parameter detection is only
|
||||
# run for longer words
|
||||
|
||||
|
||||
|
||||
for batch_size, word_length in ((1, 1), (2, 1), (2, 6), (4, 8)):
|
||||
start = dataset.dev_buckets[-1, 0]
|
||||
end = start + batch_size
|
||||
inp_xs, inp_y = dataset._encode(dataset.dev_x[start:end], dataset.dev_y[start:end])
|
||||
inp_xs = torch.tensor(inp_xs[:word_length], requires_grad=True)
|
||||
loss_node = model.get_loss(inp_xs, inp_y)
|
||||
grad = torch.autograd.grad(loss_node, inp_xs, allow_unused=True, retain_graph=True)
|
||||
for gradient in grad:
|
||||
assert gradient != None, "Output returned from LanguageIDModel.run() does not depend on all of the provided inputs (xs)"
|
||||
|
||||
|
||||
tracker.add_points(2) # Partial credit for passing sanity checks
|
||||
|
||||
model.train(dataset)
|
||||
|
||||
|
||||
accuracy_threshold = 0.81
|
||||
test_accuracy = dataset.get_validation_accuracy()
|
||||
if test_accuracy >= accuracy_threshold:
|
||||
print("Your final test set accuracy is: {:%}".format(test_accuracy))
|
||||
tracker.add_points(5)
|
||||
else:
|
||||
print("Your final test set accuracy ({:%}) must be at least {:.0%} to receive full points for this question".format(test_accuracy, accuracy_threshold))
|
||||
|
||||
@test('q5', points=0)
|
||||
def check_convolution(tracker):
|
||||
import models
|
||||
|
||||
model = models.DigitConvolutionalModel()
|
||||
dataset = backend.DigitClassificationDataset2(model)
|
||||
|
||||
def conv2d(a, f):
|
||||
s = f.shape + tuple(np.subtract(a.shape, f.shape) + 1)
|
||||
strd = np.lib.stride_tricks.as_strided
|
||||
subM = strd(a, shape = s, strides = a.strides * 2)
|
||||
return np.einsum('ij,ijkl->kl', f, subM)
|
||||
|
||||
detected_parameters = None
|
||||
|
||||
for batch_size in (1, 2, 4):
|
||||
inp_x = torch.tensor(dataset[:batch_size]['x'], dtype=torch.float, requires_grad=True)
|
||||
inp_y = torch.tensor(dataset[:batch_size]['label'], dtype=torch.float, requires_grad=True)
|
||||
loss = model.get_loss(inp_x, inp_y)
|
||||
|
||||
verify_node(loss, 'tensor', (1,), "DigitClassificationModel.run()")
|
||||
|
||||
|
||||
grad_y = torch.autograd.grad(loss, inp_x, allow_unused=True, retain_graph=True)
|
||||
grad_x = torch.autograd.grad(loss, inp_y, allow_unused=True, retain_graph=True)
|
||||
|
||||
assert grad_x[0] != None, "Node returned from RegressionModel.get_loss() does not depend on the provided input (x)"
|
||||
assert grad_y[0] != None, "Node returned from RegressionModel.get_loss() does not depend on the provided labels (y)"
|
||||
|
||||
for matrix_size in (2, 4, 6): #Test 3 random convolutions to test convolve() function
|
||||
weights = np.random.rand(2,2)
|
||||
input = np.random.rand(matrix_size, matrix_size)
|
||||
student_output = models.Convolve(torch.Tensor(input), torch.Tensor(weights))
|
||||
actual_output = conv2d(input,weights)
|
||||
assert np.isclose(student_output, actual_output).all(), "The convolution returned by Convolve() does not match expected output"
|
||||
|
||||
tracker.add_points(1/2) # Partial credit for testing whether convolution function works
|
||||
|
||||
model.train(dataset)
|
||||
|
||||
|
||||
test_logits = model.run(torch.tensor(dataset.test_images)).data
|
||||
test_predicted = np.argmax(test_logits, axis=1).detach().numpy()
|
||||
test_accuracy = np.mean(test_predicted == dataset.test_labels)
|
||||
|
||||
accuracy_threshold = 0.80
|
||||
if test_accuracy >= accuracy_threshold:
|
||||
print("Your final test set accuracy is: {:%}".format(test_accuracy))
|
||||
tracker.add_points(0.5)
|
||||
else:
|
||||
print("Your final test set accuracy ({:%}) must be at least {:.0%} to receive full points for this question".format(test_accuracy, accuracy_threshold))
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
601
machinelearning/backend.py
Normal file
601
machinelearning/backend.py
Normal file
@ -0,0 +1,601 @@
|
||||
import collections
|
||||
import os
|
||||
import time
|
||||
import os
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
|
||||
from torch import nn
|
||||
import torch
|
||||
from torch.utils.data import Dataset, DataLoader
|
||||
|
||||
|
||||
use_graphics = True
|
||||
|
||||
def maybe_sleep_and_close(seconds):
|
||||
if use_graphics and plt.get_fignums():
|
||||
time.sleep(seconds)
|
||||
for fignum in plt.get_fignums():
|
||||
fig = plt.figure(fignum)
|
||||
plt.close(fig)
|
||||
try:
|
||||
# This raises a TclError on some Windows machines
|
||||
fig.canvas.start_event_loop(1e-3)
|
||||
except:
|
||||
pass
|
||||
|
||||
def get_data_path(filename):
|
||||
path = os.path.join(
|
||||
os.path.dirname(__file__), os.pardir, "data", filename)
|
||||
if not os.path.exists(path):
|
||||
path = os.path.join(
|
||||
os.path.dirname(__file__), "data", filename)
|
||||
if not os.path.exists(path):
|
||||
path = os.path.join(
|
||||
os.path.dirname(__file__), filename)
|
||||
if not os.path.exists(path):
|
||||
raise Exception("Could not find data file: {}".format(filename))
|
||||
return path
|
||||
|
||||
class Custom_Dataset(Dataset):
|
||||
def __init__(self, x, y, transform=None):
|
||||
assert isinstance(x, np.ndarray)
|
||||
assert isinstance(y, np.ndarray)
|
||||
assert np.issubdtype(x.dtype, np.floating)
|
||||
assert np.issubdtype(y.dtype, np.floating)
|
||||
assert x.ndim == 2
|
||||
assert y.ndim == 2
|
||||
assert x.shape[0] == y.shape[0]
|
||||
self.x = x
|
||||
self.y = y
|
||||
self.transform = transform
|
||||
|
||||
def __len__(self):
|
||||
return len(self.x)
|
||||
|
||||
def __getitem__(self, idx):
|
||||
if torch.is_tensor(idx):
|
||||
idx = idx.tolist()
|
||||
|
||||
label = self.y[idx]
|
||||
x = self.x[idx]
|
||||
|
||||
sample = {'x': torch.Tensor(x), 'label': torch.Tensor(label)}
|
||||
|
||||
if self.transform:
|
||||
sample = self.transform(sample)
|
||||
|
||||
return sample
|
||||
|
||||
|
||||
|
||||
def get_validation_accuracy(self):
|
||||
raise NotImplementedError(
|
||||
"No validation data is available for this dataset. "
|
||||
"In this assignment, only the Digit Classification and Language "
|
||||
"Identification datasets have validation data.")
|
||||
|
||||
class PerceptronDataset(Custom_Dataset):
|
||||
def __init__(self, model):
|
||||
points = 500
|
||||
x = np.hstack([np.random.randn(points, 2), np.ones((points, 1))])
|
||||
y = np.where(x[:, 0] + 2 * x[:, 1] - 1 >= 0, 1.0, -1.0)
|
||||
super().__init__(x, np.expand_dims(y, axis=1))
|
||||
|
||||
self.model = model
|
||||
self.epoch = 0
|
||||
|
||||
if use_graphics:
|
||||
fig, ax = plt.subplots(1, 1)
|
||||
limits = np.array([-3.0, 3.0])
|
||||
ax.set_xlim(limits)
|
||||
ax.set_ylim(limits)
|
||||
positive = ax.scatter(*x[y == 1, :-1].T, color="red", marker="+")
|
||||
negative = ax.scatter(*x[y == -1, :-1].T, color="blue", marker="_")
|
||||
line, = ax.plot([], [], color="black")
|
||||
text = ax.text(0.03, 0.97, "", transform=ax.transAxes, va="top")
|
||||
ax.legend([positive, negative], [1, -1])
|
||||
plt.show(block=False)
|
||||
|
||||
self.fig = fig
|
||||
self.limits = limits
|
||||
self.line = line
|
||||
self.text = text
|
||||
self.last_update = time.time()
|
||||
|
||||
|
||||
|
||||
def __getitem__(self, idx):
|
||||
self.epoch += 1
|
||||
|
||||
if torch.is_tensor(idx):
|
||||
idx = idx.tolist()
|
||||
|
||||
x = self.x[idx]
|
||||
y = self.y[idx]
|
||||
|
||||
|
||||
|
||||
if use_graphics and time.time() - self.last_update > 0.01:
|
||||
w = self.model.get_weights().data.flatten()
|
||||
limits = self.limits
|
||||
if w[1] != 0:
|
||||
self.line.set_data(limits, (-w[0] * limits - w[2]) / w[1])
|
||||
elif w[0] != 0:
|
||||
self.line.set_data(np.full(2, -w[2] / w[0]), limits)
|
||||
else:
|
||||
self.line.set_data([], [])
|
||||
self.text.set_text(
|
||||
"epoch: {:,}\npoint: {:,}/{:,}\nweights: {}".format(
|
||||
self.epoch, idx * 1 + 1, len(self.x), w))
|
||||
self.fig.canvas.draw_idle()
|
||||
self.fig.canvas.start_event_loop(1e-3)
|
||||
self.last_update = time.time()
|
||||
|
||||
return {'x': torch.tensor(x, dtype=torch.float32), 'label': torch.tensor(y, dtype=torch.float32)}
|
||||
|
||||
class RegressionDataset(Custom_Dataset):
|
||||
def __init__(self, model):
|
||||
x = np.expand_dims(np.linspace(-2 * np.pi, 2 * np.pi, num=200), axis=1)
|
||||
np.random.RandomState(0).shuffle(x)
|
||||
self.argsort_x = np.argsort(x.flatten())
|
||||
y = np.sin(x)
|
||||
super().__init__(x, y)
|
||||
|
||||
self.model = model
|
||||
self.processed = 0
|
||||
|
||||
if use_graphics:
|
||||
fig, ax = plt.subplots(1, 1)
|
||||
ax.set_xlim(-2 * np.pi, 2 * np.pi)
|
||||
ax.set_ylim(-1.4, 1.4)
|
||||
real, = ax.plot(x[self.argsort_x], y[self.argsort_x], color="blue")
|
||||
learned, = ax.plot([], [], color="red")
|
||||
text = ax.text(0.03, 0.97, "", transform=ax.transAxes, va="top")
|
||||
ax.legend([real, learned], ["real", "learned"])
|
||||
plt.show(block=False)
|
||||
|
||||
self.fig = fig
|
||||
self.learned = learned
|
||||
self.text = text
|
||||
self.last_update = time.time()
|
||||
|
||||
def __len__(self):
|
||||
return len(self.x)
|
||||
|
||||
def __getitem__(self, idx):
|
||||
|
||||
data = super().__getitem__(idx)
|
||||
|
||||
x = data['x']
|
||||
y = data['label']
|
||||
|
||||
self.processed += 1
|
||||
|
||||
if use_graphics and time.time() - self.last_update > 0.1:
|
||||
predicted = self.model(torch.tensor(self.x, dtype=torch.float32)).data
|
||||
loss = self.model.get_loss(
|
||||
x, y).data
|
||||
self.learned.set_data(self.x[self.argsort_x], predicted[self.argsort_x])
|
||||
self.text.set_text("processed: {:,}\nloss: {:.6f}".format(
|
||||
self.processed, loss))
|
||||
self.fig.canvas.draw_idle()
|
||||
self.fig.canvas.start_event_loop(1e-3)
|
||||
self.last_update = time.time()
|
||||
|
||||
return {'x': x, 'label': y}
|
||||
|
||||
class DigitClassificationDataset(Custom_Dataset):
|
||||
def __init__(self, model):
|
||||
mnist_path = get_data_path("mnist.npz")
|
||||
|
||||
with np.load(mnist_path) as data:
|
||||
train_images = data["train_images"]
|
||||
train_labels = data["train_labels"]
|
||||
test_images = data["test_images"]
|
||||
test_labels = data["test_labels"]
|
||||
assert len(train_images) == len(train_labels) == 60000
|
||||
assert len(test_images) == len(test_labels) == 10000
|
||||
self.dev_images = test_images[0::2]
|
||||
self.dev_labels = test_labels[0::2]
|
||||
self.test_images = test_images[1::2]
|
||||
self.test_labels = test_labels[1::2]
|
||||
|
||||
train_labels_one_hot = np.zeros((len(train_images), 10))
|
||||
train_labels_one_hot[range(len(train_images)), train_labels] = 1
|
||||
|
||||
super().__init__(train_images, train_labels_one_hot)
|
||||
|
||||
self.model = model
|
||||
self.epoch = 0
|
||||
self.num_items = 0
|
||||
|
||||
if use_graphics:
|
||||
self.current_accuracy = None
|
||||
width = 20 # Width of each row expressed as a multiple of image width
|
||||
samples = 100 # Number of images to display per label
|
||||
fig = plt.figure()
|
||||
ax = {}
|
||||
images = collections.defaultdict(list)
|
||||
texts = collections.defaultdict(list)
|
||||
for i in reversed(range(10)):
|
||||
ax[i] = plt.subplot2grid((30, 1), (3 * i, 0), 2, 1,
|
||||
sharex=ax.get(9))
|
||||
plt.setp(ax[i].get_xticklabels(), visible=i == 9)
|
||||
ax[i].set_yticks([])
|
||||
ax[i].text(-0.03, 0.5, i, transform=ax[i].transAxes,
|
||||
va="center")
|
||||
ax[i].set_xlim(0, 28 * width)
|
||||
ax[i].set_ylim(0, 28)
|
||||
for j in range(samples):
|
||||
images[i].append(ax[i].imshow(
|
||||
np.zeros((28, 28)), vmin=0, vmax=1, cmap="Greens",
|
||||
alpha=0.3))
|
||||
texts[i].append(ax[i].text(
|
||||
0, 0, "", ha="center", va="top", fontsize="smaller"))
|
||||
ax[9].set_xticks(np.linspace(0, 28 * width, 11))
|
||||
ax[9].set_xticklabels(
|
||||
["{:.1f}".format(num) for num in np.linspace(0, 1, 11)])
|
||||
ax[9].tick_params(axis="x", pad=16)
|
||||
ax[9].set_xlabel("Probability of Correct Label")
|
||||
status = ax[0].text(
|
||||
0.5, 1.5, "", transform=ax[0].transAxes, ha="center",
|
||||
va="bottom")
|
||||
plt.show(block=False)
|
||||
|
||||
self.width = width
|
||||
self.samples = samples
|
||||
self.fig = fig
|
||||
self.images = images
|
||||
self.texts = texts
|
||||
self.status = status
|
||||
self.last_update = time.time()
|
||||
|
||||
|
||||
def __getitem__(self, idx):
|
||||
|
||||
|
||||
data = super().__getitem__(idx)
|
||||
|
||||
x = data['x']
|
||||
y = data['label']
|
||||
|
||||
if use_graphics and time.time() - self.last_update > 1:
|
||||
dev_logits = self.model.run(torch.tensor(self.dev_images)).data
|
||||
dev_predicted = np.argmax(dev_logits, axis=1).detach().numpy()
|
||||
dev_probs = np.exp(nn.functional.log_softmax(dev_logits))
|
||||
|
||||
dev_accuracy = np.mean(dev_predicted == self.dev_labels)
|
||||
self.status.set_text(
|
||||
"validation accuracy: "
|
||||
"{:.2%}".format(
|
||||
dev_accuracy))
|
||||
for i in range(10):
|
||||
predicted = dev_predicted[self.dev_labels == i]
|
||||
probs = dev_probs[self.dev_labels == i][:, i]
|
||||
linspace = np.linspace(
|
||||
0, len(probs) - 1, self.samples).astype(int)
|
||||
indices = probs.argsort()[linspace]
|
||||
for j, (prob, image) in enumerate(zip(
|
||||
probs[indices],
|
||||
self.dev_images[self.dev_labels == i][indices])):
|
||||
self.images[i][j].set_data(image.reshape((28, 28)))
|
||||
left = prob * (self.width - 1) * 28
|
||||
if predicted[indices[j]] == i:
|
||||
self.images[i][j].set_cmap("Greens")
|
||||
self.texts[i][j].set_text("")
|
||||
else:
|
||||
self.images[i][j].set_cmap("Reds")
|
||||
self.texts[i][j].set_text(predicted[indices[j]])
|
||||
self.texts[i][j].set_x(left + 14)
|
||||
self.images[i][j].set_extent([left, left + 28, 0, 28])
|
||||
self.fig.canvas.draw_idle()
|
||||
self.fig.canvas.start_event_loop(1e-3)
|
||||
self.last_update = time.time()
|
||||
|
||||
if(self.num_items == len(self.x)):
|
||||
self.current_accuracy = self.num_right_items/len(self.x)
|
||||
self.num_right_items = 0
|
||||
self.epoch += 1
|
||||
|
||||
return {'x': x, 'label': y}
|
||||
|
||||
def get_validation_accuracy(self):
|
||||
dev_logits = self.model.run(torch.tensor(self.dev_images)).data
|
||||
dev_predicted = np.argmax(dev_logits, axis=1).detach().numpy()
|
||||
dev_probs = np.exp(nn.functional.log_softmax(dev_logits))
|
||||
|
||||
dev_accuracy = np.mean(dev_predicted == self.dev_labels)
|
||||
return dev_accuracy
|
||||
|
||||
class LanguageIDDataset(Custom_Dataset):
|
||||
def __init__(self, model):
|
||||
self.model = model
|
||||
|
||||
data_path = get_data_path("lang_id.npz")
|
||||
|
||||
with np.load(data_path) as data:
|
||||
self.chars = data['chars']
|
||||
self.language_codes = data['language_codes']
|
||||
self.language_names = data['language_names']
|
||||
self.train_x = data['train_x']
|
||||
self.train_y = data['train_y']
|
||||
self.train_buckets = data['train_buckets']
|
||||
self.dev_x = data['dev_x']
|
||||
self.dev_y = data['dev_y']
|
||||
self.dev_buckets = data['dev_buckets']
|
||||
self.test_x = data['test_x']
|
||||
self.test_y = data['test_y']
|
||||
self.test_buckets = data['test_buckets']
|
||||
|
||||
self.epoch = 0
|
||||
self.bucket_weights = self.train_buckets[:,1] - self.train_buckets[:,0]
|
||||
self.bucket_weights = self.bucket_weights / float(self.bucket_weights.sum())
|
||||
|
||||
self.chars_print = self.chars
|
||||
try:
|
||||
print(u"Alphabet: {}".format(u"".join(self.chars)))
|
||||
except UnicodeEncodeError:
|
||||
self.chars_print = "abcdefghijklmnopqrstuvwxyzaaeeeeiinoouuacelnszz"
|
||||
print("Alphabet: " + self.chars_print)
|
||||
self.chars_print = list(self.chars_print)
|
||||
print("""
|
||||
NOTE: Your terminal does not appear to support printing Unicode characters.
|
||||
For the purposes of printing to the terminal, some of the letters in the
|
||||
alphabet above have been substituted with ASCII symbols.""".strip())
|
||||
print("")
|
||||
|
||||
# Select some examples to spotlight in the monitoring phase (3 per language)
|
||||
spotlight_idxs = []
|
||||
for i in range(len(self.language_names)):
|
||||
idxs_lang_i = np.nonzero(self.dev_y == i)[0]
|
||||
idxs_lang_i = np.random.choice(idxs_lang_i, size=3, replace=False)
|
||||
spotlight_idxs.extend(list(idxs_lang_i))
|
||||
self.spotlight_idxs = np.array(spotlight_idxs, dtype=int)
|
||||
|
||||
# Templates for printing updates as training progresses
|
||||
max_word_len = self.dev_x.shape[1]
|
||||
max_lang_len = max([len(x) for x in self.language_names])
|
||||
|
||||
self.predicted_template = u"Pred: {:<NUM}".replace('NUM',
|
||||
str(max_lang_len))
|
||||
|
||||
self.word_template = u" "
|
||||
self.word_template += u"{:<NUM} ".replace('NUM', str(max_word_len))
|
||||
self.word_template += u"{:<NUM} ({:6.1%})".replace('NUM', str(max_lang_len))
|
||||
self.word_template += u" {:<NUM} ".replace('NUM',
|
||||
str(max_lang_len + len('Pred: ')))
|
||||
for i in range(len(self.language_names)):
|
||||
self.word_template += u"|{}".format(self.language_codes[i])
|
||||
self.word_template += "{probs[" + str(i) + "]:4.0%}"
|
||||
|
||||
self.last_update = time.time()
|
||||
|
||||
def __len__(self):
|
||||
return len(self.train_x)
|
||||
|
||||
def _encode(self, inp_x, inp_y):
|
||||
xs = []
|
||||
for i in range(inp_x.shape[1]):
|
||||
|
||||
if np.all(np.array(inp_x[:,i]) == -1):
|
||||
break
|
||||
assert not np.any(np.array(inp_x[:,i]) == -1), (
|
||||
"Please report this error in the project: batching by length was done incorrectly in the provided code")
|
||||
x = np.eye(len(self.chars))[np.array(inp_x[:,i], dtype=int)]
|
||||
xs.append(x)
|
||||
y = np.eye(len(self.language_names))[inp_y]
|
||||
j = [[0 for j in range(47)]]
|
||||
|
||||
if(len(inp_x) == 1):
|
||||
return torch.nn.functional.pad(torch.tensor(xs, dtype=torch.float),(0,0,0,0,0,10 - len(xs))), torch.tensor(y, dtype=torch.float)
|
||||
|
||||
return torch.tensor(xs, dtype=torch.float), torch.tensor(y, dtype=torch.float)
|
||||
|
||||
def _softmax(self, x):
|
||||
exp = np.exp(x - np.max(x, axis=-1, keepdims=True))
|
||||
return exp / np.sum(exp, axis=-1, keepdims=True)
|
||||
|
||||
def _predict(self, split='test'):
|
||||
if split == 'dev':
|
||||
data_x = self.dev_x
|
||||
data_y = self.dev_y
|
||||
buckets = self.dev_buckets
|
||||
else:
|
||||
data_x = self.test_x
|
||||
data_y = self.test_y
|
||||
buckets = self.test_buckets
|
||||
|
||||
all_predicted = []
|
||||
all_correct = []
|
||||
for bucket_id in range(buckets.shape[0]):
|
||||
start, end = buckets[bucket_id]
|
||||
xs, y = self._encode(data_x[start:end], data_y[start:end])
|
||||
predicted = self.model.run(xs)
|
||||
|
||||
all_predicted.extend(list(predicted.data))
|
||||
all_correct.extend(list(data_y[start:end]))
|
||||
sftmax = nn.Softmax()
|
||||
all_predicted_probs = [sftmax(torch.tensor(i)) for i in all_predicted]
|
||||
|
||||
all_predicted = [i.argmax() for i in all_predicted_probs]
|
||||
all_correct = np.asarray(all_correct)
|
||||
|
||||
return all_predicted_probs, all_predicted, all_correct
|
||||
|
||||
def __getitem__(self, idx):
|
||||
|
||||
if torch.is_tensor(idx):
|
||||
idx = idx.tolist()
|
||||
|
||||
|
||||
ret = self._encode(self.train_x[idx:idx+1], self.train_y[idx:idx+1])
|
||||
return {'x': torch.squeeze(ret[0]), 'label': torch.squeeze(ret[1])}
|
||||
|
||||
def get_validation_accuracy(self):
|
||||
dev_predicted_probs, dev_predicted, dev_correct = self._predict()
|
||||
dev_accuracy = np.mean(dev_predicted == dev_correct)
|
||||
return dev_accuracy
|
||||
|
||||
def collate(self, batch):
|
||||
'''
|
||||
Padds batch of variable length
|
||||
|
||||
|
||||
'''
|
||||
## get sequence lengths
|
||||
lengths = torch.tensor([ t['x'].shape[0] for t in batch ])
|
||||
## padd
|
||||
batch_x = [ torch.Tensor(t['x']) for t in batch ]
|
||||
batch_y = [ torch.Tensor(t['labels']) for t in batch ]
|
||||
return {'x':batch_x,'label':batch_y}
|
||||
|
||||
|
||||
class DigitClassificationDataset2(Custom_Dataset):
|
||||
def __init__(self, model):
|
||||
mnist_path = get_data_path("mnist.npz")
|
||||
training_size = 200
|
||||
test_size = 100
|
||||
with np.load(mnist_path) as data:
|
||||
train_images = data["train_images"][:training_size]
|
||||
train_labels = data["train_labels"][:training_size]
|
||||
test_images = data["train_images"][:test_size]
|
||||
test_labels = data["train_labels"][:test_size]
|
||||
assert len(train_images) == len(train_labels) == training_size
|
||||
assert len(test_images) == len(test_labels) == test_size
|
||||
self.dev_images = test_images[0::2]
|
||||
self.dev_labels = test_labels[0::2]
|
||||
self.test_images = test_images[1::2]
|
||||
self.test_labels = test_labels[1::2]
|
||||
|
||||
train_labels_one_hot = np.zeros((len(train_images), 10))
|
||||
train_labels_one_hot[range(len(train_images)), train_labels] = 1
|
||||
|
||||
super().__init__(train_images, train_labels_one_hot)
|
||||
|
||||
self.model = model
|
||||
self.epoch = 0
|
||||
self.num_items = 0
|
||||
|
||||
if use_graphics:
|
||||
self.current_accuracy = None
|
||||
width = 20 # Width of each row expressed as a multiple of image width
|
||||
samples = 100 # Number of images to display per label
|
||||
fig = plt.figure()
|
||||
ax = {}
|
||||
images = collections.defaultdict(list)
|
||||
texts = collections.defaultdict(list)
|
||||
for i in reversed(range(10)):
|
||||
ax[i] = plt.subplot2grid((30, 1), (3 * i, 0), 2, 1,
|
||||
sharex=ax.get(9))
|
||||
plt.setp(ax[i].get_xticklabels(), visible=i == 9)
|
||||
ax[i].set_yticks([])
|
||||
ax[i].text(-0.03, 0.5, i, transform=ax[i].transAxes,
|
||||
va="center")
|
||||
ax[i].set_xlim(0, 28 * width)
|
||||
ax[i].set_ylim(0, 28)
|
||||
for j in range(samples):
|
||||
images[i].append(ax[i].imshow(
|
||||
np.zeros((28, 28)), vmin=0, vmax=1, cmap="Greens",
|
||||
alpha=0.3))
|
||||
texts[i].append(ax[i].text(
|
||||
0, 0, "", ha="center", va="top", fontsize="smaller"))
|
||||
ax[9].set_xticks(np.linspace(0, 28 * width, 11))
|
||||
ax[9].set_xticklabels(
|
||||
["{:.1f}".format(num) for num in np.linspace(0, 1, 11)])
|
||||
ax[9].tick_params(axis="x", pad=16)
|
||||
ax[9].set_xlabel("Probability of Correct Label")
|
||||
status = ax[0].text(
|
||||
0.5, 1.5, "", transform=ax[0].transAxes, ha="center",
|
||||
va="bottom")
|
||||
plt.show(block=False)
|
||||
|
||||
self.width = width
|
||||
self.samples = samples
|
||||
self.fig = fig
|
||||
self.images = images
|
||||
self.texts = texts
|
||||
self.status = status
|
||||
self.last_update = time.time()
|
||||
|
||||
|
||||
def __getitem__(self, idx):
|
||||
|
||||
|
||||
data = super().__getitem__(idx)
|
||||
|
||||
x = data['x']
|
||||
y = data['label']
|
||||
|
||||
if use_graphics and time.time() - self.last_update > 1:
|
||||
dev_logits = self.model.run(torch.tensor(self.dev_images)).data
|
||||
dev_predicted = np.argmax(dev_logits, axis=1).detach().numpy()
|
||||
dev_probs = np.exp(nn.functional.log_softmax(dev_logits))
|
||||
|
||||
dev_accuracy = np.mean(dev_predicted == self.dev_labels)
|
||||
self.status.set_text(
|
||||
"validation accuracy: "
|
||||
"{:.2%}".format(
|
||||
dev_accuracy))
|
||||
for i in range(10):
|
||||
predicted = dev_predicted[self.dev_labels == i]
|
||||
probs = dev_probs[self.dev_labels == i][:, i]
|
||||
linspace = np.linspace(
|
||||
0, len(probs) - 1, self.samples).astype(int)
|
||||
indices = probs.argsort()[linspace]
|
||||
for j, (prob, image) in enumerate(zip(
|
||||
probs[indices],
|
||||
self.dev_images[self.dev_labels == i][indices])):
|
||||
self.images[i][j].set_data(image.reshape((28, 28)))
|
||||
left = prob * (self.width - 1) * 28
|
||||
if predicted[indices[j]] == i:
|
||||
self.images[i][j].set_cmap("Greens")
|
||||
self.texts[i][j].set_text("")
|
||||
else:
|
||||
self.images[i][j].set_cmap("Reds")
|
||||
self.texts[i][j].set_text(predicted[indices[j]])
|
||||
self.texts[i][j].set_x(left + 14)
|
||||
self.images[i][j].set_extent([left, left + 28, 0, 28])
|
||||
self.fig.canvas.draw_idle()
|
||||
self.fig.canvas.start_event_loop(1e-3)
|
||||
self.last_update = time.time()
|
||||
|
||||
if(self.num_items == len(self.x)):
|
||||
self.current_accuracy = self.num_right_items/len(self.x)
|
||||
self.num_right_items = 0
|
||||
self.epoch += 1
|
||||
|
||||
return {'x': x, 'label': y}
|
||||
|
||||
def get_validation_accuracy(self):
|
||||
dev_logits = self.model.run(torch.tensor(self.dev_images)).data
|
||||
dev_predicted = np.argmax(dev_logits, axis=1).detach().numpy()
|
||||
dev_probs = np.exp(nn.functional.log_softmax(dev_logits))
|
||||
|
||||
dev_accuracy = np.mean(dev_predicted == self.dev_labels)
|
||||
return dev_accuracy
|
||||
|
||||
|
||||
|
||||
def main():
|
||||
import models
|
||||
model = models.PerceptronModel(3)
|
||||
dataset = PerceptronDataset(model)
|
||||
model.train(dataset)
|
||||
|
||||
model = models.RegressionModel()
|
||||
dataset = RegressionDataset(model)
|
||||
model.train(dataset)
|
||||
|
||||
model = models.DigitClassificationModel()
|
||||
dataset = DigitClassificationDataset(model)
|
||||
model.train(dataset)
|
||||
|
||||
model = models.LanguageIDModel()
|
||||
dataset = LanguageIDDataset(model)
|
||||
model.train(dataset)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
BIN
machinelearning/data/lang_id.npz
Normal file
BIN
machinelearning/data/lang_id.npz
Normal file
Binary file not shown.
BIN
machinelearning/data/mnist.npz
Normal file
BIN
machinelearning/data/mnist.npz
Normal file
Binary file not shown.
381
machinelearning/models.py
Normal file
381
machinelearning/models.py
Normal file
@ -0,0 +1,381 @@
|
||||
from torch import no_grad, stack
|
||||
from torch.utils.data import DataLoader
|
||||
from torch.nn import Module
|
||||
|
||||
|
||||
"""
|
||||
Functions you should use.
|
||||
Please avoid importing any other torch functions or modules.
|
||||
Your code will not pass if the gradescope autograder detects any changed imports
|
||||
"""
|
||||
from torch.nn import Parameter, Linear
|
||||
from torch import optim, tensor, tensordot, empty, ones
|
||||
from torch.nn.functional import cross_entropy, relu, mse_loss
|
||||
from torch import movedim
|
||||
|
||||
|
||||
class PerceptronModel(Module):
|
||||
def __init__(self, dimensions):
|
||||
"""
|
||||
Initialize a new Perceptron instance.
|
||||
|
||||
A perceptron classifies data points as either belonging to a particular
|
||||
class (+1) or not (-1). `dimensions` is the dimensionality of the data.
|
||||
For example, dimensions=2 would mean that the perceptron must classify
|
||||
2D points.
|
||||
|
||||
In order for our autograder to detect your weight, initialize it as a
|
||||
pytorch Parameter object as follows:
|
||||
|
||||
Parameter(weight_vector)
|
||||
|
||||
where weight_vector is a pytorch Tensor of dimension 'dimensions'
|
||||
|
||||
|
||||
Hint: You can use ones(dim) to create a tensor of dimension dim.
|
||||
"""
|
||||
super(PerceptronModel, self).__init__()
|
||||
|
||||
"*** YOUR CODE HERE ***"
|
||||
self.w = None #Initialize your weights here
|
||||
|
||||
def get_weights(self):
|
||||
"""
|
||||
Return a Parameter instance with the current weights of the perceptron.
|
||||
"""
|
||||
return self.w
|
||||
|
||||
def run(self, x):
|
||||
"""
|
||||
Calculates the score assigned by the perceptron to a data point x.
|
||||
|
||||
Inputs:
|
||||
x: a node with shape (1 x dimensions)
|
||||
Returns: a node containing a single number (the score)
|
||||
|
||||
The pytorch function `tensordot` may be helpful here.
|
||||
"""
|
||||
"*** YOUR CODE HERE ***"
|
||||
|
||||
|
||||
def get_prediction(self, x):
|
||||
"""
|
||||
Calculates the predicted class for a single data point `x`.
|
||||
|
||||
Returns: 1 or -1
|
||||
"""
|
||||
"*** YOUR CODE HERE ***"
|
||||
|
||||
|
||||
|
||||
def train(self, dataset):
|
||||
"""
|
||||
Train the perceptron until convergence.
|
||||
You can iterate through DataLoader in order to
|
||||
retrieve all the batches you need to train on.
|
||||
|
||||
Each sample in the dataloader is in the form {'x': features, 'label': label} where label
|
||||
is the item we need to predict based off of its features.
|
||||
"""
|
||||
with no_grad():
|
||||
dataloader = DataLoader(dataset, batch_size=1, shuffle=True)
|
||||
"*** YOUR CODE HERE ***"
|
||||
|
||||
|
||||
|
||||
class RegressionModel(Module):
|
||||
"""
|
||||
A neural network model for approximating a function that maps from real
|
||||
numbers to real numbers. The network should be sufficiently large to be able
|
||||
to approximate sin(x) on the interval [-2pi, 2pi] to reasonable precision.
|
||||
"""
|
||||
def __init__(self):
|
||||
# Initialize your model parameters here
|
||||
"*** YOUR CODE HERE ***"
|
||||
super().__init__()
|
||||
|
||||
|
||||
|
||||
def forward(self, x):
|
||||
"""
|
||||
Runs the model for a batch of examples.
|
||||
|
||||
Inputs:
|
||||
x: a node with shape (batch_size x 1)
|
||||
Returns:
|
||||
A node with shape (batch_size x 1) containing predicted y-values
|
||||
"""
|
||||
"*** YOUR CODE HERE ***"
|
||||
|
||||
|
||||
def get_loss(self, x, y):
|
||||
"""
|
||||
Computes the loss for a batch of examples.
|
||||
|
||||
Inputs:
|
||||
x: a node with shape (batch_size x 1)
|
||||
y: a node with shape (batch_size x 1), containing the true y-values
|
||||
to be used for training
|
||||
Returns: a tensor of size 1 containing the loss
|
||||
"""
|
||||
"*** YOUR CODE HERE ***"
|
||||
|
||||
|
||||
|
||||
def train(self, dataset):
|
||||
"""
|
||||
Trains the model.
|
||||
|
||||
In order to create batches, create a DataLoader object and pass in `dataset` as well as your required
|
||||
batch size. You can look at PerceptronModel as a guideline for how you should implement the DataLoader
|
||||
|
||||
Each sample in the dataloader object will be in the form {'x': features, 'label': label} where label
|
||||
is the item we need to predict based off of its features.
|
||||
|
||||
Inputs:
|
||||
dataset: a PyTorch dataset object containing data to be trained on
|
||||
|
||||
"""
|
||||
"*** YOUR CODE HERE ***"
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class DigitClassificationModel(Module):
|
||||
"""
|
||||
A model for handwritten digit classification using the MNIST dataset.
|
||||
|
||||
Each handwritten digit is a 28x28 pixel grayscale image, which is flattened
|
||||
into a 784-dimensional vector for the purposes of this model. Each entry in
|
||||
the vector is a floating point number between 0 and 1.
|
||||
|
||||
The goal is to sort each digit into one of 10 classes (number 0 through 9).
|
||||
|
||||
(See RegressionModel for more information about the APIs of different
|
||||
methods here. We recommend that you implement the RegressionModel before
|
||||
working on this part of the project.)
|
||||
"""
|
||||
def __init__(self):
|
||||
# Initialize your model parameters here
|
||||
super().__init__()
|
||||
input_size = 28 * 28
|
||||
output_size = 10
|
||||
"*** YOUR CODE HERE ***"
|
||||
|
||||
|
||||
|
||||
def run(self, x):
|
||||
"""
|
||||
Runs the model for a batch of examples.
|
||||
|
||||
Your model should predict a node with shape (batch_size x 10),
|
||||
containing scores. Higher scores correspond to greater probability of
|
||||
the image belonging to a particular class.
|
||||
|
||||
Inputs:
|
||||
x: a tensor with shape (batch_size x 784)
|
||||
Output:
|
||||
A node with shape (batch_size x 10) containing predicted scores
|
||||
(also called logits)
|
||||
"""
|
||||
""" YOUR CODE HERE """
|
||||
|
||||
|
||||
def get_loss(self, x, y):
|
||||
"""
|
||||
Computes the loss for a batch of examples.
|
||||
|
||||
The correct labels `y` are represented as a tensor with shape
|
||||
(batch_size x 10). Each row is a one-hot vector encoding the correct
|
||||
digit class (0-9).
|
||||
|
||||
Inputs:
|
||||
x: a node with shape (batch_size x 784)
|
||||
y: a node with shape (batch_size x 10)
|
||||
Returns: a loss tensor
|
||||
"""
|
||||
""" YOUR CODE HERE """
|
||||
|
||||
|
||||
|
||||
def train(self, dataset):
|
||||
"""
|
||||
Trains the model.
|
||||
"""
|
||||
""" YOUR CODE HERE """
|
||||
|
||||
|
||||
|
||||
class LanguageIDModel(Module):
|
||||
"""
|
||||
A model for language identification at a single-word granularity.
|
||||
|
||||
(See RegressionModel for more information about the APIs of different
|
||||
methods here. We recommend that you implement the RegressionModel before
|
||||
working on this part of the project.)
|
||||
"""
|
||||
def __init__(self):
|
||||
# Our dataset contains words from five different languages, and the
|
||||
# combined alphabets of the five languages contain a total of 47 unique
|
||||
# characters.
|
||||
# You can refer to self.num_chars or len(self.languages) in your code
|
||||
self.num_chars = 47
|
||||
self.languages = ["English", "Spanish", "Finnish", "Dutch", "Polish"]
|
||||
super(LanguageIDModel, self).__init__()
|
||||
"*** YOUR CODE HERE ***"
|
||||
# Initialize your model parameters here
|
||||
|
||||
|
||||
def run(self, xs):
|
||||
"""
|
||||
Runs the model for a batch of examples.
|
||||
|
||||
Although words have different lengths, our data processing guarantees
|
||||
that within a single batch, all words will be of the same length (L).
|
||||
|
||||
Here `xs` will be a list of length L. Each element of `xs` will be a
|
||||
tensor with shape (batch_size x self.num_chars), where every row in the
|
||||
array is a one-hot vector encoding of a character. For example, if we
|
||||
have a batch of 8 three-letter words where the last word is "cat", then
|
||||
xs[1] will be a tensor that contains a 1 at position (7, 0). Here the
|
||||
index 7 reflects the fact that "cat" is the last word in the batch, and
|
||||
the index 0 reflects the fact that the letter "a" is the inital (0th)
|
||||
letter of our combined alphabet for this task.
|
||||
|
||||
Your model should use a Recurrent Neural Network to summarize the list
|
||||
`xs` into a single tensor of shape (batch_size x hidden_size), for your
|
||||
choice of hidden_size. It should then calculate a tensor of shape
|
||||
(batch_size x 5) containing scores, where higher scores correspond to
|
||||
greater probability of the word originating from a particular language.
|
||||
|
||||
Inputs:
|
||||
xs: a list with L elements (one per character), where each element
|
||||
is a node with shape (batch_size x self.num_chars)
|
||||
Returns:
|
||||
A node with shape (batch_size x 5) containing predicted scores
|
||||
(also called logits)
|
||||
"""
|
||||
"*** YOUR CODE HERE ***"
|
||||
|
||||
|
||||
def get_loss(self, xs, y):
|
||||
"""
|
||||
Computes the loss for a batch of examples.
|
||||
|
||||
The correct labels `y` are represented as a node with shape
|
||||
(batch_size x 5). Each row is a one-hot vector encoding the correct
|
||||
language.
|
||||
|
||||
Inputs:
|
||||
xs: a list with L elements (one per character), where each element
|
||||
is a node with shape (batch_size x self.num_chars)
|
||||
y: a node with shape (batch_size x 5)
|
||||
Returns: a loss node
|
||||
"""
|
||||
"*** YOUR CODE HERE ***"
|
||||
|
||||
|
||||
def train(self, dataset):
|
||||
"""
|
||||
Trains the model.
|
||||
|
||||
Note that when you iterate through dataloader, each batch will returned as its own vector in the form
|
||||
(batch_size x length of word x self.num_chars). However, in order to run multiple samples at the same time,
|
||||
get_loss() and run() expect each batch to be in the form (length of word x batch_size x self.num_chars), meaning
|
||||
that you need to switch the first two dimensions of every batch. This can be done with the movedim() function
|
||||
as follows:
|
||||
|
||||
movedim(input_vector, initial_dimension_position, final_dimension_position)
|
||||
|
||||
For more information, look at the pytorch documentation of torch.movedim()
|
||||
"""
|
||||
"*** YOUR CODE HERE ***"
|
||||
|
||||
|
||||
|
||||
def Convolve(input: tensor, weight: tensor):
|
||||
"""
|
||||
Acts as a convolution layer by applying a 2d convolution with the given inputs and weights.
|
||||
DO NOT import any pytorch methods to directly do this, the convolution must be done with only the functions
|
||||
already imported.
|
||||
|
||||
There are multiple ways to complete this function. One possible solution would be to use 'tensordot'.
|
||||
If you would like to index a tensor, you can do it as such:
|
||||
|
||||
tensor[y:y+height, x:x+width]
|
||||
|
||||
This returns a subtensor who's first element is tensor[y,x] and has height 'height, and width 'width'
|
||||
"""
|
||||
input_tensor_dimensions = input.shape
|
||||
weight_dimensions = weight.shape
|
||||
Output_Tensor = tensor(())
|
||||
"*** YOUR CODE HERE ***"
|
||||
|
||||
|
||||
"*** End Code ***"
|
||||
return Output_Tensor
|
||||
|
||||
|
||||
|
||||
class DigitConvolutionalModel(Module):
|
||||
"""
|
||||
A model for handwritten digit classification using the MNIST dataset.
|
||||
|
||||
This class is a convolutational model which has already been trained on MNIST.
|
||||
if Convolve() has been correctly implemented, this model should be able to achieve a high accuracy
|
||||
on the mnist dataset given the pretrained weights.
|
||||
|
||||
|
||||
"""
|
||||
|
||||
|
||||
def __init__(self):
|
||||
# Initialize your model parameters here
|
||||
super().__init__()
|
||||
output_size = 10
|
||||
|
||||
self.convolution_weights = Parameter(ones((3, 3)))
|
||||
""" YOUR CODE HERE """
|
||||
|
||||
|
||||
def run(self, x):
|
||||
"""
|
||||
The convolutional layer is already applied, and the output is flattened for you. You should treat x as
|
||||
a regular 1-dimentional datapoint now, similar to the previous questions.
|
||||
"""
|
||||
x = x.reshape(len(x), 28, 28)
|
||||
x = stack(list(map(lambda sample: Convolve(sample, self.convolution_weights), x)))
|
||||
x = x.flatten(start_dim=1)
|
||||
""" YOUR CODE HERE """
|
||||
|
||||
|
||||
|
||||
def get_loss(self, x, y):
|
||||
"""
|
||||
Computes the loss for a batch of examples.
|
||||
|
||||
The correct labels `y` are represented as a tensor with shape
|
||||
(batch_size x 10). Each row is a one-hot vector encoding the correct
|
||||
digit class (0-9).
|
||||
|
||||
Inputs:
|
||||
x: a node with shape (batch_size x 784)
|
||||
y: a node with shape (batch_size x 10)
|
||||
Returns: a loss tensor
|
||||
"""
|
||||
""" YOUR CODE HERE """
|
||||
|
||||
|
||||
|
||||
def train(self, dataset):
|
||||
"""
|
||||
Trains the model.
|
||||
"""
|
||||
""" YOUR CODE HERE """
|
||||
|
Binary file not shown.
Before Width: | Height: | Size: 116 KiB |
Reference in New Issue
Block a user