大資料技術 - 通俗理解MapReduce之WordCount(二)
上一章我們搭建了分散式的 Hadoop 叢集。本章我們介紹 Hadoop 框架中的一個核心模組 - MapReduce。MapReduce 是平行計算模組,顧名思義,它包含兩個主要的階段,map 階段和 reduce 階段。每個階段輸入和輸出都是鍵值對。map 階段主要是對輸入的原始資料做處理,按照 key-value 形式輸出資料,輸出的資料按照key是有序的。reduce 階段的輸入是 map 任務的輸出,會對輸入的資料會按照 key 做歸併排序,使得輸入 reduce 任務輸入的 key 也是有序的,reduce 階段進行完業務處理之後將把資料輸出到HDFS中。下面以具體的例子說明 MapReduce 的機制。
本章以 WordCount 為例子講解 MapReduce 機制,這個例子相當於學習程式語言的 "Hello, World"。假設我們在 HDFS 上有一個 10T 的檔案,檔案每一行有多個單詞,單詞之間空格分割,現在我們想統計一下這個檔案中每個單詞出現的次數,這就是 word count。我們的例子將在上一章搭建的 Hadoop 叢集上進行。首先準備資料來源,我們實際的例子中的資料量比較少,本地(hadoop0 機器)檔案如下:
word1檔案: hello world hadoop hadoop spark word2檔案: hadoop hbase mapreduce hdfs
需要將這兩個檔案上傳至 HDFS,命令如下:
hadoop fs -mkdir -p /hadoop-ex/wordcount/input#mkdir:建立目錄 -p:遞迴建立多級目錄 hadoop fs -put word1 word2 /hadoop-ex/wordcount/input #上本地檔案上傳至HDFS目錄
有了資料來源,我們開始寫 MapReduce 程式,我用的編輯器是 Intellj IDEA,建立一個 maven 專案,選擇 archetype 為 maven-archetype-quickstart。專案建立完成後引入 Hadoop 依賴
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.9.2</version> </dependency>
先建立在 map 階段執行的類,也叫做 Mapper:
package com.cnblogs.duma.mapreduce; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * WordCountMapper 繼承 Mapper 類,需要指定4個泛型型別,分別是 * 輸入 key 型別:本例中輸入的 key 為每行文字的行號,例子中用不到所以這裡是 Object * 輸入 value 型別:本例中輸入的 value 是每行文字,因此是Text * 輸出 key 型別:map 輸出的是每個單詞,型別為 Text * 輸出 value 型別:單詞出現的次數,為 1,因此型別 IntWritable */ public class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> { /** * 把每個單詞對映成 <word, 1> 的格式 */ private final static IntWritable one = new IntWritable(1); private Text outWord = new Text(); /** * 每個 map 函式處理一行資料 * @param key輸入的行號 * @param value每一行文字 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split(" "); //空格分割一行中的每個單詞 for (String word : words) { outWord.set(word); System.out.println("<" + outWord + ", " + one + ">"); //列印 context.write(outWord, one); // map輸出 } } }
這個就是 map 階段要執行的邏輯,程式碼本身比較簡單。首先說明幾個問題。第一,map 階段會對輸入的檔案做分割,分割的大小可以通過引數指定,預設按照 HDFS 儲存的塊大小分割。假設 10TB 的檔案,HDFS 塊為128MB,那麼大概會有 81920 個分塊, 每個 map 任務處理一個分塊。也就是會為每個分塊建立 WordCountMapper 物件,遍歷資料塊的每一行,並呼叫 WordCountMapper 類中的 map 函式處理。因此,當前分片的資料有多少行,就會呼叫多少次 map 函式。我們的例子中有兩個檔案,每個檔案都小於 128MB ,因此會啟動2個 map 任務。由於 HDFS 中的資料儲存在不同的機器上,因此map 任務會在儘可能在儲存資料塊的機器上啟動 。 這樣每個 map 任務可以處理本地的資料,如果有資料塊的節點上資源比較緊張無法分配新的 map 任務,只能在其他機器啟動 map 任務,將資料下載到該機器,這種情況將產生網路的消耗。第二,剛才提到過,map 函式會執行多次, 一些變數可以定義成類變數,防止建立過多的物件,浪費記憶體。該例子中,變數 one、outWord 被定義成類變數。第三,Hadoop 為了網路傳輸更優的序列化與反序列化,重新定義了資料型別,Text 對應java中的 String,IntWritable 對應 java 中的 int。第四,map 任務輸出的資料放在本地磁碟上,等待 reduce 任務拉取。
map 函式執行完成後,輸出的結果如下
map 任務1: <hadoop, 1> <hadoop, 1> <hello, 1> <spark, 1> <world, 1> map 任務2: <hadoop, 1> <hbase, 1> <hdfs, 1> <mapreduce, 1>
首先可以看到輸出是有序的。下面再看下 reduce 任務
package com.cnblogs.duma.mapreduce; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * WordCountReducer 繼承 Reducer 類,需要指定4個泛型型別,分別是 * 輸入 key 型別:map 任務輸出的 key 型別, Text * 輸入 value 型別:map 任務輸出的 value 型別,IntWritable * 輸出 key 型別:reduce 輸出的是每個單詞,型別為 Text * 輸出 value 型別:單詞出現的次數,因此型別 IntWritable */ public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private Log logger = LogFactory.getLog(WordCountMapper.class); private IntWritable result = new IntWritable(); /** * 一次 reduce 函式的呼叫,會處理一個 key * @param key * @param values相同 key 對應的 values 的集合 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { // 同一個單詞出現次數相加,即為該單詞的個數 sum += val.get(); } result.set(sum); System.out.println("<" + key + ", " + result + ">"); context.write(key, result); // 輸出 } }
reduce 任務的輸入便是 map 任務的輸出,這裡也說明幾個問題。第一,reduce 的個數需要通過引數或者程式碼指定, 預設為1。第二,map 任務輸出的 key 去到哪個 reduce 任務,預設是 key 的 hash 值取模。第三,reduce 輸入的key 有多個且經過排序,每個 key 對應的 value 組成一個 list,如 reduce 函式輸入引數所示。輸入多少 key 便呼叫多少次 reduce 函式。在這個例子中,reduce 任務個數為1。第四,reduce 任務的輸出是輸出到 HDFS 中。本例中輸入資料如下
<hadoop, [1, 1, 1]> <hbase, [1]> <hdfs, [1]> <hello, [1]> <mapreduce, [1]> <spark, [1]> <world, [1]>
可以看到,reduce 任務的輸入是有序的。reduce 任務處理完成後,輸出如下
<hadoop, 3> <hbase, 1> <hdfs, 1> <hello, 1> <mapreduce, 1> <spark, 1> <world, 1>
現在需要一個驅動程式把他們串起來,程式碼如下
package com.cnblogs.duma.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCount { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "WordCount"); //第二引數為程式的名字 job.setJarByClass(WordCount.class); //需要設定類名 job.setMapperClass(WordCountMapper.class); //設定 map 任務的類 //job.setCombinerClass(WordCountReducer.class); job.setReducerClass(WordCountReducer.class); // 設定 reduce 任務的類 job.setOutputKeyClass(Text.class);//設定輸出的 key 型別 job.setOutputValueClass(IntWritable.class); //設定輸出的 value 型別 FileInputFormat.addInputPath(job, new Path(args[0])); //增加輸入檔案 FileOutputFormat.setOutputPath(job, new Path(args[1])); //設定輸出目錄 System.exit(job.waitForCompletion(true) ? 0 : 1); } }
現在需要打包放到叢集上執行這個例子,可以在專案的根目錄執行 mvn package 命令, 也可以利用 IDEA maven 視覺化工具直接打包。打包完成後在專案目錄中會生成 target 目錄,裡面有打包好的 jar 檔案,我們將它上傳到 hadoop0 機器,執行以下命令執行任務
hadoop jar hadoop-ex-1.0-SNAPSHOT.jar com.cnblogs.duma.mapreduce.WordCount /hadoop-ex/wordcount/input /hadoop-ex/wordcount/output
執行日誌如下:
19/03/03 04:16:17 INFO client.RMProxy: Connecting to ResourceManager at hadoop0/192.168.29.132:8032 19/03/03 04:16:18 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 19/03/03 04:16:18 INFO input.FileInputFormat: Total input files to process : 2 19/03/03 04:16:19 INFO mapreduce.JobSubmitter: number of splits:2 19/03/03 04:16:19 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled 19/03/03 04:16:19 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1551593879638_0009 19/03/03 04:16:20 INFO impl.YarnClientImpl: Submitted application application_1551593879638_0009 19/03/03 04:16:20 INFO mapreduce.Job: The url to track the job: http://hadoop0:8088/proxy/application_1551593879638_0009/ 19/03/03 04:16:20 INFO mapreduce.Job: Running job: job_1551593879638_0009 19/03/03 04:16:35 INFO mapreduce.Job: Job job_1551593879638_0009 running in uber mode : false 19/03/03 04:16:35 INFO mapreduce.Job:map 0% reduce 0% 19/03/03 04:16:48 INFO mapreduce.Job:map 100% reduce 0% 19/03/03 04:17:01 INFO mapreduce.Job:map 100% reduce 100% 19/03/03 04:17:02 INFO mapreduce.Job: Job job_1551593879638_0009 completed successfully 19/03/03 04:17:02 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=120 。。。
日誌有幾個問題需要說明,第一,可以看到第4行 number of splits:2,說明會啟動2個 map 任務處理資料。第二,第8行 url :http://hadoop0:8088/proxy/application_1551593879638_0009/ ,可以訪問它觀察任務的狀態、任務任性在那個節點、任務的日誌等。
至此,簡單的 MapReduce 入門已經介紹完畢,主要就是 map 任務和 reduce 任務兩個主要的階段。當然這兩個階段之間有一個更重要的過程叫做 shuffle,很多工的優化都需要調這個過程的引數,shuffle 過程的詳細介紹我們在之後會討論。在進行這個例子的時候可能會出一些錯,常見的錯誤我在這裡先記錄一下。第一,啟動任務時候報一下錯誤
The auxService:mapreduce_shuffle does not exist
解決方法,在 yarn-site.xml 檔案中增加引數
<property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property>
第二,在 web 頁面檢視任務 logs 的時候,可能會報一下錯誤
Aggregation is not enabled
解決方法,在 yarn-site.xml 檔案中增加引數
<property> <name>yarn.log-aggregation-enable</name> <value>true</value> </property>
總結
這篇文章主要介紹 MapReduce 的機制,用來入門。MapReduce 主要分兩個階段,map 階段會對輸入的檔案分割,分割數決定啟動多少 map 任務,map 任務中進行資料處理,並按照<key, value>的格式輸出, map 任務的輸出資料臨時存放在本地磁碟, 經過 shuffle 過程後, 啟動 reduce 任務, reduce 任務個數可以手動指定,reduce 任務輸入的 key 有序且同一個 key 的 value 會聚合在一起,最終 reduce 任務結果輸出到 HDFS。