Spark MLlib分为基于RDD的API和基于DataStream的API,其中基于RDD的API是MLlib的主要API。数据类型
MLlib支持存储在一台机器上的本地向量和矩阵,以及由一个或多个 RDD 支持的分布式矩阵。本地向量和本地矩阵用于公共接口的简单数据。
MLlib支持两种本地向量:密集向量和稀疏向量。
本地向量的基类为Vector。其实也相当于c++中的数据类型。使用工厂方法Vectors来创建本地向量。
Vectors.sparse(4, Seq((0, 1.0), (3, -2.0))) //创建稀疏向量:(1.0, 0.0, 0.0, -2.0) Vectors.dense(4.0, 5.0, 0.0, 3.0) //创建密集向量(4.0, 5.0, 0.0, 3.0)
对于监督学习而言,数据总是以\(D=\{(\vec{x_1}, y_1),(\vec{x_2}, y_2),...(\vec{x_n}, y_n),\}\)的形式出现,其中\((\vec{x_1}, y_1)\)就是一个标记点。
标记点使用LabeledPoint创建。
//使用密集向量创建标记点 val point1: LabeledPoint = LabeledPoint(1.0, Vectors.dense(1.0, 2.0, 3.0)) //使用稀疏向量创建标记点 val point2: LabeledPoint = LabeledPoint(0.0, Vectors.sparse(3, Seq((1, 0.9))))
在实践中,我们往往遇到的是稀疏数据。也就是使用稀疏向量创建标记点。所以MLlib提供了一种支持读入这种创建标记点的方法。MLlib支持读取LIBLINEAR形式的数据。该数据格式如下:
label index1:value1 index2:value2 ...indexn:valuen
注意:该数据类型的index序列是从小到大。
object Main { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("readData").setMaster("local[*]") val sc = new SparkContext(conf) val dataRdd: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "src/main/resources/data.txt") //读取文件 dataRdd.foreach(println) sc.stop() } }
data.txt文件的内容如下:
1.0 1:0.1 2:1.1 4:2.1
本地矩阵具有整数类型的行和列索引以及双类型值,存储在一台机器上。MLlib支持密集矩阵和稀疏矩阵。
\(\begin{bmatrix}
{1.2}&{2.1}\\
{2.1}&{2.2}\\
{3.2}&{2.3}\\
\end{bmatrix}\)和\(\begin{bmatrix}
{9.0}&{0.0}\\
{0.0}&{8.0}\\
{0.0}&{6.0}\\
\end{bmatrix}\)
下面分别是密集矩阵和稀疏矩阵的创建:
object Main { def main(args: Array[String]): Unit = { val dm1: Matrix = Matrices.dense(3, 2, Array(1.2, 2.1, 3.2, 2.1, 2.2, 2.3)) //使用密集矩阵创建爱你 val dm2: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8)) //使用稀疏矩阵 println(dm1) println(dm2) } }
注意:Matrices.sparse的参数有点难以理解。该代码源的定义为:
def sparse(numRows : scala.Int, numCols : scala.Int, colPtrs : scala.Array[scala.Int], rowIndices : scala.Array[scala.Int], values : scala.Array[scala.Double]) Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8)) numRows=3 numCols=2 colPtrs=Array(0, 1, 3) // (0, 1, 3) % 2 = (0, 1, 1) rowIndices=Array(0, 2, 1) // (0, 2, 1) % 3 = (0, 2, 1) values=Array(9, 6, 8)
这参数有点扯。
分布式矩阵有(Long)长类型的行和列索引值使用Double存储值,分布式存储在一个或者多个RDD中。注意:采用分布式矩阵的形式非常重要,因为当一种形式的分布式矩阵转化为另一种分布式矩阵可能需要全局Shuffle操作。目前MLlib中有:行矩阵, 索引矩阵, 坐标矩阵, 块矩阵。
行矩阵是一个面向行的分布式矩阵,其中的每一行为本地向量,其行由RDD支持。
val conf: SparkConf = new SparkConf().setAppName("Test").setMaster("local[*]") val sc = new SparkContext(conf) val vlist = Seq(Vectors.dense(4.0, 2.0, 1.0), Vectors.dense(2.1, 2.2, 1.7), Vectors.dense(4.1, 2.3, 2.0)) val vrdd: RDD[linalg.Vector] = sc.makeRDD(vlist) val mat = new RowMatrix(vrdd) //创建行矩阵
更多行矩阵的API
索引矩阵就是一行为索引index和本地向量Vectors所组成的case class IndexedRow(index:Long, vector: Vector)类。一般适用于矩阵很稀疏的情况。
val indexedlist = Seq(new IndexedRow(0L, Vectors.dense(1.0, 2.0, 3.0)), new IndexedRow(1L, Vectors.dense(2.0, 1.0, 4.0))) val indexedRdd: RDD[IndexedRow] = sc.makeRDD(indexedlist) val mat = new IndexedRowMatrix(indexedRdd) //获取行数和列数 val n: Long = mat.numRows() val m: Long = mat.numCols() println(n) println(m)
更多索引矩阵的API
坐标矩阵本质就是RDD[Tuple3(x, y, value)], 其中每一行就是case class MatrixEntry(val i : scala.Long, val j : scala.Long, val value : scala.Double)对象。以下是坐标矩阵的创建代码:
val coordinatelist = Seq(new MatrixEntry(0L, 1L, 8.0), new MatrixEntry(2L, 3L, 1.0)) val coordinateRdd: RDD[MatrixEntry] = sc.makeRDD(coordinatelist) val mat = new CoordinateMatrix(coordinateRdd)
更多坐标矩阵API
块矩阵本质上是RDD[(Int, Int), Matrix],其中(Int, Int)是块的索引。BlockMatrix还有一个辅助功能validate,用来检测是否BlockMatrix设置正确。
val coordinatelist = Seq(new MatrixEntry(0L, 1L, 8.0), new MatrixEntry(2L, 3L, 1.0)) val coordinateRdd: RDD[MatrixEntry] = sc.makeRDD(coordinatelist) val mat3 = new CoordinateMatrix(coordinateRdd) val mat4: BlockMatrix = mat3.toBlockMatrix().cache()
在任务中常常存在对平均值,最大小值,相关性等。在Spark MLlib中使用Statistics对象中进行调度。
在Statistics对象中使用colStats()来统计RDD[Vector]列的最大,小值, 平均值,方差等。
val conf: SparkConf = new SparkConf().setAppName("test").setMaster("local[*]") val sc = new SparkContext(conf) val datalist = Seq( Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(2.0, 5.0, 6.0), Vectors.dense(4.0, 1.0, 3.0) ) val dataRdd: RDD[linalg.Vector] = sc.makeRDD(datalist) val summary: MultivariateStatisticalSummary = Statistics.colStats(dataRdd) println(summary.mean) //输出列的平均值 println(summary.variance) //输出方差 println(summary.numNonzeros) //输出列的标准差 sc.stop()
Stattistics对象的corr支持计算两个RDD[Double]之间的相关系数或者计算一个矩阵各个列之间的相关系数。并且可以指定相关系数。
常见的相关系数:
Pearson系数用于衡量两个随机变量X,Y的线性相关性。Pearson系数在+1~-1之间。其中+1表示正线性相关,-1表示负线性相关,0表示非线性相关。Pearson系数的重要特性是在两个变量的位置和尺度的单独变化下是不变的。,也就是对进行如下转换:
\[X_1=aX+b, Y_1=cY+d \]其中X1和Y1的线性关系也依旧是X,Y的线性关系。
Pearson的计算公式:
Spearman秩相关系数:使用两个变量的秩次大小作为线性相关分析,对原始变量的分布不做要求,属于非参数统计方法。因此它的适用范围比Pearson相关系数要广的多。因为对原始变量的分布不做要求,所以统计的效能比Pearson相关系数要低一些。(不容易检测出两个随机变量的相关性)计算公式如下:
\[\rho=\frac{\sum_{i}{(x_i-\bar{x})(y_i-\bar{y})}}{\sqrt{\sum_{i}{(x_i-\bar{x})^2}\sum_{i=1}{(y_i-\bar{y})^2}}} \]Kendall秩相关系数是一种秩相关系数,用于反映分类变量相关性的指标,适合用于两个变量均为有序分类的情况。无序分类,如男女,正反等。有序分类,如上中下,轻重缓等。Kendall秩相关系数的范围在+1和-1之间,+1表示两个随机变量拥有一定的等级相关性;-1表示两个随机变量拥有完全相反的等级相关性;0表示两个随机变量相互独立。
\[\tau=\frac{2}{n(n-1)}\sum_{i<j}sgn(x_i-x_j)sgn(y_i-y_j) \]下面的代码分别计算了两个RDD和一个矩阵的Pearson相关系数,分别输出一个double值和一个矩阵。
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("test").setMaster("local[*]") val sc = new SparkContext(conf) //数据 val seriesX: RDD[Double] = sc.makeRDD(Seq(1.0, 2.0, 3.0, 4.0, 5.0)) val seriesY: RDD[Double] = sc.makeRDD(Seq(11.0, 22.0, 33.0, 44.0, 55.0)) val corr: Double = Statistics.corr(seriesX, seriesY, "pearson") //计算出皮尔逊相关系数 println(s"Corr is : $corr") val dataList = Seq( Vectors.dense(1.0, 10.0, 100.0), Vectors.dense(2.0, 12.0, 200.0), Vectors.dense(5.0, 22.0, 266.0) ) val dataRdd: RDD[linalg.Vector] = sc.makeRDD(dataList) val corrMat: Matrix = Statistics.corr(dataRdd, "pearson") println(corrMat.toString) sc.stop() }
多层抽样方法sampleByKey和sampleByExact可以在RDD的键值对上进行操作,对于分成抽样来说,键可以做标签,值可以看作是指定的属性。sampleByKey方法会随机决定某个观察值是否需要抽样,因此需要提供一个确定抽样数量的抽样方法sampleByKeyExact,就是在各个层必须保证抽取所需数量的样本。
sampleByKeyExact()方法具体为\([P_k.n_k]\ \forall k\in set(K)\),其中\(P_k\)为k层的抽样比例,\(n_K\)为实际K层的数量。set(K)表示分层抽样后的样本集合。k是样本。
def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("test").setMaster("local[*]") val sc = new SparkContext(conf) val datalist = Seq((1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (2, 'e'), (3, 'f')) val dataRdd = sc.makeRDD(datalist) val fractions = Map(1 -> 0.1, 2 -> 0.6, 3 -> 0.3) val approxSample = dataRdd.sampleByKey(withReplacement = false, fractions = fractions) val exactSample = dataRdd.sampleByKeyExact(withReplacement = false, fractions = fractions) }
Spark MLlib目前支持Pearson的卡方检验。
Spark MLlib提供了一些在线实现,以支持A/B测试等用例。这些测试可以在Spark StreamingDStream[(Boolean, Double)]上执行。
Spark MLlib中支持产生随机RDD,该RDD中的每一个元素的值都是符合相同的分布,Spark MLlib支持的分布有均匀分布,标准分布和泊松分布。
Spark MLlib中使用RandomRDDs的方式来创建随机RDD。
def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("test").setMaster("local[*]") val sc = new SparkContext(conf) val normalDataRDD = RandomRDDs.normalRDD(sc, 100L, 10) //元素为100个并且服从标准正太分布的RDD, 分区为10 RandomRDDs.poissonRDD(sc, 0.8, 100L, 10) //元素为100个并且服从均值为0.8的泊松分布,分区为10 }