Merge remote-tracking branch 'upstream/main'

This commit is contained in:
2024-07-02 12:50:21 +00:00
6 changed files with 1491 additions and 0 deletions

View File

@ -0,0 +1,598 @@
################################################################################
# A mini-framework for autograding
################################################################################
import optparse
import sys
import traceback
class WritableNull:
def write(self, string):
pass
def flush(self):
pass
class Tracker(object):
def __init__(self, questions, maxes, prereqs, mute_output):
self.questions = questions
self.maxes = maxes
self.prereqs = prereqs
self.points = {q: 0 for q in self.questions}
self.current_question = None
self.current_test = None
self.points_at_test_start = None
self.possible_points_remaining = None
self.mute_output = mute_output
self.original_stdout = None
self.muted = False
def mute(self):
if self.muted:
return
self.muted = True
self.original_stdout = sys.stdout
sys.stdout = WritableNull()
def unmute(self):
if not self.muted:
return
self.muted = False
sys.stdout = self.original_stdout
def begin_q(self, q):
assert q in self.questions
text = 'Question {}'.format(q)
print('\n' + text)
print('=' * len(text))
for prereq in sorted(self.prereqs[q]):
if self.points[prereq] < self.maxes[prereq]:
print("""*** NOTE: Make sure to complete Question {} before working on Question {},
*** because Question {} builds upon your answer for Question {}.
""".format(prereq, q, q, prereq))
return False
self.current_question = q
self.possible_points_remaining = self.maxes[q]
return True
def begin_test(self, test_name):
self.current_test = test_name
self.points_at_test_start = self.points[self.current_question]
print("*** {}) {}".format(self.current_question, self.current_test))
if self.mute_output:
self.mute()
def end_test(self, pts):
if self.mute_output:
self.unmute()
self.possible_points_remaining -= pts
if self.points[self.current_question] == self.points_at_test_start + pts:
print("*** PASS: {}".format(self.current_test))
elif self.points[self.current_question] == self.points_at_test_start:
print("*** FAIL")
self.current_test = None
self.points_at_test_start = None
def end_q(self):
assert self.current_question is not None
assert self.possible_points_remaining == 0
print('\n### Question {}: {}/{} ###'.format(
self.current_question,
self.points[self.current_question],
self.maxes[self.current_question]))
self.current_question = None
self.possible_points_remaining = None
def finalize(self):
import time
print('\nFinished at %d:%02d:%02d' % time.localtime()[3:6])
print("\nProvisional grades\n==================")
for q in self.questions:
print('Question %s: %d/%d' % (q, self.points[q], self.maxes[q]))
print('------------------')
print('Total: %d/%d' % (sum(self.points.values()),
sum([self.maxes[q] for q in self.questions])))
print("""
Your grades are NOT yet registered. To register your grades, make sure
to follow your instructor's guidelines to receive credit on your project.
""")
def add_points(self, pts):
self.points[self.current_question] += pts
TESTS = []
PREREQS = {}
def add_prereq(q, pre):
if isinstance(pre, str):
pre = [pre]
if q not in PREREQS:
PREREQS[q] = set()
PREREQS[q] |= set(pre)
def test(q, points):
def deco(fn):
TESTS.append((q, points, fn))
return fn
return deco
def parse_options(argv):
parser = optparse.OptionParser(description = 'Run public tests on student code')
parser.set_defaults(
edx_output=False,
gs_output=False,
no_graphics=False,
mute_output=False,
check_dependencies=False,
)
parser.add_option('--edx-output',
dest = 'edx_output',
action = 'store_true',
help = 'Ignored, present for compatibility only')
parser.add_option('--gradescope-output',
dest = 'gs_output',
action = 'store_true',
help = 'Ignored, present for compatibility only')
parser.add_option('--question', '-q',
dest = 'grade_question',
default = None,
help = 'Grade only one question (e.g. `-q q1`)')
parser.add_option('--no-graphics',
dest = 'no_graphics',
action = 'store_true',
help = 'Do not display graphics (visualizing your implementation is highly recommended for debugging).')
parser.add_option('--mute',
dest = 'mute_output',
action = 'store_true',
help = 'Mute output from executing tests')
parser.add_option('--check-dependencies',
dest = 'check_dependencies',
action = 'store_true',
help = 'check that numpy and matplotlib are installed')
(options, args) = parser.parse_args(argv)
return options
def main():
options = parse_options(sys.argv)
if options.check_dependencies:
check_dependencies()
return
if options.no_graphics:
disable_graphics()
questions = set()
maxes = {}
for q, points, fn in TESTS:
questions.add(q)
maxes[q] = maxes.get(q, 0) + points
if q not in PREREQS:
PREREQS[q] = set()
questions = list(sorted(questions))
if options.grade_question:
if options.grade_question not in questions:
print("ERROR: question {} does not exist".format(options.grade_question))
sys.exit(1)
else:
questions = [options.grade_question]
PREREQS[options.grade_question] = set()
tracker = Tracker(questions, maxes, PREREQS, options.mute_output)
for q in questions:
started = tracker.begin_q(q)
if not started:
continue
for testq, points, fn in TESTS:
if testq != q:
continue
tracker.begin_test(fn.__name__)
try:
fn(tracker)
except KeyboardInterrupt:
tracker.unmute()
print("\n\nCaught KeyboardInterrupt: aborting autograder")
tracker.finalize()
print("\n[autograder was interrupted before finishing]")
sys.exit(1)
except:
tracker.unmute()
print(traceback.format_exc())
tracker.end_test(points)
tracker.end_q()
tracker.finalize()
################################################################################
# Tests begin here
################################################################################
import torch
import matplotlib
import contextlib
from torch import nn, Tensor
import backend
def check_dependencies():
import matplotlib.pyplot as plt
fig, ax = plt.subplots(1, 1)
ax.set_xlim([-1, 1])
ax.set_ylim([-1, 1])
line, = ax.plot([], [], color="black")
plt.show(block=False)
for t in range(400):
angle = t * 0.05
x = torch.sin(torch.tensor(angle))
y = torch.cos(torch.tensor(angle))
line.set_data([x.item(), -x.item()], [y.item(), -y.item()])
fig.canvas.draw_idle()
fig.canvas.start_event_loop(1e-3)
def disable_graphics():
backend.use_graphics = False
@contextlib.contextmanager
def no_graphics():
old_use_graphics = backend.use_graphics
backend.use_graphics = False
yield
backend.use_graphics = old_use_graphics
def verify_node(node, expected_type, expected_shape, method_name):
if expected_type == 'parameter':
assert node is not None, (
"{} should return an instance of nn.Parameter, not None".format(method_name))
assert isinstance(node, nn.Parameter), (
"{} should return an instance of nn.Parameter, instead got type {!r}".format(
method_name, type(node).__name__))
elif expected_type == 'loss':
assert node is not None, (
"{} should return an instance a loss node, not None".format(method_name))
assert isinstance(node, (nn.modules.loss._Loss)), (
"{} should return a loss node, instead got type {!r}".format(
method_name, type(node).__name__))
elif expected_type == 'tensor':
assert node is not None, (
"{} should return a node object, not None".format(method_name))
assert isinstance(node, Tensor), (
"{} should return a node object, instead got type {!r}".format(
method_name, type(node).__name__))
else:
assert False, "If you see this message, please report a bug in the autograder"
if expected_type != 'loss':
assert all([(expected == '?' or actual == expected) for (actual, expected) in zip(node.shape, expected_shape)]), (
"{} should return an object with shape {}, got {}".format(
method_name, expected_shape, node.shape))
@test('q1', points=6)
def check_perceptron(tracker):
import models
print("Sanity checking perceptron...")
torch.manual_seed(0)
# Check that the perceptron weights are initialized to a single vector with `dimensions` entries.
for dimensions in range(1, 10):
p = models.PerceptronModel(dimensions)
p_weights = p.get_weights()
number_of_parameters = 0
for param in p.parameters():
number_of_parameters += 1
verify_node(param, 'parameter', (1, dimensions), 'PerceptronModel.parameters()')
assert number_of_parameters == 1, ('Perceptron Model should only have 1 parameter')
# Check that run returns a Tensor, and that the score in the node is correct
for dimensions in range(1, 10):
p = models.PerceptronModel(dimensions)
point = torch.empty((1, dimensions)).uniform_(-10, 10)
score = p.run(point)
verify_node(score, 'tensor', (1,), "PerceptronModel.run()")
calculated_score = score.item()
# Compare run output to actual value
for param in p.parameters():
expected_score = float(torch.dot(point.flatten(), param.detach().flatten()))
assert torch.isclose(torch.tensor(calculated_score), torch.tensor(expected_score)), (
"The score computed by PerceptronModel.run() ({:.4f}) does not match the expected score ({:.4f})".format(
calculated_score, expected_score))
# Check that get_prediction returns the correct values, including the
# case when a point lies exactly on the decision boundary
for dimensions in range(1, 10):
p = models.PerceptronModel(dimensions)
random_point = torch.empty((1, dimensions)).uniform_(-10, 10)
for point in (random_point, torch.zeros_like(random_point)):
prediction = p.get_prediction(point)
assert prediction == 1 or prediction == -1, (
"PerceptronModel.get_prediction() should return 1 or -1, not {}".format(
prediction))
expected_prediction = torch.where(torch.dot(point.flatten(), p.get_weights().data.T.flatten()) >= 0, torch.tensor(1), torch.tensor(-1)).item()
assert prediction == expected_prediction, (
"PerceptronModel.get_prediction() returned {}; expected {}".format(
prediction, expected_prediction))
tracker.add_points(2) # Partial credit for passing sanity checks
print("Sanity checking perceptron weight updates...")
# Test weight updates. This involves constructing a dataset that
# requires 0 or 1 updates before convergence, and testing that weight
# values change as expected. Note that (multiplier < -1 or multiplier > 1)
# must be true for the testing code to be correct.
dimensions = 2
for multiplier in (-5, -2, 2, 5):
p = models.PerceptronModel(dimensions)
orig_weights = p.get_weights().data.reshape((1, dimensions)).detach().clone()
if torch.abs(orig_weights).sum() == 0.0:
# This autograder test doesn't work when weights are exactly zero
continue
point = multiplier * orig_weights
sanity_dataset = backend.CustomDataset(
x=point.repeat((500, 1)),
y=torch.ones((500, 1)) * -1.0
)
p.train(sanity_dataset)
new_weights = p.get_weights().data.reshape((1, dimensions)).detach().clone()
if multiplier < 0:
expected_weights = orig_weights
else:
expected_weights = orig_weights - point
if not torch.equal(new_weights, expected_weights):
print()
print("Initial perceptron weights were: [{:.4f}, {:.4f}]".format(
orig_weights[0,0], orig_weights[0,1]))
print("All data points in the dataset were identical and had:")
print(" x = [{:.4f}, {:.4f}]".format(
point[0,0], point[0,1]))
print(" y = -1")
print("Your trained weights were: [{:.4f}, {:.4f}]".format(
new_weights[0,0], new_weights[0,1]))
print("Expected weights after training: [{:.4f}, {:.4f}]".format(
expected_weights[0,0], expected_weights[0,1]))
print()
assert False, "Weight update sanity check failed"
print("Sanity checking complete. Now training perceptron")
model = models.PerceptronModel(3)
dataset = backend.PerceptronDataset(model)
model.train(dataset)
backend.maybe_sleep_and_close(1)
assert dataset.epoch != 0, "Perceptron code never iterated over the training data"
accuracy = torch.mean((torch.where(torch.matmul(torch.tensor(dataset.x, dtype=torch.float32), model.get_weights().data.T) >= 0.0, 1.0, -1.0) == torch.tensor(dataset.y)).float())
if accuracy < 1.0:
print("The weights learned by your perceptron correctly classified {:.2%} of training examples".format(accuracy))
print("To receive full points for this question, your perceptron must converge to 100% accuracy")
return
tracker.add_points(4)
@test('q2', points=6)
def check_regression(tracker):
import models
model = models.RegressionModel()
dataset = backend.RegressionDataset(model=model)
detected_parameters = None
for batch_size in (1, 2, 4):
inp_x = torch.tensor(dataset.x[:batch_size], dtype=torch.float, requires_grad=True)
inp_y = torch.tensor(dataset.y[:batch_size], dtype=torch.float, requires_grad=True)
loss = model.get_loss(inp_x, inp_y)
verify_node(loss, 'tensor', (1,), "RegressionModel.get_loss()")
grad_y = torch.autograd.grad(loss, inp_x, allow_unused=True, retain_graph=True)
grad_x = torch.autograd.grad(loss, inp_y, allow_unused=True, retain_graph=True)
assert grad_x[0] != None, "Node returned from RegressionModel.get_loss() does not depend on the provided input (x)"
assert grad_y[0] != None, "Node returned from RegressionModel.get_loss() does not depend on the provided labels (y)"
tracker.add_points(2) # Partial credit for passing sanity checks
model.train(dataset)
backend.maybe_sleep_and_close(1)
data_x = torch.tensor(dataset.x,dtype=torch.float32)
labels = torch.tensor(dataset.y, dtype=torch.float32)
train_loss = model.get_loss(data_x, labels)
verify_node(train_loss, 'tensor', (1,), "RegressionModel.get_loss()")
train_loss = train_loss.item()
# Re-compute the loss ourselves: otherwise get_loss() could be hard-coded
# to always return zero
train_predicted = model(data_x)
verify_node(train_predicted, 'tensor', (dataset.x.shape[0], 1), "RegressionModel()")
error = labels - train_predicted
sanity_loss = torch.mean((error.detach())**2)
assert torch.isclose(torch.tensor(train_loss), sanity_loss), (
"RegressionModel.get_loss() returned a loss of {:.4f}, "
"but the autograder computed a loss of {:.4f} "
"based on the output of RegressionModel()".format(
train_loss, sanity_loss))
loss_threshold = 0.02
if train_loss <= loss_threshold:
print("Your final loss is: {:f}".format(train_loss))
tracker.add_points(4)
else:
print("Your final loss ({:f}) must be no more than {:.4f} to receive full points for this question".format(train_loss, loss_threshold))
@test('q3', points=6)
def check_digit_classification(tracker):
import models
model = models.DigitClassificationModel()
dataset = backend.DigitClassificationDataset(model)
detected_parameters = None
for batch_size in (1, 2, 4):
inp_x = torch.tensor(dataset.x[:batch_size], dtype=torch.float, requires_grad=True)
inp_y = torch.tensor(dataset.y[:batch_size], dtype=torch.float, requires_grad=True)
loss = model.get_loss(inp_x, inp_y)
verify_node(loss, 'tensor', (1,), "DigitClassificationModel.run()")
grad_y = torch.autograd.grad(loss, inp_x, allow_unused=True, retain_graph=True)
grad_x = torch.autograd.grad(loss, inp_y, allow_unused=True, retain_graph=True)
assert grad_x[0] != None, "Node returned from RegressionModel.get_loss() does not depend on the provided input (x)"
assert grad_y[0] != None, "Node returned from RegressionModel.get_loss() does not depend on the provided labels (y)"
tracker.add_points(2) # Partial credit for passing sanity checks
model.train(dataset)
test_logits = model.run(torch.tensor(dataset.test_images)).detach().cpu()
test_predicted = torch.argmax(test_logits, axis=1)
test_accuracy = torch.mean(torch.eq(test_predicted, torch.tensor(dataset.test_labels)).float())
accuracy_threshold = 0.97
if test_accuracy >= accuracy_threshold:
print("Your final test set accuracy is: {:%}".format(test_accuracy))
tracker.add_points(4)
else:
print("Your final test set accuracy ({:%}) must be at least {:.0%} to receive full points for this question".format(test_accuracy, accuracy_threshold))
@test('q4', points=7)
def check_lang_id(tracker):
import models
model = models.LanguageIDModel()
dataset = backend.LanguageIDDataset(model)
detected_parameters = None
for batch_size, word_length in ((1, 1), (2, 1), (2, 6), (4, 8)):
start = dataset.dev_buckets[-1, 0]
end = start + batch_size
inp_xs, inp_y = dataset._encode(dataset.dev_x[start:end], dataset.dev_y[start:end])
inp_xs = torch.tensor(inp_xs[:word_length], requires_grad=True)
output_node = model.run(inp_xs)
verify_node(output_node, 'tensor', (batch_size, len(dataset.language_names)), "LanguageIDModel.run()")
grad = torch.autograd.grad(torch.sum(output_node), inp_xs, allow_unused=True, retain_graph=True)
for gradient in grad:
assert gradient != None, "Output returned from LanguageIDModel.run() does not depend on all of the provided inputs (xs)"
# Word length 1 does not use parameters related to transferring the
# hidden state across timesteps, so initial parameter detection is only
# run for longer words
for batch_size, word_length in ((1, 1), (2, 1), (2, 6), (4, 8)):
start = dataset.dev_buckets[-1, 0]
end = start + batch_size
inp_xs, inp_y = dataset._encode(dataset.dev_x[start:end], dataset.dev_y[start:end])
inp_xs = torch.tensor(inp_xs[:word_length], requires_grad=True)
loss_node = model.get_loss(inp_xs, inp_y)
grad = torch.autograd.grad(loss_node, inp_xs, allow_unused=True, retain_graph=True)
for gradient in grad:
assert gradient != None, "Output returned from LanguageIDModel.run() does not depend on all of the provided inputs (xs)"
tracker.add_points(2) # Partial credit for passing sanity checks
model.train(dataset)
accuracy_threshold = 0.81
test_accuracy = dataset.get_validation_accuracy()
if test_accuracy >= accuracy_threshold:
print("Your final test set accuracy is: {:%}".format(test_accuracy))
tracker.add_points(5)
else:
print("Your final test set accuracy ({:%}) must be at least {:.0%} to receive full points for this question".format(test_accuracy, accuracy_threshold))
@test('q5', points=0)
def check_convolution(tracker):
import models
model = models.DigitConvolutionalModel()
dataset = backend.DigitClassificationDataset2(model)
def conv2d(a, f):
s = f.shape + tuple(torch.tensor(a.shape) - torch.tensor(f.shape) + 1)
strd = torch.as_strided
subM = strd(a, size = s, stride = a.stride() * 2)
return torch.einsum('ij,ijkl->kl', f, subM)
detected_parameters = None
for batch_size in (1, 2, 4):
inp_x = torch.tensor(dataset[:batch_size]['x'], dtype=torch.float, requires_grad=True)
inp_y = torch.tensor(dataset[:batch_size]['label'], dtype=torch.float, requires_grad=True)
loss = model.get_loss(inp_x, inp_y)
verify_node(loss, 'tensor', (1,), "DigitClassificationModel.run()")
grad_y = torch.autograd.grad(loss, inp_x, allow_unused=True, retain_graph=True)
grad_x = torch.autograd.grad(loss, inp_y, allow_unused=True, retain_graph=True)
assert grad_x[0] != None, "Node returned from RegressionModel.get_loss() does not depend on the provided input (x)"
assert grad_y[0] != None, "Node returned from RegressionModel.get_loss() does not depend on the provided labels (y)"
for matrix_size in (2, 4, 6): #Test 3 random convolutions to test convolve() function
weights = torch.rand(2,2)
input = torch.rand(matrix_size, matrix_size)
student_output = models.Convolve(input, weights)
actual_output = conv2d(input,weights)
assert torch.isclose(student_output, actual_output).all(), "The convolution returned by Convolve() does not match expected output"
tracker.add_points(1/2) # Partial credit for testing whether convolution function works
model.train(dataset)
test_logits = model.run(torch.tensor(dataset.test_images)).detach().cpu()
test_predicted = torch.argmax(test_logits, axis=1)
test_accuracy = torch.mean(torch.eq(test_predicted, torch.tensor(dataset.test_labels)).float())
accuracy_threshold = 0.80
if test_accuracy >= accuracy_threshold:
print("Your final test set accuracy is: {:%}".format(test_accuracy))
tracker.add_points(0.5)
else:
print("Your final test set accuracy ({:%}) must be at least {:.0%} to receive full points for this question".format(test_accuracy, accuracy_threshold))
if __name__ == '__main__':
main()

512
machinelearning/backend.py Normal file
View File

@ -0,0 +1,512 @@
import collections
import os
import time
import matplotlib.pyplot as plt
import numpy as np
from torch import nn
import torch
from torch.utils.data import Dataset, DataLoader
use_graphics = True
def maybe_sleep_and_close(seconds):
if use_graphics and plt.get_fignums():
time.sleep(seconds)
for fignum in plt.get_fignums():
fig = plt.figure(fignum)
plt.close(fig)
try:
# This raises a TclError on some Windows machines
fig.canvas.start_event_loop(1e-3)
except:
pass
def get_data_path(filename):
path = os.path.join(
os.path.dirname(__file__), os.pardir, "data", filename)
if not os.path.exists(path):
path = os.path.join(
os.path.dirname(__file__), "data", filename)
if not os.path.exists(path):
path = os.path.join(
os.path.dirname(__file__), filename)
if not os.path.exists(path):
raise Exception("Could not find data file: {}".format(filename))
return path
class CustomDataset(Dataset):
def __init__(self, x, y, transform=None):
self.x = torch.tensor(x, dtype=torch.float32)
self.y = torch.tensor(y, dtype=torch.float32)
self.transform = transform
def __len__(self):
return len(self.x)
def __getitem__(self, idx):
x = self.x[idx]
y = self.y[idx]
sample = {'x': x, 'label': y}
if self.transform:
sample = self.transform(sample)
return sample
def get_validation_accuracy(self):
raise NotImplementedError(
"No validation data is available for this dataset. "
"In this assignment, only the Digit Classification and Language "
"Identification datasets have validation data.")
class PerceptronDataset(CustomDataset):
def __init__(self, model):
points = 500
x = np.hstack([np.random.randn(points, 2), np.ones((points, 1))])
y = np.where(x[:, 0] + 2 * x[:, 1] - 1 >= 0, 1.0, -1.0)
super().__init__(x, np.expand_dims(y, axis=1))
self.model = model
self.epoch = 0
if use_graphics:
fig, ax = plt.subplots(1, 1)
limits = np.array([-3.0, 3.0])
ax.set_xlim(limits)
ax.set_ylim(limits)
positive = ax.scatter(*x[y == 1, :-1].T, color="red", marker="+")
negative = ax.scatter(*x[y == -1, :-1].T, color="blue", marker="_")
line, = ax.plot([], [], color="black")
text = ax.text(0.03, 0.97, "", transform=ax.transAxes, va="top")
ax.legend([positive, negative], [1, -1])
plt.show(block=False)
self.fig = fig
self.limits = limits
self.line = line
self.text = text
self.last_update = time.time()
def __getitem__(self, idx):
self.epoch += 1
if torch.is_tensor(idx):
idx = idx.tolist()
x = self.x[idx]
y = self.y[idx]
if use_graphics and time.time() - self.last_update > 0.01:
w = self.model.get_weights().data.flatten()
limits = self.limits
if w[1] != 0:
self.line.set_data(limits, (-w[0] * limits - w[2]) / w[1])
elif w[0] != 0:
self.line.set_data(np.full(2, -w[2] / w[0]), limits)
else:
self.line.set_data([], [])
self.text.set_text(
"epoch: {:,}\npoint: {:,}/{:,}\nweights: {}".format(
self.epoch, idx * 1 + 1, len(self.x), w))
self.fig.canvas.draw_idle()
self.fig.canvas.start_event_loop(1e-3)
self.last_update = time.time()
return {'x': x, 'label': y}
class RegressionDataset(CustomDataset):
def __init__(self, model):
x = np.expand_dims(np.linspace(-2 * np.pi, 2 * np.pi, num=200), axis=1)
np.random.RandomState(0).shuffle(x)
self.argsort_x = np.argsort(x.flatten())
y = np.sin(x)
super().__init__(x, y)
self.model = model
self.processed = 0
if use_graphics:
fig, ax = plt.subplots(1, 1)
ax.set_xlim(-2 * np.pi, 2 * np.pi)
ax.set_ylim(-1.4, 1.4)
real, = ax.plot(x[self.argsort_x], y[self.argsort_x], color="blue")
learned, = ax.plot([], [], color="red")
text = ax.text(0.03, 0.97, "", transform=ax.transAxes, va="top")
ax.legend([real, learned], ["real", "learned"])
plt.show(block=False)
self.fig = fig
self.learned = learned
self.text = text
self.last_update = time.time()
def __getitem__(self, idx):
data = super().__getitem__(idx)
x = data['x']
y = data['label']
self.processed += 1
if use_graphics and time.time() - self.last_update > 0.1:
predicted = self.model(torch.tensor(self.x, dtype=torch.float32)).data
loss = self.model.get_loss(x, y).data
self.learned.set_data(self.x[self.argsort_x], predicted[self.argsort_x])
self.text.set_text("processed: {:,}\nloss: {:.6f}".format(
self.processed, loss))
self.fig.canvas.draw_idle()
self.fig.canvas.start_event_loop(1e-3)
self.last_update = time.time()
return {'x': x, 'label': y}
class DigitClassificationDataset(CustomDataset):
def __init__(self, model):
mnist_path = get_data_path("mnist.npz")
with np.load(mnist_path) as data:
train_images = data["train_images"]
train_labels = data["train_labels"]
test_images = data["test_images"]
test_labels = data["test_labels"]
assert len(train_images) == len(train_labels) == 60000
assert len(test_images) == len(test_labels) == 10000
self.dev_images = test_images[0::2]
self.dev_labels = test_labels[0::2]
self.test_images = test_images[1::2]
self.test_labels = test_labels[1::2]
train_labels_one_hot = np.zeros((len(train_images), 10))
train_labels_one_hot[range(len(train_images)), train_labels] = 1
super().__init__(train_images, train_labels_one_hot)
self.model = model
self.epoch = 0
self.num_items = 0
if use_graphics:
self.current_accuracy = None
width = 20 # Width of each row expressed as a multiple of image width
samples = 100 # Number of images to display per label
fig = plt.figure()
ax = {}
images = collections.defaultdict(list)
texts = collections.defaultdict(list)
for i in reversed(range(10)):
ax[i] = plt.subplot2grid((30, 1), (3 * i, 0), 2, 1,
sharex=ax.get(9))
plt.setp(ax[i].get_xticklabels(), visible=i == 9)
ax[i].set_yticks([])
ax[i].text(-0.03, 0.5, i, transform=ax[i].transAxes,
va="center")
ax[i].set_xlim(0, 28 * width)
ax[i].set_ylim(0, 28)
for j in range(samples):
images[i].append(ax[i].imshow(
np.zeros((28, 28)), vmin=0, vmax=1, cmap="Greens",
alpha=0.3))
texts[i].append(ax[i].text(
0, 0, "", ha="center", va="top", fontsize="smaller"))
ax[9].set_xticks(np.linspace(0, 28 * width, 11))
ax[9].set_xticklabels(
["{:.1f}".format(num) for num in np.linspace(0, 1, 11)])
ax[9].tick_params(axis="x", pad=16)
ax[9].set_xlabel("Probability of Correct Label")
status = ax[0].text(
0.5, 1.5, "", transform=ax[0].transAxes, ha="center",
va="bottom")
plt.show(block=False)
self.width = width
self.samples = samples
self.fig = fig
self.images = images
self.texts = texts
self.status = status
self.last_update = time.time()
def __getitem__(self, idx):
data = super().__getitem__(idx)
x = data['x']
y = data['label']
if use_graphics and time.time() - self.last_update > 1:
dev_logits = self.model.run(torch.tensor(self.dev_images, dtype=torch.float32)).detach().cpu()
dev_predicted = torch.argmax(dev_logits, axis=1)
dev_probs = torch.exp(nn.functional.log_softmax(dev_logits, dim=1))
dev_accuracy = torch.mean(torch.eq(dev_predicted, torch.tensor(self.dev_labels)).float())
self.status.set_text(
"validation accuracy: {:.2%}".format(dev_accuracy))
for i in range(10):
predicted = dev_predicted[self.dev_labels == i]
probs = dev_probs[self.dev_labels == i][:, i]
linspace = np.linspace(0, len(probs) - 1, self.samples).astype(int)
indices = probs.argsort()[linspace]
for j, (prob, image) in enumerate(zip(probs[indices], self.dev_images[self.dev_labels == i][indices])):
self.images[i][j].set_data(image.reshape((28, 28)))
left = prob * (self.width - 1) * 28
if predicted[indices[j]] == i:
self.images[i][j].set_cmap("Greens")
self.texts[i][j].set_text("")
else:
self.images[i][j].set_cmap("Reds")
self.texts[i][j].set_text(predicted[indices[j]].detach().cpu().numpy())
self.texts[i][j].set_x(left + 14)
self.images[i][j].set_extent([left, left + 28, 0, 28])
self.fig.canvas.draw_idle()
self.fig.canvas.start_event_loop(1e-3)
self.last_update = time.time()
return {'x': x, 'label': y}
def get_validation_accuracy(self):
dev_logits = self.model.run(torch.tensor(self.dev_images, dtype=torch.float32)).data
dev_predicted = torch.argmax(dev_logits, axis=1).detach()
dev_accuracy = (dev_predicted == self.dev_labels).mean()
return dev_accuracy
class LanguageIDDataset(CustomDataset):
def __init__(self, model):
self.model = model
data_path = get_data_path("lang_id.npz")
with np.load(data_path) as data:
self.chars = data['chars']
self.language_codes = data['language_codes']
self.language_names = data['language_names']
self.train_x = data['train_x']
self.train_y = data['train_y']
self.train_buckets = data['train_buckets']
self.dev_x = data['dev_x']
self.dev_y = data['dev_y']
self.dev_buckets = data['dev_buckets']
self.test_x = data['test_x']
self.test_y = data['test_y']
self.test_buckets = data['test_buckets']
self.epoch = 0
self.bucket_weights = self.train_buckets[:, 1] - self.train_buckets[:, 0]
self.bucket_weights = self.bucket_weights / float(self.bucket_weights.sum())
self.chars_print = self.chars
try:
print(u"Alphabet: {}".format(u"".join(self.chars)))
except UnicodeEncodeError:
self.chars_print = "abcdefghijklmnopqrstuvwxyzaaeeeeiinoouuacelnszz"
print("Alphabet: " + self.chars_print)
self.chars_print = list(self.chars_print)
print("""
NOTE: Your terminal does not appear to support printing Unicode characters.
For the purposes of printing to the terminal, some of the letters in the
alphabet above have been substituted with ASCII symbols.""".strip())
print("")
# Select some examples to spotlight in the monitoring phase (3 per language)
spotlight_idxs = []
for i in range(len(self.language_names)):
idxs_lang_i = np.nonzero(self.dev_y == i)[0]
idxs_lang_i = np.random.choice(idxs_lang_i, size=3, replace=False)
spotlight_idxs.extend(list(idxs_lang_i))
self.spotlight_idxs = np.array(spotlight_idxs, dtype=int)
# Templates for printing updates as training progresses
max_word_len = self.dev_x.shape[1]
max_lang_len = max([len(x) for x in self.language_names])
self.predicted_template = u"Pred: {:<NUM}".replace('NUM', str(max_lang_len))
self.word_template = u" "
self.word_template += u"{:<NUM} ".replace('NUM', str(max_word_len))
self.word_template += u"{:<NUM} ({:6.1%})".replace('NUM', str(max_lang_len))
self.word_template += u" {:<NUM} ".replace('NUM', str(max_lang_len + len('Pred: ')))
for i in range(len(self.language_names)):
self.word_template += u"|{}".format(self.language_codes[i])
self.word_template += "{probs[" + str(i) + "]:4.0%}"
self.last_update = time.time()
def __len__(self):
return len(self.train_x)
def _encode(self, inp_x, inp_y):
xs = []
for i in range(inp_x.shape[1]):
if np.all(np.array(inp_x[:, i]) == -1):
break
assert not np.any(np.array(inp_x[:, i]) == -1), (
"Please report this error in the project: batching by length was done incorrectly in the provided code")
x = np.eye(len(self.chars))[np.array(inp_x[:, i], dtype=int)]
xs.append(x)
y = np.eye(len(self.language_names))[inp_y]
j = [[0 for _ in range(47)]]
if len(inp_x) == 1:
return nn.functional.pad(torch.tensor(xs, dtype=torch.float), (0, 0, 0, 0, 0, 10 - len(xs))), torch.tensor(y, dtype=torch.float)
return torch.tensor(xs, dtype=torch.float), torch.tensor(y, dtype=torch.float)
def _softmax(self, x):
exp = np.exp(x - np.max(x, axis=-1, keepdims=True))
return exp / np.sum(exp, axis=-1, keepdims=True)
def _predict(self, split='test'):
if split == 'dev':
data_x = self.dev_x
data_y = self.dev_y
buckets = self.dev_buckets
else:
data_x = self.test_x
data_y = self.test_y
buckets = self.test_buckets
all_predicted = []
all_correct = []
for bucket_id in range(buckets.shape[0]):
start, end = buckets[bucket_id]
xs, y = self._encode(data_x[start:end], data_y[start:end])
predicted = self.model.run(xs)
all_predicted.extend(list(predicted.data))
all_correct.extend(list(data_y[start:end]))
all_predicted_probs = [nn.functional.softmax(torch.tensor(i), dim=-1) for i in all_predicted]
all_predicted = [i.argmax() for i in all_predicted_probs]
all_correct = np.asarray(all_correct)
return all_predicted_probs, all_predicted, all_correct
def __getitem__(self, idx):
if torch.is_tensor(idx):
idx = idx.tolist()
ret = self._encode(self.train_x[idx:idx+1], self.train_y[idx:idx+1])
return {'x': torch.squeeze(ret[0]), 'label': torch.squeeze(ret[1])}
def get_validation_accuracy(self):
dev_predicted_probs, dev_predicted, dev_correct = self._predict('dev')
dev_accuracy = (torch.tensor(dev_predicted) == torch.tensor(dev_correct)).float().mean().item()
return dev_accuracy
class DigitClassificationDataset2(CustomDataset):
def __init__(self, model):
mnist_path = get_data_path("mnist.npz")
training_size = 200
test_size = 100
with np.load(mnist_path) as data:
train_images = data["train_images"][:training_size]
train_labels = data["train_labels"][:training_size]
test_images = data["train_images"][:test_size]
test_labels = data["train_labels"][:test_size]
assert len(train_images) == len(train_labels) == training_size
assert len(test_images) == len(test_labels) == test_size
self.dev_images = test_images[0::2]
self.dev_labels = test_labels[0::2]
self.test_images = test_images[1::2]
self.test_labels = test_labels[1::2]
train_labels_one_hot = np.zeros((len(train_images), 10))
train_labels_one_hot[range(len(train_images)), train_labels] = 1
super().__init__(train_images, train_labels_one_hot)
self.model = model
self.epoch = 0
self.num_items = 0
if use_graphics:
self.current_accuracy = None
width = 20 # Width of each row expressed as a multiple of image width
samples = 100 # Number of images to display per label
fig = plt.figure()
ax = {}
images = collections.defaultdict(list)
texts = collections.defaultdict(list)
for i in reversed(range(10)):
ax[i] = plt.subplot2grid((30, 1), (3 * i, 0), 2, 1, sharex=ax.get(9))
plt.setp(ax[i].get_xticklabels(), visible=i == 9)
ax[i].set_yticks([])
ax[i].text(-0.03, 0.5, i, transform=ax[i].transAxes, va="center")
ax[i].set_xlim(0, 28 * width)
ax[i].set_ylim(0, 28)
for j in range(samples):
images[i].append(ax[i].imshow(np.zeros((28, 28)), vmin=0, vmax=1, cmap="Greens", alpha=0.3))
texts[i].append(ax[i].text(0, 0, "", ha="center", va="top", fontsize="smaller"))
ax[9].set_xticks(np.linspace(0, 28 * width, 11))
ax[9].set_xticklabels(["{:.1f}".format(num) for num in np.linspace(0, 1, 11)])
ax[9].tick_params(axis="x", pad=16)
ax[9].set_xlabel("Probability of Correct Label")
status = ax[0].text(0.5, 1.5, "", transform=ax[0].transAxes, ha="center", va="bottom")
plt.show(block=False)
self.width = width
self.samples = samples
self.fig = fig
self.images = images
self.texts = texts
self.status = status
self.last_update = time.time()
def __getitem__(self, idx):
data = super().__getitem__(idx)
x = data['x']
y = data['label']
if use_graphics and time.time() - self.last_update > 1:
dev_logits = self.model.run(torch.tensor(self.dev_images, dtype=torch.float32)).detach().cpu()
dev_predicted = torch.argmax(dev_logits, axis=1)
dev_probs = torch.exp(nn.functional.log_softmax(dev_logits, dim=1))
dev_accuracy = torch.mean(torch.eq(dev_predicted, torch.tensor(self.dev_labels)).float())
self.status.set_text("validation accuracy: {:.2%}".format(dev_accuracy))
for i in range(10):
predicted = dev_predicted[self.dev_labels == i]
probs = dev_probs[self.dev_labels == i][:, i]
linspace = np.linspace(0, len(probs) - 1, self.samples).astype(int)
indices = probs.argsort()[linspace]
for j, (prob, image) in enumerate(zip(probs[indices], self.dev_images[self.dev_labels == i][indices])):
self.images[i][j].set_data(image.reshape((28, 28)))
left = (prob * (self.width - 1) * 28).detach().cpu().numpy()
if predicted[indices[j]] == i:
self.images[i][j].set_cmap("Greens")
self.texts[i][j].set_text("")
else:
self.images[i][j].set_cmap("Reds")
self.texts[i][j].set_text(predicted[indices[j]].detach().cpu().numpy())
self.texts[i][j].set_x(left + 14)
self.images[i][j].set_extent([left, left + 28, 0, 28])
self.fig.canvas.draw_idle()
self.fig.canvas.start_event_loop(1e-3)
self.last_update = time.time()
return {'x': x, 'label': y}
def get_validation_accuracy(self):
dev_logits = self.model.run(torch.tensor(self.dev_images, dtype=torch.float32)).data
dev_predicted = torch.argmax(dev_logits, axis=1).detach()
dev_accuracy = torch.mean(torch.eq(dev_predicted, torch.tensor(self.dev_labels)).float())
return dev_accuracy
def main():
import models
model = models.PerceptronModel(3)
dataset = PerceptronDataset(model)
model.train(dataset)
model = models.RegressionModel()
dataset = RegressionDataset(model)
model.train(dataset)
model = models.DigitClassificationModel()
dataset = DigitClassificationDataset(model)
model.train(dataset)
model = models.LanguageIDModel()
dataset = LanguageIDDataset(model)
model.train(dataset)
if __name__ == "__main__":
main()

Binary file not shown.

Binary file not shown.

381
machinelearning/models.py Normal file
View File

@ -0,0 +1,381 @@
from torch import no_grad, stack
from torch.utils.data import DataLoader
from torch.nn import Module
"""
Functions you should use.
Please avoid importing any other torch functions or modules.
Your code will not pass if the gradescope autograder detects any changed imports
"""
from torch.nn import Parameter, Linear
from torch import optim, tensor, tensordot, empty, ones
from torch.nn.functional import cross_entropy, relu, mse_loss
from torch import movedim
class PerceptronModel(Module):
def __init__(self, dimensions):
"""
Initialize a new Perceptron instance.
A perceptron classifies data points as either belonging to a particular
class (+1) or not (-1). `dimensions` is the dimensionality of the data.
For example, dimensions=2 would mean that the perceptron must classify
2D points.
In order for our autograder to detect your weight, initialize it as a
pytorch Parameter object as follows:
Parameter(weight_vector)
where weight_vector is a pytorch Tensor of dimension 'dimensions'
Hint: You can use ones(dim) to create a tensor of dimension dim.
"""
super(PerceptronModel, self).__init__()
"*** YOUR CODE HERE ***"
self.w = None #Initialize your weights here
def get_weights(self):
"""
Return a Parameter instance with the current weights of the perceptron.
"""
return self.w
def run(self, x):
"""
Calculates the score assigned by the perceptron to a data point x.
Inputs:
x: a node with shape (1 x dimensions)
Returns: a node containing a single number (the score)
The pytorch function `tensordot` may be helpful here.
"""
"*** YOUR CODE HERE ***"
def get_prediction(self, x):
"""
Calculates the predicted class for a single data point `x`.
Returns: 1 or -1
"""
"*** YOUR CODE HERE ***"
def train(self, dataset):
"""
Train the perceptron until convergence.
You can iterate through DataLoader in order to
retrieve all the batches you need to train on.
Each sample in the dataloader is in the form {'x': features, 'label': label} where label
is the item we need to predict based off of its features.
"""
with no_grad():
dataloader = DataLoader(dataset, batch_size=1, shuffle=True)
"*** YOUR CODE HERE ***"
class RegressionModel(Module):
"""
A neural network model for approximating a function that maps from real
numbers to real numbers. The network should be sufficiently large to be able
to approximate sin(x) on the interval [-2pi, 2pi] to reasonable precision.
"""
def __init__(self):
# Initialize your model parameters here
"*** YOUR CODE HERE ***"
super().__init__()
def forward(self, x):
"""
Runs the model for a batch of examples.
Inputs:
x: a node with shape (batch_size x 1)
Returns:
A node with shape (batch_size x 1) containing predicted y-values
"""
"*** YOUR CODE HERE ***"
def get_loss(self, x, y):
"""
Computes the loss for a batch of examples.
Inputs:
x: a node with shape (batch_size x 1)
y: a node with shape (batch_size x 1), containing the true y-values
to be used for training
Returns: a tensor of size 1 containing the loss
"""
"*** YOUR CODE HERE ***"
def train(self, dataset):
"""
Trains the model.
In order to create batches, create a DataLoader object and pass in `dataset` as well as your required
batch size. You can look at PerceptronModel as a guideline for how you should implement the DataLoader
Each sample in the dataloader object will be in the form {'x': features, 'label': label} where label
is the item we need to predict based off of its features.
Inputs:
dataset: a PyTorch dataset object containing data to be trained on
"""
"*** YOUR CODE HERE ***"
class DigitClassificationModel(Module):
"""
A model for handwritten digit classification using the MNIST dataset.
Each handwritten digit is a 28x28 pixel grayscale image, which is flattened
into a 784-dimensional vector for the purposes of this model. Each entry in
the vector is a floating point number between 0 and 1.
The goal is to sort each digit into one of 10 classes (number 0 through 9).
(See RegressionModel for more information about the APIs of different
methods here. We recommend that you implement the RegressionModel before
working on this part of the project.)
"""
def __init__(self):
# Initialize your model parameters here
super().__init__()
input_size = 28 * 28
output_size = 10
"*** YOUR CODE HERE ***"
def run(self, x):
"""
Runs the model for a batch of examples.
Your model should predict a node with shape (batch_size x 10),
containing scores. Higher scores correspond to greater probability of
the image belonging to a particular class.
Inputs:
x: a tensor with shape (batch_size x 784)
Output:
A node with shape (batch_size x 10) containing predicted scores
(also called logits)
"""
""" YOUR CODE HERE """
def get_loss(self, x, y):
"""
Computes the loss for a batch of examples.
The correct labels `y` are represented as a tensor with shape
(batch_size x 10). Each row is a one-hot vector encoding the correct
digit class (0-9).
Inputs:
x: a node with shape (batch_size x 784)
y: a node with shape (batch_size x 10)
Returns: a loss tensor
"""
""" YOUR CODE HERE """
def train(self, dataset):
"""
Trains the model.
"""
""" YOUR CODE HERE """
class LanguageIDModel(Module):
"""
A model for language identification at a single-word granularity.
(See RegressionModel for more information about the APIs of different
methods here. We recommend that you implement the RegressionModel before
working on this part of the project.)
"""
def __init__(self):
# Our dataset contains words from five different languages, and the
# combined alphabets of the five languages contain a total of 47 unique
# characters.
# You can refer to self.num_chars or len(self.languages) in your code
self.num_chars = 47
self.languages = ["English", "Spanish", "Finnish", "Dutch", "Polish"]
super(LanguageIDModel, self).__init__()
"*** YOUR CODE HERE ***"
# Initialize your model parameters here
def run(self, xs):
"""
Runs the model for a batch of examples.
Although words have different lengths, our data processing guarantees
that within a single batch, all words will be of the same length (L).
Here `xs` will be a list of length L. Each element of `xs` will be a
tensor with shape (batch_size x self.num_chars), where every row in the
array is a one-hot vector encoding of a character. For example, if we
have a batch of 8 three-letter words where the last word is "cat", then
xs[1] will be a tensor that contains a 1 at position (7, 0). Here the
index 7 reflects the fact that "cat" is the last word in the batch, and
the index 0 reflects the fact that the letter "a" is the inital (0th)
letter of our combined alphabet for this task.
Your model should use a Recurrent Neural Network to summarize the list
`xs` into a single tensor of shape (batch_size x hidden_size), for your
choice of hidden_size. It should then calculate a tensor of shape
(batch_size x 5) containing scores, where higher scores correspond to
greater probability of the word originating from a particular language.
Inputs:
xs: a list with L elements (one per character), where each element
is a node with shape (batch_size x self.num_chars)
Returns:
A node with shape (batch_size x 5) containing predicted scores
(also called logits)
"""
"*** YOUR CODE HERE ***"
def get_loss(self, xs, y):
"""
Computes the loss for a batch of examples.
The correct labels `y` are represented as a node with shape
(batch_size x 5). Each row is a one-hot vector encoding the correct
language.
Inputs:
xs: a list with L elements (one per character), where each element
is a node with shape (batch_size x self.num_chars)
y: a node with shape (batch_size x 5)
Returns: a loss node
"""
"*** YOUR CODE HERE ***"
def train(self, dataset):
"""
Trains the model.
Note that when you iterate through dataloader, each batch will returned as its own vector in the form
(batch_size x length of word x self.num_chars). However, in order to run multiple samples at the same time,
get_loss() and run() expect each batch to be in the form (length of word x batch_size x self.num_chars), meaning
that you need to switch the first two dimensions of every batch. This can be done with the movedim() function
as follows:
movedim(input_vector, initial_dimension_position, final_dimension_position)
For more information, look at the pytorch documentation of torch.movedim()
"""
"*** YOUR CODE HERE ***"
def Convolve(input: tensor, weight: tensor):
"""
Acts as a convolution layer by applying a 2d convolution with the given inputs and weights.
DO NOT import any pytorch methods to directly do this, the convolution must be done with only the functions
already imported.
There are multiple ways to complete this function. One possible solution would be to use 'tensordot'.
If you would like to index a tensor, you can do it as such:
tensor[y:y+height, x:x+width]
This returns a subtensor who's first element is tensor[y,x] and has height 'height, and width 'width'
"""
input_tensor_dimensions = input.shape
weight_dimensions = weight.shape
Output_Tensor = tensor(())
"*** YOUR CODE HERE ***"
"*** End Code ***"
return Output_Tensor
class DigitConvolutionalModel(Module):
"""
A model for handwritten digit classification using the MNIST dataset.
This class is a convolutational model which has already been trained on MNIST.
if Convolve() has been correctly implemented, this model should be able to achieve a high accuracy
on the mnist dataset given the pretrained weights.
"""
def __init__(self):
# Initialize your model parameters here
super().__init__()
output_size = 10
self.convolution_weights = Parameter(ones((3, 3)))
""" YOUR CODE HERE """
def run(self, x):
"""
The convolutional layer is already applied, and the output is flattened for you. You should treat x as
a regular 1-dimentional datapoint now, similar to the previous questions.
"""
x = x.reshape(len(x), 28, 28)
x = stack(list(map(lambda sample: Convolve(sample, self.convolution_weights), x)))
x = x.flatten(start_dim=1)
""" YOUR CODE HERE """
def get_loss(self, x, y):
"""
Computes the loss for a batch of examples.
The correct labels `y` are represented as a tensor with shape
(batch_size x 10). Each row is a one-hot vector encoding the correct
digit class (0-9).
Inputs:
x: a node with shape (batch_size x 784)
y: a node with shape (batch_size x 10)
Returns: a loss tensor
"""
""" YOUR CODE HERE """
def train(self, dataset):
"""
Trains the model.
"""
""" YOUR CODE HERE """

Binary file not shown.

Before

Width:  |  Height:  |  Size: 116 KiB