Source code for rime.util.score_array

import numpy as np, pandas as pd
import torch, dataclasses, warnings, operator, builtins, numbers, os, itertools, functools
from typing import List, ClassVar
from torch.utils.data import DataLoader
import scipy.sparse as sps


[docs]def get_batch_size(shape): """ round to similar batch sizes """ n_users, n_items = shape frac = float(os.environ.get("BATCH_SIZE_FRAC", 0.1)) if torch.cuda.device_count(): total_memory = torch.cuda.get_device_properties(0).total_memory else: total_memory = 16e9 max_batch_size = total_memory / 8 / n_items * frac n_batches = int(n_users / max_batch_size) + 1 return int(np.ceil(n_users / n_batches))
[docs]def find_iloc(old_index, new_index, allow_missing=False): if not isinstance(old_index, pd.Index): old_index = pd.Index(old_index) iloc = old_index.get_indexer(new_index) if not allow_missing and -1 in iloc: raise IndexError("missing indices detected in a disallowed case") return iloc
[docs]def matrix_reindex(csr, old_index, new_index, axis, fill_value=0): """ pandas.reindex functionality on sparse or dense matrices as well as 1d arrays """ if axis == 1: return matrix_reindex(csr.T, old_index, new_index, 0, fill_value).T.copy() assert axis == 0, "axis must be 0 or 1" assert csr.shape[0] == len(old_index), "shape must match between csr and old_index" if isinstance(csr, LazyScoreBase): pass # does not support extrapolation elif sps.issparse(csr): csr = sps.vstack([csr, csr[:1] * 0 + fill_value], "csr") csr.eliminate_zeros() else: csr = np.concatenate([csr, csr[:1] * 0 + fill_value], axis=0) iloc = find_iloc(old_index, new_index, allow_missing=True) return csr[iloc]
[docs]def sps_to_torch(x, device): """ convert scipy.sparse to torch.sparse """ coo = x.tocoo() values = coo.data indices = np.vstack((coo.row, coo.col)) return torch.sparse_coo_tensor(indices, values, coo.shape, device=device)
[docs]def auto_device(): return 'cuda' if torch.cuda.is_available() else 'cpu'
[docs]def auto_tensor(x, device=None): if device is None: device = auto_device() if hasattr(x, "as_tensor"): return x.as_tensor(device) elif sps.issparse(x): return sps_to_torch(x, device).to_dense() else: return torch.as_tensor(x, device)
[docs]class LazyScoreBase: """ low-rank, sparse, and other expressions with deferred evaluations after subsampling * Support expressions like (a @ b.T + c).exp() * d + e * Support operations T, __getitem__, collate_fn * Lazy evaluation until as_tensor(device) or numpy() Drop support for reindex because it is nontrivial to find fill_values through activation. """
[docs] def __init__(self, shape): self.shape = shape
def __repr__(self): return f"<{type(self).__name__} {self.shape}>" def numpy(self): return self.as_tensor().numpy() def as_tensor(self, device=None): raise NotImplementedError @property def T(self): raise NotImplementedError def __getitem__(self, key): """ slice by rows for pytorch dataloaders """ raise NotImplementedError
[docs] @staticmethod def collate_fn(D): """ collate by rows for pytorch dataloaders """ raise NotImplementedError
def __len__(self): return self.shape[0] @property def size(self): return np.prod(self.shape) @property def batch_size(self): return get_batch_size(self.shape) def __matmul__(self, other): return MatMulExpression(operator.matmul, [self, other]) def __add__(self, other): return ElementWiseExpression(operator.add, [self, other]) def __mul__(self, other): return ElementWiseExpression(operator.mul, [self, other]) def __sub__(self, other): return ElementWiseExpression(operator.sub, [self, other]) def __truediv__(self, other): return ElementWiseExpression(operator.truediv, [self, other]) def exp(self): return ElementWiseExpression(torch.exp, [self]) def softplus(self): return ElementWiseExpression(torch.nn.functional.softplus, [self]) def sigmoid(self): return ElementWiseExpression(torch.sigmoid, [self]) def apply(self, op): return ElementWiseExpression(op, [self]) def vae_module(self, module): op = _VAEOp(module.training, module.prior, module.beta) rand_shape = (self.shape[0], self.shape[1] // 2) return VAEExpression(op, [self, RandScore.create(rand_shape, "randn")])
[docs]def auto_cast_lazy_score(other): if other is None: return None # prior_score=None -> None elif isinstance(other, LazyScoreBase): return other elif sps.issparse(other): return LazySparseMatrix(other) elif isinstance(other, pd.DataFrame): return LazyDenseMatrix(other.values) elif torch.is_tensor(other): return LazyDenseMatrix(other.detach().cpu().numpy()) else: # scalar or numpy arrays of 0, 1, 2 dimensions return LazyDenseMatrix(other)
[docs]class LazySparseMatrix(LazyScoreBase):
[docs] def __init__(self, c): self.c = c.tocsr() self.shape = c.shape
def numpy(self): return self.c.toarray() def as_tensor(self, device=None): return sps_to_torch(self.c, device).to_dense() @property def T(self): return self.__class__(self.c.T) def __getitem__(self, key): if np.isscalar(key): slc = slice(self.c.indptr[key], self.c.indptr[key + 1]) _dict = { "values": self.c.data[slc], "keys": self.c.indices[slc], "shape": self.c.shape[1], } return _LazySparseDictFast(_dict) else: return self.__class__(self.c[key])
[docs] @staticmethod def collate_fn(D): return D[0].__class__(sps.vstack([d.c for d in D]))
class _LazySparseDictFast(LazyScoreBase): def __init__(self, c): self.c = c self.shape = (1, self.c['shape']) @staticmethod def collate_fn(D): C = [d.c for d in D] csr = sps.csr_matrix(( np.hstack([c['values'] for c in C]), # data np.hstack([c['keys'] for c in C]), # indices np.hstack([[0], np.cumsum([len(c['keys']) for c in C])]), # indptr ), shape=(len(C), C[0]['shape'])) return LazySparseMatrix(csr)
[docs]class LazyDenseMatrix(LazyScoreBase): """ cast scalar and arrays to 2-d arrays """
[docs] def __init__(self, c): self.c = np.array(c, ndmin=2) self.shape = self.c.shape
def numpy(self): return self.c def as_tensor(self, device=None): return torch.as_tensor(self.c, device=device) @property def T(self): return self.__class__(self.c.T) def __getitem__(self, key): """ take modulos for broadcasting purposes """ if isinstance(key, slice): key = range(key.stop)[key] key = np.array(key, ndmin=1) % self.shape[0] return self.__class__(self.c[key])
[docs] @staticmethod def collate_fn(D): return D[0].__class__(np.vstack([d.c for d in D]))
def _get_op_name(op): if hasattr(op, "__name__"): return op.__name__ elif hasattr(op, "training"): return f"{op}({op.training})" else: return repr(op)
[docs]class LazyExpressionBase:
[docs] def __init__(self, op, children): self.op = op self.children = [auto_cast_lazy_score(c) for c in children] self.__post_init__()
def __post_init__(self): pass def traverse(self, op_func=_get_op_name): builder = "" for i, c in enumerate(self.children): if hasattr(c, "traverse"): builder = builder + f"({c.traverse(op_func)})" else: builder = builder + f"{c}" if i == 0: builder = builder + f" {op_func(self.op)} " return builder def train(self): self.traverse(lambda op: setattr(op, "training", True) if hasattr(op, "training") else None) def eval(self): self.traverse(lambda op: setattr(op, "training", False) if hasattr(op, "training") else None) def __repr__(self): return self.traverse() def as_tensor(self, device=None): children = [c.as_tensor(device) for c in self.children] return self.op(*children)
[docs]class ElementWiseExpression(LazyExpressionBase, LazyScoreBase): """ Tree representation of an element-wise expression; auto-broadcast """ def __post_init__(self): shape = np.transpose([c.shape for c in self.children]) self.shape = (max(shape[0]), max(shape[1])) # consider broadcast @property def T(self): children = [c.T for c in self.children] return self.__class__(self.op, children) def __getitem__(self, key): children = [c[key] for c in self.children] return self.__class__(self.op, children)
[docs] @staticmethod def collate_fn(batch): op, template = batch[0].op, batch[0].children data = zip(*[b.children for b in batch]) children = [c.collate_fn(D) for c, D in zip(template, data)] return batch[0].__class__(op, children)
[docs]class MatMulExpression(LazyExpressionBase, LazyScoreBase): def __post_init__(self): self.left, self.right = self.children self.shape = (self.left.shape[0], self.right.shape[1]) assert self.left.shape[1] == self.right.shape[0], \ f"matmul shape check fail: {self.left.shape} vs {self.right.shape}" @property def T(self): return self.__class__(self.op, [self.right.T, self.left.T]) def __getitem__(self, key): return self.__class__(self.op, [self.left[key], self.right])
[docs] @staticmethod def collate_fn(batch): left = batch[0].left.__class__.collate_fn([b.left for b in batch]) return batch[0].__class__(batch[0].op, [left, batch[0].right])
[docs]class VAEExpression(LazyExpressionBase, LazyScoreBase): """ op=_VAEOp, [weight (n, 2d), randn (n, d)], auto-broadcast on d-axis """ @property def axis(self): return int(isinstance(self.right, RandScore) and self.right.distn == "randn") def __post_init__(self): self.left, self.right = self.children self.shape = (self.left.shape[0], self.right.shape[1]) assert _vae_shape_check(self.left.shape, self.right.shape, self.axis), \ f"vae(axis={self.axis}) shape check fail: {self.left.shape} vs {self.right.shape}" @property def T(self): return self.__class__(self.op, [self.right.T, self.left.T]) def __getitem__(self, key): if self.axis == 1: return ElementWiseExpression.__getitem__(self, key) else: return MatMulExpression.__getitem__(self, key)
[docs] @staticmethod def collate_fn(batch): if batch[0].axis == 1: return ElementWiseExpression.collate_fn(batch) else: return MatMulExpression.collate_fn(batch)
def as_tensor(self, device): if self.axis == 1: weight = self.left.as_tensor(device) randn = self.right.as_tensor(device) return self.op(weight, randn).expand(self.shape) else: return self.T.as_tensor(device).T
def _vae_shape_check(left_shape, right_shape, axis): if axis == 1: ndim = left_shape[1] // 2 return left_shape[0] == right_shape[0] and right_shape[1] % ndim == 0 else: return _vae_shape_check(right_shape[::-1], left_shape[::-1], 1 - axis) class _VAEOp: def __init__(self, training=True, prior=0, beta=1): self.training = training self.beta = beta self.prior_std = np.exp(prior / 2) self.__name__ = f"vae({training}*{beta})" def __call__(self, weight, randn): mean, logvar = torch.split(weight, weight.shape[-1] // 2, -1) noise = randn * (logvar / 2).exp() * self.training * self.beta return (mean + noise) * self.prior_std
[docs]@dataclasses.dataclass(repr=False) class RandScore(LazyScoreBase): """ add random noise to break ties """ row_seeds: list # np.array for fast indexing col_seeds: list # np.array for fast indexing distn: str = "rand" _DEFAULT_MAX_SEED: ClassVar[int] = 10000 @property def shape(self): return (len(self.row_seeds), len(self.col_seeds)) @classmethod def create(cls, shape, distn="rand"): return cls(np.random.choice(cls._DEFAULT_MAX_SEED) + np.arange(shape[0]), np.random.choice(cls._DEFAULT_MAX_SEED) + np.arange(shape[1]), distn) def numpy(self): return np.vstack([ getattr(np.random.RandomState(int(s)), self.distn)(len(self.col_seeds)) for s in self.row_seeds]) def as_tensor(self, device=None): rows = [] for s in self.row_seeds: generator = torch.Generator(device).manual_seed(int(s)) new = getattr(torch, self.distn)(len(self.col_seeds), device=device, generator=generator) rows.append(new) return torch.vstack(rows) @property def T(self): warnings.warn("transpose changes rand seed; only for evaluate_user_rec") return self.__class__(self.col_seeds, self.row_seeds, self.distn) def __getitem__(self, key): if isinstance(key, slice): key = range(key.stop)[key] key = np.array(key, ndmin=1) % self.shape[0] return self.__class__(self.row_seeds[key], self.col_seeds, self.distn)
[docs] @staticmethod def collate_fn(batch): row_seeds = np.hstack([b.row_seeds for b in batch]) return batch[0].__class__(row_seeds, batch[0].col_seeds, batch[0].distn)
[docs]def batch_op_iter(S, op, device=None): if isinstance(op, str): op = getattr(torch, op) # max, min, sum for i in range(0, len(S), S.batch_size): batch = S[i:min(len(S), i + S.batch_size)] val = batch.as_tensor(device) yield op(val)
[docs]def score_op(S, op, device=None, reduce_fn=None): """ aggregation operations (e.g., max, min, sum) across entire matrix """ if reduce_fn is None: reduce_fn = {"max": max, "min": min, "sum": operator.add}[op] iterable = batch_op_iter(S, op, device) return functools.reduce(reduce_fn, iterable)
[docs]@dataclasses.dataclass class LazyScoreModel: user_index: pd.Index item_index: pd.Index lazy_score: LazyScoreBase = None tie_breaker: float = 0 def __post_init__(self): shape = ((len(self.user_index), len(self.item_index))) if self.lazy_score is None: self.lazy_score = auto_cast_lazy_score(sps.csr_matrix(shape)) assert self.lazy_score.shape == shape, f"shape inconsistent: {shape}, {self.lazy_score.shape}" def transform(self, D): old_index = self.user_index new_index = D.test_requests.index while old_index.nlevels > new_index.nlevels: old_index = old_index.droplevel(-1) while new_index.nlevels > old_index.nlevels: new_index = new_index.droplevel(-1) row_ind = old_index.get_indexer(new_index) col_ind = self.item_index.get_indexer(D.item_in_test.index) S = self.lazy_score.T[col_ind].T[row_ind] if self.tie_breaker > 0: S = S + RandScore.create(S.shape) * self.tie_breaker return S