我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用sklearn.svm.OneClassSVM()。
def optimize_training_parameters(self, n): # data from_timestamp = self.min_timestamp to_timestamp = self.min_timestamp + datetime.timedelta(days=365) + datetime.timedelta(hours=1) train_timestamps, train_values = self.load_monitor_data(from_timestamp, to_timestamp, "1") train_data = np.array(train_values)[:, 0:5] # parameters nu = np.linspace(start=1e-5, stop=1e-2, num=n) gamma = np.linspace(start=1e-6, stop=1e-3, num=n) opt_diff = 1.0 opt_nu = None opt_gamma = None fw = open("training_param.csv", "w") fw.write("nu,gamma,diff\n") for i in range(len(nu)): for j in range(len(gamma)): classifier = svm.OneClassSVM(kernel="rbf", nu=nu[i], gamma=gamma[j]) classifier.fit(train_data) label = classifier.predict(train_data) p = 1 - float(sum(label == 1.0)) / len(label) diff = math.fabs(p-nu[i]) if diff < opt_diff: opt_diff = diff opt_nu = nu[i] opt_gamma = gamma[j] fw.write(",".join([str(nu[i]), str(gamma[j]), str(diff)]) + "\n") fw.close() return opt_nu, opt_gamma
def test_oneclass_decision_function(): # Test OneClassSVM decision function clf = svm.OneClassSVM() rnd = check_random_state(2) # Generate train data X = 0.3 * rnd.randn(100, 2) X_train = np.r_[X + 2, X - 2] # Generate some regular novel observations X = 0.3 * rnd.randn(20, 2) X_test = np.r_[X + 2, X - 2] # Generate some abnormal novel observations X_outliers = rnd.uniform(low=-4, high=4, size=(20, 2)) # fit the model clf = svm.OneClassSVM(nu=0.1, kernel="rbf", gamma=0.1) clf.fit(X_train) # predict things y_pred_test = clf.predict(X_test) assert_greater(np.mean(y_pred_test == 1), .9) y_pred_outliers = clf.predict(X_outliers) assert_greater(np.mean(y_pred_outliers == -1), .9) dec_func_test = clf.decision_function(X_test) assert_array_equal((dec_func_test > 0).ravel(), y_pred_test == 1) dec_func_outliers = clf.decision_function(X_outliers) assert_array_equal((dec_func_outliers > 0).ravel(), y_pred_outliers == 1)
def detect_outlier(data_train, measurement): """ Detect whether the input measurement is outlier or not. :param data_train: data for training the one class SVM model :param measurement: one row from the chill_untested.csv :return: predicted label for input measurement """ classifier = svm.OneClassSVM(kernel="rbf", nu=0.005, gamma=0.00001) classifier.fit(data_train) label = classifier.predict(measurement)[0] return label
def fit(self, X): clf = svm.OneClassSVM(nu=0.5, kernel="rbf", gamma=0.9) clf.fit(X) self.clf = clf
def __init__(self, param_dict={}): self.param_dict = param_dict print self.__class__.__name__, self.param_dict self.cls = OneClassSVM(**param_dict)
def learn_structure(self, samples): X_train, X_test = self._generate_train_test_sets(samples, 0.75) logger.info("Training with " + str(len(X_train)) + "samples; testing with " + str(len(X_test)) + " samples.") svm_detector = svm.OneClassSVM(nu=0.95 * OUTLIERS_FRACTION + 0.05, kernel="rbf", gamma=0.1) svm_detector.fit(X_train) Y_test = svm_detector.predict(X_test) num_anomalies = Y_test[Y_test == -1].size logger.info("Found " + str(num_anomalies) + " anomalies in testing set") return svm_detector
def test_learn_structure(self): data = self.get_testing_data() clf = self.svm.learn_structure(data) self.assertIsInstance(clf, svm.OneClassSVM)
def fit(self): global isFitted isFitted = True print "fit the model" train = np.array(self.model.data) X = train[:, 0:2] y = train[:, 2] lam = float(self.complexity.get()) gamma = float(self.gamma.get()) coef0 = float(self.coef0.get()) degree = int(self.degree.get()) kernel_map = {0: "linear", 1: "rbf", 2: "poly"} #if len(np.unique(y)) == 1: # clf = svm.OneClassSVM(kernel=kernel_map[self.kernel.get()], # gamma=gamma, coef0=coef0, degree=degree) # clf.fit(X) #else: #mysvm = svm.SVC(kernel=kernel_map[self.kernel.get()], C=1000, # gamma=gamma, coef0=coef0, degree=degree) #mysvm.fit(X, y) #l = 0.1; clf = komd.KOMD(lam=lam, Kf=kernel_map[self.kernel.get()], rbf_gamma=gamma, poly_deg=degree, poly_coeff=coef0) clf.fit(X,y) #print clf.gamma #global gamma, bias #gamma = clf.gamma #bias = clf.bias if hasattr(clf, 'score'): print "Accuracy:", clf.score(X, y) * 100 X1, X2, Z = self.decision_surface(clf) self.model.clf = clf #self.model.svm = mysvm self.clf = clf #self.mysvm = mysvm self.model.set_surface((X1, X2, Z)) self.model.surface_type = self.surface_type.get() self.fitted = True self.model.changed("surface")
def fit(self): print("fit the model") train = np.array(self.model.data) X = train[:, 0:2] y = train[:, 2] C = float(self.complexity.get()) gamma = float(self.gamma.get()) coef0 = float(self.coef0.get()) degree = int(self.degree.get()) kernel_map = {0: "linear", 1: "rbf", 2: "poly"} if len(np.unique(y)) == 1: clf = svm.OneClassSVM(kernel=kernel_map[self.kernel.get()], gamma=gamma, coef0=coef0, degree=degree) clf.fit(X) else: clf = svm.SVC(kernel=kernel_map[self.kernel.get()], C=C, gamma=gamma, coef0=coef0, degree=degree) clf.fit(X, y) if hasattr(clf, 'score'): print("Accuracy:", clf.score(X, y) * 100) X1, X2, Z = self.decision_surface(clf) self.model.clf = clf self.model.set_surface((X1, X2, Z)) self.model.surface_type = self.surface_type.get() self.fitted = True self.model.changed("surface")
def test_oneclass(): # Test OneClassSVM clf = svm.OneClassSVM() clf.fit(X) pred = clf.predict(T) assert_array_almost_equal(pred, [-1, -1, -1]) assert_array_almost_equal(clf.intercept_, [-1.008], decimal=3) assert_array_almost_equal(clf.dual_coef_, [[0.632, 0.233, 0.633, 0.234, 0.632, 0.633]], decimal=3) assert_raises(ValueError, lambda: clf.coef_)
def test_immutable_coef_property(): # Check that primal coef modification are not silently ignored svms = [ svm.SVC(kernel='linear').fit(iris.data, iris.target), svm.NuSVC(kernel='linear').fit(iris.data, iris.target), svm.SVR(kernel='linear').fit(iris.data, iris.target), svm.NuSVR(kernel='linear').fit(iris.data, iris.target), svm.OneClassSVM(kernel='linear').fit(iris.data), ] for clf in svms: assert_raises(AttributeError, clf.__setattr__, 'coef_', np.arange(3)) assert_raises((RuntimeError, ValueError), clf.coef_.__setitem__, (0, 0), 0)
def check_svm_model_equal(dense_svm, sparse_svm, X_train, y_train, X_test): dense_svm.fit(X_train.toarray(), y_train) if sparse.isspmatrix(X_test): X_test_dense = X_test.toarray() else: X_test_dense = X_test sparse_svm.fit(X_train, y_train) assert_true(sparse.issparse(sparse_svm.support_vectors_)) assert_true(sparse.issparse(sparse_svm.dual_coef_)) assert_array_almost_equal(dense_svm.support_vectors_, sparse_svm.support_vectors_.toarray()) assert_array_almost_equal(dense_svm.dual_coef_, sparse_svm.dual_coef_.toarray()) if dense_svm.kernel == "linear": assert_true(sparse.issparse(sparse_svm.coef_)) assert_array_almost_equal(dense_svm.coef_, sparse_svm.coef_.toarray()) assert_array_almost_equal(dense_svm.support_, sparse_svm.support_) assert_array_almost_equal(dense_svm.predict(X_test_dense), sparse_svm.predict(X_test)) assert_array_almost_equal(dense_svm.decision_function(X_test_dense), sparse_svm.decision_function(X_test)) assert_array_almost_equal(dense_svm.decision_function(X_test_dense), sparse_svm.decision_function(X_test_dense)) if isinstance(dense_svm, svm.OneClassSVM): msg = "cannot use sparse input in 'OneClassSVM' trained on dense data" else: assert_array_almost_equal(dense_svm.predict_proba(X_test_dense), sparse_svm.predict_proba(X_test), 4) msg = "cannot use sparse input in 'SVC' trained on dense data" if sparse.isspmatrix(X_test): assert_raise_message(ValueError, msg, dense_svm.predict, X_test)
def predict(self, nu, gamma): # classifier classifier = svm.OneClassSVM(kernel="rbf", nu=nu, gamma=gamma) # data for test from_timestamp = self.min_timestamp + datetime.timedelta(days=365) to_timestamp = self.max_timestamp test_timestamps, test_values = self.load_monitor_data(from_timestamp, to_timestamp, "nan") test_data = np.array(test_values)[:, 0:5] # data for train to_timestamp = self.min_timestamp + datetime.timedelta(days=365) + datetime.timedelta(hours=1) train_timestamps, train_values = self.load_monitor_data(self.min_timestamp, to_timestamp, "1") for i in range(len(test_timestamps)): # predict train_data = np.array(train_values)[:, 0:5] classifier.fit(train_data) label = classifier.predict(test_data[i])[0] test_values[i][5] = int(label) if label == 1: test_values[i][6] = 0.0 train_values.append(test_values[i]) else: test_values[i][6] = 1.0 print test_timestamps[i], label, test_values[i] # write result into monitor file fr = open(self.monitor_file, "r") header = fr.readline() lines = fr.readlines() fr.close() fw = open(self.monitor_file, "w") # update monitor file fw.write(header) for line in lines: timestamp = datetime.datetime.strptime(line.strip().split(",")[0], "%Y-%m-%d %H:%M:%S") if timestamp in test_timestamps: idx = test_timestamps.index(timestamp) value = test_values[idx] timestamp = str(timestamp) temperature = str(value[0]) ph = str(value[1]) conductivity = str(value[2]) orp = str(value[3]) do = str(value[4]) label = str(int(value[5])) outlier_prob = str(value[6]) event_prob = str(value[7]) m = [timestamp, temperature, ph, conductivity, orp, do, label, outlier_prob, event_prob] fw.write(",".join(m) + "\n") else: fw.write(line) fw.close()
def __init__(self, num_class=2): """ :type num_classes: int :rtype: None """ self.__ctrl__ = None self.__case__ = None with open('../../.dbname', 'r') as f: self.__DB_NAME__ = json.load(f)['dbname'] self.__MG_DOCS_COLL__ = 'raw-docs' # raw docs self.__MG_SENTS_COLL__ = 'bag-of-sents' # raw sentences self.__MG_TOKENS_COLL__ = 'sample-tokens' # clean tokens (words) self.__PG_STATS_TBL__ = 'stats' # stylometric features self.__PG_RESULTS_TBL__ = 'results_' + \ str(num_class) + \ 'class' # cross val results self.__PG_PROBAS_TBL__ = 'probabilities' # cross val probabilities self.__model__ = Pipeline([ \ # ('scaler2', StandardScaler()), # ('scaler', MinMaxScaler()), # ('scaler3', Normalizer()), ('classifier', SVC(probability=True, kernel='poly', degree=2, class_weight='balanced') \ if num_class-1 \ else OneClassSVM(kernel='rbf', nu=0.7, gamma=1./250)) ]) print 'Instantiated classifier %s.' % \ self.__model__.named_steps['classifier'].__class__.__name__ self.__io__ = DBIO(MG_DB_NAME=self.__DB_NAME__, PG_DB_NAME=self.__DB_NAME__) self.__tagger__ = None # initialise if re-creating samples self.__bootstrap__ = None # initialise in fit
def fit(self, author1, author2, wts1=None, wts2=None, bootstrap=False, verbose=False): """ :type author1: str :type author2: str :type wts1: str/List[str] :type wts2: str/List[str] :type verbose:bool :rtype: bool : : Prepares databases and tables/collections. : """ self.__bootstrap__ = bootstrap cases = [] for i, (author, wts) in enumerate([(author1, wts1), (author2, wts2)]): if not wts: wts = [wt.encode('ascii') \ for wt in self.__io__.mg_distinct(self.__MG_DOCS_COLL__, 'type', { 'author':author } )] if not isinstance(wts, list): wts = [wts] cases += (author, wts, (1,-1)[i]), # use 1, -1 to match output # from sklearn's OneClassSVM self.__ctrl__ = cases[0] # assign label 1 in y vector self.__case__ = cases[1] # assign be label 0 in y vector self.__MG_TOKENS_COLL__ += '-' + cases[0][0] + \ '-' + cases[1][0] + \ '-' + \ ''.join(wt[:3] for wt in cases[0][1]) + \ '-' + \ ''.join(wt[:3] for wt in cases[1][1]) + \ '-' + \ ('nobs','bs')[bootstrap] self.__PG_STATS_TBL__ += '_' + cases[0][0] + \ '_' + cases[1][0] + \ '_' + \ ''.join(wt[:3] for wt in cases[0][1]) + \ '_' + \ ''.join(wt[:3] for wt in cases[1][1]) + \ '_' + \ ('nobs','bs')[bootstrap] if verbose: print 'Control:', self.__ctrl__ print 'Case: ', self.__case__ print 'Saving tokens to', self.__MG_TOKENS_COLL__ print 'Saving stats to', self.__PG_STATS_TBL__ return self.__prep_sents__(verbose=verbose) # err in preparing sentences