如何使用Tunnel SDK上傳/下載MaxCompute複雜型別資料
基於Tunnel SDK如何上傳複雜型別資料到MaxCompute?首先介紹一下MaxCompute複雜資料型別:
複雜資料型別
MaxCompute採用基於ODPS2.0的SQL引擎,豐富了對複雜資料型別型別的支援。MaxCompute支援ARRAY, MAP, STRUCT型別,並且可以任意巢狀使用並提供了配套的內建函式。
型別 | 定義方法 | 構造方法 |
---|---|---|
ARRAY | array;array> | array(1, 2, 3); array(array(1, 2); array(3, 4)) |
MAP | map;map> | map(“k1”, “v1”, “k2”, “v2”);map(1S, array(‘a’, ‘b’), 2S, array(‘x’, ‘y)) |
STRUCT | struct;struct< field1:bigint, field2:array, field3:map> | named_struct(‘x’, 1, ‘y’, 2);named_struct(‘field1’, 100L, ‘field2’, array(1, 2), ‘field3’, map(1, 100, 2, 200) |
複雜型別構造與操作函式
返回型別 | 簽名 | 註釋 |
---|---|---|
MAP | map(K key1, V value1, K key2, V value2, ...) | 使用給定key/value對建立map, 所有key型別一致,必須是基本型別,所有value型別一致,可為任意型別 |
ARRAY | map_keys(Map m) | 將引數中的map的所有key作為陣列返回,輸入NULL,返回NULL |
ARRAY | map_values(MAP m) | 將引數中的map的所有value作為陣列返回,輸入NULL,返回NULL |
int | size(MAP) | 取得給定MAP元素數目 |
TABLE | explode(MAP) | 表生成函式,將給定MAP展開,每個key/value一行,每行兩列分別對應key和value |
ARRAY | array(T value1, T value2, ...) | 使用給定value構造ARRAY,所有value型別一致 |
int | size(ARRAY) | 取得給定ARRAY元素數目 |
boolean | array_contains(ARRAY a, value v) | 檢測給定ARRAY a中是否包含v |
ARRAY | sort_array(ARRAY) | 對給定陣列排序 |
ARRAY | collect_list(T col) | 聚合函式,在給定group內,將col指定的表示式聚合為一個數組 |
ARRAY | collect_set(T col) | 聚合函式,在給定group內,將col指定的表示式聚合為一個無重複元素的集合陣列 |
TABLE | explode(ARRAY) | 表生成函式,將給定ARRAY展開,每個value一行,每行一列對應相應陣列元素 |
TABLE (int, T) | posexplode(ARRAY) | 表生成函式,將給定ARRAY展開,每個value一行,每行兩列分別對應陣列從0開始的下標和陣列元素 |
STRUCT | struct(T1 value1, T2 value2, ...) | 使用給定value列表建立struct, 各value可為任意型別,生成struct的field的名稱依次為col1, col2, ... |
STRUCT | named_struct(name1, value1, name2, value2, ...) | 使用給定name/value列表建立struct, 各value可為任意型別,生成struct的field的名稱依次為name1, name2, ... |
TABLE (f1 T1, f2 T2, ...) | inline(ARRAY>) | 表生成函式,將給定struct陣列展開,每個元素對應一行,每行每個struct元素對應一列 |
Tunnel SDK 介紹
Tunnel 是 ODPS 的資料通道,使用者可以通過 Tunnel 向 ODPS 中上傳或者下載資料。
TableTunnel 是訪問 ODPS Tunnel 服務的入口類,僅支援表資料(非檢視)的上傳和下載。
對一張表或 partition 上傳下載的過程,稱為一個session。session 由一或多個到 Tunnel RESTful API 的 HTTP Request 組成。
session 用 session ID 來標識,session 的超時時間是24小時,如果大批量資料傳輸導致超過24小時,需要自行拆分成多個 session。
資料的上傳和下載分別由 TableTunnel.UploadSession
和 TableTunnel.DownloadSession
這兩個會話來負責。
TableTunnel 提供建立 UploadSession 物件和 DownloadSession 物件的方法.
- 典型表資料上傳流程:
1) 建立 TableTunnel
2) 建立 UploadSession
3) 建立 RecordWriter,寫入 Record
4)提交上傳操作 - 典型表資料下載流程:
1) 建立 TableTunnel
2) 建立 DownloadSession
3) 建立 RecordReader,讀取 Record
基於Tunnel SDK構造複雜型別資料
程式碼示例:
RecordWriter recordWriter = uploadSession.openRecordWriter(0); ArrayRecord record = (ArrayRecord) uploadSession.newRecord(); // prepare data List arrayData = Arrays.asList(1, 2, 3); Map<String, Long> mapData = new HashMap<String, Long>(); mapData.put("a", 1L); mapData.put("c", 2L); List<Object> structData = new ArrayList<Object>(); structData.add("Lily"); structData.add(18); // set data to record record.setArray(0, arrayData); record.setMap(1, mapData); record.setStruct(2, new SimpleStruct((StructTypeInfo) schema.getColumn(2).getTypeInfo(), structData)); // write the record recordWriter.write(record);
從MaxCompute下載複雜型別資料
程式碼示例:
RecordReader recordReader = downloadSession.openRecordReader(0, 1); // read the record ArrayRecord record1 = (ArrayRecord)recordReader.read(); // get array field data List field0 = record1.getArray(0); List<Long> longField0 = record1.getArray(Long.class, 0); // get map field data Map field1 = record1.getMap(1); Map<String, Long> typedField1 = record1.getMap(String.class, Long.class, 1); // get struct field data Struct field2 = record1.getStruct(2);
執行例項
完整程式碼如下:
import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import com.aliyun.odps.Odps; import com.aliyun.odps.PartitionSpec; import com.aliyun.odps.TableSchema; import com.aliyun.odps.account.Account; import com.aliyun.odps.account.AliyunAccount; import com.aliyun.odps.data.ArrayRecord; import com.aliyun.odps.data.RecordReader; import com.aliyun.odps.data.RecordWriter; import com.aliyun.odps.data.SimpleStruct; import com.aliyun.odps.data.Struct; import com.aliyun.odps.tunnel.TableTunnel; import com.aliyun.odps.tunnel.TableTunnel.UploadSession; import com.aliyun.odps.tunnel.TableTunnel.DownloadSession; import com.aliyun.odps.tunnel.TunnelException; import com.aliyun.odps.type.StructTypeInfo; public class TunnelComplexTypeSample { private static String accessId = "<your access id>"; private static String accessKey = "<your access Key>"; private static String odpsUrl = "<your odps endpoint>"; private static String project = "<your project>"; private static String table = "<your table name>"; // partitions of a partitioned table, eg: "pt=\'1\',ds=\'2\'" // if the table is not a partitioned table, do not need it private static String partition = "<your partition spec>"; public static void main(String args[]) { Account account = new AliyunAccount(accessId, accessKey); Odps odps = new Odps(account); odps.setEndpoint(odpsUrl); odps.setDefaultProject(project); try { TableTunnel tunnel = new TableTunnel(odps); PartitionSpec partitionSpec = new PartitionSpec(partition); // ---------- Upload Data --------------- // create upload session for table // the table schema is {"col0": ARRAY<BIGINT>, "col1": MAP<STRING, BIGINT>, "col2": STRUCT<name:STRING,age:BIGINT>} UploadSession uploadSession = tunnel.createUploadSession(project, table, partitionSpec); // get table schema TableSchema schema = uploadSession.getSchema(); // open record writer RecordWriter recordWriter = uploadSession.openRecordWriter(0); ArrayRecord record = (ArrayRecord) uploadSession.newRecord(); // prepare data List arrayData = Arrays.asList(1, 2, 3); Map<String, Long> mapData = new HashMap<String, Long>(); mapData.put("a", 1L); mapData.put("c", 2L); List<Object> structData = new ArrayList<Object>(); structData.add("Lily"); structData.add(18); // set data to record record.setArray(0, arrayData); record.setMap(1, mapData); record.setStruct(2, new SimpleStruct((StructTypeInfo) schema.getColumn(2).getTypeInfo(), structData)); // write the record recordWriter.write(record); // close writer recordWriter.close(); // commit uploadSession, the upload finish uploadSession.commit(new Long[]{0L}); System.out.println("upload success!"); // ---------- Download Data --------------- // create download session for table // the table schema is {"col0": ARRAY<BIGINT>, "col1": MAP<STRING, BIGINT>, "col2": STRUCT<name:STRING,age:BIGINT>} DownloadSession downloadSession = tunnel.createDownloadSession(project, table, partitionSpec); schema = downloadSession.getSchema(); // open record reader, read one record here for example RecordReader recordReader = downloadSession.openRecordReader(0, 1); // read the record ArrayRecord record1 = (ArrayRecord)recordReader.read(); // get array field data List field0 = record1.getArray(0); List<Long> longField0 = record1.getArray(Long.class, 0); // get map field data Map field1 = record1.getMap(1); Map<String, Long> typedField1 = record1.getMap(String.class, Long.class, 1); // get struct field data Struct field2 = record1.getStruct(2); System.out.println("download success!"); } catch (TunnelException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }