Spark MLlib
是 Spark 的可扩展机器学习库,提供基于 RDD
的 API。Spark 2.x
版本中,MLlib
将向基于 DataFrame
的 API 添加功能。并且 MLlib
中基于DataFrame
的 API 将成为主流,MLlib
的 API 更加偏向于底层,可以灵活多变的修改逻辑。MLlib
的 API不 会被 ML
替代。Spark ML
提供了一个基于 DataFrame(数据帧)
构建的更高级的 API, 从而用于构建机器学习工作流 ML Pipeline
。Spark ML
为主要学习的技术, 因为 API 能更灵活、更具弹性地使用 DataFrame
。转换器
是 特征变换 和 机器学习 模型的抽象。转换器必须实现 transform
方法,这个方法将一个DataFrame
转换成另一个 DataFrame
,通常是附加一个或者多个列。也称为了评估器吗?
(1) Estimators
模型学习器是 拟合 和 训练数据 的机器学习算法或者其他算法的抽象。
(2) Estimator
实现 fit()
方法,这个方法输入一个 DataFrame
并产生一个 Model
即一个 Transformer
(转换器)。
(3) 例如:一个机器学习算法是一个 Estimator
模型学习器 ,比如这个算法是 LogisticRegression
(逻辑回归),调用 fit()
方法训练出一个 LogisticRegressionModel
,这是一个 Model
,因此也是一个Transformer
(转换器)。
import org.apache.spark.ml.feature._ import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.m1.{ Pipeline, PipelineModel } import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession // 1.训练样本 val training = spark.createDataFrame(Seq( (1.0, Vectors.dense(0.0, 1.1, 0.1)), (0.0, Vectors.dense(2.0, 1.0, -1.0)), (0.0, Vectors.dense(2.0, 1.3, 1.0)), (1.0, Vectors.dense(0.0, 1.2, -0.5)))).toDF("label", "features") // 2.创建逻辑回归 Estimator val lr = new LogisticRegression( ) println("LogisticRegression parameters:\n" + lr.explainParams() + "\n") // 3.通过setter方法设置模型参数 lr.setMaxIter(10) .setRegParam(0.01) // 4.训练模型 val model1 = lr.fit(training) println("Model 1 was fit using parameters: " + model1.parent.extractParamMap) // 5.通过ParamMap设置参数方法 val paramMap = ParamMap(lr.maxIter -> 20) .put(lr.maxIter, 30) .put(lr.regParam -> 0.1, lr.threshold -> 0.55) // 5.ParamMap合并 val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") val paramMapCombined = paramMap ++ paramMap2 // 6.训练模型,采用paramMap参数 // paramMapCombined会覆盖所有lr.set设置的参数 val model2 = lr.fit(training, paramMapCombined) println("Model 2 was fit using parameters: " + model2.parent.extractParamMap) // 7.测试样本 val test = spark.createDataFrame(Seq( (1.0, Vectors.dense(-1.0, 1.5, 1.3 )), (0.0, Vectors.dense(3.0, 2.0, -0.1)), (1.0, Vectors.dense(0.0, 2.2, -1.5)))).toDF("label", "features") // 8.对模型进行测试 model2.transform(test) .select("features", "label", "myProbability", "prediction") .collect() .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) => println(s"($features, $label) -> prob=$prob, prediction=$prediction") }