# `latpy.ml` — Machine Learning Pure stdlib ML algorithms backed by NDArray. Every model operates on `latpy.latmath.array.NDArray` and requires zero third-party dependencies — no numpy, scipy, or sklearn. --- ## Clustering ```python from latpy.ml import kmeans ``` | Signature | Description | |---|---| | `kmeans(X, k, max_iter=100, tol=1e-4, seed=42) -> (centroids, labels, inertia)` | K-Means clustering. Returns (k×d centroids, n×1 labels, inertia). | ### Algorithm: Lloyd's K-Means K-Means minimizes the within-cluster sum of squares: \[ \arg\min_{\{c_j\},\{z_i\}} \sum_{i=1}^n \|x_i - c_{z_i}\|_2^2 \] by alternating between **assignment** (each point to nearest centroid) and **update** (recompute centroids as cluster means). It is a greedy, iterative algorithm that converges to a **local optimum** (the objective is non-convex). Initial centroids are chosen as `k` distinct random samples from `X` (seeded for reproducibility). **Why K-Means?** Simple, interpretable, O(nkd) per iteration. Works well on spherical, well-separated clusters. Sensitive to initialization and outliers. **Complexity:** O(nkd) per iteration, O(kd) memory for centroids. ### Example ```python from latpy.latmath.array import array from latpy.ml import kmeans X = array([[1.0, 2.0], [1.5, 1.8], [5.0, 8.0], [8.0, 8.0], [1.0, 0.6], [9.0, 11.0]]) centroids, labels, inertia = kmeans(X, k=2, seed=42) print(centroids.tolist()) # [[1.167, 1.467], [7.333, 9.0]] print(labels.tolist()) # [0, 0, 1, 1, 0, 1] print(inertia) # 9.207 (sum of squared distances to nearest centroid) ``` ### Edge Cases | Scenario | Behavior | Rationale | |---|---|---| | **k=1** | Returns single centroid at data mean, all labels 0 | A single cluster is just the global mean; every point is assigned to it. | | **k >= n_samples** | Raises `ValueError` | Cannot choose `k` distinct random samples from fewer points. | | **k=0 or negative** | Raises `ValueError` | Guarded at entry. | | **Convergence tolerance (tol)** | Halts when max centroid shift < tol (default 1e-4) | Avoids wasted iterations once movement is negligible. Also halts early if no labels change between iterations. | | **Different seeds** | Different random initial centroids → different local optima | Lloyd's is sensitive to initialization; seed gives reproducible restarts. | | **Axis-aligned data** | Works, but may split elongated clusters artificially | K-Means assumes isotropic (spherical) clusters; axis-aligned elongated clusters can be split across the "middle" where Euclidean distance is ambiguous. | | **Single-sample clusters** | Centroids collapse to the lone point, inertia contribution zero for that point | A cluster with count=1 yields zero shift but is still valid. | --- ## Linear Models ```python from latpy.ml import LinearRegression ``` | Signature | Description | |---|---| | `LinearRegression(fit_intercept=True)` | OLS via normal equations | | `.fit(X, y) -> self` | Fit model | | `.predict(X) -> NDArray` | Predict | | `.score(X, y) -> float` | R² coefficient of determination | ### Algorithm: Ordinary Least Squares (Closed-Form) Solves the normal equations: \[ \hat{w} = (X^T X)^{-1} X^T y \] via Gaussian elimination with partial pivoting. When `fit_intercept=True`, a column of ones is prepended to the design matrix; the first coefficient becomes `intercept_` and the rest become `coef_`. **Why OLS closed-form?** No learning rate, no iteration, globally optimal for the linear least-squares objective. The normal equations are the standard approach for small-to-moderate `d` (number of features). For very large `d`, iterative methods (gradient descent) would be more efficient. **Complexity:** O(d²n + d³) — constructing X^T X is O(d²n), solving the d×d system is O(d³). ### Example ```python from latpy.latmath.array import array from latpy.ml import LinearRegression X = array([[1.0], [2.0], [3.0], [4.0]]) y = array([2.0, 4.0, 6.0, 8.0]) lr = LinearRegression(fit_intercept=True) lr.fit(X, y) print(lr.coef_.tolist()) # [2.0] print(lr.intercept_) # ~0.0 print(lr.predict(array([[5.0]])).tolist()) # [10.0] print(lr.score(X, y)) # 1.0 (perfect fit) ``` ```python # Without intercept — forces line through origin lr2 = LinearRegression(fit_intercept=False) lr2.fit(array([[1.0], [2.0], [3.0]]), array([1.0, 2.0, 3.0])) print(lr2.coef_.tolist()) # [1.0] print(lr2.predict(array([[4.0]])).tolist()) # [4.0] ``` ### Edge Cases | Scenario | Behavior | Rationale | |---|---|---| | **Singular X (rank-deficient)** | Gaussian elimination produces a solution, but coefficients may be numerically unstable; no explicit rank check | X^T X is singular when features are linearly dependent. The pivot search in `_gaussian_elimination` may skip zero columns, producing an arbitrary solution among infinitely many. Always check that features are linearly independent. | | **n < d (underdetermined)** | OLS still runs but is ill-posed — infinitely many solutions | Normal equations produce a particular solution; use regularization or reduce features. | | **Single feature** | Works as simple linear regression; slope in `coef_[0]`, intercept in `intercept_` | Trivially correct. | | **Constant features (entire column same value)** | Makes X^T X nearly singular; coefficients may be large or NaN | A constant column is collinear with the intercept column when `fit_intercept=True`. Either drop the column or set `fit_intercept=False`. | | **fit_intercept=False** | No column of ones; model forced through origin | Useful when the relationship is known to be proportional. R² can be negative if the origin-constrained fit is worse than predicting the mean. | | **Empty X (n=0)** | Raises `ValueError` | Need at least one sample. | | **Zero-feature X (p=0)** | Raises `ValueError` | Need at least one feature. | --- ## Metrics ```python from latpy.ml import accuracy, precision, recall, f1_score, confusion_matrix from latpy.ml import mean_squared_error, mean_absolute_error, r2_score ``` | Signature | Description | |---|---| | `accuracy(y_true, y_pred) -> float` | Classification accuracy (correct / total) | | `precision(y_true, y_pred, pos_label=1) -> float` | Precision = TP / (TP + FP) | | `recall(y_true, y_pred, pos_label=1) -> float` | Recall = TP / (TP + FN) | | `f1_score(y_true, y_pred, pos_label=1) -> float` | F1 = 2·P·R / (P + R) | | `confusion_matrix(y_true, y_pred, labels=None) -> NDArray(I64)` | Confusion matrix | | `mean_squared_error(y_true, y_pred) -> float` | MSE | | `mean_absolute_error(y_true, y_pred) -> float` | MAE | | `r2_score(y_true, y_pred) -> float` | R² coefficient of determination | ### Examples ```python from latpy.latmath.array import array from latpy.ml import accuracy, precision, recall, f1_score, confusion_matrix from latpy.ml import mean_squared_error, mean_absolute_error, r2_score # --- Classification --- y_true = array([1, 0, 1, 1, 0]) y_pred = array([1, 0, 0, 1, 0]) print(accuracy(y_true, y_pred)) # 0.8 print(precision(y_true, y_pred)) # 1.0 (TP=2, FP=0) print(recall(y_true, y_pred)) # 0.6667 (TP=2, FN=1) print(f1_score(y_true, y_pred)) # 0.8 cm = confusion_matrix(y_true, y_pred) print(cm.tolist()) # [[2, 0], # [1, 2]] # Rows: true, Cols: pred. Order: [0, 1] # --- Multiclass confusion matrix --- y_mc_t = array([0, 1, 2, 0, 1, 2]) y_mc_p = array([0, 2, 1, 0, 1, 2]) cm3 = confusion_matrix(y_mc_t, y_mc_p) print(cm3.tolist()) # [[2, 0, 0], # [0, 1, 1], # [0, 1, 1]] # Custom label ordering cm_custom = confusion_matrix(y_mc_t, y_mc_p, labels=[2, 1, 0]) print(cm_custom.tolist()) # [[1, 1, 0], # [1, 1, 0], # [0, 0, 2]] # --- Regression --- y_t = array([1.0, 2.0, 3.0]) y_p = array([1.5, 2.5, 3.5]) print(mean_squared_error(y_t, y_p)) # 0.25 print(mean_absolute_error(y_t, y_p)) # 0.5 print(r2_score(y_t, y_p)) # negative (worse than mean predictor) ``` ### Edge Cases | Scenario | Behavior | Rationale | |---|---|---| | **Empty y_true/y_pred (n=0)** | Accuracy returns 0.0; MSE/MAE return nan via division by zero | No samples to evaluate. Accuracy explicitly guards with `n > 0`. | | **All-correct predictions** | accuracy=1.0, precision=1.0, recall=1.0, f1=1.0, MSE=0.0, MAE=0.0, R²=1.0 | Ideal scenario; all metrics at their best possible values. | | **All-wrong predictions** | accuracy=0.0; precision depends on FP; recall=0.0; F1=0.0 if P+R=0; R² may be negative | No correct predictions. R² can be < 0 because predicting the mean would be better. | | **No positive predictions (TP+FP=0)** | Precision returns 0.0 | Guarded: `tp / (tp + fp)` when denominator is 0 → 0. | | **No positive ground truth (TP+FN=0)** | Recall returns 0.0 | Guarded: `tp / (tp + fn)` when denominator is 0 → 0. | | **P + R = 0** | F1 returns 0.0 | Guarded before the harmonic mean formula to avoid division by zero. | | **R² with constant y_true** | Returns 0.0 (ss_tot=0 guard) | If the target has zero variance, any model that doesn't predict exactly the constant will have ss_res > 0, but the guard returns 0.0 instead of -inf. | | **Multi-class confusion matrix** | Auto-detects labels from the union of y_true and y_pred | Ensures all classes (even those with zero predictions) appear in the matrix. | --- ## SOV Models (State-Observation-Vector) ```python from latpy.ml.sov import SOVRegression, SOVClassifier, SOVDynamics ``` ### Architecture & Rationale SOV models use a **two-stage** architecture: 1. **Encoder (random projection):** A fixed random matrix \( W_{\text{enc}} \in \mathbb{R}^{p \times m} \) (entries drawn i.i.d. from \( \mathcal{N}(0, 1/m) \)) projects the `p`-dimensional input into an `m`-dimensional latent **state space**. The encoder is never trained — it is seeded with `random.seed(42)` and fixed after construction. 2. **Decoder (learned):** A simple model operates in the reduced state space: - **SOVRegression:** Least-squares linear decoder from states to output. - **SOVClassifier:** Nearest-centroid classification in state space. - **SOVDynamics:** Linear state transition matrix A + observation matrix C. **Why random projections?** The Johnson-Lindenstrauss lemma guarantees that random projections approximately preserve pairwise distances with high probability when projecting into dimension \( O(\log n) \). This makes the state space a "sketch" of the input that retains enough structure for linear methods to work. The seed ensures reproducibility — the same projection is always used for a given input dimension. **Why SOV exists:** For very high-dimensional inputs where memory or compute is constrained, random projections provide a fast, lightweight dimensionality reduction that doesn't require the full SVD or gradient-based training. The encoder is O(pm) once, the decoder is trained in the smaller state space. **Complexity:** O(nds) where n = samples, d = features, s = n_states (the latent dimension). --- ### SOVRegression Random-projection encoder + least-squares decoder. **Training:** Normalizes features, projects to latent states via random matrix, then solves \( Z^T Z w = Z^T y \) via Gaussian elimination on the state-space representation. | Signature | Description | |---|---| | `SOVRegression(n_states=2)` | SOV regression model | | `.fit(X, y) -> self` | Fit encoder/decoder | | `.predict(X) -> NDArray` | Predict | | `.score(X, y) -> float` | R² | #### Example ```python from latpy.latmath.array import array from latpy.ml.sov import SOVRegression X = array([[1.0], [2.0], [3.0], [4.0]]) y = array([2.0, 4.0, 6.0, 8.0]) sov = SOVRegression(n_states=2) sov.fit(X, y) pred = sov.predict(X) print(pred.tolist()) # e.g. [2.0, 4.0, 6.0, 8.0] (near-perfect fit) print(sov.score(X, y)) # ~1.0 (perfect linear relationship captured) ``` #### Edge Cases | Scenario | Behavior | |---|---| | **n_states > n_features** | Works but the random projection expands dimensionality rather than reducing it; decoder still fits via least squares | | **n_states = 0** | Not guarded at init; fit will likely fail with singular Z^T Z | | **Predict before fit** | Raises `RuntimeError` | --- ### SOVClassifier Random-projection encoder + nearest-centroid classification. **Training:** Projects inputs to latent states via random encoder, then computes class centroids in state space. Prediction assigns each point to the nearest centroid. | Signature | Description | |---|---| | `SOVClassifier(n_states=4)` | SOV classifier | | `.fit(X, y) -> self` | Fit model | | `.predict(X) -> NDArray(I64)` | Predict labels | | `.score(X, y) -> float` | Accuracy | #### Example ```python from latpy.latmath.array import array from latpy.ml.sov import SOVClassifier X = array([[1.0, 2.0], [2.0, 1.0], [5.0, 5.0], [6.0, 6.0]]) y = array([0, 0, 1, 1]) clf = SOVClassifier(n_states=2) clf.fit(X, y) pred = clf.predict(X) print(pred.tolist()) # e.g. [0, 0, 1, 1] print(clf.score(X, y)) # 1.0 (linearly separable in projected space) ``` #### Edge Cases | Scenario | Behavior | |---|---| | **Predict before fit** | Raises `RuntimeError` | | **Large n_states** | Can overfit; state space becomes as large as feature space, nearest-centroid may memorize | | **Single class** | Works; all predictions collapse to that class | --- ### SOVDynamics Discrete-time linear state-space model: \[ z_{t+1} = A \, z_t \quad (\text{state transition}) \] \[ x_t = C \, z_t \quad (\text{observation}) \] The system is designed to be **contractive**: A is scaled so its spectral radius < 0.9, ensuring stability. The **equilibrium state** is the dominant eigenvector of A (the direction that survives repeated application). | Signature | Description | |---|---| | `SOVDynamics(n_states=2, n_obs=3)` | SOV dynamics model | | `.fit_random(seed=42) -> self` | Initialize stable random A, C | | `.equilibrium(max_iter=1000, tol=1e-10) -> NDArray` | Dominant eigenvector (equilibrium state) | | `.step(z) -> NDArray` | Advance state by one step: z_{t+1} = A @ z | | `.observe(z) -> NDArray` | Project state to observation: x = C @ z | | `.simulate(n_steps, z0=None) -> (states, obs)` | Simulate full trajectory | #### Example ```python from latpy.latmath.array import array from latpy.ml.sov import SOVDynamics dyn = SOVDynamics(n_states=2, n_obs=3) dyn.fit_random(seed=42) # Step z = array([1.0, 0.0]) z1 = dyn.step(z) print(z1.tolist()) # e.g. [0.246, -0.121] # Observe x = dyn.observe(z) print(x.tolist()) # e.g. [0.185, -0.291, -0.084] # Simulate 10 steps states, obs = dyn.simulate(n_steps=10) print(states.shape) # (11, 2) print(obs.shape) # (11, 3) # Find equilibrium z_star = dyn.equilibrium() print(z_star.tolist()) # unit-norm vector, dominant eigenvector of A # Verify equilibrium: A @ z_star ≈ λ z_star (direction preserved) z_next = dyn.step(z_star) cos_sim = sum(z_next[i] * z_star[i] for i in range(2)) / \ (sum(z_next[i]**2 for i in range(2))**0.5 * sum(z_star[i]**2 for i in range(2))**0.5) print(cos_sim) # > 0.9 (direction nearly preserved) ``` #### Edge Cases | Scenario | Behavior | Rationale | |---|---|---| | **n_states > n_features** | Works; A is an n_states×n_states matrix independent of input dimension | The dynamics operate purely in latent state space; input dimension is irrelevant. | | **Different random seeds** | Different A, C matrices → different dynamics, different equilibrium | The seed parameterizes different random systems. | | **Unstable system (spectral radius > 1)** | `fit_random` scales A to have spectral radius < 0.9, so this is prevented by design | Without scaling, power iteration in `equilibrium()` would diverge. | | **Zero initial state** | `simulate` with z0 = zeros produces all-zero trajectory | Linear system: A·0 = 0. Equilibrium is also zero (trivial eigenvector). | | **fit_random before equilibrium/step/simulate** | Required; raises `RuntimeError` if A_ is None | The matrices must be initialized before use. | | **Equilibrium on unstable system** | Power iteration still converges to the dominant eigenvector, but its norm grows without bound | Since `fit_random` guarantees stability, this is not an issue in practice. |