Exercises
ex-sp-ch02-01
EasyWrite a pure function standardize(x: np.ndarray) -> np.ndarray that
returns an array with zero mean and unit variance. Verify that it does
not modify the input array. Add a complete NumPy-style docstring.
Compute mean and std, then return (x - mean) / std.
Test purity: check that the original array is unchanged after the call.
Implementation
import numpy as np
def standardize(x: np.ndarray) -> np.ndarray:
"""Standardize an array to zero mean and unit variance.
Parameters
----------
x : np.ndarray
Input array.
Returns
-------
np.ndarray
Standardized array with mean ~0 and std ~1.
"""
return (x - x.mean()) / x.std()
# Verify purity
data = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
original = data.copy()
result = standardize(data)
assert np.array_equal(data, original), "Input was modified!"
assert np.isclose(result.mean(), 0.0)
assert np.isclose(result.std(), 1.0)
ex-sp-ch02-02
EasyWrite a function apply_transforms(data, *transforms) that takes
a NumPy array and applies a series of transformation functions in
order. Use *args to accept any number of transforms.
Loop through transforms and apply each one sequentially.
Use functools.reduce as an alternative.
Implementation
import numpy as np
from functools import reduce
def apply_transforms(data: np.ndarray, *transforms) -> np.ndarray:
"""Apply a sequence of transformations to data.
Parameters
----------
data : np.ndarray
Input data.
*transforms : callable
Functions to apply in order.
Returns
-------
np.ndarray
Transformed data.
"""
return reduce(lambda d, f: f(d), transforms, data)
# Usage
result = apply_transforms(
np.array([1, -2, 3, -4, 5]),
np.abs,
np.sqrt,
lambda x: x / x.max(),
)
ex-sp-ch02-03
EasyFix the following function that has the mutable default argument bug:
def log_experiment(result, history=[]):
history.append(result)
return history
Demonstrate the bug and write the corrected version.
Call the function multiple times and observe that history accumulates.
Use None as the default and create a new list inside.
Bug demonstration
# Bug: shared mutable default
print(log_experiment("run1")) # ['run1']
print(log_experiment("run2")) # ['run1', 'run2'] β unexpected!
Fix
def log_experiment(result, history=None):
if history is None:
history = []
history.append(result)
return history
print(log_experiment("run1")) # ['run1']
print(log_experiment("run2")) # ['run2'] β correct!
ex-sp-ch02-04
EasyWrite a simple @timer decorator that prints the execution time
of any function. Use functools.wraps to preserve metadata.
Test it on a function that computes the eigenvalues of a random matrix.
Use time.perf_counter() for timing.
The wrapper should accept *args, **kwargs.
Implementation
import functools
import time
import numpy as np
def timer(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
print(f"{func.__name__} took {elapsed:.4f}s")
return result
return wrapper
@timer
def compute_eigenvalues(n: int) -> np.ndarray:
"""Compute eigenvalues of a random n x n matrix."""
A = np.random.randn(n, n)
return np.linalg.eigvals(A)
eigs = compute_eigenvalues(500)
print(compute_eigenvalues.__name__) # 'compute_eigenvalues'
ex-sp-ch02-05
EasyWrite a context manager using contextlib.contextmanager that
prints "Starting..." when entering and "Done in X.XXXXs" when
exiting. Test it on a NumPy matrix multiplication.
Use @contextmanager with a try/finally around yield.
Use time.perf_counter() for timing.
Implementation
import time
from contextlib import contextmanager
import numpy as np
@contextmanager
def timed_block(label: str = "Block"):
print(f"Starting {label}...")
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
print(f"Done in {elapsed:.4f}s")
with timed_block("Matrix multiply"):
A = np.random.randn(1000, 1000)
B = np.random.randn(1000, 1000)
C = A @ B
ex-sp-ch02-06
MediumCreate a function factory make_activation(name: str) that returns
activation functions commonly used in neural networks. Support
"relu", "sigmoid", "tanh", and "leaky_relu" (with a configurable
alpha parameter for leaky ReLU). Each returned function should
operate on NumPy arrays.
Use closures to capture the alpha parameter for leaky_relu.
sigmoid:
Attach a name attribute to the returned function for introspection.
Implementation
import numpy as np
def make_activation(name: str, alpha: float = 0.01):
"""Create an activation function by name.
Parameters
----------
name : str
One of 'relu', 'sigmoid', 'tanh', 'leaky_relu'.
alpha : float
Slope for negative values in leaky ReLU.
"""
if name == "relu":
def activation(x):
return np.maximum(0, x)
elif name == "sigmoid":
def activation(x):
return 1 / (1 + np.exp(-x))
elif name == "tanh":
def activation(x):
return np.tanh(x)
elif name == "leaky_relu":
def activation(x):
return np.where(x > 0, x, alpha * x)
else:
raise ValueError(f"Unknown activation: {name}")
activation.__name__ = name
activation.name = name
return activation
# Test
relu = make_activation("relu")
leaky = make_activation("leaky_relu", alpha=0.1)
x = np.linspace(-3, 3, 100)
print(relu(x).min()) # 0.0
print(leaky(-1.0)) # -0.1
ex-sp-ch02-07
MediumWrite a @count_calls decorator that tracks how many times a function
has been called. The count should be accessible via func.call_count
and resettable via func.reset_count().
Store the count as an attribute on the wrapper function.
Use functools.wraps to preserve the original function metadata.
Implementation
import functools
def count_calls(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
wrapper.call_count += 1
return func(*args, **kwargs)
wrapper.call_count = 0
wrapper.reset_count = lambda: setattr(wrapper, 'call_count', 0)
return wrapper
@count_calls
def simulate(snr_db):
return np.random.rand() < 10**(-snr_db/10)
for _ in range(1000):
simulate(10)
print(simulate.call_count) # 1000
simulate.reset_count()
print(simulate.call_count) # 0
ex-sp-ch02-08
MediumWrite a decorator factory @retry(max_attempts=3, delay=1.0) that
retries a function if it raises an exception, with exponential
backoff. This is useful for network calls or flaky I/O operations
in scientific pipelines.
The factory returns a decorator, which returns a wrapper.
Use time.sleep(delay * 2**attempt) for exponential backoff.
Re-raise the last exception if all retries fail.
Implementation
import functools
import time
def retry(max_attempts: int = 3, delay: float = 1.0):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
wait = delay * (2 ** attempt)
print(f"{func.__name__} failed (attempt {attempt+1}/"
f"{max_attempts}), retrying in {wait:.1f}s: {e}")
time.sleep(wait)
raise last_exception
return wrapper
return decorator
@retry(max_attempts=3, delay=0.5)
def download_data(url: str) -> bytes:
import urllib.request
return urllib.request.urlopen(url).read()
ex-sp-ch02-09
MediumCreate a class-based context manager WorkingDirectory that temporarily
changes the working directory and restores it on exit:
with WorkingDirectory("/tmp/experiments"):
# cwd is now /tmp/experiments
save_results(data)
# cwd is restored to original
Then rewrite it using @contextmanager.
Use os.getcwd() and os.chdir() in enter and exit.
Create the directory if it does not exist using os.makedirs.
Class-based version
import os
class WorkingDirectory:
def __init__(self, path: str, create: bool = True):
self.path = path
self.create = create
self._old_cwd = None
def __enter__(self):
self._old_cwd = os.getcwd()
if self.create:
os.makedirs(self.path, exist_ok=True)
os.chdir(self.path)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
os.chdir(self._old_cwd)
return False
Generator-based version
from contextlib import contextmanager
@contextmanager
def working_directory(path: str, create: bool = True):
old_cwd = os.getcwd()
if create:
os.makedirs(path, exist_ok=True)
os.chdir(path)
try:
yield path
finally:
os.chdir(old_cwd)
ex-sp-ch02-10
MediumRewrite the following loop-variable closure bug so that each callback
correctly captures its own value. Provide three different solutions:
default argument, functools.partial, and a factory function.
# Bug:
callbacks = []
for freq in [100, 200, 400, 800]:
callbacks.append(lambda t: np.sin(2 * np.pi * freq * t))
# All callbacks use freq=800
Default argument: lambda t, f=freq: ...
partial: partial(lambda t, f: np.sin(2*np.pi*f*t), f=freq)
Three solutions
import numpy as np
from functools import partial
# Solution 1: Default argument
callbacks_1 = []
for freq in [100, 200, 400, 800]:
callbacks_1.append(lambda t, f=freq: np.sin(2 * np.pi * f * t))
# Solution 2: functools.partial
def sine_wave(t, freq):
return np.sin(2 * np.pi * freq * t)
callbacks_2 = [partial(sine_wave, freq=f) for f in [100, 200, 400, 800]]
# Solution 3: Factory function
def make_sine(freq):
def sine(t):
return np.sin(2 * np.pi * freq * t)
return sine
callbacks_3 = [make_sine(f) for f in [100, 200, 400, 800]]
ex-sp-ch02-11
MediumWrite a @validate_shapes decorator factory that checks NumPy array
argument shapes before calling the function. Usage:
@validate_shapes({"X": (None, None), "y": (None,)})
def fit(X: np.ndarray, y: np.ndarray):
...
None means "any size in this dimension". The decorator should
raise ValueError with a clear message if shapes do not match.
Use inspect.signature to map argument names to positions.
Compare each dimension: skip if spec is None, check equality otherwise.
Implementation
import functools
import inspect
import numpy as np
def validate_shapes(shape_specs: dict):
def decorator(func):
sig = inspect.signature(func)
param_names = list(sig.parameters.keys())
@functools.wraps(func)
def wrapper(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
for name, expected in shape_specs.items():
arr = bound.arguments.get(name)
if arr is None or not isinstance(arr, np.ndarray):
continue
if len(arr.shape) != len(expected):
raise ValueError(
f"{func.__name__}: {name} expected "
f"{len(expected)}D, got {len(arr.shape)}D"
)
for i, (actual, spec) in enumerate(zip(arr.shape, expected)):
if spec is not None and actual != spec:
raise ValueError(
f"{func.__name__}: {name} dim {i} "
f"expected {spec}, got {actual}"
)
return func(*args, **kwargs)
return wrapper
return decorator
@validate_shapes({"X": (None, 10), "y": (None,)})
def fit(X, y):
return X.shape, y.shape
fit(np.zeros((100, 10)), np.zeros(100)) # OK
# fit(np.zeros((100, 5)), np.zeros(100)) # ValueError
ex-sp-ch02-12
HardImplement a @memoize_disk decorator that caches function results to
disk using pickle. The cache key should be based on the function
name and arguments (handle NumPy arrays by hashing their contents).
Include a cache_dir parameter and a way to invalidate the cache.
Use hashlib.sha256 on argument representations.
For NumPy arrays, hash arr.tobytes() plus str(arr.dtype) plus str(arr.shape).
Store each result as a separate pickle file.
Implementation
import functools
import hashlib
import os
import pickle
import numpy as np
def memoize_disk(cache_dir: str = ".cache"):
def decorator(func):
os.makedirs(cache_dir, exist_ok=True)
def _hash_arg(arg):
if isinstance(arg, np.ndarray):
h = hashlib.sha256(arg.tobytes())
h.update(str(arg.dtype).encode())
h.update(str(arg.shape).encode())
return h.hexdigest()
return hashlib.sha256(repr(arg).encode()).hexdigest()
def _cache_key(args, kwargs):
parts = [func.__name__]
parts.extend(_hash_arg(a) for a in args)
for k, v in sorted(kwargs.items()):
parts.append(f"{k}={_hash_arg(v)}")
combined = "_".join(parts)
return hashlib.sha256(combined.encode()).hexdigest()
@functools.wraps(func)
def wrapper(*args, **kwargs):
key = _cache_key(args, kwargs)
path = os.path.join(cache_dir, f"{key}.pkl")
if os.path.exists(path):
with open(path, "rb") as f:
return pickle.load(f)
result = func(*args, **kwargs)
with open(path, "wb") as f:
pickle.dump(result, f)
return result
def clear_cache():
import shutil
if os.path.exists(cache_dir):
shutil.rmtree(cache_dir)
os.makedirs(cache_dir)
wrapper.clear_cache = clear_cache
return wrapper
return decorator
ex-sp-ch02-13
HardWrite a Pipeline class that uses closures and the >> operator
(via __rshift__) to compose data transformation functions:
pipe = Pipeline(remove_nans) >> log_transform >> normalize
result = pipe(raw_data)
The pipeline should support len() (number of steps), iteration
over steps, and a describe() method that lists each function.
Store a list of functions internally.
__rshift__ should return a new Pipeline with the added function.
Implement __len__, __iter__, and __call__.
Implementation
class Pipeline:
def __init__(self, *funcs):
self._funcs = list(funcs)
def __rshift__(self, other):
if callable(other):
return Pipeline(*self._funcs, other)
raise TypeError(f"Cannot compose with {type(other)}")
def __call__(self, data):
result = data
for func in self._funcs:
result = func(result)
return result
def __len__(self):
return len(self._funcs)
def __iter__(self):
return iter(self._funcs)
def __repr__(self):
names = [f.__name__ for f in self._funcs]
return f"Pipeline({' >> '.join(names)})"
def describe(self):
for i, func in enumerate(self._funcs):
doc = (func.__doc__ or "No description").strip().split('\n')[0]
print(f" Step {i+1}: {func.__name__} β {doc}")
# Usage
pipe = Pipeline(np.abs) >> np.sqrt >> np.log1p
print(pipe) # Pipeline(absolute >> sqrt >> log1p)
print(len(pipe)) # 3
result = pipe(np.array([-4, -1, 0, 1, 4]))
ex-sp-ch02-14
HardImplement a @deprecated(message, version) decorator factory that:
- Issues a
DeprecationWarningon first call - Includes the replacement function name in the message
- Only warns once per function (not on every call)
- Preserves the original function's metadata
@deprecated("Use compute_ber_v2 instead", version="3.0")
def compute_ber(tx, rx):
...
Use warnings.warn with DeprecationWarning category.
Track whether the warning has been issued using a closure variable.
Implementation
import functools
import warnings
def deprecated(message: str, version: str = ""):
def decorator(func):
warned = False
@functools.wraps(func)
def wrapper(*args, **kwargs):
nonlocal warned
if not warned:
ver_info = f" (since v{version})" if version else ""
warnings.warn(
f"{func.__name__} is deprecated{ver_info}. {message}",
category=DeprecationWarning,
stacklevel=2,
)
warned = True
return func(*args, **kwargs)
return wrapper
return decorator
@deprecated("Use compute_ber_v2 instead", version="3.0")
def compute_ber(tx, rx):
"""Old BER computation."""
return np.mean(tx != rx)
ex-sp-ch02-15
HardBuild an ExitStack-based context manager that manages multiple
resources dynamically. Write a function that opens N HDF5-like
files (simulate with regular files), processes them, and ensures
all are closed even if an error occurs partway through.
Use contextlib.ExitStack and demonstrate it with at least 5 files.
Use stack.enter_context(open(...)) to register each file.
ExitStack guarantees all registered cleanups run.
Implementation
import os
import tempfile
from contextlib import ExitStack
def process_multiple_files(file_paths: list[str]) -> dict:
"""Process multiple files, ensuring all are properly closed."""
results = {}
with ExitStack() as stack:
# Open all files β ExitStack closes them all on exit
handles = []
for path in file_paths:
fh = stack.enter_context(open(path, 'r'))
handles.append((path, fh))
# Process each file
for path, fh in handles:
content = fh.read()
results[path] = len(content)
# All files are guaranteed closed here
return results
# Demo with temporary files
with tempfile.TemporaryDirectory() as tmpdir:
paths = []
for i in range(5):
path = os.path.join(tmpdir, f"data_{i}.txt")
with open(path, 'w') as f:
f.write(f"Data for experiment {i}\n" * 100)
paths.append(path)
results = process_multiple_files(paths)
for path, size in results.items():
print(f"{os.path.basename(path)}: {size} chars")
ex-sp-ch02-16
HardWrite a @register decorator that registers functions in a
dispatch table (dictionary). Then use it to build a simple
plugin system for different optimization algorithms:
@register("sgd")
def sgd_optimizer(params, lr=0.01): ...
@register("adam")
def adam_optimizer(params, lr=0.001, beta1=0.9): ...
# Dispatch by name
optimizer = get_optimizer("adam")
The registry is a module-level dict.
register(name) is a decorator factory that adds to the dict.
Implementation
import functools
_OPTIMIZER_REGISTRY = {}
def register(name: str):
def decorator(func):
_OPTIMIZER_REGISTRY[name] = func
@functools.wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
return decorator
def get_optimizer(name: str):
if name not in _OPTIMIZER_REGISTRY:
available = ", ".join(_OPTIMIZER_REGISTRY.keys())
raise ValueError(f"Unknown optimizer: {name}. Available: {available}")
return _OPTIMIZER_REGISTRY[name]
def list_optimizers():
return list(_OPTIMIZER_REGISTRY.keys())
@register("sgd")
def sgd_optimizer(params, lr=0.01):
"""Stochastic Gradient Descent."""
return {p: p - lr * g for p, g in params.items()}
@register("adam")
def adam_optimizer(params, lr=0.001, beta1=0.9, beta2=0.999):
"""Adam optimizer."""
return {p: p - lr * g for p, g in params.items()}
# Dispatch
opt = get_optimizer("adam")
print(opt.__name__) # 'adam_optimizer'
print(list_optimizers()) # ['sgd', 'adam']
ex-sp-ch02-17
ChallengeBuild a complete @experiment_tracker decorator that:
- Logs function name, arguments, start time, end time, and duration
- Captures the return value and any exceptions
- Saves all runs to a JSON file with unique run IDs (UUID)
- Supports nested tracked functions (tracks the call tree)
- Provides a
report()class method to summarize all runs
This simulates a lightweight version of MLflow's tracking functionality.
Use a class-based decorator to maintain state across calls.
Use uuid.uuid4() for run IDs.
Use a thread-local stack for tracking nested calls.
Handle NumPy arrays in arguments by converting to shape/dtype strings for JSON.
Implementation sketch
import functools
import json
import time
import uuid
import threading
from datetime import datetime
class ExperimentTracker:
_runs = []
_call_stack = threading.local()
@classmethod
def track(cls, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
run_id = str(uuid.uuid4())[:8]
if not hasattr(cls._call_stack, 'stack'):
cls._call_stack.stack = []
parent = cls._call_stack.stack[-1] if cls._call_stack.stack else None
run = {
"run_id": run_id,
"parent_id": parent,
"function": func.__name__,
"start_time": datetime.now().isoformat(),
"args": [repr(a)[:100] for a in args],
"kwargs": {k: repr(v)[:100] for k, v in kwargs.items()},
}
cls._call_stack.stack.append(run_id)
try:
start = time.perf_counter()
result = func(*args, **kwargs)
run["duration"] = time.perf_counter() - start
run["status"] = "success"
run["result"] = repr(result)[:200]
return result
except Exception as e:
run["duration"] = time.perf_counter() - start
run["status"] = "error"
run["error"] = str(e)
raise
finally:
cls._call_stack.stack.pop()
cls._runs.append(run)
return wrapper
@classmethod
def save(cls, path: str = "experiment_log.json"):
with open(path, "w") as f:
json.dump(cls._runs, f, indent=2)
@classmethod
def report(cls):
print(f"Total runs: {len(cls._runs)}")
for run in cls._runs:
status = run["status"]
print(f" [{run['run_id']}] {run['function']} "
f"({run['duration']:.4f}s) [{status}]")
experiment_tracker = ExperimentTracker.track
ex-sp-ch02-18
ChallengeImplement a @jit_fallback decorator that tries to JIT-compile a
function with Numba, but gracefully falls back to the pure Python
version if Numba is not installed or compilation fails. It should:
- Detect whether Numba is available at import time
- If available, apply
@numba.jit(nopython=True)with error handling - If compilation fails, warn and use the original function
- Provide a
.is_jittedattribute to check which version is active - Include a benchmark method that compares JIT vs non-JIT performance
Test it on a Monte Carlo simulation function.
Use try/except ImportError to detect Numba.
Numba compilation errors are numba.core.errors.TypingError.
Store both versions (original and compiled) to allow benchmarking.
Implementation
import functools
import time
import warnings
def jit_fallback(func=None, *, nopython=True):
if func is None:
return lambda f: jit_fallback(f, nopython=nopython)
original = func
try:
import numba
try:
jitted = numba.jit(nopython=nopython)(func)
# Trigger compilation with a test call if possible
is_jitted = True
except Exception as e:
warnings.warn(f"Numba JIT failed for {func.__name__}: {e}")
jitted = func
is_jitted = False
except ImportError:
jitted = func
is_jitted = False
@functools.wraps(func)
def wrapper(*args, **kwargs):
return jitted(*args, **kwargs)
wrapper.is_jitted = is_jitted
wrapper.original = original
wrapper.jitted = jitted
def benchmark(*args, n=100, **kwargs):
# Time original
start = time.perf_counter()
for _ in range(n):
original(*args, **kwargs)
t_orig = (time.perf_counter() - start) / n
# Time JIT version
start = time.perf_counter()
for _ in range(n):
jitted(*args, **kwargs)
t_jit = (time.perf_counter() - start) / n
speedup = t_orig / t_jit if t_jit > 0 else float('inf')
print(f"{func.__name__}: original={t_orig:.6f}s, "
f"jit={t_jit:.6f}s, speedup={speedup:.1f}x")
return {"original": t_orig, "jit": t_jit, "speedup": speedup}
wrapper.benchmark = benchmark
return wrapper
@jit_fallback
def monte_carlo_pi(n_samples: int) -> float:
count = 0
for i in range(n_samples):
x = np.random.random()
y = np.random.random()
if x*x + y*y <= 1.0:
count += 1
return 4.0 * count / n_samples