Source code for chainer_chemistry.links.scaler.standard_scaler

from logging import getLogger

import numpy
from chainer import cuda

from chainer_chemistry.links.scaler.base import BaseScaler, to_array  # NOQA


def format_x(x):
    """x may be array or Variable"""
    # currently, only consider the case x is 2-dim, (batchsize, feature)
    if x.ndim == 1:
        # Deal with as 1 feature with several samples.
        x = x[:, None]
    if x.ndim != 2:
        raise ValueError(
            "Unexpected value x.shape={}, only x.ndim=2 is supported."
            .format(x.shape))
    return x


[docs]class StandardScaler(BaseScaler):
[docs] def __init__(self): super(StandardScaler, self).__init__() self.indices = None self.register_persistent('indices') self.mean = None self.register_persistent('mean') self.std = None self.register_persistent('std')
def fit(self, x, indices=None): """Fitting parameter. Args: x: indices (list or tuple or None): indices for applying standard scaling. Returns: self (StandardScaler): this instance. """ x = to_array(x) x = format_x(x) if indices is None: pass elif isinstance(indices, (list, tuple)): indices = numpy.asarray(indices) self.indices = indices if self.indices is not None: x = x[:, self.indices] xp = self.xp if xp is numpy: self.mean = xp.nanmean(x, axis=0) self.std = xp.nanstd(x, axis=0) else: if int(xp.sum(xp.isnan(x))) > 0: raise NotImplementedError( "StandardScaling with nan value on GPU is not supported.") # cupy.nanmean, cupy.nanstd is not implemented yet. self.mean = xp.mean(x, axis=0) self.std = xp.std(x, axis=0) # result consistency check if xp.sum(self.std == 0) > 0: logger = getLogger(__name__) ind = numpy.argwhere(cuda.to_cpu(self.std) == 0)[:, 0] logger.warning('fit: std was 0 at indices {}'.format(ind)) return self def _compute_mean_std_all(self, input_dim): if self.indices is None: std_all = self.xp.ones(input_dim, dtype=self.xp.float32) std_all[self.std != 0] = self.std[self.std != 0] return self.mean, std_all else: mean_all = self.xp.zeros(input_dim, dtype=self.xp.float32) mean_all[self.indices] = self.mean std_all = self.xp.ones(input_dim, dtype=self.xp.float32) non_zero_indices = self.indices[self.std != 0] std_all[non_zero_indices] = self.std[self.std != 0] return mean_all, std_all def transform(self, x): if self.mean is None: raise AttributeError('[Error] mean is None, call fit beforehand!') x = format_x(x) mean_all, std_all = self._compute_mean_std_all(x.shape[1]) return (x - mean_all[None, :]) / std_all[None, :] def inverse_transform(self, x): if self.mean is None: raise AttributeError('[Error] mean is None, call fit beforehand!') x = format_x(x) mean_all, std_all = self._compute_mean_std_all(x.shape[1]) return x * std_all[None, :] + mean_all[None, :]