Source code for adeft.modeling.classify

import gzip
import json
import logging
import warnings
import numpy as np
from hashlib import md5
from datetime import datetime
from collections import Counter, defaultdict

from sklearn.pipeline import Pipeline
from sklearn.exceptions import ConvergenceWarning
from sklearn.linear_model import LogisticRegression
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import GridSearchCV, StratifiedKFold
from sklearn.metrics import f1_score, precision_score, recall_score,\
    make_scorer


from adeft import __version__
from adeft.nlp import english_stopwords

warnings.filterwarnings("ignore", category=ConvergenceWarning)

logger = logging.getLogger(__file__)


[docs]class AdeftClassifier(object): """Trains classifiers to disambiguate shortforms based on context Fits logistic regression models with tfidf vectorized ngram features. Uses sklearns LogisticRegression and TfidfVectorizer classes. Models can be serialized and loaded for later use. Parameters ---------- shortforms : str or list of str Shortform to disambiguate or list of shortforms to build models for multiple synomous shortforms. pos_labels : list of str Labels for positive classes. These correspond to the longforms of interest in an application. For adeft pretrained models these are typically genes and other relevant biological terms. random_state : Optional[int] Optional specification of seed used when calculating crossvalidation folds and fitting the logistic regression model. Default: None Attributes ---------- estimator : py:class:`sklearn.pipeline.Pipeline` An sklearn pipeline that transforms text data with a TfidfVectorizer and fits a logistic regression. stats : dict Statistics describing model performance. Only available after model is fit with crossvalidation stop : list of str List of stopwords to exclude when performing tfidf vectorization. These consist of the set of stopwords in adeft.nlp.english_stopwords along with the shortform(s) for which the model is being built params : dict Dictionary mapping parameters to their values. If fit with cv, this contains the parameters with best micro averaged f1 score over crossvalidation runs. best_score : float Best micro averaged f1 score for positive labels over crossvalidation runs. This information can also be found in the stats dict and is not included when models are serialized. Only available if model is fit with the cv method. grid_search : py:class:`sklearn.model_selection.GridSearchCV` sklearn gridsearch object if model was fit with cv. This is not included when model is serialized. confusion_info : dict Contains the confusion matrix for each pair of labels per crossvalidation split. Only available if the model has been fit with crossvalidation. Nested dictionary, `confusion_info[label1][label2][i]` gives the number of test examples where the true label is label1 and the classifier has made prediction label2 in split i. other_metadata : dict Data set here by the user will be included when the model is serialized and remain available when the classifier is loaded again. version : str Adeft version used when model was fit timestamp : str Human readable timestamp for when model was fit training_set_digest : str Digest of training set calculated using md5 hash. Can be used at a glance to determine if two models used the same training set. _std : py:class:`numpy.ndarray` Array of standard deviations of feature values over training set. This is used to calculate feature importance """ def __init__(self, shortforms, pos_labels, random_state=None): # handle case where single string is passed if isinstance(shortforms, str): shortforms = [shortforms] self.shortforms = shortforms self.pos_labels = pos_labels self.random_state = random_state self.estimator = None self.stats = None self.confusion_info = None self.other_metadata = None # Add shortforms to list of stopwords self.stop = set(english_stopwords).union([sf.lower() for sf in self.shortforms]) self.best_score = None self.grid_search = None self.version = __version__ self._std = None self.params = None self.timestamp = None self.training_set_digest = None
[docs] def train(self, texts, y, C=1.0, ngram_range=(1, 2), max_features=1000, class_weight=None): """Fits a disambiguation model Parameters ---------- texts : iterable of str Training texts y : iterable of str True labels for training texts C : Optional[float] L1 regularization parameter logistic regression model. Follows convention of support vector machines with smaller values corresponding to stronger regularization. Default: 1.0 ngram_range : Optional[tuple of int] Range of ngram features to use. Must be a tuple of ints of the form (a, b) with a <= b. When ngram_range is (1, 2), unigrams and bigrams will be used as features. Default: (1, 2) max_features : int Maximum number of tfidf-vectorized ngrams to use as features in model. Selects top_features by term frequency Default: 1000 class_weight : Optional[dict or 'balanced'] Weights associated with classes in the form {class_label: weight}. If not given, all classes are supposed to have weight one. The “balanced” mode uses the values of y to automatically adjust weights inversely proportional to class frequencies in the input data as n_samples / (n_classes * np.bincount(y)). Note that these weights will be multiplied with sample_weight (passed through the fit method) if sample_weight is specified. """ # Initialize pipeline seed = self.random_state logit_pipeline = \ Pipeline([('tfidf', TfidfVectorizer(ngram_range=ngram_range, max_features=max_features, stop_words=self.stop)), ('logit', LogisticRegression(C=C, solver='saga', penalty='l1', multi_class='auto', class_weight=class_weight, random_state=seed))]) logit_pipeline.fit(texts, y) self.params = {'C': C, 'ngram_grange': ngram_range, 'max_features': max_features, 'class_weight': class_weight, 'random_state': self.random_state} self.estimator = logit_pipeline self.best_score = None self.grid_search = None self.timestamp = self._get_current_time() self.training_set_digest = self._training_set_digest(texts) self._set_variance(texts)
[docs] def cv(self, texts, y, param_grid, n_jobs=1, cv=5): """Performs grid search to select and fit a disambiguation model Parameters ---------- texts : iterable of str Training texts y : iterable of str True labels for the training texts param_grid : Optional[dict] Grid search parameters. Can contain all parameters from the train method. n_jobs : Optional[int] Number of jobs to use when performing grid_search Default: 1 cv : Optional[int] Number of folds to use in crossvalidation. Default: 5 Example ------- >>> params = {'C': [1.0, 10.0, 100.0], ... 'max_features': [3000, 6000, 9000], ... 'ngram_range': [(1, 1), (1, 2), (1, 3)]} >>> classifier = LongformClassifier('IR', ['insulin receptor']) >>> classifier.train(texts, labels, param_grid=params, n_jobs=4) """ # Initialize pipeline seed = self.random_state logit_pipeline = Pipeline([('tfidf', TfidfVectorizer(ngram_range=(1, 2), max_features=1000, stop_words=self.stop)), ('logit', LogisticRegression(C=100., solver='saga', penalty='l1', multi_class='auto', random_state=seed))]) # Create scorer for use in grid search. Best params decided using # f1 score. The positive labels are specified when the classifier is # initialized. Uses micro-average f1, precision, and recall scores. # This means metrics are calculated globally by counting all true # positives, false negatives, and false positives f1_scorer = make_scorer(f1_score, labels=self.pos_labels, average='micro') pr_scorer = make_scorer(precision_score, labels=self.pos_labels, average='micro') rc_scorer = make_scorer(recall_score, labels=self.pos_labels, average='micro') scorer = {'f1': f1_scorer, 'pr': pr_scorer, 'rc': rc_scorer} all_labels = sorted(set(y)) for label in all_labels: f1 = make_scorer(f1_score, labels=[label], average=None) pr = make_scorer(recall_score, labels=[label], average=None) rc = make_scorer(precision_score, labels=[label], average=None) scorer.update({'f1_%s' % label: f1, 'pr_%s' % label: pr, 'rc_%s' % label: rc}) for label1 in all_labels: for label2 in all_labels: count_score = make_scorer(_count_score, label1=label1, label2=label2) scorer['count_%s_%s' % (label1, label2)] = count_score logger.info('Beginning grid search in parameter space:\n' '%s' % param_grid) param_mapping = {'C': 'logit__C', 'class_weight': 'logit__class_weight', 'max_features': 'tfidf__max_features', 'ngram_range': 'tfidf__ngram_range'} inverse_param_mapping = {value: key for key, value in param_mapping.items()} param_grid = {param_mapping[key]: value for key, value in param_grid.items()} num_splits = cv cv = StratifiedKFold(n_splits=cv, shuffle=True, random_state=seed) # Fit grid_search and set the estimator for the instance of the class grid_search = GridSearchCV(logit_pipeline, param_grid, cv=cv, n_jobs=n_jobs, scoring=scorer, refit='f1', return_train_score=False) grid_search.fit(texts, y) logger.info('Best f1 score of %s found for' % grid_search.best_score_ + ' parameter values:\n%s' % grid_search.best_params_) cv = grid_search.cv_results_ best_index = cv['rank_test_f1'][0] - 1 labels = dict(Counter(y)) stats = {'label_distribution': labels, 'f1': {'mean': np.round(cv['mean_test_f1'][best_index], 6), 'std': np.round(cv['std_test_f1'][best_index], 6)}, 'precision': {'mean': np.round(cv['mean_test_pr'] [best_index], 6), 'std': np.round(cv['std_test_pr'] [best_index], 6)}, 'recall': {'mean': np.round(cv['mean_test_rc'] [best_index], 6), 'std': np.round(cv['std_test_rc'] [best_index], 6)}} for label in all_labels: stats.update({label: {'f1': {'mean': np.round(cv['mean_test_f1_%s' % label][best_index], 6), 'std': np.round(cv['std_test_f1_%s' % label][best_index], 6)}, 'pr': {'mean': np.round(cv['mean_test_pr_%s' % label][best_index], 6), 'std': np.round(cv['std_test_pr_%s' % label][best_index], 6)}, 'rc': {'mean': np.round(cv['mean_test_rc_%s' % label][best_index], 6), 'std': np.round(cv['std_test_rc_%s' % label][best_index], 6)}}}) confusion = defaultdict(lambda: defaultdict(list)) for label1 in all_labels: for label2 in all_labels: for i in range(num_splits): key = 'split%s_test_count_%s_%s' % (i, label1, label2) val = int(cv[key][best_index]) confusion[label1][label2].append(val) confusion = {key: dict(value) for key, value in confusion.items()} params = {inverse_param_mapping[key]: value for key, value in grid_search.best_params_.items()} params['random_state'] = self.random_state self.params = params self.estimator = grid_search.best_estimator_ self.best_score = grid_search.best_score_ self.grid_search = grid_search self.stats = stats self.confusion_info = confusion self.timestamp = self._get_current_time() self.training_set_digest = self._training_set_digest(texts) self._set_variance(texts)
[docs] def predict_proba(self, texts): """Predict class probabilities for a list-like of texts""" labels = self.estimator.classes_ preds = self.estimator.predict_proba(texts) return [{labels[i]: prob for i, prob in enumerate(probs)} for probs in preds]
[docs] def predict(self, texts): """Predict class labels for a list-like of texts""" return self.estimator.predict(texts)
[docs] def get_model_info(self): """Return a JSON object representing a model for portability. Returns ------- dict A JSON object representing the attributes of the classifier needed to make it portable/serializable and enabling its reload. """ logit = self.estimator.named_steps['logit'] if not hasattr(logit, 'coef_'): raise RuntimeError('Estimator has not been fit.') classes_ = logit.classes_.tolist() intercept_ = logit.intercept_.tolist() coef_ = logit.coef_.tolist() tfidf = self.estimator.named_steps['tfidf'] vocabulary_ = {term: int(frequency) for term, frequency in tfidf.vocabulary_.items()} idf_ = tfidf.idf_.tolist() ngram_range = tfidf.ngram_range model_info = {'logit': {'classes_': classes_, 'intercept_': intercept_, 'coef_': coef_}, 'tfidf': {'vocabulary_': vocabulary_, 'idf_': idf_, 'ngram_range': ngram_range}, 'shortforms': self.shortforms, 'pos_labels': self.pos_labels} # Model statistics may not be available depending on # how the model was fit if hasattr(self, 'stats') and self.stats is not None: model_info['stats'] = self.stats # These attributes may not exist in older models if hasattr(self, '_std') and self._std is not None: model_info['std'] = self._std.tolist() if hasattr(self, 'timestamp') and self.timestamp is not None: model_info['timestamp'] = self.timestamp if hasattr(self, 'training_set_digest') and \ self.training_set_digest is not None: model_info['training_set_digest'] = self.training_set_digest if hasattr(self, 'params') and self.params is not None: model_info['params'] = self.params if hasattr(self, 'version') and self.version is not None: model_info['version'] = self.version if hasattr(self, 'confusion_info') and self.confusion_info is not None: model_info['confusion_info'] = self.confusion_info if hasattr(self, 'other_metadata') and self.other_metadata is not None: model_info['other_metadata'] = self.other_metadata return model_info
[docs] def dump_model(self, filepath): """Serialize model to gzipped json Parameters ---------- filepath : str Path to output file """ model_info = self.get_model_info() json_str = json.dumps(model_info) json_bytes = json_str.encode('utf-8') with gzip.GzipFile(filepath, 'w') as fout: fout.write(json_bytes)
[docs] def feature_importances(self): """Return feature importance scores for each label The feature importance scores are given by multiplying the coefficients of the logistic regression model by the standard deviations of the tf-idf scores for the associated features over all texts. Note that there is a coefficient associated to each label feature pair. One can interpret the feature importance score as the change in the linear predictor for a given label associated to a one standard deviation change in a feature's value. The predicted probability being given by the composition of the logit link function and the linear predictor. Returns ------- dict Dictionary with class labels as keys. The associated values are lists of two element tuples each with first element an ngram feature and second element a feature importance score """ if not hasattr(self, '_std') or self._std is None: logger.warning('Feature importance information not available for' ' this model.') return None output = {} tfidf = self.estimator.named_steps['tfidf'] logit = self.estimator.named_steps['logit'] feature_names = tfidf.get_feature_names() classes = logit.classes_ # Binary and multiclass cases most be handled separately # When there are greater than two classes, the logistic # regression model will have a row of coefficients for # each class. When there are only two classes, there is # only one row of coefficients corresponding to the label classes[1] if len(classes) > 2: for index, label in enumerate(classes): importance = np.round(logit.coef_[index] * self._std, 4) output[label] = sorted(zip(feature_names, importance), key=lambda x: -x[1]) else: importance = np.round(np.squeeze(logit.coef_) * self._std, 4) output[classes[1]] = sorted(zip(feature_names, importance), key=lambda x: -x[1]) output[classes[0]] = [(feature, -value) for feature, value in output[classes[1]][::-1]] return output
def _set_variance(self, texts): """Set attribute containing array of variances for features Parameters __________ texts : iterable of str Training texts """ tfidf = self.estimator.named_steps['tfidf'] X = tfidf.transform(texts) temp = X.copy() temp.data **= 2 second_moment = temp.mean(0) first_moment_squared = np.square(X.mean(0)) result = second_moment - first_moment_squared self._std = np.sqrt(np.squeeze(np.asarray(result))) def _get_current_time(self): unix_timestamp = datetime.now().timestamp() return datetime.fromtimestamp(unix_timestamp).isoformat() def _training_set_digest(self, texts): """Returns a hash corresponding to training set Does not depend on order of texts """ hashed_texts = ''.join(md5(text.encode('utf-8')).hexdigest() for text in sorted(texts)) return md5(hashed_texts.encode('utf-8')).hexdigest()
[docs]def load_model(filepath): """Load previously serialized model Parameters ---------- filepath : str path to model file Returns ------- longform_model : py:class:`adeft.classify.AdeftClassifier` The classifier that was loaded from the given path. """ with gzip.GzipFile(filepath, 'r') as fin: json_bytes = fin.read() json_str = json_bytes.decode('utf-8') model_info = json.loads(json_str) return load_model_info(model_info)
[docs]def load_model_info(model_info): """Return a longform model from a model info JSON object. Parameters ---------- model_info : dict The JSON object containing the attributes of a model. Returns ------- longform_model : py:class:`adeft.classify.AdeftClassifier` The classifier that was loaded from the given JSON object. """ shortforms = model_info['shortforms'] pos_labels = model_info['pos_labels'] longform_model = AdeftClassifier(shortforms=shortforms, pos_labels=pos_labels) ngram_range = model_info['tfidf']['ngram_range'] tfidf = TfidfVectorizer(ngram_range=ngram_range, stop_words='english') logit = LogisticRegression(multi_class='auto') tfidf.vocabulary_ = model_info['tfidf']['vocabulary_'] tfidf.idf_ = model_info['tfidf']['idf_'] logit.classes_ = np.array(model_info['logit']['classes_'], dtype='<U64') logit.intercept_ = np.array(model_info['logit']['intercept_']) logit.coef_ = np.array(model_info['logit']['coef_']) estimator = Pipeline([('tfidf', tfidf), ('logit', logit)]) longform_model.estimator = estimator # These attributes do not exist in older adeft models. # For backwards compatibility we check if they are present if 'stats' in model_info: longform_model.stats = model_info['stats'] if 'std' in model_info: longform_model._std = np.array(model_info['std']) if 'timestamp' in model_info: longform_model.timestamp = model_info['timestamp'] if 'training_set_digest' in model_info: longform_model.training_set_digest = model_info['training_set_digest'] if 'params' in model_info: longform_model.params = model_info['params'] if 'version' in model_info: longform_model.version == model_info['version'] if 'confusion_info' in model_info: longform_model.confusion_info = model_info['confusion_info'] if 'other_metadata' in model_info: longform_model.other_metadata = model_info['other_metadata'] return longform_model
def _count_score(y_true, y_pred, label1=0, label2=1): return sum((y == label1 and pred == label2) for y, pred in zip(y_true, y_pred))