我也能寫資料庫:Streaming(上)
概述
在前面兩篇中介紹了儲存和 UDF,然後就開始著手準備streaming了,開始走了些彎路,本以為需要構建起一個簡單的流系統,才能寫 streaming sql
呢,所以跑去看來幾天的flink,然後再仔細研究了calcite的原始碼後發現,其實並不用那麼麻煩,所以這個系列又能繼續了。
現在,我打算用2-3章來說說streaming。
首先streaming是對錶的一種補充,因為他代表著當前和未來的情況,而表則代表著過去。流是連續,流動的記錄的集合,與表不同,流通常不儲存再磁碟上,而是再網路上流動,在記憶體中保留的時間也很短。
但是與表類似,業務上也通常希望以基於關係代數的高階語言查詢流,根據模式進行驗證,並優化以利用可用的資源和演算法。
Calcite
的 Streaming SQL
是標準SQL的擴充套件,而不是另一種 SQL like
的語言。主要原因如下(翻譯自calcite官方文件:
-
對於任何瞭解標準SQL的人來說,流式SQL都很容易學習。
-
語義清晰,無論使用表或是流,都可以返回相同的資料。
-
可以編寫結合流和表的查詢(或者流的歷史記錄,它基本上是記憶體中的表)。
-
許多現有的工具可以生成標準SQL。
-
如果不使用stream關鍵字,則返回常規標準SQL。
介紹了一下基本概念,關於流,還由一點是必須說的,就是視窗
-
tumbling window (GROUP BY)
-
hopping window (multi GROUP BY)
-
sliding window (window functions)
-
cascading window (window functions)
對於視窗和時間的一些理解,也可以看看,我的另外一篇文章《再談Flink》
案例
好了,基礎先說到這,下面來看看程式碼吧,這次其實非常簡單,就可以完成 streaming
了,再一次強調, calcite
的 streaming sql
和 flink
及 spark
的支援不同,不是api級別上的,而是支援 stream
關鍵字來支援流
我們已經有了前面工程的積累,這樣程式碼量非常小的改動就可以完成了。
bookshopStream.json
首先,我們重新定義一個模型檔案,取名 bookshopStream.json
{ "version": "1.0", "defaultSchema": "bookshopstream", "schemas": [ { "name": "bookshopstream", "tables": [ { "name": "BOOK", "type": "custom", "factory": "com.dafei1288.calcite.stream.InMemoryStreamTableFactory", "stream": { "stream": true }, "operand": { "p1": "hello", "p2": "world" } } ] } ] }
這裡我們對 schema
並沒有過多的設定,而是直接對 tables
屬性進行了設定,將 factory
指定為 com.dafei1288.calcite.stream.InMemoryStreamTableFactory
,這類後續在細講。這裡我們將表名定義為 BOOK
,意在後續使用之前案例的 Storage
。
InMemoryStreamTableFactory
public class InMemoryStreamTableFactory implements TableFactory { @Override public Table create(SchemaPlus schema, String name, Map operand, RelDataType rowType) { System.out.println(operand); System.out.println(name); return new InMemoryStreamTable(name, Storage.getTable(name)); } }
因為在模型裡,直接指定了 TableFactory
,這個類的職責就是構建 Table
表物件,其職責,有點類似之前案例裡的 InMemorySchema
類的 public Map<String, Table> getTableMap()
方法。前文描述了過,指定了 "name": "BOOK"
,所以,在這裡程式碼執行的結果就是載入了 BOOK
表。
InMemoryStreamTable
public class InMemoryStreamTable extends InMemoryTable implements StreamableTable { public InMemoryStreamTable(String name, Storage.DummyTable it) { super(name, it); } @Override public Table stream() { System.out.println("streaming ....."); return this; } }
這裡,為了能複用之前的儲存邏輯,所以直接繼承了 InMemoryTable
,所以,這個實現,其實底層並不是一個徹底的 streaming
實現,而是和之前案例一直的記憶體實現,但是這樣就可以通過stream關鍵字,來進行sql查詢了。
測試
public class TestStreamJDBC { public static void main(String[] args) { try { Class.forName("org.apache.calcite.jdbc.Driver"); } catch (ClassNotFoundException e1) { e1.printStackTrace(); } System.setProperty("saffron.default.charset", ConversionUtil.NATIVE_UTF16_CHARSET_NAME); System.setProperty("saffron.default.nationalcharset",ConversionUtil.NATIVE_UTF16_CHARSET_NAME); System.setProperty("saffron.default.collation.name",ConversionUtil.NATIVE_UTF16_CHARSET_NAME + "$en_US"); Properties info = new Properties(); String jsonmodle = "E:\\working\\others\\寫作\\calcitetutorial\\src\\main\\resources\\bookshopStream.json"; try { Connection connection = DriverManager.getConnection("jdbc:calcite:model=" + jsonmodle, info); CalciteConnection calciteConn = connection.unwrap(CalciteConnection.class); ResultSet result = null; Statement st = connection.createStatement(); st = connection.createStatement(); //where b.name = '資料山' result = st.executeQuery("select stream * from BOOK as b "); while(result.next()) { System.out.println(result.getString(1)+" \t "+result.getString(2)+" \t "+result.getString(3)+" \t "+result.getString(4)); } result.close(); }catch(Exception e){ e.printStackTrace(); } } }
select stream * from BOOK as b
這裡撰寫了一個簡單的SQL,並使用了 stream
關鍵字,結果如下。
{p1=hello, p2=world, modelUri=E:\working\others\寫作\calcitetutorial\src\main\resources\bookshopStream.json, baseDirectory=E:\working\others\寫作\calcitetutorial\src\main\resources} BOOK streaming ..... scan ...... 1 1 資料山 java 2 2 大關 sql 3 1 lili sql 4 3 ten c#
那麼對於一個非stream表,使用stream關鍵字,會怎麼樣呢?那麼我們會得到一個異常
ERROR: Cannot convert table 'xxx' to a stream
結尾
目前只是完成了最基礎的查詢,程式碼已提交到demo倉庫
TBD