一起學Hadoop——實現兩張表之間的連線操作
---恢復內容開始---
之前我們都是學習使用MapReduce處理一張表的資料(一個檔案可視為一張表,hive和關係型資料庫Mysql、Oracle等都是將資料儲存在檔案中)。但是我們經常會遇到處理多張表的場景,不同的資料儲存在不同的檔案中,因此Hadoop也提供了類似傳統關係型資料庫的join操作。Hadoop生態元件的高階框架Hive、Pig等也都實現了join連線操作,編寫類似SQL的語句,就可以在MapReduce中執行,底層的實現也是基於MapReduce。本文介紹如何使用MapReduce實現join操作,為以後學習hive打下基礎。
1、Map端連。
資料在進入到map函式之前就進行連線操作。適用場景:一個檔案比較大,一個檔案比較小,小到可以載入到記憶體中。如果兩個都是大檔案,就會出現OOM記憶體溢位的異常。實現Map端連線操作需要用到Job類的addCacheFile()方法將小檔案分發到各個計算節點,然後載入到節點的記憶體中。
下面通過一個例子來實現Map端join連線操作:
1、僱員employee表資料如下:
name gender age dept_no
Tom male 30 1
Tony male 35 2
Lily female 28 1
Lucy female 32 3
2、部門表dept資料如下:
dept_no dept_name
1 TSD
2 MCD
3 PPD
程式碼實現如下:
1 package join; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.conf.Configured; 5 import org.apache.hadoop.fs.FileSystem; 6 import org.apache.hadoop.mapreduce.Job; 7 import org.apache.hadoop.mapreduce.Reducer; 8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 9 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 11 import org.apache.hadoop.util.Tool; 12 import org.apache.hadoop.io.*; 13 import org.apache.hadoop.util.ToolRunner; 14 import org.apache.hadoop.mapreduce.Mapper; 15 16 import java.io.BufferedReader; 17 import java.io.FileReader; 18 import java.io.IOException; 19 import java.net.URI; 20 import java.util.HashMap; 21 import java.util.Map; 22 import org.apache.hadoop.fs.Path; 23 24 public class MapJoin extends Configured implements Tool { 25 26public static class MapJoinMapper extends Mapper<LongWritable, Text, Text,NullWritable> { 27private Map<Integer, String> deptData = new HashMap<Integer, String>(); 28 29@Override 30protected void setup(Mapper<LongWritable, Text, Text,NullWritable>.Context context) throws IOException, InterruptedException { 31super.setup(context); 32//從快取的中讀取檔案。 33Path[] files = context.getLocalCacheFiles(); 34 //Path file1path = new Path(files[0]); 35BufferedReader reader = new BufferedReader(new FileReader(files[0].toString())); 36String str = null; 37try { 38// 一行一行讀取 39while ((str = reader.readLine()) != null) { 40// 對快取中的資料以" "分隔符進行分隔。 41String[] splits = str.split(" "); 42// 把需要的資料放在Map中。注意不能操作Map的大小,否則會出現OOM的異常 43deptData.put(Integer.parseInt(splits[0]), splits[1]); 44} 45} catch (Exception e) { 46e.printStackTrace(); 47} finally{ 48reader.close(); 49} 50} 51 52@Override 53protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text,NullWritable>.Context context) throws IOException, 54InterruptedException { 55// 獲取從HDFS中載入的表 56String[] values = value.toString().split(" "); 57// 獲取關聯欄位depNo,這個欄位是關鍵 58int depNo = Integer.parseInt(values[3]); 59// 根據deptNo從記憶體中的關聯表中獲取要關聯的屬性depName 60String depName = deptData.get(depNo); 61String resultData = value.toString() + " " + depName; 62// 將資料通過context寫入到Reduce中。 63context.write(new Text(resultData),NullWritable.get()); 64} 65} 66 67public static class MapJoinReducer extends Reducer<Text,NullWritable,Text,NullWritable> { 68public void reduce(Text key, Iterable<NullWritable> values,Context context)throws IOException,InterruptedException{ 69context.write(key,NullWritable.get()); 70} 71} 72 73@Override 74public int run(String[] args) throws Exception { 75Configuration conf = new Configuration(); 76Job job = Job.getInstance(conf, "Total Sort app"); 77//將小表載入到快取中。 78job.addCacheFile(new URI(args[0])); 79job.setJarByClass(MapJoinMapper.class); 80//1.1 設定輸入目錄和設定輸入資料格式化的類 81FileInputFormat.setInputPaths(job,new Path(args[1])); 82job.setInputFormatClass(TextInputFormat.class); 83 84//1.2 設定自定義Mapper類和設定map函式輸出資料的key和value的型別 85job.setMapperClass(MapJoinMapper.class); 86job.setMapOutputKeyClass(Text.class); 87job.setMapOutputValueClass(NullWritable.class); 88 89//1.3 設定reduce數量 90job.setNumReduceTasks(1); 91//設定實現了reduce函式的類 92job.setReducerClass(MapJoinReducer.class); 93 94//設定reduce函式的key值 95job.setOutputKeyClass(Text.class); 96//設定reduce函式的value值 97job.setOutputValueClass(NullWritable.class); 98 99// 判斷輸出路徑是否存在,如果存在,則刪除 100Path mypath = new Path(args[2]); 101FileSystem hdfs = mypath.getFileSystem(conf); 102if (hdfs.isDirectory(mypath)) { 103hdfs.delete(mypath, true); 104} 105 106FileOutputFormat.setOutputPath(job, new Path(args[2])); 107 108return job.waitForCompletion(true) ? 0 : 1; 109} 110 111public static void main(String[] args)throws Exception{ 112 113int exitCode = ToolRunner.run(new MapJoin(), args); 114System.exit(exitCode); 115} 116 }
執行指令碼檔案如下::
1 /usr/local/src/hadoop-2.6.1/bin/hadoop jar MapJoin.jar \ 2 hdfs://hadoop-master:8020/data/dept.txt \ 3 hdfs://hadoop-master:8020/data/employee.txt \ 4 hdfs://hadoop-master:8020/mapjoin_output
執行結果:
Lily female 28 1 TSD Lucy female 32 3 PPD Tom male 30 1 TSD Tony male 35 2 MCD
2、Reduce端連線(Reduce side join)。
資料在Reduce程序中執行連線操作。實現思路:在Map程序中對來自不同表的資料打上標籤,例如來自表employee的資料打上a標籤,來自檔案dept表的資料打上b標籤。然後在Reduce程序,對同一個key,來自不同表的資料進行笛卡爾積操作。請看下圖,我們對錶employee和表dept的dept_no欄位進行關聯,將dept_no欄位當做key。
在MapReduce中,key相同的資料會放在一起,因此我們只需在reduce函式中判斷資料是來自哪張表,來自相同表的資料不進行join。
程式碼如下:
1 public class ReduceJoin extends Configured implements Tool { 2public static class JoinMapper extends 3Mapper<LongWritable,Text,Text,Text> { 4String employeeValue = ""; 5protected void map(LongWritable key, Text value, Context context) 6throws IOException,InterruptedException { 7/* 8* 根據命令列傳入的檔名,判斷資料來自哪個檔案,來自employee的資料打上a標籤,來自dept的資料打上b標籤 9*/ 10String filepath = ((FileSplit)context.getInputSplit()).getPath().toString(); 11String line = value.toString(); 12if (line == null || line.equals("")) return; 13 14if (filepath.indexOf("employee") != -1) { 15String[] lines = line.split(" "); 16if(lines.length < 4) return; 17 18String deptNo = lines[3]; 19employeeValue = line + " a"; 20context.write(new Text(deptNo),new Text(employeeValue)); 21} 22 23else if(filepath.indexOf("dept") != -1) { 24String[] lines = line.split(" "); 25if(lines.length < 2) return; 26String deptNo = lines[0]; 27context.write(new Text(deptNo), new Text(line + " b")); 28} 29} 30} 31 32public static class JoinReducer extends 33Reducer<Text, Text, Text, NullWritable> { 34protected void reduce(Text key, Iterable<Text> values, 35Context context) throws IOException, InterruptedException{ 36List<String[]> lista = new ArrayList<String[]>(); 37List<String[]> listb = new ArrayList<String[]>(); 38 39for(Text val:values) { 40String[] str = val.toString().split(" "); 41//最後一位是標籤位,因此根據最後一位判斷資料來自哪個檔案,標籤為a的資料放在lista中,標籤為b的資料放在listb中 42String flag = str[str.length -1]; 43if("a".equals(flag)) { 44//String valueA = str[0] + " " + str[1] + " " + str[2]; 45lista.add(str); 46} else if("b".equals(flag)) { 47//String valueB = str[0] + " " + str[1]; 48listb.add(str); 49} 50} 51 52for (int i = 0; i < lista.size(); i++) { 53if (listb.size() == 0) { 54continue; 55} else { 56String[] stra = lista.get(i); 57for (int j = 0; j < listb.size(); j++) { 58String[] strb = listb.get(j); 59String keyValue = stra[0] + " " + stra[1] + " " + stra[2] + " " + stra[3] + " " + strb[1]; 60context.write(new Text(keyValue), NullWritable.get()); 61} 62} 63} 64} 65} 66 67@Override 68public int run(String[] args) throws Exception { 69Configuration conf = getConf(); 70GenericOptionsParser optionparser = new GenericOptionsParser(conf, args); 71conf = optionparser.getConfiguration(); 72Job job = Job.getInstance(conf, "Reduce side join"); 73job.setJarByClass(ReduceJoin.class); 74//1.1 設定輸入目錄和設定輸入資料格式化的類 75//FileInputFormat.setInputPaths(job,new Path(args[0])); 76FileInputFormat.addInputPaths(job, conf.get("input_data")); 77 78job.setInputFormatClass(TextInputFormat.class); 79 80//1.2 設定自定義Mapper類和設定map函式輸出資料的key和value的型別 81job.setMapperClass(JoinMapper.class); 82job.setMapOutputKeyClass(Text.class); 83job.setMapOutputValueClass(Text.class); 84 85//1.3 設定reduce數量 86job.setNumReduceTasks(1); 87//設定實現了reduce函式的類 88job.setReducerClass(JoinReducer.class); 89 90//設定reduce函式的key值 91job.setOutputKeyClass(Text.class); 92//設定reduce函式的value值 93job.setOutputValueClass(NullWritable.class); 94 95// 判斷輸出路徑是否存在,如果存在,則刪除 96Path output_dir = new Path(conf.get("output_dir")); 97FileSystem hdfs = output_dir.getFileSystem(conf); 98if (hdfs.isDirectory(output_dir)) { 99hdfs.delete(output_dir, true); 100} 101 102FileOutputFormat.setOutputPath(job, output_dir); 103 104return job.waitForCompletion(true) ? 0 : 1; 105} 106 107public static void main(String[] args)throws Exception{ 108int exitCode = ToolRunner.run(new ReduceJoin(), args); 109System.exit(exitCode); 110} 111 }
執行MapReduce的shell指令碼如下:
1 /usr/local/src/hadoop-2.6.1/bin/hadoop jar ReduceJoin.jar \ 2 -Dinput_data=hdfs://hadoop-master:8020/data/dept.txt,hdfs://hadoop-master:8020/data/employee.txt \ 3 -Doutput_dir=hdfs://hadoop-master:8020/reducejoin_output
總結:
1、Map side join的執行速度比Reduce side join快,因為Reduce side join在shuffle階段會消耗大量的資源。Map side join由於把小表放在記憶體中,所以執行效率很高。
2、當有一張表的資料很小時,小到可以載入到記憶體中,那麼建議使用Map side join。
歡迎關注本人公眾號瞭解更多關於大資料方面的知識: