Exercises
ex-sp-ch03-01
EasyCreate a @dataclass called ExperimentConfig with fields: n_samples (int, default 1000),
snr_db (float, default 20.0), algorithm (str, default "lasso"), and tags (list of str,
default empty). Make it frozen and verify that you cannot modify attributes after creation.
Use field(default_factory=tuple) for the immutable tags field, since frozen dataclasses need hashable defaults.
Try config.snr_db = 10.0 and verify it raises FrozenInstanceError.
Implementation
from dataclasses import dataclass, field
@dataclass(frozen=True)
class ExperimentConfig:
n_samples: int = 1000
snr_db: float = 20.0
algorithm: str = "lasso"
tags: tuple[str, ...] = ()
config = ExperimentConfig(snr_db=15.0, tags=("sparse", "noiseless"))
print(config)
# ExperimentConfig(n_samples=1000, snr_db=15.0, algorithm='lasso', tags=('sparse', 'noiseless'))
try:
config.snr_db = 10.0
except AttributeError as e:
print(f"Cannot modify: {e}")
# Hashable — can be used as dict key
results = {config: {"nmse": -25.3}}
ex-sp-ch03-02
EasyImplement a Vector3D class with x, y, z attributes and the following
dunder methods: __repr__, __add__ (vector addition), __mul__ (scalar
multiplication), and __abs__ (Euclidean norm). Verify with test cases.
Use math.sqrt(self.x**2 + self.y**2 + self.z**2) for __abs__.
Return a new Vector3D from __add__ and __mul__ (keep it pure).
Implementation
import math
class Vector3D:
def __init__(self, x: float, y: float, z: float):
self.x, self.y, self.z = x, y, z
def __repr__(self):
return f"Vector3D({self.x}, {self.y}, {self.z})"
def __add__(self, other):
return Vector3D(self.x + other.x, self.y + other.y, self.z + other.z)
def __mul__(self, scalar):
return Vector3D(self.x * scalar, self.y * scalar, self.z * scalar)
def __rmul__(self, scalar):
return self.__mul__(scalar)
def __abs__(self):
return math.sqrt(self.x**2 + self.y**2 + self.z**2)
v1 = Vector3D(1, 2, 3)
v2 = Vector3D(4, 5, 6)
print(v1 + v2) # Vector3D(5, 7, 9)
print(v1 * 2) # Vector3D(2, 4, 6)
print(abs(v1)) # 3.7416...
ex-sp-ch03-03
EasyWrite a base class Estimator with an @abstractmethod called estimate
that takes y: np.ndarray and returns np.ndarray. Create two subclasses:
MeanEstimator (returns the mean repeated) and MedianEstimator (returns
the median repeated). Verify that attempting Estimator() raises TypeError.
Import ABC and abstractmethod from abc.
np.full_like(y, np.mean(y)) creates an array of the same shape filled with the mean.
Implementation
from abc import ABC, abstractmethod
import numpy as np
class Estimator(ABC):
@abstractmethod
def estimate(self, y: np.ndarray) -> np.ndarray:
...
class MeanEstimator(Estimator):
def estimate(self, y):
return np.full_like(y, np.mean(y))
class MedianEstimator(Estimator):
def estimate(self, y):
return np.full_like(y, np.median(y))
# Test
try:
Estimator() # TypeError!
except TypeError as e:
print(f"Cannot instantiate ABC: {e}")
y = np.array([1.0, 5.0, 2.0, 8.0, 3.0])
print(MeanEstimator().estimate(y)) # [3.8, 3.8, 3.8, 3.8, 3.8]
print(MedianEstimator().estimate(y)) # [3.0, 3.0, 3.0, 3.0, 3.0]
ex-sp-ch03-04
EasyDefine a typing.Protocol called HasLength with a single method __len__() -> int.
Make it @runtime_checkable. Verify that list, str, np.ndarray, and dict
all satisfy the protocol using isinstance.
All built-in containers already have __len__.
Test with isinstance([], HasLength) — it should return True.
Implementation
from typing import Protocol, runtime_checkable
import numpy as np
@runtime_checkable
class HasLength(Protocol):
def __len__(self) -> int: ...
# All of these should satisfy HasLength
assert isinstance([], HasLength)
assert isinstance("hello", HasLength)
assert isinstance(np.zeros(5), HasLength)
assert isinstance({}, HasLength)
assert isinstance((1, 2), HasLength)
# This does NOT satisfy HasLength
assert not isinstance(42, HasLength)
assert not isinstance(3.14, HasLength)
print("All checks passed!")
ex-sp-ch03-05
EasyCreate a class NamedArray that wraps a NumPy array with a name attribute.
Implement __array__ so that np.asarray(named_array) returns the underlying
data. Verify that np.mean(NamedArray("temperature", data)) works.
Store the data as self._data = np.asarray(value).
__array__ should return self._data (or a copy for safety).
Implementation
import numpy as np
class NamedArray:
def __init__(self, name: str, data):
self.name = name
self._data = np.asarray(data)
def __array__(self, dtype=None, copy=None):
if dtype is not None:
return self._data.astype(dtype)
return self._data.copy()
def __repr__(self):
return f"NamedArray('{self.name}', shape={self._data.shape})"
temp = NamedArray("temperature", [20.1, 21.5, 19.8, 22.3])
print(np.mean(temp)) # 20.925
print(np.array(temp).dtype) # float64
print(np.std(temp)) # ~0.93
ex-sp-ch03-06
MediumImplement a Solver base class (ABC) with methods solve(A, y) -> np.ndarray
and convergence_history() -> list[float]. Create two subclasses:
GradientDescentSolver and ProximalSolver. Each should track the cost
at every iteration. Write a function compare_solvers(solvers, A, y) that
runs all solvers and returns a dict mapping solver names to their final costs.
Store self._history = [] in the base __init__ and append costs in each iteration.
The proximal solver uses soft thresholding after each gradient step.
Implementation
from abc import ABC, abstractmethod
import numpy as np
class Solver(ABC):
def __init__(self, n_iters=100, step_size=0.01):
self.n_iters = n_iters
self.step_size = step_size
self._history = []
@abstractmethod
def solve(self, A: np.ndarray, y: np.ndarray) -> np.ndarray: ...
def convergence_history(self) -> list[float]:
return self._history.copy()
class GradientDescentSolver(Solver):
def solve(self, A, y):
x = np.zeros(A.shape[1])
self._history = []
for _ in range(self.n_iters):
grad = A.T @ (A @ x - y)
x -= self.step_size * grad
self._history.append(float(np.linalg.norm(A @ x - y)))
return x
class ProximalSolver(Solver):
def __init__(self, n_iters=100, step_size=0.01, lam=0.1):
super().__init__(n_iters, step_size)
self.lam = lam
def solve(self, A, y):
x = np.zeros(A.shape[1])
self._history = []
for _ in range(self.n_iters):
grad = A.T @ (A @ x - y)
z = x - self.step_size * grad
x = np.sign(z) * np.maximum(np.abs(z) - self.lam * self.step_size, 0)
cost = 0.5 * np.linalg.norm(A @ x - y)**2 + self.lam * np.linalg.norm(x, 1)
self._history.append(float(cost))
return x
def compare_solvers(solvers, A, y):
results = {}
for s in solvers:
s.solve(A, y)
results[s.__class__.__name__] = s.convergence_history()[-1]
return results
ex-sp-ch03-07
MediumDesign a ForwardOperator protocol with forward(x), adjoint(y), and a
shape property. Implement three concrete operators: DenseMatrix (stores
full matrix), DiagonalOperator (stores only diagonal), and FFTOperator
(uses FFT/IFFT). Write a function that accepts any ForwardOperator and
computes A^T A x for a given x.
For FFTOperator, forward = np.fft.fft and adjoint = np.fft.ifft (scaled).
The shape property returns (m, n) — for FFT, m == n.
Implementation
from typing import Protocol, runtime_checkable
import numpy as np
@runtime_checkable
class ForwardOperator(Protocol):
def forward(self, x: np.ndarray) -> np.ndarray: ...
def adjoint(self, y: np.ndarray) -> np.ndarray: ...
@property
def shape(self) -> tuple[int, int]: ...
class DenseMatrix:
def __init__(self, A):
self._A = np.asarray(A)
def forward(self, x): return self._A @ x
def adjoint(self, y): return self._A.T @ y
@property
def shape(self): return self._A.shape
class DiagonalOperator:
def __init__(self, d):
self._d = np.asarray(d)
def forward(self, x): return self._d * x
def adjoint(self, y): return self._d.conj() * y
@property
def shape(self): return (len(self._d), len(self._d))
class FFTOperator:
def __init__(self, n):
self._n = n
def forward(self, x): return np.fft.fft(x) / np.sqrt(self._n)
def adjoint(self, y): return np.fft.ifft(y) * np.sqrt(self._n)
@property
def shape(self): return (self._n, self._n)
def apply_gram(op: ForwardOperator, x: np.ndarray) -> np.ndarray:
return np.real(op.adjoint(op.forward(x)))
# Test
for op in [DenseMatrix(np.eye(5)), DiagonalOperator(np.ones(5)), FFTOperator(5)]:
assert isinstance(op, ForwardOperator)
result = apply_gram(op, np.ones(5))
print(f"{op.__class__.__name__}: {np.allclose(result, np.ones(5))}")
ex-sp-ch03-08
MediumImplement a UnitArray class with __array_ufunc__ that preserves unit
strings through addition (same units required), multiplication (units
concatenated), and division (units as fraction). Raise ValueError for
addition of different units.
Check ufunc identity: np.add, np.subtract require same units.
For np.multiply, combine units as "unit1*unit2"; for np.divide, use "unit1/unit2".
Implementation
import numpy as np
class UnitArray:
def __init__(self, data, unit="1"):
self._data = np.asarray(data, dtype=float)
self.unit = unit
def __repr__(self):
return f"UnitArray({self._data}, '{self.unit}')"
def __array__(self, dtype=None, copy=None):
return self._data if dtype is None else self._data.astype(dtype)
def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
raw = []
units = []
for inp in inputs:
if isinstance(inp, UnitArray):
raw.append(inp._data)
units.append(inp.unit)
else:
raw.append(np.asarray(inp))
units.append("1")
result = getattr(ufunc, method)(*raw, **kwargs)
if ufunc in (np.add, np.subtract):
unique = set(u for u in units if u != "1")
if len(unique) > 1:
raise ValueError(f"Cannot {ufunc.__name__} units: {units}")
return UnitArray(result, unique.pop() if unique else "1")
elif ufunc == np.multiply:
non_trivial = [u for u in units if u != "1"]
return UnitArray(result, "*".join(non_trivial) or "1")
elif ufunc == np.divide:
return UnitArray(result, f"{units[0]}/{units[1]}")
return UnitArray(result, units[0] if units else "1")
# Test
volts = UnitArray([1, 2, 3], "V")
amps = UnitArray([0.1, 0.2, 0.3], "A")
print(np.add(volts, UnitArray([1, 1, 1], "V"))) # UnitArray([2, 3, 4], 'V')
print(np.multiply(volts, amps)) # UnitArray([0.1, 0.4, 0.9], 'V*A')
ex-sp-ch03-09
MediumImplement the strategy pattern for a signal processing pipeline. Define a
Filter protocol with apply(signal) -> signal. Create three filters:
LowPassFilter, HighPassFilter, and BandPassFilter. Build a Pipeline
class that chains any sequence of filters.
Use scipy.signal.butter and scipy.signal.sosfilt for the filter implementations.
The Pipeline stores a list of filters and applies them sequentially.
Implementation
from typing import Protocol
import numpy as np
class Filter(Protocol):
def apply(self, signal: np.ndarray) -> np.ndarray: ...
class LowPassFilter:
def __init__(self, cutoff: float = 0.3):
self.cutoff = cutoff
def apply(self, signal):
n = len(signal)
freqs = np.fft.fftfreq(n)
spectrum = np.fft.fft(signal)
spectrum[np.abs(freqs) > self.cutoff] = 0
return np.real(np.fft.ifft(spectrum))
class HighPassFilter:
def __init__(self, cutoff: float = 0.1):
self.cutoff = cutoff
def apply(self, signal):
n = len(signal)
freqs = np.fft.fftfreq(n)
spectrum = np.fft.fft(signal)
spectrum[np.abs(freqs) < self.cutoff] = 0
return np.real(np.fft.ifft(spectrum))
class BandPassFilter:
def __init__(self, low: float = 0.1, high: float = 0.3):
self.low, self.high = low, high
def apply(self, signal):
n = len(signal)
freqs = np.fft.fftfreq(n)
spectrum = np.fft.fft(signal)
mask = (np.abs(freqs) >= self.low) & (np.abs(freqs) <= self.high)
spectrum[~mask] = 0
return np.real(np.fft.ifft(spectrum))
class Pipeline:
def __init__(self, *filters: Filter):
self.filters = list(filters)
def process(self, signal: np.ndarray) -> np.ndarray:
result = signal.copy()
for f in self.filters:
result = f.apply(result)
return result
# Usage
pipe = Pipeline(HighPassFilter(0.05), LowPassFilter(0.4))
clean = pipe.process(noisy_signal)
ex-sp-ch03-10
MediumImplement dependency injection for a MonteCarloSimulation class. It should
accept a DataGenerator, Solver, and MetricsComputer as constructor arguments.
Write a test using mock implementations that return fixed values.
The mock DataGenerator can return a fixed identity matrix and known signal.
The mock Solver can return the true signal (perfect reconstruction).
Implementation
from typing import Protocol
import numpy as np
class DataGenerator(Protocol):
def generate(self, n, m, seed) -> tuple[np.ndarray, np.ndarray, np.ndarray]: ...
class Solver(Protocol):
def solve(self, A, y) -> np.ndarray: ...
class MetricsComputer(Protocol):
def compute(self, x_true, x_hat) -> dict: ...
class MonteCarloSimulation:
def __init__(self, generator: DataGenerator, solver: Solver, metrics: MetricsComputer):
self.generator = generator
self.solver = solver
self.metrics = metrics
def run(self, n_trials, n, m, seed=42):
results = []
for i in range(n_trials):
A, y, x_true = self.generator.generate(n, m, seed + i)
x_hat = self.solver.solve(A, y)
results.append(self.metrics.compute(x_true, x_hat))
return results
# Test with mocks
class MockGenerator:
def generate(self, n, m, seed):
return np.eye(n, m), np.ones(n), np.ones(m)
class MockSolver:
def solve(self, A, y):
return y # perfect reconstruction
class MockMetrics:
def compute(self, x_true, x_hat):
return {"nmse": float(np.linalg.norm(x_hat - x_true)**2)}
sim = MonteCarloSimulation(MockGenerator(), MockSolver(), MockMetrics())
results = sim.run(n_trials=3, n=5, m=5)
assert all(r["nmse"] == 0.0 for r in results)
print("All tests passed!")
ex-sp-ch03-11
MediumCreate a ChannelModel ABC with abstract methods apply(x) -> y and
get_matrix(n_rx, n_tx) -> np.ndarray. Implement AWGNChannel,
RayleighChannel, and RicianChannel subclasses. Show the MRO
for a class that inherits from both RayleighChannel and a LoggingMixin.
Rayleigh: H has i.i.d. complex Gaussian entries.
Rician: H = sqrt(K/(K+1)) * H_los + sqrt(1/(K+1)) * H_rayleigh where K is the Rician factor.
Implementation
from abc import ABC, abstractmethod
import numpy as np
class ChannelModel(ABC):
@abstractmethod
def apply(self, x: np.ndarray) -> np.ndarray: ...
@abstractmethod
def get_matrix(self, n_rx: int, n_tx: int) -> np.ndarray: ...
class AWGNChannel(ChannelModel):
def __init__(self, snr_db: float):
self.snr_db = snr_db
def apply(self, x):
noise_std = np.sqrt(np.mean(np.abs(x)**2) * 10**(-self.snr_db/10))
return x + noise_std * (np.random.randn(*x.shape) + 1j * np.random.randn(*x.shape)) / np.sqrt(2)
def get_matrix(self, n_rx, n_tx):
return np.eye(n_rx, n_tx)
class RayleighChannel(ChannelModel):
def __init__(self, snr_db: float, seed: int = 42):
self.snr_db = snr_db
self.seed = seed
def apply(self, x):
H = self.get_matrix(len(x), len(x))
return H @ x
def get_matrix(self, n_rx, n_tx):
rng = np.random.default_rng(self.seed)
return (rng.standard_normal((n_rx, n_tx)) + 1j * rng.standard_normal((n_rx, n_tx))) / np.sqrt(2 * n_tx)
class RicianChannel(ChannelModel):
def __init__(self, snr_db: float, K: float = 3.0, seed: int = 42):
self.snr_db = snr_db
self.K = K
self.seed = seed
def apply(self, x):
H = self.get_matrix(len(x), len(x))
return H @ x
def get_matrix(self, n_rx, n_tx):
rng = np.random.default_rng(self.seed)
H_los = np.ones((n_rx, n_tx)) / np.sqrt(n_tx)
H_nlos = (rng.standard_normal((n_rx, n_tx)) + 1j * rng.standard_normal((n_rx, n_tx))) / np.sqrt(2 * n_tx)
return np.sqrt(self.K/(self.K+1)) * H_los + np.sqrt(1/(self.K+1)) * H_nlos
class LoggingMixin:
def log(self, msg):
print(f"[{self.__class__.__name__}] {msg}")
class LoggedRayleigh(LoggingMixin, RayleighChannel):
pass
print(LoggedRayleigh.__mro__)
# (LoggedRayleigh, LoggingMixin, RayleighChannel, ChannelModel, ABC, object)
ex-sp-ch03-12
HardBuild a complete SignalArray class that wraps a NumPy array with metadata
(sample_rate, channel_name) and implements both __array_ufunc__ and
__array_function__. Support np.concatenate (all arrays must have the same
sample_rate) and np.mean/np.std (return plain floats). Write at least
5 test cases demonstrating the interop.
Use the HANDLED_FUNCTIONS dispatch pattern from the section.
For np.concatenate, validate that all arrays have the same sample_rate.
Key implementation points
HANDLED_FUNCTIONS = {}
def implements(np_function):
def decorator(func):
HANDLED_FUNCTIONS[np_function] = func
return func
return decorator
class SignalArray:
def __init__(self, data, sample_rate=1.0, channel="ch0"):
self._data = np.asarray(data)
self.sample_rate = sample_rate
self.channel = channel
def __array__(self, dtype=None, copy=None):
return self._data.copy() if dtype is None else self._data.astype(dtype)
def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
raw = [np.asarray(i) if isinstance(i, SignalArray) else i for i in inputs]
sigs = [i for i in inputs if isinstance(i, SignalArray)]
result = getattr(ufunc, method)(*raw, **kwargs)
if isinstance(result, np.ndarray) and sigs:
return SignalArray(result, sigs[0].sample_rate, sigs[0].channel)
return result
def __array_function__(self, func, types, args, kwargs):
if func in HANDLED_FUNCTIONS:
return HANDLED_FUNCTIONS[func](*args, **kwargs)
return NotImplemented
@implements(np.concatenate)
def concat(arrays, axis=0):
sigs = [a for a in arrays if isinstance(a, SignalArray)]
rates = {s.sample_rate for s in sigs}
if len(rates) > 1:
raise ValueError(f"Cannot concatenate signals with different sample rates: {rates}")
raw = [np.asarray(a) for a in arrays]
return SignalArray(np.concatenate(raw, axis=axis), sigs[0].sample_rate, sigs[0].channel)
ex-sp-ch03-13
HardImplement a Plugin system using Protocols. Define a PluginProtocol with
name: str, version: str, and execute(data) -> data. Create a PluginManager
that discovers plugins, validates they conform to the protocol, and runs them
in a configurable order. Include error handling for plugins that fail.
Use isinstance(plugin, PluginProtocol) with @runtime_checkable for validation.
Wrap each plugin.execute() in try/except to isolate failures.
Implementation
from typing import Protocol, runtime_checkable
import numpy as np
@runtime_checkable
class PluginProtocol(Protocol):
name: str
version: str
def execute(self, data: np.ndarray) -> np.ndarray: ...
class PluginManager:
def __init__(self):
self._plugins: list[PluginProtocol] = []
self._errors: list[tuple[str, Exception]] = []
def register(self, plugin: PluginProtocol) -> None:
if not isinstance(plugin, PluginProtocol):
raise TypeError(f"{plugin} does not conform to PluginProtocol")
self._plugins.append(plugin)
def run_all(self, data: np.ndarray) -> np.ndarray:
self._errors = []
result = data.copy()
for plugin in self._plugins:
try:
result = plugin.execute(result)
except Exception as e:
self._errors.append((plugin.name, e))
return result
class NormalizerPlugin:
name = "normalizer"
version = "1.0"
def execute(self, data):
return (data - data.mean()) / (data.std() + 1e-8)
class ClipPlugin:
name = "clipper"
version = "1.0"
def __init__(self, low=-3, high=3):
self.low, self.high = low, high
def execute(self, data):
return np.clip(data, self.low, self.high)
mgr = PluginManager()
mgr.register(NormalizerPlugin())
mgr.register(ClipPlugin())
result = mgr.run_all(np.random.randn(100) * 10)
ex-sp-ch03-14
HardDesign a ComposablePipeline that uses the builder pattern to chain
processing steps. Each step conforms to a Step protocol with
process(data) -> data. The pipeline should support: adding steps,
removing steps by name, reordering steps, and running with intermediate
result logging. Compare this design to a deep inheritance hierarchy.
Use a list of (name, step) tuples for ordered, named steps.
The builder pattern returns self for chaining: pipeline.add(a).add(b).run(data).
Key design
from typing import Protocol
import numpy as np
class Step(Protocol):
def process(self, data: np.ndarray) -> np.ndarray: ...
class ComposablePipeline:
def __init__(self):
self._steps: list[tuple[str, Step]] = []
self._log: list[tuple[str, np.ndarray]] = []
def add(self, name: str, step: Step) -> "ComposablePipeline":
self._steps.append((name, step))
return self # builder pattern
def remove(self, name: str) -> "ComposablePipeline":
self._steps = [(n, s) for n, s in self._steps if n != name]
return self
def run(self, data: np.ndarray, log=False) -> np.ndarray:
self._log = []
result = data.copy()
for name, step in self._steps:
result = step.process(result)
if log:
self._log.append((name, result.copy()))
return result
def get_log(self) -> list[tuple[str, np.ndarray]]:
return self._log
# Usage with builder pattern
class Scale:
def __init__(self, factor): self.factor = factor
def process(self, data): return data * self.factor
class Offset:
def __init__(self, value): self.value = value
def process(self, data): return data + self.value
result = (ComposablePipeline()
.add("scale", Scale(2.0))
.add("offset", Offset(-1.0))
.add("clip", type("Clip", (), {"process": lambda self, d: np.clip(d, 0, 10)})())
.run(np.array([1, 2, 3, 4, 5]), log=True))
ex-sp-ch03-15
HardImplement a MemoizedOperator wrapper that caches the result of a
ForwardOperator's forward() method using a hash of the input array.
Use composition (not inheritance) to wrap any operator. Implement cache
statistics (hits, misses, hit rate) and a clear_cache() method.
Use hash(x.tobytes()) to hash NumPy arrays.
Store results in a dict keyed by input hash.
Implementation
import numpy as np
class MemoizedOperator:
def __init__(self, operator):
self._operator = operator
self._cache: dict[int, np.ndarray] = {}
self.hits = 0
self.misses = 0
def forward(self, x: np.ndarray) -> np.ndarray:
key = hash(x.tobytes())
if key in self._cache:
self.hits += 1
return self._cache[key].copy()
self.misses += 1
result = self._operator.forward(x)
self._cache[key] = result.copy()
return result
def adjoint(self, y: np.ndarray) -> np.ndarray:
return self._operator.adjoint(y)
@property
def shape(self):
return self._operator.shape
@property
def hit_rate(self) -> float:
total = self.hits + self.misses
return self.hits / total if total > 0 else 0.0
def clear_cache(self):
self._cache.clear()
self.hits = self.misses = 0
# Usage
base_op = DenseMatrix(np.random.randn(50, 100))
cached_op = MemoizedOperator(base_op)
x = np.random.randn(100)
_ = cached_op.forward(x) # miss
_ = cached_op.forward(x) # hit!
print(f"Hit rate: {cached_op.hit_rate:.0%}") # 50%
ex-sp-ch03-16
HardCreate a multi-backend Tensor class that dispatches operations to NumPy
or CuPy depending on a backend attribute. Implement __array_ufunc__
that routes ufunc calls to the appropriate backend. The class should support
.to("numpy") and .to("cupy") for backend transfer (use NumPy-only for
the implementation, simulating CuPy with a wrapper).
Define a Backend protocol with array, add, multiply methods.
The .to() method creates a new Tensor with the data moved to the target backend.
Core implementation
import numpy as np
class NumpyBackend:
name = "numpy"
@staticmethod
def array(data): return np.asarray(data)
@staticmethod
def add(a, b): return np.add(a, b)
@staticmethod
def multiply(a, b): return np.multiply(a, b)
class FakeCupyBackend:
"""Simulated CuPy backend using NumPy."""
name = "cupy"
@staticmethod
def array(data): return np.asarray(data) # would be cupy.asarray
@staticmethod
def add(a, b): return np.add(a, b)
@staticmethod
def multiply(a, b): return np.multiply(a, b)
BACKENDS = {"numpy": NumpyBackend, "cupy": FakeCupyBackend}
class Tensor:
def __init__(self, data, backend="numpy"):
self.backend_name = backend
self._backend = BACKENDS[backend]
self._data = self._backend.array(data)
def to(self, backend: str) -> "Tensor":
return Tensor(np.asarray(self._data), backend=backend)
def __array__(self, dtype=None, copy=None):
return np.asarray(self._data)
def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
raw = [i._data if isinstance(i, Tensor) else i for i in inputs]
result = getattr(ufunc, method)(*raw, **kwargs)
return Tensor(result, backend=self.backend_name)
def __repr__(self):
return f"Tensor({self._data}, backend='{self.backend_name}')"
t = Tensor([1, 2, 3])
t_gpu = t.to("cupy")
print(np.add(t, Tensor([4, 5, 6]))) # Tensor([5, 7, 9], backend='numpy')
ex-sp-ch03-17
ChallengeDesign and implement a complete simulation framework using all the patterns from this chapter. The framework should:
- Use
@dataclass(frozen=True)for experiment configuration - Define
ForwardOperatorandDenoiserprotocols - Implement at least 2 operators and 2 denoisers
- Use composition to build solvers from (operator, denoiser) pairs
- Include dependency injection for testability
- Support serialization of configs and results to JSON
- Run a parameter sweep over SNR values and plot convergence curves
The entire framework should be under 200 lines of code.
Start with the Config dataclass and work outward to protocols, implementations, and the runner.
Use dataclasses.asdict() for JSON serialization.
The parameter sweep is a list comprehension over configs.
ex-sp-ch03-18
ChallengeExtend the PhysicalQuantity class from this chapter to support:
- Full
__array_function__protocol (at leastnp.concatenate,np.stack,np.linalg.norm,np.fft.fft) - Unit algebra:
m * m = m^2,m / s = m*s^-1, automatic simplification - Unit conversion:
quantity.to("km")converts meters to kilometers - A registry of known unit conversions loaded from a YAML file
- Integration with matplotlib for auto-labeled axes
Write comprehensive tests including edge cases (zero-dimensional arrays, complex units, broadcasting).
Parse unit strings into a dict of {base_unit: exponent} for algebra.
Store conversion factors as a graph and use BFS/DFS for multi-hop conversions.