Source code for Orange.preprocess.score

import re
from collections import defaultdict
from itertools import chain

import numpy as np
from sklearn import feature_selection as skl_fss

from import Domain, Variable, DiscreteVariable, ContinuousVariable
from import HasClass
from Orange.misc.wrapper_meta import WrapperMeta
from Orange.preprocess.preprocess import Discretize, SklImpute, RemoveNaNColumns
from Orange.statistics import contingency, distribution
from Orange.util import Reprable

__all__ = ["Chi2",

class Scorer(Reprable):
    feature_type = None
    class_type = None
    supports_sparse_data = None
    preprocessors = [HasClass()]

    def friendly_name(self):
        """Return type name with camel-case separated into words.
        Derived classes can provide a better property or a class attribute.
        return re.sub("([a-z])([A-Z])",
                      lambda mo: + " " +,

    def _friendly_vartype_name(vartype):
        if vartype == DiscreteVariable:
            return "categorical"
        if vartype == ContinuousVariable:
            return "numeric"
        # Fallbacks
        name = vartype.__name__
        if name.endswith("Variable"):
            return name.lower()[:-8]
        return name

    def __call__(self, data, feature=None):
        if not data.domain.class_var:
            raise ValueError(
                "{} requires data with a target variable."
        if not isinstance(data.domain.class_var, self.class_type):
            raise ValueError(
                "{} requires a {} target variable."

        if feature is not None:
            f = data.domain[feature]
            data = data.transform(Domain([f], data.domain.class_vars))

        orig_domain = data.domain
        for pp in self.preprocessors:
            data = pp(data)

        for var in data.domain.attributes:
            if not isinstance(var, self.feature_type):
                raise ValueError(
                    "{} cannot score {} variables."

        if feature is not None:
            return self.score_data(data, feature)

        scores = np.full(len(orig_domain.attributes), np.nan)
        names = [ for a in data.domain.attributes]
        mask = np.array([ in names for a in orig_domain.attributes])
        if len(mask):
            scores[mask] = self.score_data(data, feature)
        return scores

    def score_data(self, data, feature):
        raise NotImplementedError

class SklScorer(Scorer, metaclass=WrapperMeta):
    supports_sparse_data = True

    preprocessors = Scorer.preprocessors + [

    def score_data(self, data, feature):
        score = self.score(data.X, data.Y)
        if feature is not None:
            return score[0]
        return score

[docs] class Chi2(SklScorer): """ A wrapper for `${sklname}`. The following is the documentation from `scikit-learn <>`_. ${skldoc} """ __wraps__ = skl_fss.chi2 feature_type = DiscreteVariable class_type = DiscreteVariable preprocessors = SklScorer.preprocessors + [ Discretize(remove_const=False) ] def score(self, X, y): f, _ = skl_fss.chi2(X, y) return f
[docs] class ANOVA(SklScorer): """ A wrapper for `${sklname}`. The following is the documentation from `scikit-learn <>`_. ${skldoc} """ __wraps__ = skl_fss.f_classif feature_type = ContinuousVariable class_type = DiscreteVariable def score(self, X, y): f, _ = skl_fss.f_classif(X, y) return f
[docs] class UnivariateLinearRegression(SklScorer): """ A wrapper for `${sklname}`. The following is the documentation from `scikit-learn <>`_. ${skldoc} """ __wraps__ = skl_fss.f_regression feature_type = ContinuousVariable class_type = ContinuousVariable def score(self, X, y): f, p = skl_fss.f_regression(X, y) return f
class LearnerScorer(Scorer): def score(self, data): raise NotImplementedError def score_data(self, data, feature=None): def join_derived_features(scores): """ Obtain scores for original attributes. If a learner preprocessed the data before building a model, current scores do not directly correspond to the domain. For example, continuization creates multiple features from a discrete feature. """ scores_grouped = defaultdict(list) for attr, score in zip(model_attributes, scores): # Go up the chain of preprocessors to obtain the original variable, but no further # than the data.domain, because the data is perhaphs already preprocessed. while not (attr in data.domain) and attr.compute_value is not None: if hasattr(attr.compute_value, 'variable'): attr = getattr(attr.compute_value, 'variable') else: # The attributes's parent can not be identified. Thus, the attributes # score will be ignored. break scores_grouped[attr].append(score) return [max(scores_grouped[attr]) if attr in scores_grouped else 0 for attr in data.domain.attributes] scores, model_attributes = self.score(data) scores = np.atleast_2d(scores) if data.domain.attributes != model_attributes: scores = np.array([join_derived_features(row) for row in scores]) return scores[:, data.domain.attributes.index(feature)] \ if feature else scores class ClassificationScorer(Scorer): """ Base class for feature scores in a class-labeled dataset. Parameters ---------- feature : int, string, Feature id data : Dataset Attributes ---------- feature_type : Required type of features. class_type : Required type of class variable. """ feature_type = DiscreteVariable class_type = DiscreteVariable supports_sparse_data = True preprocessors = Scorer.preprocessors + [ Discretize(remove_const=False) ] def score_data(self, data, feature): instances_with_class = \ np.sum(distribution.Discrete(data, data.domain.class_var)) def score_from_contingency(f): cont = contingency.Discrete(data, f) return self.from_contingency( cont, 1. - np.sum(cont.unknowns)/instances_with_class) scores = [score_from_contingency(f) for f in data.domain.attributes] if feature is not None: return scores[0] return scores def _entropy(dist): """Entropy of class-distribution matrix""" p = dist / np.sum(dist, axis=0) pc = np.clip(p, 1e-15, 1) return np.sum(np.sum(- p * np.log2(pc), axis=0) * np.sum(dist, axis=0) / np.sum(dist)) def _gini(dist): """Gini index of class-distribution matrix""" p = np.asarray(dist / np.sum(dist, axis=0)) return np.sum((1 - np.sum(p ** 2, axis=0)) * np.sum(dist, axis=0) / np.sum(dist)) def _symmetrical_uncertainty(data, attr1, attr2): """Symmetrical uncertainty, Press et al., 1988.""" cont = np.asarray(contingency.Discrete(data, attr1, attr2), dtype=float) ig = InfoGain().from_contingency(cont, 1) return 2 * ig / (_entropy(cont) + _entropy(cont.T))
[docs] class FCBF(ClassificationScorer): """ Fast Correlation-Based Filter. Described in: Yu, L., Liu, H., Feature selection for high-dimensional data: A fast correlation-based filter solution. 2003. """ def score_data(self, data, feature=None): attributes = data.domain.attributes s = [] for i, attr in enumerate(attributes): s.append((_symmetrical_uncertainty(data, attr, data.domain.class_var), i)) s.sort() worst = [] p = 1 while True: try: _, Fp = s[-p] except IndexError: break q = p + 1 while True: try: suqc, Fq = s[-q] except IndexError: break if _symmetrical_uncertainty(data, attributes[Fp], attributes[Fq]) >= suqc: del s[-q] worst.append((1e-4*suqc, Fq)) else: q += 1 p += 1 best = s scores = [i[0] for i in sorted(chain(best, worst), key=lambda i: i[1])] return np.array(scores) if not feature else scores[0]
[docs] class InfoGain(ClassificationScorer): """ Information gain is the expected decrease of entropy. See `Wikipedia entry on information gain <>`_. """ def from_contingency(self, cont, nan_adjustment): h_class = _entropy(np.sum(cont, axis=1)) h_residual = _entropy(np.compress(np.sum(cont, axis=0), cont, axis=1)) return nan_adjustment * (h_class - h_residual)
[docs] class GainRatio(ClassificationScorer): """ Information gain ratio is the ratio between information gain and the entropy of the feature's value distribution. The score was introduced in [Quinlan1986]_ to alleviate overestimation for multi-valued features. See `Wikipedia entry on gain ratio <>`_. .. [Quinlan1986] J R Quinlan: Induction of Decision Trees, Machine Learning, 1986. """ def from_contingency(self, cont, nan_adjustment): h_class = _entropy(np.sum(cont, axis=1)) h_residual = _entropy(np.compress(np.sum(cont, axis=0), cont, axis=1)) h_attribute = _entropy(np.sum(cont, axis=0)) if h_attribute == 0: h_attribute = 1 return nan_adjustment * (h_class - h_residual) / h_attribute
[docs] class Gini(ClassificationScorer): """ Gini impurity is the probability that two randomly chosen instances will have different classes. See `Wikipedia entry on Gini impurity <>`_. """ def from_contingency(self, cont, nan_adjustment): return (_gini(np.sum(cont, axis=1)) - _gini(cont)) * nan_adjustment
[docs] class ReliefF(Scorer): """ ReliefF algorithm. Contrary to most other scorers, Relief family of algorithms is not as myoptic but tends to give unreliable results with datasets with lots (hundreds) of features. Robnik-Šikonja, M., Kononenko, I. Theoretical and empirical analysis of ReliefF and RReliefF. 2003. """ feature_type = Variable class_type = DiscreteVariable supports_sparse_data = False friendly_name = "ReliefF" preprocessors = Scorer.preprocessors + [RemoveNaNColumns()] def __init__(self, n_iterations=50, k_nearest=10, random_state=None): self.n_iterations = n_iterations self.k_nearest = k_nearest self.random_state = random_state def score_data(self, data, feature): if len(data.domain.class_vars) != 1: raise ValueError('ReliefF requires one single class') if not data.domain.class_var.is_discrete: raise ValueError('ReliefF supports classification; use RReliefF ' 'for regression') if len(data.domain.class_var.values) == 1: # Single-class value non-problem return 0 if feature else np.zeros(data.X.shape[1]) if isinstance(self.random_state, np.random.RandomState): rstate = self.random_state else: rstate = np.random.RandomState(self.random_state) from Orange.preprocess._relieff import relieff weights = np.asarray(relieff(data.X, data.Y, self.n_iterations, self.k_nearest, np.array([a.is_discrete for a in data.domain.attributes], dtype=bool), rstate)) if feature: return weights[0] return weights
[docs] class RReliefF(Scorer): feature_type = Variable class_type = ContinuousVariable supports_sparse_data = False friendly_name = "RReliefF" preprocessors = Scorer.preprocessors + [RemoveNaNColumns()] def __init__(self, n_iterations=50, k_nearest=50, random_state=None): self.n_iterations = n_iterations self.k_nearest = k_nearest self.random_state = random_state def score_data(self, data, feature): if len(data.domain.class_vars) != 1: raise ValueError('RReliefF requires one single class') if not data.domain.class_var.is_continuous: raise ValueError('RReliefF supports regression; use ReliefF ' 'for classification') if isinstance(self.random_state, np.random.RandomState): rstate = self.random_state else: rstate = np.random.RandomState(self.random_state) from Orange.preprocess._relieff import rrelieff weights = np.asarray(rrelieff(data.X, data.Y, self.n_iterations, self.k_nearest, np.array([a.is_discrete for a in data.domain.attributes]), rstate)) if feature: return weights[0] return weights
if __name__ == '__main__': from import Table X = np.random.random((500, 20)) X[np.random.random(X.shape) > .95] = np.nan y_cls = np.zeros(X.shape[0]) y_cls[(X[:, 0] > .5) ^ (X[:, 1] > .6)] = 1 y_reg = np.nansum(X[:, 0:3], 1) for relief, y in ((ReliefF(), y_cls), (RReliefF(), y_reg)): data = Table.from_numpy(None, X, y) weights = relief.score_data(data, False) print(relief.__class__.__name__) print('Best =', weights.argsort()[::-1]) print('Weights =', weights[weights.argsort()[::-1]]) X *= 10 data = Table.from_numpy(None, X, y_cls) weights = FCBF().score_data(data, False) print('FCBF') print('Best =', weights.argsort()[::-1]) print('Weights =', weights[weights.argsort()[::-1]])