一起學Hadoop——二次排序演算法的實現
二次排序,從字面上可以理解為在對key排序的基礎上對key所對應的值value排序,也叫輔助排序。一般情況下,MapReduce框架只對key排序,而不對key所對應的值排序,因此value的排序經常是不固定的。但是我們經常會遇到同時對key和value排序的需求,例如Hadoop權威指南中的求一年的高高氣溫,key為年份,value為最高氣溫,年份按照降序排列,氣溫按照降序排列。還有水果電商網站經常會有按天統計水果銷售排行榜的需求等等,這些都是需要對key和value同時進行排序。如下圖所示:
如何設計一個MapReduce程式解決對key和value同時排序的需求呢?這就需要用到組合鍵、分割槽、分組的概念。在這裡又看到分割槽的影子,可知分割槽在MapReduce是多麼的重要,一定要好好掌握,是優化的重點。
按照上圖中資料流轉的方向,我們首先設計一個Fruit類,有三個欄位,分別是日期、水果名和銷量,將日期、水果名和銷量作為一個複合鍵;接著設計一個自定義Partition類,根據Fruit的日期欄位分割槽,讓相同日期的資料流向同一個partition分割槽中;最後定義一個分組類,實現同一個分割槽內的資料分組,然後按照銷量欄位進行二次排序。
具體實現思路:
1、定義Fruit類,實現WritableComparable介面,並且重寫compareTo、equal和hashcode方法以及序列化和反序列化方法readFields和write方法。Java類要在網路上傳輸必須序列化和反序列化。在Map端的map函式中將Fruit物件當做key。compareTo方法用於比較兩個key的大小,在本文中就是比較兩個Fruit物件的排列順序。
2、自定義第一次排序類,繼承WritableComparable或者WritableComparator介面,重寫compareTo或者compare方法,。就是在Map端對Fruit物件的第一個欄位進行排序
3、自定義Partition類,實現Partitioner介面,並且重寫getPartition方法,將日期相同的Fruit物件分發到同一個partition中。
4、定義分組類,繼承WritableComparator介面,並且重寫compare方法。用於比較同一分組內兩個Fruit物件的排列順序,根據銷量欄位比較。日期相同的Fruit物件會劃分到同一個分組。通過setGroupingComparatorClass方法設定分組類。如果不設定分組類,則按照key預設的compare方法來對key進行排序。
程式碼如下:
1 import org.apache.hadoop.conf.Configured; 2 import org.apache.hadoop.io.WritableComparable; 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import org.apache.hadoop.io.*; 7 import org.apache.hadoop.mapreduce.Partitioner; 8 import org.apache.hadoop.mapreduce.Mapper; 9 import org.apache.hadoop.mapreduce.Reducer; 10 import org.apache.hadoop.conf.Configuration; 11 import org.apache.hadoop.fs.FileSystem; 12 import org.apache.hadoop.fs.Path; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 18 import org.apache.hadoop.util.Tool; 19 import org.apache.hadoop.util.ToolRunner; 20 import org.slf4j.Logger; 21 import org.slf4j.LoggerFactory; 22 23 public class SecondrySort extends Configured implements Tool { 24 25static class Fruit implements WritableComparable<Fruit>{ 26private static final Logger logger = LoggerFactory.getLogger(Fruit.class); 27private String date; 28private String name; 29private Integer sales; 30public Fruit(){ 31} 32public Fruit(String date,String name,Integer sales){ 33this.date = date; 34this.name = name; 35this.sales = sales; 36} 37 38public String getDate(){ 39return this.date; 40} 41 42public String getName(){ 43return this.name; 44} 45 46public Integer getSales(){ 47return this.sales; 48} 49 50@Override 51public void readFields(DataInput in) throws IOException{ 52this.date = in.readUTF(); 53this.name = in.readUTF(); 54this.sales = in.readInt(); 55} 56 57@Override 58public void write(DataOutput out) throws IOException{ 59out.writeUTF(this.date); 60out.writeUTF(this.name); 61out.writeInt(sales); 62} 63 64@Override 65public int compareTo(Fruit other) { 66int result1 = this.date.compareTo(other.getDate()); 67if(result1 == 0) { 68int result2 = this.sales - other.getSales(); 69if (result2 == 0) { 70double result3 = this.name.compareTo(other.getName()); 71if(result3 > 0) return -1; 72else if(result3 < 0) return 1; 73else return 0; 74}else if(result2 >0){ 75return -1; 76}else if(result2 < 0){ 77return 1; 78} 79}else if(result1 > 0){ 80return -1; 81}else{ 82return 1; 83} 84return 0; 85} 86 87@Override 88public int hashCode(){ 89return this.date.hashCode() * 157 + this.sales + this.name.hashCode(); 90} 91 92@Override 93public boolean equals(Object object){ 94if (object == null) 95return false; 96if (this == object) 97return true; 98if (object instanceof Fruit){ 99Fruit r = (Fruit) object; 100 //if(r.getDate().toString().equals(this.getDate().toString())){ 101return r.getDate().equals(this.getDate()) && r.getName().equals(this.getName()) 102&& this.getSales() == r.getSales(); 103}else{ 104return false; 105} 106} 107 108public String toString() { 109return this.date + " " + this.name + " " + this.sales; 110} 111 112} 113 114static class FruitPartition extends Partitioner<Fruit, NullWritable>{ 115@Override 116public int getPartition(Fruit key, NullWritable value,int numPartitions){ 117return Math.abs(Integer.parseInt(key.getDate()) * 127) % numPartitions; 118} 119} 120 121public static class GroupingComparator extends WritableComparator{ 122protected GroupingComparator(){ 123super(Fruit.class, true); 124} 125 126@Override 127public int compare(WritableComparable w1, WritableComparable w2){ 128Fruit f1 = (Fruit) w1; 129Fruit f2 = (Fruit) w2; 130 131if(!f1.getDate().equals(f2.getDate())){ 132return f1.getDate().compareTo(f2.getDate()); 133}else{ 134return f1.getSales().compareTo(f2.getSales()); 135} 136} 137} 138 139public static class Map extends Mapper<LongWritable, Text, Fruit, NullWritable> { 140 141public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 142String line = value.toString(); 143String str[] = line.split(" "); 144Fruit fruit = new Fruit(str[0],str[1],new Integer(str[2])); 145//Fruit fruit = new Fruit(); 146//fruit.set(str[0],str[1],new Integer(str[2])); 147context.write(fruit, NullWritable.get()); 148} 149} 150 151public static class Reduce extends Reducer<Fruit, NullWritable, Text, NullWritable> { 152 153public void reduce(Fruit key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { 154String str = key.getDate() + " " + key.getName() + " " + key.getSales(); 155context.write(new Text(str), NullWritable.get()); 156} 157} 158 159@Override 160public int run(String[] args) throws Exception { 161Configuration conf = new Configuration(); 162// 判斷路徑是否存在,如果存在,則刪除 163Path mypath = new Path(args[1]); 164FileSystem hdfs = mypath.getFileSystem(conf); 165if (hdfs.isDirectory(mypath)) { 166hdfs.delete(mypath, true); 167} 168 169Job job = Job.getInstance(conf, "Secondry Sort app"); 170// 設定主類 171job.setJarByClass(SecondrySort.class); 172 173// 輸入路徑 174FileInputFormat.setInputPaths(job, new Path(args[0])); 175// 輸出路徑 176FileOutputFormat.setOutputPath(job, new Path(args[1])); 177 178// Mapper 179job.setMapperClass(Map.class); 180// Reducer 181job.setReducerClass(Reduce.class); 182 183// 分割槽函式 184job.setPartitionerClass(FruitPartition.class); 185 186// 分組函式 187job.setGroupingComparatorClass(GroupingComparator.class); 188 189// map輸出key型別 190job.setMapOutputKeyClass(Fruit.class); 191// map輸出value型別 192job.setMapOutputValueClass(NullWritable.class); 193 194// reduce輸出key型別 195job.setOutputKeyClass(Text.class); 196// reduce輸出value型別 197job.setOutputValueClass(NullWritable.class); 198 199// 輸入格式 200job.setInputFormatClass(TextInputFormat.class); 201// 輸出格式 202job.setOutputFormatClass(TextOutputFormat.class); 203 204return job.waitForCompletion(true) ? 0 : 1; 205} 206 207public static void main(String[] args) throws Exception{ 208int exitCode = ToolRunner.run(new SecondrySort(), args); 209System.exit(exitCode); 210} 211 }
測試資料:
20180906 Apple 200
20180904 Apple 200
20180905 Banana 100
20180906 Orange 300
20180906 Banana 400
20180904 Orange 100
20180905 Apple 400
20180904 Banana 300
20180905 Orange 500
執行結果:
20180906 Banana 400
20180906 Orange 300
20180906 Apple 200
20180905 Orange 500
20180905 Apple 400
20180905 Banana 100
20180904 Banana 300
20180904 Apple 200
20180904 Orange 100
總結:
1、在使用實現WritableComparable介面的方式實現自定義比較器時,必須有一個無參的建構函式。否則會報Unable to initialize any output collector的錯誤。
2、readFields和write方法中處理欄位的順序必須一致,否則會報MapReduce Error: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197)的錯誤。
瞭解更多大資料的知識請關注我的微信公眾號: summer_bigdata
歡迎可以掃碼關注本人的公眾號: