我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sklearn.cross_validation.StratifiedKFold()。
def train_and_calibrate_cv(model, X_tr, y_tr, cv=5): y_pred_xval = np.zeros(len(y_tr)) skf = cross_validation.StratifiedKFold(y_tr, n_folds=cv,shuffle=True) i = 0; for train, test in skf: i = i+1 print("training fold {} of {}".format(i, cv)) X_train_xval = np.array(X_tr)[train,:] X_test_xval = np.array(X_tr)[test,:] y_train_xval = np.array(y_tr)[train] # We could also copy the model first and then fit it model_copy = clone(model) model_copy.fit(X_train_xval,y_train_xval) y_pred_xval[test]=model.predict_proba(X_test_xval)[:,1] print("training full model") model_copy = clone(model) model_copy.fit(X_tr,y_tr) print("calibrating function") calib_func = prob_calibration_function(y_tr, y_pred_xval) return model_copy, calib_func
def test_homonym(H, sent, features, C=1.0): X_0 = features(matching(sent, H[0])) X_1 = features(matching(sent, H[1])) y_0 = numpy.zeros(len(X_0)) y_1 = numpy.ones(len(X_1)) X = normalize(numpy.vstack([X_0, X_1]), norm='l2') y = numpy.hstack([y_0, y_1]) classifier = LogisticRegression(C=C) fold = StratifiedKFold(y, n_folds=10) score = [] count = [] for tr, te in fold: X_tr, X_te = X[tr], X[te] y_tr, y_te = y[tr], y[te] classifier.fit(X_tr, y_tr) score.append(sum(classifier.predict(X_te) == y_te)) count.append(len(y_te)) score = numpy.array(score, dtype='float') count = numpy.array(count, dtype='float') result = {'word1_count': len(y_0), 'word2_count': len(y_1), 'majority': 1.0 * max(len(y_0),len(y_1))/len(y), 'kfold_acc': score/count } return result
def getFolds(labels, number_folds): """ Provides train/test indices to split data in train test sets. Parameters ---------- labels: array-like of shape = [number_samples] The target values (class labels in classification). number_folds: int The amount of folds for the k-fold cross-validation. Return ---------- folds: StratifiedKFold the train/test indices of the splitted data. """ return StratifiedKFold(y=labels, n_folds=number_folds, shuffle=True)
def _calculate(self, X, y, categorical, metafeatures, helpers): import sklearn.lda if len(y.shape) == 1 or y.shape[1] == 1: kf = cross_validation.StratifiedKFold(y, n_folds=10) else: kf = cross_validation.KFold(y.shape[0], n_folds=10) accuracy = 0. try: for train, test in kf: lda = sklearn.lda.LDA() if len(y.shape) == 1 or y.shape[1] == 1: lda.fit(X[train], y[train]) else: lda = OneVsRestClassifier(lda) lda.fit(X[train], y[train]) predictions = lda.predict(X[test]) accuracy += sklearn.metrics.accuracy_score(predictions, y[test]) return accuracy / 10 except LinAlgError as e: self.logger.warning("LDA failed: %s Returned 0 instead!" % e) return np.NaN except ValueError as e: self.logger.warning("LDA failed: %s Returned 0 instead!" % e) return np.NaN
def _calculate(self, X, y, categorical, metafeatures, helpers): import sklearn.naive_bayes if len(y.shape) == 1 or y.shape[1] == 1: kf = cross_validation.StratifiedKFold(y, n_folds=10) else: kf = cross_validation.KFold(y.shape[0], n_folds=10) accuracy = 0. for train, test in kf: nb = sklearn.naive_bayes.GaussianNB() if len(y.shape) == 1 or y.shape[1] == 1: nb.fit(X[train], y[train]) else: nb = OneVsRestClassifier(nb) nb.fit(X[train], y[train]) predictions = nb.predict(X[test]) accuracy += sklearn.metrics.accuracy_score(predictions, y[test]) return accuracy / 10
def _calculate(self, X, y, categorical, metafeatures, helpers): import sklearn.tree if len(y.shape) == 1 or y.shape[1] == 1: kf = cross_validation.StratifiedKFold(y, n_folds=10) else: kf = cross_validation.KFold(y.shape[0], n_folds=10) accuracy = 0. for train, test in kf: random_state = check_random_state(42) tree = sklearn.tree.DecisionTreeClassifier(random_state=random_state) if len(y.shape) == 1 or y.shape[1] == 1: tree.fit(X[train], y[train]) else: tree = OneVsRestClassifier(tree) tree.fit(X[train], y[train]) predictions = tree.predict(X[test]) accuracy += sklearn.metrics.accuracy_score(predictions, y[test]) return accuracy / 10
def _calculate(self, X, y, categorical, metafeatures, helpers): import sklearn.tree if len(y.shape) == 1 or y.shape[1] == 1: kf = cross_validation.StratifiedKFold(y, n_folds=10) else: kf = cross_validation.KFold(y.shape[0], n_folds=10) accuracy = 0. for train, test in kf: random_state = check_random_state(42) node = sklearn.tree.DecisionTreeClassifier( criterion="entropy", max_depth=1, random_state=random_state, min_samples_split=1, min_samples_leaf=1, max_features=None) if len(y.shape) == 1 or y.shape[1] == 1: node.fit(X[train], y[train]) else: node = OneVsRestClassifier(node) node.fit(X[train], y[train]) predictions = node.predict(X[test]) accuracy += sklearn.metrics.accuracy_score(predictions, y[test]) return accuracy / 10
def _calculate(self, X, y, categorical, metafeatures, helpers): import sklearn.tree if len(y.shape) == 1 or y.shape[1] == 1: kf = cross_validation.StratifiedKFold(y, n_folds=10) else: kf = cross_validation.KFold(y.shape[0], n_folds=10) accuracy = 0. for train, test in kf: random_state = check_random_state(42) node = sklearn.tree.DecisionTreeClassifier( criterion="entropy", max_depth=1, random_state=random_state, min_samples_split=1, min_samples_leaf=1, max_features=1) if len(y.shape) == 1 or y.shape[1] == 1: node.fit(X[train], y[train]) else: node = OneVsRestClassifier(node) node.fit(X[train], y[train]) predictions = node.predict(X[test]) accuracy += sklearn.metrics.accuracy_score(predictions, y[test]) return accuracy / 10
def test_stacked_classfier_extkfold(self): bclf = LogisticRegression(random_state=1) clfs = [RandomForestClassifier(n_estimators=40, criterion = 'gini', random_state=1), RidgeClassifier(random_state=1), ] sl = StackedClassifier(bclf, clfs, n_folds=3, verbose=0, Kfold=StratifiedKFold(self.iris.target, 3), stack_by_proba=False, oob_score_flag=True, oob_metrics=log_loss) sl.fit(self.iris.data, self.iris.target) score = sl.score(self.iris.data, self.iris.target) self.assertGreater(score, 0.9, "Failed with score = {0}".format(score))
def test_fwls_classfier(self): feature_func = lambda x: np.ones(x.shape) bclf = LogisticRegression(random_state=1) clfs = [RandomForestClassifier(n_estimators=40, criterion = 'gini', random_state=1), RidgeClassifier(random_state=1), ] sl = FWLSClassifier(bclf, clfs, feature_func=feature_func, n_folds=3, verbose=0, Kfold=StratifiedKFold(self.iris.target, 3), stack_by_proba=False) sl.fit(self.iris.data, self.iris.target) score = sl.score(self.iris.data, self.iris.target) self.assertGreater(score, 0.9, "Failed with score = {0}".format(score))
def validation(self,X,Y,kind): """ ??2-fold???? """ print 'validating...' fold_n=2 folds = list(StratifiedKFold(Y, n_folds=fold_n, random_state=0)) score=np.zeros(fold_n) for j, (train_idx, test_idx) in enumerate(folds): print j + 1, '-fold' X_train = X[train_idx] y_train = Y[train_idx] X_test = X[test_idx] y_test = Y[test_idx] res = self.fit(X_train, y_train, X_test) cur = sum(y_test == res) * 1.0 / len(res) score[j] = cur print score, score.mean() return score.mean()
def _devset_cv(self, a_y_train, a_n_dev, a_n_folds): """Generate train-test split from training and development data. Args: a_y_train (list[int]): list of training instances' tags a_n_dev (int): number of devset instances a_n_folds (int): number of folds Returrns: list[tuple]: list of training/testing folds """ folds = [] n_train = len(a_y_train) dev_ids = [n_train + i for i in xrange(a_n_dev)] # create stratified K-folds over the training data skf = StratifiedKFold(a_y_train, a_n_folds) for train_ids, test_ids in skf: folds.append((train_ids, np.concatenate((test_ids, dev_ids)))) return folds
def run(self, X_train, y_train, X_test, y_test, profiler): skf = StratifiedKFold(y_train, n_folds=self.n_folds, shuffle=True, random_state=123) fold = 1 for train_index, test_index in skf: X_train_fold, y_train_fold = [X_train[i] for i in train_index], [y_train[i] for i in train_index] X_test_fold, y_test_fold = [X_train[i] for i in test_index], [y_train[i] for i in test_index] logger.info('Training on {} instances!'.format(len(train_index))) profiler.train(X_train_fold, y_train_fold) logger.info('Testing on fold {} with {} instances'.format( fold, len(test_index))) y_pred_fold = profiler.predict(X_test_fold) print_accuracy(y_test_fold, y_pred_fold) fold = fold + 1 if X_test: logger.info('Training on {} instances!'.format(len(X_train))) profiler.train(X_train, y_train) logger.info('Testing on {} instances!'.format(len(X_test))) y_pred = profiler.predict(X_test) print_confusion_matrix(y_test, y_pred) print_accuracy(y_test, y_pred)
def test_model(self, n_folds=10): """ ?? `??K-??????Stratified K-folds cross-validating?` ??????? """ logging.debug("testing model with {}-folds CV".format(n_folds)) model = self.init_model() X = self.data.data y = self.data.target cv = cross_validation.StratifiedKFold(y, n_folds=n_folds, random_state=42) t0 = time() y_pred = cross_validation.cross_val_predict(model, X=X, y=y, n_jobs=-1, cv=cv) t = time() - t0 print("=" * 52) print("time cost: {}".format(t)) print() print("confusion matrix\n", metrics.confusion_matrix(y, y_pred)) print() print("\t\taccuracy: {}".format(metrics.accuracy_score(y, y_pred))) print() print("\t\tclassification report") print("-" * 52) print(metrics.classification_report(y, y_pred))
def crossValidation(clf, X, Y, num=None): ''' num: can be number of trees or nearest neighbours ''' scores = [] cv = StratifiedKFold(Y, n_folds=5) for train, test in cv: X_train, y_train = X[train], Y[train] X_test, y_test = X[test], Y[test] clf.fit( X_train, y_train ) scores.append(clf.score( X_test, y_test )) if num: print("Classifier: " + str (clf.__str__ )+ "\t Mean(scores)= " + str (np.mean(scores) ) + "\tStddev(scores)= " + str (np.std(scores))+ "\t Number of neighbours / trees= " + str (num) + "\n") logFile ("Classifier: " + str (clf.__str__ )+ "\t Mean(scores)= " + str (np.mean(scores) ) + "\tStddev(scores)= " + str (np.std(scores))+ "\t Number of neighbours / trees= " + str (num) + "\n") else: print("Classifier: " + str (clf.__str__ )+ "\t Mean(scores)= " + str (np.mean(scores) ) + "\tStddev(scores)= " + str (np.std(scores)) + "\n") logFile ("Classifier: " + str (clf.__str__ )+ "\t Mean(scores)= " + str (np.mean(scores) ) + "\tStddev(scores)= " + str (np.std(scores)) + "\n")
def score(self, params): print "Training with params : " print params N_boost_round=[] Score=[] skf = cross_validation.StratifiedKFold(self.train_y, n_folds=6, shuffle=True, random_state=25) for train, test in skf: X_Train, X_Test, y_Train, y_Test = self.train_X[train], self.train_X[test], self.train_y[train], self.train_y[test] dtrain = xgb.DMatrix(X_Train, label=y_Train) dvalid = xgb.DMatrix(X_Test, label=y_Test) watchlist = [(dtrain, 'train'),(dvalid, 'eval')] model = xgb.train(params, dtrain, num_boost_round=150, evals=watchlist, early_stopping_rounds=10) predictions = model.predict(dvalid) N = model.best_iteration N_boost_round.append(N) score = model.best_score Score.append(score) Average_best_num_boost_round = np.average(N_boost_round) Average_best_score = np.average(Score) print "\tAverage of best iteration {0}\n".format(Average_best_num_boost_round) print "\tScore {0}\n\n".format(Average_best_score) return {'loss': Average_best_score, 'status': STATUS_OK, 'Average_best_num_boost_round': Average_best_num_boost_round}
def create_cv_id(target, n_folds_ = 5, cv_id_name=cv_id_name, seed=407): try: a = StratifiedKFold(target['target'],n_folds=n_folds_, shuffle=True, random_state=seed) cv_index = a.test_folds print 'Done StratifiedKFold' except: cv_index = np.empty(len(target)) a = KFold(len(target),n_folds=n_folds_, shuffle=True, random_state=seed) for idx, i in enumerate(a): cv_index[i[1]] = idx cv_index = cv_index.astype(int) print 'Done Kfold' np.save(INPUT_PATH + cv_id_name, cv_index) return ######### Utils ######### #feature list????????????util??
def test_stratified_kfold_no_shuffle(): # Manually check that StratifiedKFold preserves the data ordering as much # as possible on toy datasets in order to avoid hiding sample dependencies # when possible splits = iter(cval.StratifiedKFold([1, 1, 0, 0], 2)) train, test = next(splits) assert_array_equal(test, [0, 2]) assert_array_equal(train, [1, 3]) train, test = next(splits) assert_array_equal(test, [1, 3]) assert_array_equal(train, [0, 2]) splits = iter(cval.StratifiedKFold([1, 1, 1, 0, 0, 0, 0], 2)) train, test = next(splits) assert_array_equal(test, [0, 1, 3, 4]) assert_array_equal(train, [2, 5, 6]) train, test = next(splits) assert_array_equal(test, [2, 5, 6]) assert_array_equal(train, [0, 1, 3, 4])
def test_stratified_kfold_ratios(): # Check that stratified kfold preserves label ratios in individual splits # Repeat with shuffling turned off and on n_samples = 1000 labels = np.array([4] * int(0.10 * n_samples) + [0] * int(0.89 * n_samples) + [1] * int(0.01 * n_samples)) for shuffle in [False, True]: for train, test in cval.StratifiedKFold(labels, 5, shuffle=shuffle): assert_almost_equal(np.sum(labels[train] == 4) / len(train), 0.10, 2) assert_almost_equal(np.sum(labels[train] == 0) / len(train), 0.89, 2) assert_almost_equal(np.sum(labels[train] == 1) / len(train), 0.01, 2) assert_almost_equal(np.sum(labels[test] == 4) / len(test), 0.10, 2) assert_almost_equal(np.sum(labels[test] == 0) / len(test), 0.89, 2) assert_almost_equal(np.sum(labels[test] == 1) / len(test), 0.01, 2)
def test_cross_val_generator_with_indices(): X = np.array([[1, 2], [3, 4], [5, 6], [7, 8]]) y = np.array([1, 1, 2, 2]) labels = np.array([1, 2, 3, 4]) # explicitly passing indices value is deprecated loo = cval.LeaveOneOut(4) lpo = cval.LeavePOut(4, 2) kf = cval.KFold(4, 2) skf = cval.StratifiedKFold(y, 2) lolo = cval.LeaveOneLabelOut(labels) lopo = cval.LeavePLabelOut(labels, 2) ps = cval.PredefinedSplit([1, 1, 2, 2]) ss = cval.ShuffleSplit(2) for cv in [loo, lpo, kf, skf, lolo, lopo, ss, ps]: for train, test in cv: assert_not_equal(np.asarray(train).dtype.kind, 'b') assert_not_equal(np.asarray(train).dtype.kind, 'b') X[train], X[test] y[train], y[test]
def test_cross_val_generator_with_default_indices(): X = np.array([[1, 2], [3, 4], [5, 6], [7, 8]]) y = np.array([1, 1, 2, 2]) labels = np.array([1, 2, 3, 4]) loo = cval.LeaveOneOut(4) lpo = cval.LeavePOut(4, 2) kf = cval.KFold(4, 2) skf = cval.StratifiedKFold(y, 2) lolo = cval.LeaveOneLabelOut(labels) lopo = cval.LeavePLabelOut(labels, 2) ss = cval.ShuffleSplit(2) ps = cval.PredefinedSplit([1, 1, 2, 2]) for cv in [loo, lpo, kf, skf, lolo, lopo, ss, ps]: for train, test in cv: assert_not_equal(np.asarray(train).dtype.kind, 'b') assert_not_equal(np.asarray(train).dtype.kind, 'b') X[train], X[test] y[train], y[test]
def gridSearchPipeline(pipeline, paramsGrid, Xtrain, Ytrain, **cvParams): print("Grid Searching pipeline:") print(pipeline) # use 5-fold stratified cross-validation by default to maintain # consistent class balance across training and testing if 'cv' not in cvParams: # print "Ytrain: ", Ytrain # numClasses = len(np.unique(Ytrain)) # examplesPerClass = len(Ytrain) / numClasses # nFolds = max(5, examplesPerClass / 5) # if nFolds < 5: # if True: # r, c = Ytrain.shape # print "tiny Ytrain size: (%d, %d)" % Ytrain.shape # (r, c) # for row in Ytrain: print row # cvParams['cv'] = StratifiedKFold(Ytrain, n_folds=nFolds) cvParams['cv'] = StratifiedKFold(Ytrain, n_folds=5) cv = GridSearchCV(pipeline, paramsGrid, **cvParams) cv.fit(Xtrain, Ytrain) return cv
def test_grtm(): l = language(1000) n_iter = 1000 KL_thresh = 0.3 mu = 0. nu2 = 1. np.random.seed(l['seed']) H = np.random.normal(loc=mu, scale=nu2, size=(l['K'], l['K'])) zeta = pd.DataFrame([(i, j, np.dot(np.dot(l['thetas'][i], H), l['thetas'][j])) for i, j in product(range(l['D']), repeat=2)], columns=('tail', 'head', 'zeta')) zeta['y'] = (zeta.zeta >= 0).astype(int) y = zeta[['tail', 'head', 'y']].values skf = StratifiedKFold(y[:, 2], n_folds=100) _, train_idx = next(iter(skf)) _K = l['K'] _alpha = l['alpha'][:_K] _beta = np.repeat(0.01, l['V']) _b = 1. grtm = GRTM(_K, _alpha, _beta, mu, nu2, _b, n_iter, seed=l['seed'], n_report_iter=l['n_report_iters']) grtm.fit(l['doc_term_matrix'], y[train_idx]) assert_probablity_distribution(grtm.phi) check_KL_divergence(l['topics'], grtm.phi, KL_thresh)
def grid_search(estimator, data, featTypes=('BoW',), nFolds=10, random_seed=44, param_grid=()): labels = [x.severity for x in data] generatePrimaryFeats(data, featTypes) featurized = [] for d in data: instance = {} for featname, values in d.feats.items(): # Give each feature a unique name to avoid overwriting features. # If e.g. a concept feature has the same name as a bow word, the old code # would overwrite one of the features. instance.update({"{0}-{1}".format(featname, k): v for k, v in values.items()}) featurized.append(instance) d = DictVectorizer() x_train = d.fit_transform(featurized) folds = cross_validation.StratifiedKFold(labels, n_folds=nFolds, shuffle=True, random_state=random_seed) grid = GridSearchCV(estimator, param_grid=param_grid, scoring="f1", n_jobs=-1, cv=folds) fit_grid = grid.fit(x_train, labels) print(fit_grid.best_params_) return fit_grid.best_params_
def train_test_split(X, y, test_size=0.25, random_state=42, stratify=True): if stratify: n_folds = int(round(1 / test_size)) sss = StratifiedKFold(y, n_folds=n_folds, random_state=random_state) else: sss = ShuffleSplit(len(y), test_size=test_size, random_state=random_state) train_idx, test_idx = iter(sss).next() return X[train_idx], X[test_idx], y[train_idx], y[test_idx]
def __call__(self, X, y, net): if self.eval_size is not None: if net.regression or not self.stratify: # test_size = self.eval_size # kf = ShuffleSplit( # y.shape[0], test_size=test_size, # random_state=self.random_state # ) # train_indices, valid_indices = next(iter(kf)) # valid_indices = shuffle(valid_indices) test_size = 1 - self.eval_size kf = ShuffleSplit( y.shape[0], test_size=test_size, random_state=self.random_state ) valid_indices, train_indices = next(iter(kf)) else: n_folds = int(round(1 / self.eval_size)) kf = StratifiedKFold(y, n_folds=n_folds, random_state=self.random_state) train_indices, valid_indices = next(iter(kf)) X_train, y_train = X[train_indices], y[train_indices] X_valid, y_valid = X[valid_indices], y[valid_indices] else: X_train, y_train = X, y X_valid, y_valid = X[len(X):], y[len(y):] return X_train, X_valid, y_train, y_valid
def get_cv_generator(training_data, do_segment_split=True, random_state=None): """ Returns a cross validation generator. :param training_data: The training data to create the folds from. :param do_segment_split: If True, the folds will be generated based on the segment names. :param random_state: A constant to use as a random seed. :return: A generator which can be used by the grid search to generate cross validation folds. """ k_fold_kwargs = dict(n_folds=10, random_state=random_state) if do_segment_split: cv = dataset.SegmentCrossValidator(training_data, cross_validation.StratifiedKFold, **k_fold_kwargs) else: cv = sklearn.cross_validation.StratifiedKFold(training_data['Preictal'], **k_fold_kwargs) return cv
def __init__(self, dataframe, base_cv=None, **cv_kwargs): # We create a copy of the dataframe with a new last level # index which is an enumeration of the rows (like proper indices) self.all_segments = pd.DataFrame({'Preictal': dataframe['Preictal'], 'i': np.arange(len(dataframe))}) self.all_segments.set_index('i', append=True, inplace=True) # Now create a series with only the segments as rows. This is what we will pass into the wrapped cross # validation generator self.segments = self.all_segments['Preictal'].groupby(level='segment').first() self.segments.sort(inplace=True) if base_cv is None: self.cv = cross_validation.StratifiedKFold(self.segments, **cv_kwargs) else: self.cv = base_cv(self.segments, **cv_kwargs)
def split_dataset(dataframe, training_ratio=.8, do_segment_split=True, shuffle=False, random_state=None): """ Splits the dataset into a training and test partition. :param dataframe: A data frame to split. Should have a 'Preictal' column. :param training_ratio: The ratio of the data to use for the first part. :param do_segment_split: If True, the split will be done on whole segments. :param shuffle: If true, the split will shuffle the data before splitting. :param random_state: Seed :return: A pair of disjoint data frames, where the first frame contains *training_ratio* of all the data. """ # We'll make the splits based on the sklearn cross validators, # We calculate the number of folds which correspond to the # desired training ratio. If *r* is the training ratio and *k* # the nubmer of folds, we'd like *r* = (*k* - 1)/*k*, that is, # the ratio should be the same as all the included folds divided # by the total number of folds. This gives us *k* = 1/(1-*r*) k = int(np.floor(1/(1 - training_ratio))) if do_segment_split: # We use the segment based cross validator to get a stratified split. cv = SegmentCrossValidator(dataframe, n_folds=k, shuffle=shuffle, random_state=random_state) else: # Don't split by segment, but still do a stratified split cv = cross_validation.StratifiedKFold(dataframe['Preictal'], n_folds=k, shuffle=shuffle, random_state=random_state) training_indices, test_indices = first(cv) return dataframe.iloc[training_indices], dataframe.iloc[test_indices]
def cv_score(classifier, dataset, metric=accuracy_score, n_folds=10): """ Calculate K-fold cross validation score. """ true_labels = [] predicted_labels = [] for train_idx, test_idx in StratifiedKFold(list(dataset.get_labels()), n_folds=n_folds): # clear the classifier (call `clear` RPC). classifier.clear() # split the dataset to train/test dataset. (train_ds, test_ds) = (dataset[train_idx], dataset[test_idx]) # train the classifier using train dataset. for (idx, label) in classifier.train(train_ds): pass # test the classifier using test dataset. for (idx, label, result) in classifier.classify(test_ds): # labels are already desc sorted by score values, so you can get a label # name with the hightest prediction score by: pred_label = result[0][0] # store the result. true_labels.append(label) predicted_labels.append(pred_label) # return cross-validation score return metric(true_labels, predicted_labels)
def _make_kfold(self, Y): if self.MyKfold is not None: return self.MyKfold else: return StratifiedKFold(Y, self.n_folds)
def validation(self, X, Y, wv_X, kind): """ 2-fold validation :param X: train text :param Y: train label :param wv_X: train wv_vec :param kind: age/gender/education :return: mean score of 2-fold validation """ print '????...' X=np.array(X) fold_n=2 folds = list(StratifiedKFold(Y, n_folds=fold_n, shuffle=False,random_state=0)) score = np.zeros(fold_n) for j, (train_idx, test_idx) in enumerate(folds): print j+1,'-fold' X_train = X[train_idx] y_train = Y[train_idx] X_test = X[test_idx] y_test = Y[test_idx] wv_X_train =wv_X[train_idx] wv_X_test = wv_X[test_idx] vec = TfidfVectorizer(use_idf=True,sublinear_tf=False, max_features=50000, binary=True) vec.fit(X_train, y_train) X_train = vec.transform(X_train) X_test = vec.transform(X_test) print 'shape',X_train.shape ypre = self.stacking(X_train,y_train,X_test,wv_X_train,wv_X_test,kind) cur = sum(y_test == ypre) * 1.0 / len(ypre) score[j] = cur print score print score.mean(),kind return score.mean()
def __call__(self, X, y): """ given a dataset X,y we split it, in order to do cross validation, according to the procedure explained below: if n_folds is not None, then we do cross validation based on stratified folds if n_class_samples is not None, then we do cross validation using only <n_class_samples> training samples per class if n_test_samples is not None, then we do cross validation using only <n_test_samples> cross validaition samples per class assumes that each datapoint is in a column of X """ n_classes = len(set(y)) if self.n_folds is not None: # generate the folds self.folds = StratifiedKFold(y, n_folds=self.n_folds, shuffle=False, random_state=None) elif self.n_class_samples is not None: self.folds = [] for i in range(self.n_tests): if type(self.n_class_samples) is not list: self.n_class_samples = (np.ones(n_classes) * self.n_class_samples).astype(int) if self.n_test_samples is not None: self.n_test_samples = (np.ones(n_classes) * self.n_test_samples).astype(int) data_idx = split_dataset(self.n_class_samples, self.n_test_samples, y) train_idx = data_idx[0] test_idx = data_idx[1] self.folds.append((train_idx, test_idx)) self.cross_validate(X, y)
def train_evaluate_stratified(clf, X, y, labels): skf = StratifiedKFold(y, n_folds=10) for fold_number, (train_index, test_index) in enumerate(skf): X_train, y_train = X[train_index], y[train_index] X_test, y_test = X[test_index], y[test_index] clf.fit(X_train, y_train) y_pred = clf.predict(X_test) save_results(y_test, y_pred, labels, fold_number)
def threshold_estimate_cv(x,y,k_fold): print "%d %d %d" % (y.shape[0], sum(y==1), sum(y==0)) kf1 = StratifiedKFold(y, n_folds=k_fold, shuffle=True, random_state=0) threshold = np.zeros((k_fold),dtype="float32") cnt = 0 for train_index, test_index in kf1: x_train, x_test = x[train_index], x[test_index] y_train, y_test = y[train_index], y[test_index] w1 = np.array([1]*y_train.shape[0]) weight = float(len(y_train[y_train == 0]))/float(len(y_train[y_train == 1])) w1 = np.array([1]*y_train.shape[0]) w1[y_train==1]=weight estimator = xgb.XGBClassifier(max_depth=10, learning_rate=0.1, n_estimators=1000, nthread=50) estimator.fit(x_train, y_train, sample_weight=w1) y_scores = estimator.predict_proba(x_test)[:,1] precision, recall, thresholds = precision_recall_curve(y_test, y_scores) f1 = 2*precision[2:]*recall[2:]/(precision[2:]+recall[2:]) m_idx = np.argmax(f1) threshold[cnt] = thresholds[2+m_idx] cnt += 1 print("%d %f %f" % (precision.shape[0], f1[m_idx], thresholds[2+m_idx])) return np.mean(threshold), threshold # Cross validation using gradient tree boosting
def print_metrics(clf): #scores = cross_validation.cross_val_score(clf,features,labels,cv=5,scoring='accuracy') #print 'Accuracy:',scores.mean() cv = cross_validation.StratifiedKFold(labels,n_folds=5) mean_tpr = 0.0 mean_fpr = np.linspace(0,1,100) all_tpr = [] for i, (train,test) in enumerate(cv): probas_ = clf.fit(features[train],labels[train]).predict_proba(features[test]) fpr,tpr,thresholds = metrics.roc_curve(labels[test],probas_[:,1]) mean_tpr += interp(mean_fpr,fpr,tpr) mean_tpr[0] = 0.0 roc_auc = metrics.auc(fpr,tpr) plt.plot(fpr,tpr,lw=1,label='ROC fold %d (area = %0.2f)' % (i,roc_auc)) plt.plot([0,1],[0,1],'--',color=(0.6,0.6,0.6),label='Luck') mean_tpr /= len(cv) mean_tpr[-1] = 1.0 mean_auc = metrics.auc(mean_fpr, mean_tpr) plt.plot(mean_fpr, mean_tpr, 'k--', label='Mean ROC (area = %0.2f)' % mean_auc, lw=2) plt.xlim([-0.05, 1.05]) plt.ylim([-0.05, 1.05]) plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') plt.title('Receiver operating characteristic') plt.legend(loc="lower right") plt.savefig('auc_sent.png')
def test_classifier(clf, X, Y, loc): folds = StratifiedKFold(Y, 5) mean_tpr = 0.0 mean_fpr = numpy.linspace(0, 1, 100) aucs = [] for i, (train, test) in enumerate(folds): clf.fit(X[train], Y[train]) prediction = clf.predict_proba(X[test]) aucs.append(roc_auc_score(Y[test], prediction[:, 1])) false_positive_rate, true_positive_rate, thresholds = roc_curve(Y[test], prediction[:, 1]) mean_tpr += interp(mean_fpr, false_positive_rate, true_positive_rate) mean_tpr[0] = 0.0 roc_auc = auc(false_positive_rate, true_positive_rate) plt.plot(false_positive_rate, true_positive_rate, lw=1, label='ROC fold %d (area = %0.2f)' % ( i, roc_auc)) plt.plot([0, 1], [0, 1], '--', color=(0.6, 0.6, 0.6), label='Luck') mean_tpr /= len(folds) mean_tpr[-1] = 1.0 mean_auc = auc(mean_fpr, mean_tpr) plt.plot(mean_fpr, mean_tpr, 'k--', label='Mean ROC (area = %0.2f)' % mean_auc, lw=2) plt.title('Receiver Operating Characteristic') plt.xlim([0,1]) plt.ylim([0,1]) plt.ylabel('True Positive Rate') plt.xlabel('False Positive Rate') plt.legend(loc='lower right') plt.show() plt.savefig('plots/'+loc+'/'+clf.__class__.__name__+'.png') plt.clf() print clf.__class__.__name__, aucs, numpy.mean(aucs)
def classify_with_cross_validation(X, y, clf, n_folds=5): cv_matrices = [] cv_measures = collections.defaultdict(list) # FIXME: use collections.OrderedDict too logging.info("classifying and predicting with cross validation") skf = cross_validation.StratifiedKFold(y, n_folds=n_folds) for train_indices, test_indices in skf: X_train = X[train_indices] X_test = X[test_indices] y_train = y[train_indices] y_test = y[test_indices] clf.fit(X_train, y_train) y_predicted = clf.predict(X_test) confusion_matrix = metrics.confusion_matrix(y_test, y_predicted).flatten() cv_matrices.append(confusion_matrix) for measure_name, measure_value in calculate_measures(*confusion_matrix).items(): cv_measures[measure_name].append(measure_value) for measure_name, measure_values in cv_measures.items(): mean = np.mean(measure_values) delta = np.std(measure_values) * 1.96 / math.sqrt(n_folds) # 95% of confidence cv_measures[measure_name] = (mean, delta) return cv_measures # noinspection PyPep8Naming
def __init__(self, X, y, Xstatic=[], ystatic=[], nfolds=5, score='r2', classifier=RegressorWrapper, random_state=None): self.nfolds = nfolds self.score = score # self.X = np.array(X) self.X = X self.Xstatic = Xstatic self.le = preprocessing.LabelEncoder().fit(y) self.y = self.le.transform(y) if len(ystatic) > 0: self.ystatic = self.le.transform(ystatic) else: self.ystatic = [] self.test_y = self.y self.create_classifier = classifier self.kfolds = cross_validation.StratifiedKFold(y, n_folds=nfolds, shuffle=True, random_state=random_state)
def __init__(self, X, y, Xstatic=[], ystatic=[], nfolds=5, score='macrof1', classifier=ClassifierWrapper, random_state=None): self.nfolds = nfolds self.score = score # self.X = np.array(X) self.X = X self.Xstatic = Xstatic self.le = preprocessing.LabelEncoder().fit(y) self.y = self.le.transform(y) if len(ystatic) > 0: self.ystatic = self.le.transform(ystatic) else: self.ystatic = [] self.test_y = self.y self.create_classifier = classifier self.kfolds = cross_validation.StratifiedKFold(y, n_folds=nfolds, shuffle=True, random_state=random_state)
def cross_predict(feat, f_name, X=X, y=y): if os.name == 'nt': n_jobs = 1 else: n_jobs = -1 # ???? # clf_1 = MultinomialNB(alpha=5) clf_2 = LinearSVC(C=0.02) # ???? (CV) # This cross-validation object is a merge of StratifiedKFold and ShuffleSplit, # which returns stratified randomized folds. The folds are made by preserving # the percentage of samples for each class. # # Note: like the ShuffleSplit strategy, stratified random splits do not guarantee # that all folds will be different, although this is still # very likely for sizeable datasets. # # Pass this cv to cross_val_predict will raise # ValueError:cross_val_predict only works for partitions # # ? cv ?????? fold ? fold ???????? # cv = cross_validation.StratifiedShuffleSplit(y, test_size=0.2, random_state=42) # This cross-validation object is a variation of KFold that returns stratified folds. # The folds are made by preserving the percentage of samples for each class. cv = cross_validation.StratifiedKFold(y, n_folds=5, random_state=42) model = Pipeline([('feat', feat), ('clf', clf_2)]) t0 = time() y_pred = cross_validation.cross_val_predict(model, X=X, y=y, n_jobs=n_jobs, cv=cv) t = time() - t0 print("=" * 20, f_name, "=" * 20) print("time cost: {}".format(t)) # print("y_predict: {}".format(y_pred)) print() print('confusion matrix:\n', confusion_matrix(y, y_pred)) print() print('\t\taccuracy: {}'.format(accuracy_score(y, y_pred))) print() print("\t\tclassification report") print("-" * 52) print(classification_report(y, y_pred)) # ?? # ???? (tfidf: baseline feature)
def make_mf_lr(X ,y, clf, X_test, n_round=3): n = X.shape[0] ''' Fit metafeature by @clf and get prediction for test. Assumed that @clf -- regressor ''' print clf mf_tr = np.zeros(X.shape[0]) mf_te = np.zeros(X_test.shape[0]) for i in range(n_round): skf = StratifiedKFold(y, n_folds=2, shuffle=True, random_state=42+i*1000) for ind_tr, ind_te in skf: X_tr = X[ind_tr] X_te = X[ind_te] # print('X_tr shape',X_tr.shape) # print('X_te shape',X_te.shape) y_tr = y[ind_tr] y_te = y[ind_te] clf.fit(X_tr, y_tr) mf_tr[ind_te] += clf.predict_proba(X_te)[:,1] mf_te += clf.predict_proba(X_test)[:,1]*0.5 y_pred = clf.predict_proba(X_te)[:,1] score = roc_auc_score(y_te, y_pred) print 'pred[{}] score:{}'.format(i, score) return (mf_tr / n_round, mf_te / n_round)
def make_mf_lsvc(X ,y, clf, X_test, n_round=3): n = X.shape[0] ''' Fit metafeature by @clf and get prediction for test. Assumed that @clf -- regressor ''' print clf mf_tr = np.zeros(X.shape[0]) mf_te = np.zeros(X_test.shape[0]) for i in range(n_round): skf = StratifiedKFold(y, n_folds=2, shuffle=True, random_state=42+i*1000) for ind_tr, ind_te in skf: X_tr = X[ind_tr] X_te = X[ind_te] # print('X_tr shape',X_tr.shape) # print('X_te shape',X_te.shape) y_tr = y[ind_tr] y_te = y[ind_te] clf.fit(X_tr, y_tr) mf_tr[ind_te] += clf.decision_function(X_te) mf_te += clf.decision_function(X_test)*0.5 y_pred = clf.decision_function(X_te) score = roc_auc_score(y_te, y_pred) print 'pred[{}] score:{}'.format(i, score) return (mf_tr / n_round, mf_te / n_round)
def make_mf_nn(X ,y, X_test, n_round=3): n = X.shape[0] ''' Fit metafeature by @clf and get prediction for test. Assumed that @clf -- regressor ''' from kaggler.online_model.ftrl import FTRL mf_tr = np.zeros(X.shape[0]) mf_te = np.zeros(X_test.shape[0]) for i in range(n_round): skf = StratifiedKFold(y, n_folds=2, shuffle=True, random_state=42+i*1000) for ind_tr, ind_te in skf: clf = build_model(X) X_tr = [X[:,0][ind_tr],X[:,1][ind_tr]] X_te = [X[:,0][ind_te],X[:,1][ind_te]] # print('X_tr shape',X_tr.shape) # print('X_te shape',X_te.shape) y_tr = y[ind_tr] y_te = y[ind_te] clf.fit(X_tr, y_tr,nb_epoch=2,batch_size=128,validation_data=[X_te,y_te]) mf_tr[ind_te] += clf.predict(X_te).ravel() mf_te += clf.predict([X_test[:,0],X_test[:,1]]).ravel()*0.5 y_pred = clf.predict(X_te).ravel() score = roc_auc_score(y_te, y_pred) print 'pred[{}] score:{}'.format(i, score) return (mf_tr / n_round, mf_te / n_round)