我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sklearn.datasets.make_blobs()。
def make_classification_example(axis, random_state): X, y = make_blobs(n_samples=100, n_features=2, centers=2, cluster_std=2.7, random_state=random_state) axis.scatter(X[y == 0, 0], X[y == 0, 1], color="red", s=10, label="Disease") axis.scatter(X[y == 1, 0], X[y == 1, 1], color="blue", s=10, label="Healthy") clf = LinearSVC().fit(X, y) # get the separating hyperplane w = clf.coef_[0] a = -w[0] / w[1] xx = np.linspace(-5, 7) yy = a * xx - (clf.intercept_[0]) / w[1] # plot the line, the points, and the nearest vectors to the plane axis.plot(xx, yy, 'k-', color="black", label="Model") ax1.tick_params(labelbottom='off', labelleft='off') ax1.set_xlabel("Gene 1") ax1.set_ylabel("Gene 2") ax1.legend()
def generate_data(N, seed=10): """ This generates some test data that we can use to test our pairwise- distance functions. Required arguments: N -- The number of datapoints in the test data. Optional arguments: seed -- The seed for NumPy's random module. """ # Generate some data: np.random.seed(seed) n_samples1 = N * 3 // 4 # same as floor(3/4 * N) n_samples2 = N - n_samples1 # Blob set 1 centers1 = [[0., 0.], [1., 0.], [0.5, np.sqrt(0.75)]] cluster_std1 = [0.3] * len(centers1) data, _ = make_blobs(n_samples=n_samples1, centers=centers1, cluster_std=cluster_std1) # Make sure Blob 1 checks out # Blob set 2 centers2 = [[0.5, np.sqrt(0.75)]] cluster_std2 = [0.3] * len(centers2) extra, _ = make_blobs(n_samples=n_samples2, centers=centers2, cluster_std=cluster_std2) return np.concatenate((data, extra), axis=0)
def main(): # Load the dataset X, y = datasets.make_blobs() # Cluster the data clf = GaussianMixtureModel(k=3) y_pred = clf.predict(X) p = Plot() p.plot_in_2d(X, y_pred, title="GMM Clustering") p.plot_in_2d(X, y, title="Actual Clustering")
def main(): # Load the dataset X, y = datasets.make_blobs() # Cluster the data using K-Medoids clf = PAM(k=3) y_pred = clf.predict(X) # Project the data onto the 2 primary principal components p = Plot() p.plot_in_2d(X, y_pred, title="PAM Clustering") p.plot_in_2d(X, y, title="Actual Clustering")
def main(): # Load the dataset X, y = datasets.make_blobs() # Cluster the data using K-Means clf = KMeans(k=3) y_pred = clf.predict(X) # Project the data onto the 2 primary principal components p = Plot() p.plot_in_2d(X, y_pred, title="K-Means Clustering") p.plot_in_2d(X, y, title="Actual Clustering")
def data_labels(): return make_blobs(random_state=2)
def test_n_samples_leaves_roots(): # Sanity check for the number of samples in leaves and roots X, y = make_blobs(n_samples=10) brc = Birch() brc.fit(X) n_samples_root = sum([sc.n_samples_ for sc in brc.root_.subclusters_]) n_samples_leaves = sum([sc.n_samples_ for leaf in brc._get_leaves() for sc in leaf.subclusters_]) assert_equal(n_samples_leaves, X.shape[0]) assert_equal(n_samples_root, X.shape[0])
def test_n_clusters(): # Test that n_clusters param works properly X, y = make_blobs(n_samples=100, centers=10) brc1 = Birch(n_clusters=10) brc1.fit(X) assert_greater(len(brc1.subcluster_centers_), 10) assert_equal(len(np.unique(brc1.labels_)), 10) # Test that n_clusters = Agglomerative Clustering gives # the same results. gc = AgglomerativeClustering(n_clusters=10) brc2 = Birch(n_clusters=gc) brc2.fit(X) assert_array_equal(brc1.subcluster_labels_, brc2.subcluster_labels_) assert_array_equal(brc1.labels_, brc2.labels_) # Test that the wrong global clustering step raises an Error. clf = ElasticNet() brc3 = Birch(n_clusters=clf) assert_raises(ValueError, brc3.fit, X) # Test that a small number of clusters raises a warning. brc4 = Birch(threshold=10000.) assert_warns(UserWarning, brc4.fit, X)
def test_sparse_X(): # Test that sparse and dense data give same results X, y = make_blobs(n_samples=100, centers=10) brc = Birch(n_clusters=10) brc.fit(X) csr = sparse.csr_matrix(X) brc_sparse = Birch(n_clusters=10) brc_sparse.fit(csr) assert_array_equal(brc.labels_, brc_sparse.labels_) assert_array_almost_equal(brc.subcluster_centers_, brc_sparse.subcluster_centers_)
def test_branching_factor(): # Test that nodes have at max branching_factor number of subclusters X, y = make_blobs() branching_factor = 9 # Purposefully set a low threshold to maximize the subclusters. brc = Birch(n_clusters=None, branching_factor=branching_factor, threshold=0.01) brc.fit(X) check_branching_factor(brc.root_, branching_factor) brc = Birch(n_clusters=3, branching_factor=branching_factor, threshold=0.01) brc.fit(X) check_branching_factor(brc.root_, branching_factor) # Raises error when branching_factor is set to one. brc = Birch(n_clusters=None, branching_factor=1, threshold=0.01) assert_raises(ValueError, brc.fit, X)
def test_threshold(): # Test that the leaf subclusters have a threshold lesser than radius X, y = make_blobs(n_samples=80, centers=4) brc = Birch(threshold=0.5, n_clusters=None) brc.fit(X) check_threshold(brc, 0.5) brc = Birch(threshold=5.0, n_clusters=None) brc.fit(X) check_threshold(brc, 5.)
def test_birch_example_reproducibility(example_id): # check reproducibility of the Birch example rng = np.random.RandomState(42) X, y = make_blobs(n_samples=1000, n_features=10, random_state=rng) cluster_model = Birch(threshold=0.9, branching_factor=20, compute_sample_indices=True) cluster_model.fit(X) #assert len(cluster_model.root_.subclusters_[1].child_.subclusters_) == 3 htree, n_subclusters = birch_hierarchy_wrapper(cluster_model) assert htree.tree_size == n_subclusters # same random seed as in the birch hierarchy example assert htree.tree_size == 78 sc = htree.flatten()[example_id] if example_id == 34: # this is true in both cases, but example_id fails on circle ci assert sc.current_depth == 1 assert len(sc.children) == 3 assert_array_equal([sc['cluster_id'] for sc in htree.flatten()], np.arange(htree.tree_size))
def load_dataset(dataset, n_samples, random_state=1, n_features=3): # wrapper function to load one of the 3d datasets if dataset == 's_curve': return make_s_curve(n_samples, random_state=random_state) elif dataset == 'swiss_roll': return make_swiss_roll(n_samples, random_state=random_state) elif dataset == 'broken_swiss_roll': return make_broken_swiss_roll(n_samples, random_state=random_state) elif dataset == 'sphere': return make_sphere(n_samples, random_state=random_state) elif dataset == '3_circles': return make_3_circles(n_samples, random_state=random_state) elif dataset == 'peaks': return make_peaks(n_samples, random_state=random_state) elif dataset == 'blobs': return make_blobs(n_samples, n_features=n_features, centers=3, random_state=random_state) else: print("unknown dataset")
def dataset_generator(): """ generate multi-class dataset :return: data X and its labels """ plt.title("Three blobs", fontsize='small') X, y = make_blobs(n_features=2, centers=3) plt.scatter(X[:, 0], X[:, 1], marker='o', c=y) plt.show() #np.save('X_multi.npy', X) #np.save('y_multi.npy', y) return X, y
def gaussian_blobs(n_samples=200, return_centers=False): random_state = 0 centers = [(-10, -10), (-10, 0), (0, -10)] centers.extend([(10, 10), (10, 0), (0, 10)]) centers = np.array(centers) X, gt = sk_datasets.make_blobs(n_samples=n_samples, centers=centers, n_features=2, shuffle=False, random_state=random_state) if return_centers: return X, gt, centers else: return X, gt
def make_clusters(skew=True, *arg, **kwargs): X, y = datasets.make_blobs(*arg, **kwargs) if skew: nrow = X.shape[1] for i in np.unique(y): X[y == i] = X[y == i].dot(np.random.random((nrow, nrow)) - 0.5) return X, y
def kmeans_example(plot=False): X, y = make_blobs(centers=4, n_samples=500, n_features=2, shuffle=True, random_state=42) clusters = len(np.unique(y)) k = KMeans(K=clusters, max_iters=150, init='++') k.fit(X) k.predict() if plot: k.plot()
def test_integrated_kmeans_elbow(self): """ Test no exceptions for kmeans k-elbow visualizer on blobs dataset See #182: cannot use occupancy dataset because of memory usage """ # Generate a blobs data set X,y = make_blobs( n_samples=1000, n_features=12, centers=6, shuffle=True ) try: visualizer = KElbowVisualizer(KMeans(), k=4) visualizer.fit(X) visualizer.poof() except Exception as e: self.fail("error during k-elbow: {}".format(e))
def test_integrated_mini_batch_kmeans_elbow(self): """ Test no exceptions for mini-batch kmeans k-elbow visualizer See #182: cannot use occupancy dataset because of memory usage """ # Generate a blobs data set X,y = make_blobs( n_samples=1000, n_features=12, centers=6, shuffle=True ) try: visualizer = KElbowVisualizer(MiniBatchKMeans(), k=4) visualizer.fit(X) visualizer.poof() except Exception as e: self.fail("error during k-elbow: {}".format(e))
def test_integrated_kmeans_silhouette(self): """ Test no exceptions for kmeans silhouette visualizer on blobs dataset See #182: cannot use occupancy dataset because of memory usage """ # Generate a blobs data set X, y = make_blobs( n_samples=1000, n_features=12, centers=8, shuffle=True, ) try: visualizer = SilhouetteVisualizer(KMeans()) visualizer.fit(X) visualizer.poof() except Exception as e: self.fail("error during silhouette: {}".format(e))
def test_integrated_mini_batch_kmeans_silhouette(self): """ Test no exceptions for mini-batch kmeans silhouette visualizer See #182: cannot use occupancy dataset because of memory usage """ # Generate a blobs data set X, y = make_blobs( n_samples=1000, n_features=12, centers=8, shuffle=True, ) try: visualizer = SilhouetteVisualizer(MiniBatchKMeans()) visualizer.fit(X) visualizer.poof() except Exception as e: self.fail("error during silhouette: {}".format(e))
def generate_data(n_samples, n_features): """Generate random blob-ish data with noisy features. This returns an array of input data with shape `(n_samples, n_features)` and an array of `n_samples` target labels. Only one feature contains discriminative information, the other features contain only noise. """ X, y = make_blobs(n_samples=n_samples, n_features=1, centers=[[-2], [2]]) # add non-discriminative features if n_features > 1: X = np.hstack([X, np.random.randn(n_samples, n_features - 1)]) return X, y
def make_data(n_samples, n_features, n_queries, random_state=0): """Create index and query data.""" print('Generating random blob-ish data') X, _ = make_blobs(n_samples=n_samples + n_queries, n_features=n_features, centers=100, shuffle=True, random_state=random_state) # Keep the last samples as held out query vectors: note since we used # shuffle=True we have ensured that index and query vectors are # samples from the same distribution (a mixture of 100 gaussians in this # case) return X[:n_samples], X[n_samples:]
def test_kde_pipeline_gridsearch(): # test that kde plays nice in pipelines and grid-searches X, _ = make_blobs(cluster_std=.1, random_state=1, centers=[[0, 1], [1, 0], [0, 0]]) pipe1 = make_pipeline(StandardScaler(with_mean=False, with_std=False), KernelDensity(kernel="gaussian")) params = dict(kerneldensity__bandwidth=[0.001, 0.01, 0.1, 1, 10]) search = GridSearchCV(pipe1, param_grid=params, cv=5) search.fit(X) assert_equal(search.best_params_['kerneldensity__bandwidth'], .1)
def test_grid_search_no_score(): # Test grid-search on classifier that has no score function. clf = LinearSVC(random_state=0) X, y = make_blobs(random_state=0, centers=2) Cs = [.1, 1, 10] clf_no_score = LinearSVCNoScore(random_state=0) grid_search = GridSearchCV(clf, {'C': Cs}, scoring='accuracy') grid_search.fit(X, y) grid_search_no_score = GridSearchCV(clf_no_score, {'C': Cs}, scoring='accuracy') # smoketest grid search grid_search_no_score.fit(X, y) # check that best params are equal assert_equal(grid_search_no_score.best_params_, grid_search.best_params_) # check that we can call score and that it gives the correct result assert_equal(grid_search.score(X, y), grid_search_no_score.score(X, y)) # giving no scoring function raises an error grid_search_no_score = GridSearchCV(clf_no_score, {'C': Cs}) assert_raise_message(TypeError, "no scoring", grid_search_no_score.fit, [[1]])
def test_grid_search_iid(): # test the iid parameter # noise-free simple 2d-data X, y = make_blobs(centers=[[0, 0], [1, 0], [0, 1], [1, 1]], random_state=0, cluster_std=0.1, shuffle=False, n_samples=80) # split dataset into two folds that are not iid # first one contains data of all 4 blobs, second only from two. mask = np.ones(X.shape[0], dtype=np.bool) mask[np.where(y == 1)[0][::2]] = 0 mask[np.where(y == 2)[0][::2]] = 0 # this leads to perfect classification on one fold and a score of 1/3 on # the other svm = SVC(kernel='linear') # create "cv" for splits cv = [[mask, ~mask], [~mask, mask]] # once with iid=True (default) grid_search = GridSearchCV(svm, param_grid={'C': [1, 10]}, cv=cv) grid_search.fit(X, y) first = grid_search.grid_scores_[0] assert_equal(first.parameters['C'], 1) assert_array_almost_equal(first.cv_validation_scores, [1, 1. / 3.]) # for first split, 1/4 of dataset is in test, for second 3/4. # take weighted average assert_almost_equal(first.mean_validation_score, 1 * 1. / 4. + 1. / 3. * 3. / 4.) # once with iid=False grid_search = GridSearchCV(svm, param_grid={'C': [1, 10]}, cv=cv, iid=False) grid_search.fit(X, y) first = grid_search.grid_scores_[0] assert_equal(first.parameters['C'], 1) # scores are the same as above assert_array_almost_equal(first.cv_validation_scores, [1, 1. / 3.]) # averaged score is just mean of scores assert_almost_equal(first.mean_validation_score, np.mean(first.cv_validation_scores))
def test_gridsearch_no_predict(): # test grid-search with an estimator without predict. # slight duplication of a test from KDE def custom_scoring(estimator, X): return 42 if estimator.bandwidth == .1 else 0 X, _ = make_blobs(cluster_std=.1, random_state=1, centers=[[0, 1], [1, 0], [0, 0]]) search = GridSearchCV(KernelDensity(), param_grid=dict(bandwidth=[.01, .1, 1]), scoring=custom_scoring) search.fit(X) assert_equal(search.best_params_['bandwidth'], .1) assert_equal(search.best_score_, 42)
def test_grid_search_score_consistency(): # test that correct scores are used clf = LinearSVC(random_state=0) X, y = make_blobs(random_state=0, centers=2) Cs = [.1, 1, 10] for score in ['f1', 'roc_auc']: grid_search = GridSearchCV(clf, {'C': Cs}, scoring=score) grid_search.fit(X, y) cv = StratifiedKFold(n_folds=3) for C, scores in zip(Cs, grid_search.grid_scores_): clf.set_params(C=C) scores = scores[2] # get the separate runs from grid scores i = 0 for train, test in cv.split(X, y): clf.fit(X[train], y[train]) if score == "f1": correct_score = f1_score(y[test], clf.predict(X[test])) elif score == "roc_auc": dec = clf.decision_function(X[test]) correct_score = roc_auc_score(y[test], dec) assert_almost_equal(correct_score, scores[i]) i += 1
def test_grid_search_score_consistency(): # test that correct scores are used clf = LinearSVC(random_state=0) X, y = make_blobs(random_state=0, centers=2) Cs = [.1, 1, 10] for score in ['f1', 'roc_auc']: grid_search = GridSearchCV(clf, {'C': Cs}, scoring=score) grid_search.fit(X, y) cv = StratifiedKFold(n_folds=3, y=y) for C, scores in zip(Cs, grid_search.grid_scores_): clf.set_params(C=C) scores = scores[2] # get the separate runs from grid scores i = 0 for train, test in cv: clf.fit(X[train], y[train]) if score == "f1": correct_score = f1_score(y[test], clf.predict(X[test])) elif score == "roc_auc": dec = clf.decision_function(X[test]) correct_score = roc_auc_score(y[test], dec) assert_almost_equal(correct_score, scores[i]) i += 1
def test_lda_coefs(): # Test if the coefficients of the solvers are approximately the same. n_features = 2 n_classes = 2 n_samples = 1000 X, y = make_blobs(n_samples=n_samples, n_features=n_features, centers=n_classes, random_state=11) clf_lda_svd = LinearDiscriminantAnalysis(solver="svd") clf_lda_lsqr = LinearDiscriminantAnalysis(solver="lsqr") clf_lda_eigen = LinearDiscriminantAnalysis(solver="eigen") clf_lda_svd.fit(X, y) clf_lda_lsqr.fit(X, y) clf_lda_eigen.fit(X, y) assert_array_almost_equal(clf_lda_svd.coef_, clf_lda_lsqr.coef_, 1) assert_array_almost_equal(clf_lda_svd.coef_, clf_lda_eigen.coef_, 1) assert_array_almost_equal(clf_lda_eigen.coef_, clf_lda_lsqr.coef_, 1)
def test_lda_explained_variance_ratio(): # Test if the sum of the normalized eigen vectors values equals 1, # Also tests whether the explained_variance_ratio_ formed by the # eigen solver is the same as the explained_variance_ratio_ formed # by the svd solver n_features = 2 n_classes = 2 n_samples = 1000 X, y = make_blobs(n_samples=n_samples, n_features=n_features, centers=n_classes, random_state=11) clf_lda_eigen = LinearDiscriminantAnalysis(solver="eigen") clf_lda_eigen.fit(X, y) assert_almost_equal(clf_lda_eigen.explained_variance_ratio_.sum(), 1.0, 3) clf_lda_svd = LinearDiscriminantAnalysis(solver="svd") clf_lda_svd.fit(X, y) assert_almost_equal(clf_lda_svd.explained_variance_ratio_.sum(), 1.0, 3) assert_array_almost_equal(clf_lda_svd.explained_variance_ratio_, clf_lda_eigen.explained_variance_ratio_)
def test_covariance(): x, y = make_blobs(n_samples=100, n_features=5, centers=1, random_state=42) # make features correlated x = np.dot(x, np.arange(x.shape[1] ** 2).reshape(x.shape[1], x.shape[1])) c_e = _cov(x, 'empirical') assert_almost_equal(c_e, c_e.T) c_s = _cov(x, 'auto') assert_almost_equal(c_s, c_s.T)
def test_sag_pobj_matches_logistic_regression(): """tests if the sag pobj matches log reg""" n_samples = 100 alpha = 1.0 max_iter = 20 X, y = make_blobs(n_samples=n_samples, centers=2, random_state=0, cluster_std=0.1) clf1 = LogisticRegression(solver='sag', fit_intercept=False, tol=.0000001, C=1. / alpha / n_samples, max_iter=max_iter, random_state=10) clf2 = clone(clf1) clf3 = LogisticRegression(fit_intercept=False, tol=.0000001, C=1. / alpha / n_samples, max_iter=max_iter, random_state=10) clf1.fit(X, y) clf2.fit(sp.csr_matrix(X), y) clf3.fit(X, y) pobj1 = get_pobj(clf1.coef_, alpha, X, y, log_loss) pobj2 = get_pobj(clf2.coef_, alpha, X, y, log_loss) pobj3 = get_pobj(clf3.coef_, alpha, X, y, log_loss) assert_array_almost_equal(pobj1, pobj2, decimal=4) assert_array_almost_equal(pobj2, pobj3, decimal=4) assert_array_almost_equal(pobj3, pobj1, decimal=4)
def test_sag_classifier_computed_correctly(): """tests if the binary classifier is computed correctly""" alpha = .1 n_samples = 50 n_iter = 50 tol = .00001 fit_intercept = True X, y = make_blobs(n_samples=n_samples, centers=2, random_state=0, cluster_std=0.1) step_size = get_step_size(X, alpha, fit_intercept, classification=True) classes = np.unique(y) y_tmp = np.ones(n_samples) y_tmp[y != classes[1]] = -1 y = y_tmp clf1 = LogisticRegression(solver='sag', C=1. / alpha / n_samples, max_iter=n_iter, tol=tol, random_state=77, fit_intercept=fit_intercept) clf2 = clone(clf1) clf1.fit(X, y) clf2.fit(sp.csr_matrix(X), y) spweights, spintercept = sag_sparse(X, y, step_size, alpha, n_iter=n_iter, dloss=log_dloss, fit_intercept=fit_intercept) spweights2, spintercept2 = sag_sparse(X, y, step_size, alpha, n_iter=n_iter, dloss=log_dloss, sparse=True, fit_intercept=fit_intercept) assert_array_almost_equal(clf1.coef_.ravel(), spweights.ravel(), decimal=2) assert_almost_equal(clf1.intercept_, spintercept, decimal=1) assert_array_almost_equal(clf2.coef_.ravel(), spweights2.ravel(), decimal=2) assert_almost_equal(clf2.intercept_, spintercept2, decimal=1)
def test_thresholded_scorers(): # Test scorers that take thresholds. X, y = make_blobs(random_state=0, centers=2) X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0) clf = LogisticRegression(random_state=0) clf.fit(X_train, y_train) score1 = get_scorer('roc_auc')(clf, X_test, y_test) score2 = roc_auc_score(y_test, clf.decision_function(X_test)) score3 = roc_auc_score(y_test, clf.predict_proba(X_test)[:, 1]) assert_almost_equal(score1, score2) assert_almost_equal(score1, score3) logscore = get_scorer('log_loss')(clf, X_test, y_test) logloss = log_loss(y_test, clf.predict_proba(X_test)) assert_almost_equal(-logscore, logloss) # same for an estimator without decision_function clf = DecisionTreeClassifier() clf.fit(X_train, y_train) score1 = get_scorer('roc_auc')(clf, X_test, y_test) score2 = roc_auc_score(y_test, clf.predict_proba(X_test)[:, 1]) assert_almost_equal(score1, score2) # test with a regressor (no decision_function) reg = DecisionTreeRegressor() reg.fit(X_train, y_train) score1 = get_scorer('roc_auc')(reg, X_test, y_test) score2 = roc_auc_score(y_test, reg.predict(X_test)) assert_almost_equal(score1, score2) # Test that an exception is raised on more than two classes X, y = make_blobs(random_state=0, centers=3) X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0) clf.fit(X_train, y_train) assert_raises(ValueError, get_scorer('roc_auc'), clf, X_test, y_test)
def test_unsupervised_scorers(): # Test clustering scorers against gold standard labeling. # We don't have any real unsupervised Scorers yet. X, y = make_blobs(random_state=0, centers=2) X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0) km = KMeans(n_clusters=3) km.fit(X_train) score1 = get_scorer('adjusted_rand_score')(km, X_test, y_test) score2 = adjusted_rand_score(y_test, km.predict(X_test)) assert_almost_equal(score1, score2)
def test_raises_on_score_list(): # Test that when a list of scores is returned, we raise proper errors. X, y = make_blobs(random_state=0) f1_scorer_no_average = make_scorer(f1_score, average=None) clf = DecisionTreeClassifier() assert_raises(ValueError, cross_val_score, clf, X, y, scoring=f1_scorer_no_average) grid_search = GridSearchCV(clf, scoring=f1_scorer_no_average, param_grid={'max_depth': [1, 2]}) assert_raises(ValueError, grid_search.fit, X, y)
def test_class_weights(): # check that the class weights are updated # simple 3 cluster dataset X, y = make_blobs(random_state=1) for Model in [DPGMM, VBGMM]: dpgmm = Model(n_components=10, random_state=1, alpha=20, n_iter=50) dpgmm.fit(X) # get indices of components that are used: indices = np.unique(dpgmm.predict(X)) active = np.zeros(10, dtype=np.bool) active[indices] = True # used components are important assert_array_less(.1, dpgmm.weights_[active]) # others are not assert_array_less(dpgmm.weights_[~active], .05)
def test_verbose_boolean(): # checks that the output for the verbose output is the same # for the flag values '1' and 'True' # simple 3 cluster dataset X, y = make_blobs(random_state=1) for Model in [DPGMM, VBGMM]: dpgmm_bool = Model(n_components=10, random_state=1, alpha=20, n_iter=50, verbose=True) dpgmm_int = Model(n_components=10, random_state=1, alpha=20, n_iter=50, verbose=1) old_stdout = sys.stdout sys.stdout = StringIO() try: # generate output with the boolean flag dpgmm_bool.fit(X) verbose_output = sys.stdout verbose_output.seek(0) bool_output = verbose_output.readline() # generate output with the int flag dpgmm_int.fit(X) verbose_output = sys.stdout verbose_output.seek(0) int_output = verbose_output.readline() assert_equal(bool_output, int_output) finally: sys.stdout = old_stdout
def test_verbose_second_level(): # simple 3 cluster dataset X, y = make_blobs(random_state=1) for Model in [DPGMM, VBGMM]: dpgmm = Model(n_components=10, random_state=1, alpha=20, n_iter=50, verbose=2) old_stdout = sys.stdout sys.stdout = StringIO() try: dpgmm.fit(X) finally: sys.stdout = old_stdout
def test_optimization_minimizes_kl_divergence(): """t-SNE should give a lower KL divergence with more iterations.""" random_state = check_random_state(0) X, _ = make_blobs(n_features=3, random_state=random_state) kl_divergences = [] for n_iter in [200, 250, 300]: tsne = TSNE(n_components=2, perplexity=10, learning_rate=100.0, n_iter=n_iter, random_state=0) tsne.fit_transform(X) kl_divergences.append(tsne.kl_divergence_) assert_less_equal(kl_divergences[1], kl_divergences[0]) assert_less_equal(kl_divergences[2], kl_divergences[1])
def test_pipeline(): # check that LocallyLinearEmbedding works fine as a Pipeline # only checks that no error is raised. # TODO check that it actually does something useful from sklearn import pipeline, datasets X, y = datasets.make_blobs(random_state=0) clf = pipeline.Pipeline( [('filter', manifold.LocallyLinearEmbedding(random_state=0)), ('clf', neighbors.KNeighborsClassifier())]) clf.fit(X, y) assert_less(.9, clf.score(X, y)) # Test the error raised when the weight matrix is singular
def test_pipeline(): # check that Isomap works fine as a transformer in a Pipeline # only checks that no error is raised. # TODO check that it actually does something useful X, y = datasets.make_blobs(random_state=0) clf = pipeline.Pipeline( [('isomap', manifold.Isomap()), ('clf', neighbors.KNeighborsClassifier())]) clf.fit(X, y) assert_less(.9, clf.score(X, y))
def check_transformer_general(name, Transformer): X, y = make_blobs(n_samples=30, centers=[[0, 0, 0], [1, 1, 1]], random_state=0, n_features=2, cluster_std=0.1) X = StandardScaler().fit_transform(X) X -= X.min() _check_transformer(name, Transformer, X, y) _check_transformer(name, Transformer, X.tolist(), y.tolist())
def check_pipeline_consistency(name, Estimator): if name in ('CCA', 'LocallyLinearEmbedding', 'KernelPCA') and _is_32bit(): # Those transformers yield non-deterministic output when executed on # a 32bit Python. The same transformers are stable on 64bit Python. # FIXME: try to isolate a minimalistic reproduction case only depending # scipy and/or maybe generate a test dataset that does not # cause such unstable behaviors. msg = name + ' is non deterministic on 32bit Python' raise SkipTest(msg) # check that make_pipeline(est) gives same score as est X, y = make_blobs(n_samples=30, centers=[[0, 0, 0], [1, 1, 1]], random_state=0, n_features=2, cluster_std=0.1) X -= X.min() y = multioutput_estimator_convert_y_2d(name, y) estimator = Estimator() set_testing_parameters(estimator) set_random_state(estimator) pipeline = make_pipeline(estimator) estimator.fit(X, y) pipeline.fit(X, y) if name in DEPRECATED_TRANSFORM: funcs = ["score"] else: funcs = ["score", "fit_transform"] for func_name in funcs: func = getattr(estimator, func_name, None) if func is not None: func_pipeline = getattr(pipeline, func_name) result = func(X, y) result_pipe = func_pipeline(X, y) assert_array_almost_equal(result, result_pipe)
def check_estimators_pickle(name, Estimator): """Test that we can pickle all estimators""" if name in DEPRECATED_TRANSFORM: check_methods = ["predict", "decision_function", "predict_proba"] else: check_methods = ["predict", "transform", "decision_function", "predict_proba"] X, y = make_blobs(n_samples=30, centers=[[0, 0, 0], [1, 1, 1]], random_state=0, n_features=2, cluster_std=0.1) # some estimators can't do features less than 0 X -= X.min() # some estimators only take multioutputs y = multioutput_estimator_convert_y_2d(name, y) # catch deprecation warnings with warnings.catch_warnings(record=True): estimator = Estimator() set_random_state(estimator) set_testing_parameters(estimator) estimator.fit(X, y) result = dict() for method in check_methods: if hasattr(estimator, method): result[method] = getattr(estimator, method)(X) # pickle and unpickle! pickled_estimator = pickle.dumps(estimator) unpickled_estimator = pickle.loads(pickled_estimator) for method in result: unpickled_result = getattr(unpickled_estimator, method)(X) assert_array_almost_equal(result[method], unpickled_result)
def check_estimators_partial_fit_n_features(name, Alg): # check if number of features changes between calls to partial_fit. if not hasattr(Alg, 'partial_fit'): return X, y = make_blobs(n_samples=50, random_state=1) X -= X.min() with warnings.catch_warnings(record=True): alg = Alg() if not hasattr(alg, 'partial_fit'): # check again as for mlp this depends on algorithm return set_testing_parameters(alg) try: if isinstance(alg, ClassifierMixin): classes = np.unique(y) alg.partial_fit(X, y, classes=classes) else: alg.partial_fit(X, y) except NotImplementedError: return assert_raises(ValueError, alg.partial_fit, X[:, :-1], y)