我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用pyspark.sql.SQLContext()。
def generate_code(self): code = dedent(u""" from pyspark.sql import SQLContext # Input data sql_context = SQLContext(spark_session.sparkContext) if {in1} is not None: sql_context.registerDataFrameAsTable({in1}, 'ds1') if {in2} is not None: sql_context.registerDataFrameAsTable({in2}, 'ds2') query = {query} {out} = sql_context.sql(query) names = {names} if names is not None and len(names) > 0: old_names = {out}.schema.names if len(old_names) != len(names): raise ValueError('{invalid_names}') rename = [functions.col(pair[0]).alias(pair[1]) for pair in zip(old_names, names)] {out} = {out}.select(*rename) """.format(in1=self.input1, in2=self.input2, query=repr(self.query), out=self.output, names=repr(self.names), invalid_names=_('Invalid names. Number of attributes in ' 'result differs from names informed.'))) return code
def test_fit_maximize_metric(self): sqlContext = SQLContext(self.sc) dataset = sqlContext.createDataFrame([ (10, 10.0), (50, 50.0), (100, 100.0), (500, 500.0)] * 10, ["feature", "label"]) iee = InducedErrorEstimator() evaluator = RegressionEvaluator(metricName="r2") grid = (ParamGridBuilder() .addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) .build()) cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator) cvModel = cv.fit(dataset) bestModel = cvModel.bestModel bestModelMetric = evaluator.evaluate(bestModel.transform(dataset)) self.assertEqual(0.0, bestModel.getOrDefault('inducedError'), "Best model should have zero induced error") self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
def run(self): self.args = self.parse_arguments() conf = SparkConf().setAll(( ("spark.task.maxFailures", "10"), ("spark.locality.wait", "20s"), ("spark.serializer", "org.apache.spark.serializer.KryoSerializer"), )) sc = SparkContext( appName=self.name, conf=conf) sqlc = SQLContext(sparkContext=sc) self.records_processed = sc.accumulator(0) self.warc_input_processed = sc.accumulator(0) self.warc_input_failed = sc.accumulator(0) self.run_job(sc, sqlc) sc.stop()
def test_fit_minimize_metric(self): sqlContext = SQLContext(self.sc) dataset = sqlContext.createDataFrame([ (10, 10.0), (50, 50.0), (100, 100.0), (500, 500.0)] * 10, ["feature", "label"]) iee = InducedErrorEstimator() evaluator = RegressionEvaluator(metricName="rmse") grid = (ParamGridBuilder() .addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) .build()) cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator) cvModel = cv.fit(dataset) bestModel = cvModel.bestModel bestModelMetric = evaluator.evaluate(bestModel.transform(dataset)) self.assertEqual(0.0, bestModel.getOrDefault('inducedError'), "Best model should have zero induced error") self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")
def test_infer_schema(self): sqlCtx = SQLContext(self.sc) rdd = self.sc.parallelize([LabeledPoint(1.0, self.dv1), LabeledPoint(0.0, self.sv1)]) df = rdd.toDF() schema = df.schema field = [f for f in schema.fields if f.name == "features"][0] self.assertEqual(field.dataType, self.udt) vectors = df.map(lambda p: p.features).collect() self.assertEqual(len(vectors), 2) for v in vectors: if isinstance(v, SparseVector): self.assertEqual(v, self.sv1) elif isinstance(v, DenseVector): self.assertEqual(v, self.dv1) else: raise TypeError("expecting a vector but got %r of type %r" % (v, type(v)))
def main(): spark = SparkSession \ .builder \ .appName("RandomForest") \ .config("spark.executor.heartbeatInterval","60s")\ .getOrCreate() sc = spark.sparkContext sqlContext = SQLContext(sc) sc.setLogLevel("INFO") train_df = spark.read.parquet(sys.argv[1]) #Persist the data in memory and disk train_df.persist(StorageLevel(True, True, False, False, 1)) rfc = RandomForestClassifier(maxDepth=8, maxBins=2400000, numTrees=128,impurity="gini") rfc_model = rfc.fit(train_df) rfc_model.save(sys.argv[2] + "rfc_model")
def getOrCreateSC(cls,type="sparkContext", master=None, name=None): from pyspark.sql import SQLContext from pyspark.sql import SparkSession ss = SparkSession.builder if name: ss.appName(name) if master: ss.master(master) sparkSession = ss.getOrCreate() sc = sparkSession.sparkContext sqlContext = SQLContext(sc) if type=="SparkSessionBuilder": return sc elif type=="sparkContext": return sc elif type=="sparkSession": return ss elif type=="sqlContext": return sqlContext else: raise ValueError("Unknown type.")
def getOrCreateSC_old(cls,type="sparkContext", master=None, name=None): from pyspark.sql import SQLContext from pyspark.sql import SparkSession ss = SparkSession.builder if name: ss.appName(name) if master: ss.master(master) sparkSession = ss.getOrCreate() sc = sparkSession.sparkContext sqlContext = SQLContext(sc) if type=="SparkSessionBuilder": return sc elif type=="sparkContext": return sc elif type=="sparkSession": return ss elif type=="sqlContext": return sqlContext else: raise ValueError("Unknown type.")
def getUserReadNews(self, user_id): sqlContext = SQLContext(self.sc) # load records df_r = sqlContext.read.format('jdbc').options( url="jdbc:mysql://localhost/RS_News?user=root&password=10081008", dbtable="app_userrecords").load() records = df_r.filter(df_r["user_id"] == user_id) records_list = [i.news_id for i in records.collect()] # # load read news df_news = sqlContext.read.format('jdbc').options( url="jdbc:mysql://localhost/RS_News?user=root&password=10081008", dbtable="app_caixinnews").load() user_news_df = df_news.filter(df_news['news_id'].isin(records_list)) user_news = [preprocess_per_news(i.content) for i in user_news_df.collect()] user_topics = [i.topic for i in user_news_df.collect()] candidates_df = df_news.filter(df_news['topic'].isin(user_topics)) candidates = [preprocess_per_news(i.content) for i in candidates_df.collect()] candidates_newsid = [i.news_id for i in candidates_df.collect()] return user_news, candidates, candidates_newsid
def setUp(self): super(SetAggregatedMetricNameTest, self).setUp() self.sql_context = SQLContext(self.spark_context)
def setUp(self): super(UsageComponentTest, self).setUp() self.sql_context = SQLContext(self.spark_context)
def registerTempTable(self, name): """Registers this RDD as a temporary table using the given name. The lifetime of this temporary table is tied to the :class:`SQLContext` that was used to create this :class:`DataFrame`. >>> df.registerTempTable("people") >>> df2 = spark.sql("select * from people") >>> sorted(df.collect()) == sorted(df2.collect()) True >>> spark.catalog.dropTempView("people") .. note:: Deprecated in 2.0, use createOrReplaceTempView instead. """ self._jdf.createOrReplaceTempView(name)
def _test(): import doctest from pyspark.context import SparkContext from pyspark.sql import Row, SQLContext, SparkSession import pyspark.sql.dataframe from pyspark.sql.functions import from_unixtime globs = pyspark.sql.dataframe.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') globs['sc'] = sc globs['sqlContext'] = SQLContext(sc) globs['spark'] = SparkSession(sc) globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')])\ .toDF(StructType([StructField('age', IntegerType()), StructField('name', StringType())])) globs['df2'] = sc.parallelize([Row(name='Tom', height=80), Row(name='Bob', height=85)]).toDF() globs['df3'] = sc.parallelize([Row(name='Alice', age=2), Row(name='Bob', age=5)]).toDF() globs['df4'] = sc.parallelize([Row(name='Alice', age=10, height=80), Row(name='Bob', age=5, height=None), Row(name='Tom', age=None, height=None), Row(name=None, age=None, height=None)]).toDF() globs['sdf'] = sc.parallelize([Row(name='Tom', time=1479441846), Row(name='Bob', time=1479442946)]).toDF() (failure_count, test_count) = doctest.testmod( pyspark.sql.dataframe, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF) globs['sc'].stop() if failure_count: exit(-1)
def _test(): import doctest import pyspark.mllib.recommendation from pyspark.sql import SQLContext globs = pyspark.mllib.recommendation.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') globs['sc'] = sc globs['sqlContext'] = SQLContext(sc) (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1)
def __init__(self, uri, *args, **kwargs): super(ParquetDAL, self).__init__() self._tables = ParquetPool() self._uri = uri self._context = SQLContext(*args, **kwargs)
def setup_env(cls): cls.sc = SparkContext('local[*]', cls.__name__) cls.sql = SQLContext(cls.sc) cls.session = SparkSession.builder.getOrCreate()
def get_sqlcontext_instance(spark_context): """ :type spark_context: pyspark.SparkContext :param spark_context: The currently active Spark Context :return: Returns the SQLContext :rtype: sql.SQLContext """ if 'sqlContextSingletonInstance' not in globals(): globals()['sqlContextSingletonInstance'] = sql.SQLContext( spark_context) return globals()['sqlContextSingletonInstance']
def setUp(self): self.sc = SparkContext() self.sqlContext = SQLContext(self.sc)
def test_idf(self): sqlContext = SQLContext(self.sc) dataset = sqlContext.createDataFrame([ (DenseVector([1.0, 2.0]),), (DenseVector([0.0, 1.0]),), (DenseVector([3.0, 0.2]),)], ["tf"]) idf0 = IDF(inputCol="tf") self.assertListEqual(idf0.params, [idf0.inputCol, idf0.minDocFreq, idf0.outputCol]) idf0m = idf0.fit(dataset, {idf0.outputCol: "idf"}) self.assertEqual(idf0m.uid, idf0.uid, "Model should inherit the UID from its parent estimator.") output = idf0m.transform(dataset) self.assertIsNotNone(output.head().idf)
def test_ngram(self): sqlContext = SQLContext(self.sc) dataset = sqlContext.createDataFrame([ Row(input=["a", "b", "c", "d", "e"])]) ngram0 = NGram(n=4, inputCol="input", outputCol="output") self.assertEqual(ngram0.getN(), 4) self.assertEqual(ngram0.getInputCol(), "input") self.assertEqual(ngram0.getOutputCol(), "output") transformedDF = ngram0.transform(dataset) self.assertEqual(transformedDF.head().output, ["a b c d", "b c d e"])
def test_stopwordsremover(self): sqlContext = SQLContext(self.sc) dataset = sqlContext.createDataFrame([Row(input=["a", "panda"])]) stopWordRemover = StopWordsRemover(inputCol="input", outputCol="output") # Default self.assertEqual(stopWordRemover.getInputCol(), "input") transformedDF = stopWordRemover.transform(dataset) self.assertEqual(transformedDF.head().output, ["panda"]) # Custom stopwords = ["panda"] stopWordRemover.setStopWords(stopwords) self.assertEqual(stopWordRemover.getInputCol(), "input") self.assertEqual(stopWordRemover.getStopWords(), stopwords) transformedDF = stopWordRemover.transform(dataset) self.assertEqual(transformedDF.head().output, ["a"])
def _test(): import doctest from pyspark.context import SparkContext from pyspark.sql import Row, SQLContext import pyspark.sql.group globs = pyspark.sql.group.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') globs['sc'] = sc globs['sqlContext'] = SQLContext(sc) globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')]) \ .toDF(StructType([StructField('age', IntegerType()), StructField('name', StringType())])) globs['df3'] = sc.parallelize([Row(name='Alice', age=2, height=80), Row(name='Bob', age=5, height=85)]).toDF() globs['df4'] = sc.parallelize([Row(course="dotNET", year=2012, earnings=10000), Row(course="Java", year=2012, earnings=20000), Row(course="dotNET", year=2012, earnings=5000), Row(course="dotNET", year=2013, earnings=48000), Row(course="Java", year=2013, earnings=30000)]).toDF() (failure_count, test_count) = doctest.testmod( pyspark.sql.group, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF) globs['sc'].stop() if failure_count: exit(-1)
def registerTempTable(self, name): """Registers this RDD as a temporary table using the given name. The lifetime of this temporary table is tied to the :class:`SQLContext` that was used to create this :class:`DataFrame`. >>> df.registerTempTable("people") >>> df2 = sqlContext.sql("select * from people") >>> sorted(df.collect()) == sorted(df2.collect()) True """ self._jdf.registerTempTable(name)
def _test(): import doctest from pyspark.context import SparkContext from pyspark.sql import Row, SQLContext import pyspark.sql.dataframe globs = pyspark.sql.dataframe.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') globs['sc'] = sc globs['sqlContext'] = SQLContext(sc) globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')])\ .toDF(StructType([StructField('age', IntegerType()), StructField('name', StringType())])) globs['df2'] = sc.parallelize([Row(name='Tom', height=80), Row(name='Bob', height=85)]).toDF() globs['df3'] = sc.parallelize([Row(name='Alice', age=2), Row(name='Bob', age=5)]).toDF() globs['df4'] = sc.parallelize([Row(name='Alice', age=10, height=80), Row(name='Bob', age=5, height=None), Row(name='Tom', age=None, height=None), Row(name=None, age=None, height=None)]).toDF() (failure_count, test_count) = doctest.testmod( pyspark.sql.dataframe, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF) globs['sc'].stop() if failure_count: exit(-1)
def test_infer_schema(self): sqlCtx = SQLContext(self.sc) rdd = self.sc.parallelize([("dense", self.dm1), ("sparse", self.sm1)]) df = rdd.toDF() schema = df.schema self.assertTrue(schema.fields[1].dataType, self.udt) matrices = df.map(lambda x: x._2).collect() self.assertEqual(len(matrices), 2) for m in matrices: if isinstance(m, DenseMatrix): self.assertTrue(m, self.dm1) elif isinstance(m, SparseMatrix): self.assertTrue(m, self.sm1) else: raise ValueError("Expected a matrix but got type %r" % type(m))
def __init__(self): conf = SparkConf().setAppName("ntu-speech").setMaster("local") self.sc = SparkContext(conf=conf) self.sqlCtx = SQLContext(self.sc)
def main(): spark = SparkSession \ .builder \ .appName("RandomForest") \ .config("spark.executor.heartbeatInterval","60s")\ .getOrCreate() sc = spark.sparkContext sqlContext = SQLContext(sc) sc.setLogLevel("INFO") # Loading the test data df_test= spark.read.parquet(sys.argv[1]) df_test, df_discard = df_test.randomSplit([0.2, 0.8]) # Load the model rf_model=RandomForestClassificationModel.load(sys.argv[2]) # Make the predictions predictions = rf_model.transform(df_test) #predictionsRDD=predictions.rdd #predictionsRDD.saveAsTextFile(sys.argv[3]+"output.text") evaluator_acc = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy") accuracy = evaluator_acc.evaluate(predictions) print "accuracy *******************" print accuracy evaluator_pre = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="weightedPrecision") print "precision *******************" print evaluator_pre.evaluate(predictions) print "recall **********************" print MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="weightedRecall").evaluate(predictions)
def setUp(self): self.df = mock.Mock(spec=DataFrame) self.df.sql_ctx = mock.Mock(spec=SQLContext) self.df.sql_ctx.sparkSession = mock.Mock(spec=SparklySession) self.write_ext = SparklyWriter(self.df)
def getSqlContextInstance(sparkContext): """Lazily instantiated global instance of SQLContext Below from https://spark.apache.org/docs/1.5.2/streaming-programming-guide.html#dataframe-and-sql-operations.""" if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext) return globals()['sqlContextSingletonInstance']
def run(self): sc = SparkContext() sqlContext = SQLContext(sc) #sqlContext = HiveContext(sc) start_scrape = datetime.now() begin, begin_parts = scrape.get_boundary(self.begin) end, end_parts = scrape.get_boundary(self.end) print "here" all_years_months_days = self.getYearsMonths() print "all_years=", all_years_months_days game_ids = scrape.get_games(all_years_months_days, source=scrape.filesystem_scraper) print "games=", game_ids gamesRDD = sc.parallelize(game_ids) gamesRDD.cache() print "fileRDD=", gamesRDD print "# parttions:", gamesRDD.getNumPartitions() print "count=", gamesRDD.count() # create RDDs self.createRawParquet(sc, sqlContext, gamesRDD) # Hitter Stats batter_games = self.createHitterStats(sqlContext) # create Pitcher Stats self.createPitcherStats(sqlContext) print "STOPPING" sc.stop()
def get_distinct_dataframe(self, data_frame, table_name, columns): """ get distinct table columns :param table_name: name of table you want to get data :param query_str: sql strings :return: query result as json Object """ try: sc = self.spark_session_create("get_distinct_dataframe") tfmsa_logger("start find distinct column !") hdfs_path = settings.HDFS_DF_ROOT + "/" + data_frame + "/" + table_name query_str = "select * from " + table_name sqlContext = SQLContext(sc) df = sqlContext.read.load(hdfs_path, "parquet") df.registerTempTable(table_name) result = sqlContext.sql(str(query_str)) return_data = {} for column in columns: return_data[column.encode("UTF8")] = result.select(column).map( lambda x: str(x[0]).encode("UTF8")).distinct().collect() tfmsa_logger("End find distinct column !") return return_data except Exception as e: tfmsa_logger(e) raise Exception(e) finally: df.unpersist() sqlContext.clearCache() sqlContext.dropTempTable(table_name) sc.clearFiles() sc.stop() tfmsa_logger("stop context")
def query_data(self, data_frame, table_name, query_str, limit_cnt=0): """ get query data from spark :param table_name: name of table you want to get data :param query_str: sql strings :return: query result as json Object """ try: sc = self.spark_session_create("query_data") tfmsa_logger("start query data !") hdfs_path = settings.HDFS_DF_ROOT + "/" + data_frame + "/" + table_name sqlContext = SQLContext(sc) df = sqlContext.read.load(hdfs_path, "parquet") df.registerTempTable(table_name) if (limit_cnt == 0): result = sqlContext.sql(str(query_str)).collect() else: result = sqlContext.sql(str(query_str)).limit(limit_cnt).collect() return result except Exception as e: tfmsa_logger(e) raise Exception(e) finally: df.unpersist() sqlContext.clearCache() sqlContext.dropTempTable(table_name) sc.clearFiles() sc.stop() tfmsa_logger("stop context")
def post_json_data(self, data_frame, table_name, json_data): """ create table with json data :param table_name: name of table want to create :param json_data: json form schema data :return: success or failure """ try: sc = self.spark_session_create("post_json_data") tfmsa_logger("start create_table !") hdfs_path = settings.HDFS_DF_ROOT + "/" + data_frame + "/" + table_name sqlContext = SQLContext(sc) df_writer = sqlContext.createDataFrame(str(json_data)).write df_writer.parquet(hdfs_path, mode="append", partitionBy=None) tfmsa_logger("End create_table !") except Exception as e: tfmsa_logger(e) raise Exception(e) finally: df_writer.unpersist() sqlContext.clearCache() sc.clearFiles() sc.stop() tfmsa_logger("stop context")
def put_json_data(self, data_frame, table_name, json_data): """ append data on exist table :param table_name: name of table want to add data :param json_data: json form schema data :return: success or failure """ try: sc = self.spark_session_create("put_json_data") tfmsa_logger("start append_data !") hdfs_path = settings.HDFS_DF_ROOT + "/" + data_frame + "/" + table_name sqlContext = SQLContext(sc) df = sqlContext.read.load(hdfs_path, "parquet") df_writer = sqlContext.createDataFrame(str(json_data)) df.unionAll(df_writer) df.write.parquet(hdfs_path, mode="append", partitionBy=None) tfmsa_logger("End append_data !") except Exception as e: tfmsa_logger(e) raise Exception(e) finally: df.unpersist() df_writer.unpersist() sqlContext.clearCache() sc.clearFiles() sc.stop() tfmsa_logger("stop context")
def save_csv_to_df(self, data_frame, table_name, csv_file): """ create new table with inserted csv data :param net_id: :return: """ try: sc = self.spark_session_create("save_csv_to_df") tfmsa_logger("start uploading csv on Hadoop") # clear current exist table self.reset_table(data_frame, table_name) sqlContext = SQLContext(sc) file_path = settings.FILE_ROOT + "/" + data_frame + "/" + table_name + "/" + csv_file df = sqlContext.createDataFrame(pd.read_csv(file_path)) df.write.parquet("{0}/{1}/{2}".format(settings.HDFS_DF_ROOT, data_frame, table_name), mode="append", partitionBy=None) tfmsa_logger("uploading csv on Hadoop finished") except Exception as e: tfmsa_logger(e) raise Exception(e) finally: df.unpersist() sqlContext.clearCache() sc.clearFiles() sc.stop() tfmsa_logger("stop context")
def query_random_sample(self, data_frame, table_name, query_str, sample_per=0.1): """ get query data from spark :param table_name: name of table you want to get data :param query_str: sql strings :return: query result as json Object """ try: sc = self.spark_session_create("query_radom_sample") tfmsa_logger("start query data !") hdfs_path = settings.HDFS_DF_ROOT + "/" + data_frame + "/" + table_name sqlContext = SQLContext(sc) df = sqlContext.read.load(hdfs_path, "parquet") df.registerTempTable(table_name) result = sqlContext.sql(str(query_str)).sample(False, float(sample_per), seed=0).collect() return result except Exception as e: tfmsa_logger(e) raise Exception(e) finally: df.unpersist() sqlContext.clearCache() sqlContext.dropTempTable(table_name) sc.clearFiles() sc.stop() tfmsa_logger("stop context")
def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance']
def get_spark_env(): conf = SparkConf() sc = SparkContext(conf=conf) sc.setLogLevel('ERROR') sql_context = SQLContext(sc) return (conf, sc, sql_context)
def main(): spark = SparkSession \ .builder \ .appName("RandomForest") \ .config("spark.executor.heartbeatInterval", "60s") \ .getOrCreate() sc = spark.sparkContext sqlContext = SQLContext(sc) sc.setLogLevel("INFO") # Loading the test data df_test = spark.read.parquet(sys.argv[1]) df_test, df_train = df_test.randomSplit([0.3, 0.7]) df_train_indexed=df_train.selectExpr("label as indexedLabel","features as indexedFeatures") df_test_indexed=df_test.selectExpr("label as indexedLabel","features as indexedFeatures") # # Load the model # rf_model = RandomForestClassificationModel.load(sys.argv[2]) # # # Make the predictions # predictions = rf_model.transform(df_test) gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=100,maxBins=24000000) model=gbt.fit(df_train_indexed) predictions = model.transform(df_test_indexed) # predictionsRDD=predictions.rdd # predictionsRDD.saveAsTextFile(sys.argv[3]+"output.text") evaluator_acc = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="indexedLabel", metricName="accuracy") accuracy = evaluator_acc.evaluate(predictions) print "accuracy *******************" print accuracy evaluator_pre = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="indexedLabel", metricName="weightedPrecision") print "precision *******************" print evaluator_pre.evaluate(predictions) print "recall **********************" print MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="indexedLabel", metricName="weightedRecall").evaluate(predictions)
def main(sc): parser = OptionParser() parser.add_option('', '--input_data_path', action='store', dest='input_data_path', help='path for input data') parser.add_option('', '--model_path', action='store', dest='model_path', help='path for model data') parser.add_option('', '--data_format', default='json', action='store', dest='data_format', help='format of input data (json, csv)') options, args = parser.parse_args() sqlContext = SQLContext(sc) if options.data_format == 'json': df = sqlContext.read.json(options.input_data_path) elif options.data_format == 'csv': df = get_kdd_csv_dataframe(sqlContext, options.input_data_path) else: raise Exception('Unknown data format') # Drop duplicate records based on uuid. # Duplicate records may be created due to various failure conditions in Spark Streaming, Kafka, or Flume. # Although duplicate records may not have a significant impact with Random Forest, we remove them here # in case we use another algorithm that is more sensitive to them. df = df.dropDuplicates(['uuid']) # Build feature vector. df = build_features_vector(df) # Show feature vector. # df.select([df['features']]).show(100) # print(df.select([df['features']]).rdd.collect()) # Train model. # We must use RDDs, not dataframes, because we can't save/load the pipelined ML model using PySpark yet. # The best parameters for training should be determined using cross validation but that is not done in this demo. features_rdd = extract_features(df) unsupervised_forest = supervised2unsupervised(RandomForest.trainClassifier, fraction=0.1) model = unsupervised_forest(features_rdd, numClasses=2, categoricalFeaturesInfo={}, numTrees=10, featureSubsetStrategy='auto', impurity='gini', maxDepth=15, maxBins=50) # Save model to disk. model.save(sc, options.model_path)