Spark ML 基於Iris資料集進行資料建模及迴歸聚類綜合分析-Spark商業ML實戰
本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:[email protected],如有任何商業交流,可隨時聯絡。
1 Iris資料集(開灶做飯)
-
Iris資料集是常用的分類實驗資料集,由Fisher於1936收集整理。Iris也稱鳶尾花卉資料集,是一類多重變數分析的資料集。
-
資料集包含150個數據集,分為3類,每類50個數據,每個資料包含4個屬性。iris以鳶尾花的特徵作為資料來源,常用在分類操作中。該資料集由3種不同型別的鳶尾花的50個樣本資料構成。其中的一個種類與另外兩個種類是線性可分離的,後兩個種類是非線性可分離的。
-
一般通過以下4個屬性預測鳶尾花卉屬於(Setosa,Versicolour,Virginica)三個種類中的哪一類。四個屬性:
Sepal.Length(花萼長度), 單位是cm; Sepal.Width (花萼寬度), 單位是cm; Petal.Length(花瓣長度), 單位是cm; Petal.Width (花瓣寬度), 單位是cm; 複製程式碼
-
三個種類:
Iris Setosa(山鳶尾) Iris Versicolour(雜色鳶尾) Iris Virginica(維吉尼亞鳶尾) 複製程式碼
2 資料集展示
Sepal.LengthSepal.WidthPetal.LengthPetal.WidthSpecies 5.13.51.40.2Iris-setosa 4.931.40.2Iris-setosa 4.73.21.30.2Iris-setosa 4.63.11.50.2Iris-setosa 53.61.40.2Iris-setosa 5.43.91.70.4Iris-setosa 4.63.41.40.3Iris-setosa 複製程式碼
3 資料預處理和分析
3.1 通過CSV檔案進行預處理
1 資料讀入處理 val df = spark.read.format("csv") .option("sep", ",").option("inferSchema", "true").option("header", "true") .load("/data/iris.csv") +------------+-----------+------------+-----------+-----------+ |Sepal.Length|Sepal.Width|Petal.Length|Petal.Width|Species| +------------+-----------+------------+-----------+-----------+ |5.1|3.5|1.4|0.2|Iris-setosa| |4.9|3.0|1.4|0.2|Iris-setosa| |4.7|3.2|1.3|0.2|Iris-setosa| |4.6|3.1|1.5|0.2|Iris-setosa| |5.0|3.6|1.4|0.2|Iris-setosa| |5.4|3.9|1.7|0.4|Iris-setosa| 2 特徵索引轉化 import org.apache.spark.ml.feature.StringIndexer val indexer = new StringIndexer().setInputCol("Species").setOutputCol("categoryIndex") val model =indexer.fit(df) val indexed = model.transform(df) model.show() +------------+-----------+------------+-----------+-----------+-------------+ |Sepal.Length|Sepal.Width|Petal.Length|Petal.Width|Species|categoryIndex| +------------+-----------+------------+-----------+-----------+-------------+ |5.1|3.5|1.4|0.2|Iris-setosa|0.0| |4.9|3.0|1.4|0.2|Iris-setosa|0.0| |4.7|3.2|1.3|0.2|Iris-setosa|0.0| |4.6|3.1|1.5|0.2|Iris-setosa|0.0| |5.0|3.6|1.4|0.2|Iris-setosa|0.0| |5.4|3.9|1.7|0.4|Iris-setosa|0.0| 複製程式碼
3.2 通過txt檔案進行預處理
1 資料讀入處理 case class Iris(Sepal_Length:Double, Sepal_Width:Double, Petal_Length:Double, Petal_Width:Double, Species:String) val data = sc.textFile("/data/iris.txt") val header = data.first val df2= data.filter(_ != header).map(_.split("\t")).map(l => Iris(l(0).toDouble,l(1).toDouble,l(2).toDouble,l(3).toDouble,l(4).toString)).toDF +------------+-----------+------------+-----------+-----------+ |Sepal_Length|Sepal_Width|Petal_Length|Petal_Width|Species| +------------+-----------+------------+-----------+-----------+ |5.1|3.5|1.4|0.2|Iris-setosa| |4.9|3.0|1.4|0.2|Iris-setosa| |4.7|3.2|1.3|0.2|Iris-setosa| |4.6|3.1|1.5|0.2|Iris-setosa| |5.0|3.6|1.4|0.2|Iris-setosa| |5.4|3.9|1.7|0.4|Iris-setosa| 2 特徵索引轉化 import org.apache.spark.ml.feature.StringIndexer val indexer = new StringIndexer().setInputCol("Species").setOutputCol("categoryIndex") val model =indexer.fit(df2) //val indexed = model.transform(df2).filter(!$"Species".equalTo("Iris-virginica")) val indexed = model.transform(df2).filter("categoryIndex<2.0") //indexed.show() +------------+-----------+------------+-----------+-----------+-------------+ |Sepal_Length|Sepal_Width|Petal_Length|Petal_Width|Species|categoryIndex| +------------+-----------+------------+-----------+-----------+-------------+ |5.1|3.5|1.4|0.2|Iris-setosa|0.0| |4.9|3.0|1.4|0.2|Iris-setosa|0.0| |4.7|3.2|1.3|0.2|Iris-setosa|0.0| |4.6|3.1|1.5|0.2|Iris-setosa|0.0| |5.0|3.6|1.4|0.2|Iris-setosa|0.0| 複製程式碼
4 Iris資料集迴歸分析
1 得到特徵值和便籤的索引 val features = List("Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width").map(indexed.columns.indexOf(_)) features: List[Int] = List(0, 1, 2, 3) val targetInd = indexed.columns.indexOf("categoryIndex") targetInd: Int = 5 2 特徵轉換成向量 import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.regression.LabeledPoint val labeledPointIris = indexed.rdd.map(r => LabeledPoint(r.getDouble(targetInd),Vectors.dense(features.map(r.getDouble(_)).toArray))) scala> labeledPointIris.foreach(println) (0.0,[5.1,3.5,1.4,0.2]) (0.0,[4.9,3.0,1.4,0.2]) (0.0,[4.7,3.2,1.3,0.2]) (0.0,[4.6,3.1,1.5,0.2]) (0.0,[5.0,3.6,1.4,0.2]) (0.0,[5.4,3.9,1.7,0.4]) (0.0,[4.6,3.4,1.4,0.3]) (0.0,[5.0,3.4,1.5,0.2]) (0.0,[4.4,2.9,1.4,0.2]) (0.0,[4.9,3.1,1.5,0.1]) scala> println(labeledPointIris.first.features) [5.1,3.5,1.4,0.2] scala> println(labeledPointIris.first.label) 0.0 3 測試集與訓練集分開 val splits = labeledPointIris.randomSplit(Array(0.8, 0.2), seed = 11L) val trainingData = splits(0).cache val testData = splits(1).cache 4 線性迴歸演算法預測1-LogisticRegressionWithSGD import org.apache.spark.mllib.classification.{LogisticRegressionWithSGD,LogisticRegressionWithLBFGS} import org.apache.spark.mllib.classification.LogisticRegressionModel import org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm val lr = new LogisticRegressionWithSGD().setIntercept(true) lr.optimizer.setStepSize(10.0).setRegParam(0.0).setNumIterations(20).setConvergenceTol(0.0005) scala>val model = lr.run(trainingData) model: org.apache.spark.mllib.classification.LogisticRegressionModel =org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = -0.24895905804746296, numFeatures = 4, numClasses = 2, threshold = 0.5 5 線性迴歸演算法預測2-LogisticRegressionWithLBFGS val numiteartor = 2 val model = new LogisticRegressionWithLBFGS().setNumClasses(numiteartor).run(trainingData) val labelAndPreds = testData.map { point => val prediction = model.predict(point.features)(point.label, prediction)} model: org.apache.spark.mllib.classification.LogisticRegressionModel = org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0, numFeatures = 4, numClasses = 2, threshold = 0.5 複製程式碼