Source code for chainer_chemistry.models.prediction.regressor

import numpy

import chainer
from chainer.dataset.convert import concat_examples
from chainer import cuda, Variable  # NOQA
from chainer import reporter
from chainer_chemistry.models.prediction.base import BaseForwardModel


[docs]class Regressor(BaseForwardModel): """A simple regressor model. This is an example of chain that wraps another chain. It computes the loss and metrics based on a given input/label pair. Args: predictor (~chainer.Link): Predictor network. lossfun (function): Loss function. metrics_fun (function or dict or None): Function that computes metrics. label_key (int or str): Key to specify label variable from arguments. When it is ``int``, a variable in positional arguments is used. And when it is ``str``, a variable in keyword arguments is used. device (int): GPU device id of this Regressor to be used. -1 indicates to use in CPU. Attributes: predictor (~chainer.Link): Predictor network. lossfun (function): Loss function. y (~chainer.Variable): Prediction for the last minibatch. loss (~chainer.Variable): Loss value for the last minibatch. metrics (dict): Metrics computed in last minibatch compute_metrics (bool): If ``True``, compute metrics on the forward computation. The default value is ``True``. """ compute_metrics = True
[docs] def __init__(self, predictor, lossfun=chainer.functions.mean_squared_error, metrics_fun=None, label_key=-1, device=-1): if not (isinstance(label_key, (int, str))): raise TypeError('label_key must be int or str, but is %s' % type(label_key)) super(Regressor, self).__init__() self.lossfun = lossfun if metrics_fun is None: self.compute_metrics = False self.metrics_fun = {} elif callable(metrics_fun): self.metrics_fun = {'metrics': metrics_fun} elif isinstance(metrics_fun, dict): self.metrics_fun = metrics_fun else: raise TypeError('Unexpected type metrics_fun must be None or ' 'Callable or dict. actual {}' .format(type(metrics_fun))) self.y = None self.loss = None self.metrics = None self.label_key = label_key with self.init_scope(): self.predictor = predictor # `initialize` must be called after `init_scope`. self.initialize(device)
def _convert_to_scalar(self, value): """Converts an input value to a scalar if its type is a Variable, numpy or cupy array, otherwise it returns the value as it is. """ if isinstance(value, Variable): value = value.array if numpy.isscalar(value): return value if type(value) is not numpy.array: value = cuda.to_cpu(value) return numpy.asscalar(value) def __call__(self, *args, **kwargs): """Computes the loss value for an input and label pair. It also computes metrics and stores it to the attribute. Args: args (list of ~chainer.Variable): Input minibatch. kwargs (dict of ~chainer.Variable): Input minibatch. When ``label_key`` is ``int``, the correpoding element in ``args`` is treated as ground truth labels. And when it is ``str``, the element in ``kwargs`` is used. The all elements of ``args`` and ``kwargs`` except the ground trush labels are features. It feeds features to the predictor and compare the result with ground truth labels. Returns: ~chainer.Variable: Loss value. """ # --- Separate `args` and `t` --- if isinstance(self.label_key, int): if not (-len(args) <= self.label_key < len(args)): msg = 'Label key %d is out of bounds' % self.label_key raise ValueError(msg) t = args[self.label_key] if self.label_key == -1: args = args[:-1] else: args = args[:self.label_key] + args[self.label_key + 1:] elif isinstance(self.label_key, str): if self.label_key not in kwargs: msg = 'Label key "%s" is not found' % self.label_key raise ValueError(msg) t = kwargs[self.label_key] del kwargs[self.label_key] else: raise TypeError('Label key type {} not supported' .format(type(self.label_key))) self.y = None self.loss = None self.metrics = None self.y = self.predictor(*args, **kwargs) self.loss = self.lossfun(self.y, t) # When the reported data is a numpy array, the loss and metrics values # are scalars. When the reported data is a cupy array, sometimes the # same values become arrays instead. This seems to be a bug inside the # reporter class, which needs to be addressed and fixed. Until then, # the reported values will be converted to numpy arrays. reporter.report( {'loss': self._convert_to_scalar(self.loss)}, self) if self.compute_metrics: # Note: self.metrics_fun is `dict`, # which is different from original chainer implementation self.metrics = {key: self._convert_to_scalar(value(self.y, t)) for key, value in self.metrics_fun.items()} reporter.report(self.metrics, self) return self.loss def predict( self, data, batchsize=16, converter=concat_examples, retain_inputs=False, preprocess_fn=None, postprocess_fn=None): """Predict label of each category by taking . Args: data: input data batchsize (int): batch size converter (Callable): convert from `data` to `inputs` preprocess_fn (Callable): Its input is numpy.ndarray or cupy.ndarray, it can return either Variable, cupy.ndarray or numpy.ndarray postprocess_fn (Callable): Its input argument is Variable, but this method may return either Variable, cupy.ndarray or numpy.ndarray. retain_inputs (bool): If True, this instance keeps inputs in `self.inputs` or not. Returns (tuple or numpy.ndarray): Typically, it is 1-dimensional int array with shape (batchsize, ) which represents each examples category prediction. """ with chainer.no_backprop_mode(), chainer.using_config('train', False): predict_labels = self._forward( data, fn=self.predictor, batchsize=batchsize, converter=converter, retain_inputs=retain_inputs, preprocess_fn=preprocess_fn, postprocess_fn=postprocess_fn) return predict_labels