Room-是如何做到響應式的?
前言
因為我們常用的Rxjava,所以這裡會結合RxRoom做分析,所以需要你有Rxjava相關的知識儲備。
完整原始碼參見JavaSample/app/src/main/java/com/example/android/observability/persistence" target="_blank" rel="nofollow,noindex">googlesamples ,可以自己跑一下,體驗一下。
簡單用法
我在閱讀一份程式碼時最喜歡的入手點就是先看怎麼用,從用法去往前推導。
看下面的簡單程式碼,也就是所謂的三大元件。
//首先是實體類對應到資料庫也就是一個表 @Entity(tableName = "users") public class User { @PrimaryKey @ColumnInfo(name = "userid") private String mId; @ColumnInfo(name = "username") private String mUserName; ... } //資料操作類 @Dao public interface UserDao { @Query("SELECT * FROM users LIMIT 1") Flowable<User> getUser(); @Insert(onConflict = OnConflictStrategy.REPLACE) void insertUser(User user); ... } //Database @Database( entities = {User.class,UserGroup.class}, version = 1) public abstract class UsersDatabase extends RoomDatabase { private static volatile UsersDatabase INSTANCE; public abstract UserDao userDao(); public static UsersDatabase getInstance(Context context) { if (INSTANCE == null) { synchronized (UsersDatabase.class) { if (INSTANCE == null) { INSTANCE = Room.databaseBuilder( context.getApplicationContext(), UsersDatabase.class, "Sample.db") .build(); } } } return INSTANCE; } }
定義好上面的3個東西,我們就能直接用來增刪改查
了。
val database = UsersDatabase.getInstance(this) //插入一條資料 val user = User("kingty") database.userDao().insertUser(user) //監聽一個查詢的實時變化 database.userDao().getUser().subscribe { Log.d("TAG", "table changed => " + it.userName + " => " + it.id) }
用法非常簡單,做一些註解,它就自動幫你完成了DAO
中的所有操作,並且還可以監聽資料庫的變化實時更新資料。這一切看起來比較夢幻。那麼問題來了,它是怎麼做到這一切的?
怎麼通過註解就可以操作資料庫了?
其實ORM
的本質就是用一些手段把你的資料類
和操作
變成SQL語句
而已。而這一切無非就是兩種手段,編譯時做(apt)或者執行時做(reflect)。出於效率考慮現在大部分都是編譯時做。編譯以下,所以我們來翻一翻build
目錄,看一下Room給我們生成了一些什麼東西。
-
build
-
generate
-
source
-
apt
-
com.kingty.roomtest
- UserDao_Impl.java
- UsersDatabase_Impl.java
-
com.kingty.roomtest
-
apt
-
source
-
generate
就發現在上面這個目錄下生成了兩個類,這裡我們不深究這裡是怎麼生成這兩個類的,說起來可以說兩天兩夜。其實是我也不懂。有興趣的同學應該早就知道了,沒興趣的我說不說也無所謂了。總之就是,編譯的時候通過我們剛才那些個註解給我們生成了真正的實現類,幫助我們完成了我們想要的操作。
來讓我們看一下里面都生成了一些什麼?
先看一下UserDao_Impl.java
這個類做了什麼?
x@SuppressWarnings("unchecked") public final class UserDao_Impl implements UserDao { private final RoomDatabase __db; private final EntityInsertionAdapter __insertionAdapterOfUser; private final SharedSQLiteStatement __preparedStmtOfDeleteAllUsers; public UserDao_Impl(RoomDatabase __db) { this.__db = __db; this.__insertionAdapterOfUser = new EntityInsertionAdapter<User>(__db) { @Override public String createQuery() { return "INSERT OR REPLACE INTO `users`(`userid`,`username`) VALUES (?,?)"; } @Override public void bind(SupportSQLiteStatement stmt, User value) { if (value.getId() == null) { stmt.bindNull(1); } else { stmt.bindString(1, value.getId()); } if (value.getUserName() == null) { stmt.bindNull(2); } else { stmt.bindString(2, value.getUserName()); } } }; this.__preparedStmtOfDeleteAllUsers = new SharedSQLiteStatement(__db) { @Override public String createQuery() { final String _query = "DELETE FROM Users"; return _query; } }; } @Override public void insertUser(User user) { __db.beginTransaction(); try { __insertionAdapterOfUser.insert(user); __db.setTransactionSuccessful(); } finally { __db.endTransaction(); } } @Override public void deleteAllUsers() { final SupportSQLiteStatement _stmt = __preparedStmtOfDeleteAllUsers.acquire(); __db.beginTransaction(); try { _stmt.executeUpdateDelete(); __db.setTransactionSuccessful(); } finally { __db.endTransaction(); __preparedStmtOfDeleteAllUsers.release(_stmt); } } @Override public Flowable<User> getUser() { final String _sql = "SELECT * FROM users LIMIT 1"; final RoomSQLiteQuery _statement = RoomSQLiteQuery.acquire(_sql, 0); return RxRoom.createFlowable(__db, new String[]{"users"}, new Callable<User>() { @Override public User call() throws Exception { final Cursor _cursor = DBUtil.query(__db, _statement, false); try { final int _cursorIndexOfMId = _cursor.getColumnIndexOrThrow("userid"); final int _cursorIndexOfMUserName = _cursor.getColumnIndexOrThrow("username"); final User _result; if(_cursor.moveToFirst()) { final String _tmpMId; _tmpMId = _cursor.getString(_cursorIndexOfMId); final String _tmpMUserName; _tmpMUserName = _cursor.getString(_cursorIndexOfMUserName); _result = new User(_tmpMId,_tmpMUserName); } else { _result = null; } return _result; } finally { _cursor.close(); } } @Override protected void finalize() { _statement.release(); } }); } }
首先,在初始化UserDao_Impl
的時候通過EntityInsertionAdapter
幫你建立了插入資料的adapter
,裡面有插入資料的sql模板,和繫結資料的方法。通俗一點說就是在這裡幫你拼好了插入資料的SQL語句。接下來就是幫你實現了你在UserDao
中定義的介面。
增刪改
的套路大概類似,都是__db.beginTransaction();
然後執行拼好的SQL語句,然後__db.endTransaction();
注意此DB非彼DB,這個DB是封裝過的RoomDatabase。後面我們會著重講一下這個beginTransaction
和endTransaction
,他們還是比較重要的一個環節。
重要環節一
。
查
的套路就不一樣了,為什麼它不一樣,public Flowable<User> getUser()
這個方法明顯看起來大坨一些,這就是它為什麼不一樣。簡單閱讀一下,它其實利用RxRoom建立了一個Flowable
,其中有3個引數我們注意一下,第一個是__db
也就是database,第二個是new String[]{"users"}
,是一個表名的陣列,第3個就是查詢完成之後組裝成User
的回撥。特別注意的是第二個引數,這個表名的陣列的作用。這是
重要環節二
。
再看看UsersDatabase_Impl.java
這個類的生成了什麼?
@SuppressWarnings("unchecked") public final class UsersDatabase_Impl extends UsersDatabase { private volatile UserDao _userDao; @Override protected SupportSQLiteOpenHelper createOpenHelper(DatabaseConfiguration configuration) { final SupportSQLiteOpenHelper.Callback _openCallback = new RoomOpenHelper(configuration, new RoomOpenHelper.Delegate(1) { @Override public void createAllTables(SupportSQLiteDatabase _db) { _db.execSQL("CREATE TABLE IF NOT EXISTS `users` (`userid` TEXT NOT NULL, `username` TEXT, PRIMARY KEY(`userid`))"); _db.execSQL("CREATE TABLE IF NOT EXISTS `usergroups` (`userGroupId` TEXT NOT NULL, `groupName` TEXT, PRIMARY KEY(`userGroupId`))"); _db.execSQL("CREATE TABLE IF NOT EXISTS room_master_table (id INTEGER PRIMARY KEY,identity_hash TEXT)"); _db.execSQL("INSERT OR REPLACE INTO room_master_table (id,identity_hash) VALUES(42, \"8890a9730e4846f27da03382221fc877\")"); } @Override public void dropAllTables(SupportSQLiteDatabase _db) { _db.execSQL("DROP TABLE IF EXISTS `users`"); _db.execSQL("DROP TABLE IF EXISTS `usergroups`"); } @Override protected void onCreate(SupportSQLiteDatabase _db) { if (mCallbacks != null) { for (int _i = 0, _size = mCallbacks.size(); _i < _size; _i++) { mCallbacks.get(_i).onCreate(_db); } } } @Override public void onOpen(SupportSQLiteDatabase _db) { mDatabase = _db; internalInitInvalidationTracker(_db); if (mCallbacks != null) { for (int _i = 0, _size = mCallbacks.size(); _i < _size; _i++) { mCallbacks.get(_i).onOpen(_db); } } } @Override public void onPreMigrate(SupportSQLiteDatabase _db) { DBUtil.dropFtsSyncTriggers(_db); } @Override public void onPostMigrate(SupportSQLiteDatabase _db) { } @Override protected void validateMigration(SupportSQLiteDatabase _db) { final HashMap<String, TableInfo.Column> _columnsUsers = new HashMap<String, TableInfo.Column>(2); _columnsUsers.put("userid", new TableInfo.Column("userid", "TEXT", true, 1)); _columnsUsers.put("username", new TableInfo.Column("username", "TEXT", false, 0)); final HashSet<TableInfo.ForeignKey> _foreignKeysUsers = new HashSet<TableInfo.ForeignKey>(0); final HashSet<TableInfo.Index> _indicesUsers = new HashSet<TableInfo.Index>(0); final TableInfo _infoUsers = new TableInfo("users", _columnsUsers, _foreignKeysUsers, _indicesUsers); final TableInfo _existingUsers = TableInfo.read(_db, "users"); if (! _infoUsers.equals(_existingUsers)) { throw new IllegalStateException("Migration didn't properly handle users(com.kingty.roomtest.User).\n" + " Expected:\n" + _infoUsers + "\n" + " Found:\n" + _existingUsers); } final HashMap<String, TableInfo.Column> _columnsUsergroups = new HashMap<String, TableInfo.Column>(2); _columnsUsergroups.put("userGroupId", new TableInfo.Column("userGroupId", "TEXT", true, 1)); _columnsUsergroups.put("groupName", new TableInfo.Column("groupName", "TEXT", false, 0)); final HashSet<TableInfo.ForeignKey> _foreignKeysUsergroups = new HashSet<TableInfo.ForeignKey>(0); final HashSet<TableInfo.Index> _indicesUsergroups = new HashSet<TableInfo.Index>(0); final TableInfo _infoUsergroups = new TableInfo("usergroups", _columnsUsergroups, _foreignKeysUsergroups, _indicesUsergroups); final TableInfo _existingUsergroups = TableInfo.read(_db, "usergroups"); if (! _infoUsergroups.equals(_existingUsergroups)) { throw new IllegalStateException("Migration didn't properly handle usergroups(com.kingty.roomtest.UserGroup).\n" + " Expected:\n" + _infoUsergroups + "\n" + " Found:\n" + _existingUsergroups); } } }, "8890a9730e4846f27da03382221fc877", "1fdb937160bfb054175cfe5daf922b3b"); final SupportSQLiteOpenHelper.Configuration _sqliteConfig = SupportSQLiteOpenHelper.Configuration.builder(configuration.context) .name(configuration.name) .callback(_openCallback) .build(); final SupportSQLiteOpenHelper _helper = configuration.sqliteOpenHelperFactory.create(_sqliteConfig); return _helper; } @Override protected InvalidationTracker createInvalidationTracker() { final HashMap<String, String> _shadowTablesMap = new HashMap<String, String>(0); HashMap<String, Set<String>> _viewTables = new HashMap<String, Set<String>>(0); return new InvalidationTracker(this, _shadowTablesMap, _viewTables, "users","usergroups"); } @Override public void clearAllTables() { super.assertNotMainThread(); final SupportSQLiteDatabase _db = super.getOpenHelper().getWritableDatabase(); try { super.beginTransaction(); _db.execSQL("DELETE FROM `users`"); _db.execSQL("DELETE FROM `usergroups`"); super.setTransactionSuccessful(); } finally { super.endTransaction(); _db.query("PRAGMA wal_checkpoint(FULL)").close(); if (!_db.inTransaction()) { _db.execSQL("VACUUM"); } } } @Override public UserDao userDao() { if (_userDao != null) { return _userDao; } else { synchronized(this) { if(_userDao == null) { _userDao = new UserDao_Impl(this); } return _userDao; } } } }
這個類中邏輯比較清晰。首先是建立了一個SupportSQLiteOpenHelper
來幫你拼了一些必要的SQL語句,比如create table,drop table,open和migrate遷移資料等等。後面還有一個初始化真正的DAO實現類UserDao_Impl
和刪除相關的表資料的方法clearAllTables
這些都是一些比較好理解的。然後我們會看到一個我們不好理解的方法createInvalidationTracker ()
,這個是用來做什麼的?我們先把這個疑問留下,叫做重要環節三
正式初略閱讀Room的原始碼
下面我們提出幾個問題:
-
在上面生成的程式碼中我們看到操作的執行都是通過一個叫
RoomDatabase
的_db
來做的,那RoomDatabase
是什麼? -
重要環節一,在執行前後
beginTransaction
和endTransaction
做了什麼? -
重要環節二,建立
Flowable
的時候做了什麼,為什麼需要table names
? -
重要環節三,
createInvalidationTracker()
是做什麼用的? - 最後,串聯起上面的問題,當一個表發生更改,監聽一個查詢的實時變化是怎麼做到的?
帶著這幾個問題,我們大體的去閱讀一下原始碼。閱讀過程中我們有一個原則,就是先不要特別在意細節,先捋通大概的邏輯流程。如果你對細節感興趣,再去扣細節。
在專案中加以下引用
implementation 'androidx.room:room-runtime:2.1.0-alpha01' annotationProcessor 'androidx.room:room-compiler:2.1.0-alpha01' implementation 'androidx.room:room-rxjava2:2.1.0-alpha01'
編譯之後我們可以在External Libraries
目錄下看到以下幾個包:
- androidx.room:room-commom
- androidx.room:room-runtime
- androidx.room:room-rxjava
- androidx.sqlite:sqlite
- androidx.sqlite:sqlite-framework
我先簡單的介紹下這幾個包大概是做什麼的。
androidx.sqlite:sqlite
這個包主要是重新定義了一層SQLite的Support介面。
androidx.sqlite:sqlite-framework
這個包主要是利用原有的android的Sqlite
相關的API實現了上面定義的介面。
這兩個包主要是對原有的API做了一層代理封裝,我的理解是便於擴充套件。因此我們在看Room
程式碼的時候這部分程式碼大概瀏覽一下就OK,不必深究。
androidx.room:room-commom
包中定義了一些公共的屬性,和我們用到的所有的註解。
androidx.room:room-runtime
是我們需要主要閱讀的邏輯所在的包,Room的核心邏輯都在這個包中。
androidx.room:room-rxjava
當我們需要返回一個Rx包裝過的結果的時候,需要這個包。裡面就是一個重要類RxRoom.java
用來把Query包裝成一個可觀察的物件。
下面我們帶著上面的問題來看一下程式碼。
RoomDatabase
是做什麼的?
程式碼太長就不全貼,我們看一下它持有的成員變數
protected volatile SupportSQLiteDatabase mDatabase; private Executor mQueryExecutor; private SupportSQLiteOpenHelper mOpenHelper; private final InvalidationTracker mInvalidationTracker; private boolean mAllowMainThreadQueries; boolean mWriteAheadLoggingEnabled;
它其實是對資料庫的進一步封裝,利用真正的SupportSQLiteDatabase
和你UsersDatabase_Impl
自動生成的createOpenHelper()
提供的SupportSQLiteOpenHelper
來操作資料庫。進一步封裝了Transcation
並封裝了一些其他的邏輯。
beginTransaction
和endTransaction
做了什麼?
實際上這也是上一個問題的一部分,先看看beginTransaction
的程式碼
/** * Wrapper for {@link SupportSQLiteDatabase#beginTransaction()}. */ public void beginTransaction() { assertNotMainThread();//禁止主執行緒執行 SupportSQLiteDatabase database = mOpenHelper.getWritableDatabase();//拿到真正的資料庫物件 mInvalidationTracker.syncTriggers(database);//?? database.beginTransaction();//開啟事務 }
從上面的程式碼來看其他3句都非常好理解,正常的資料庫事務流程,但是在開啟事務之前做了一個操作mInvalidationTracker.syncTriggers(database);
我們先不忙解釋這個是什麼意思。我們在看看endTransaction
/** * Wrapper for {@link SupportSQLiteDatabase#endTransaction()}. */ public void endTransaction() { mOpenHelper.getWritableDatabase().endTransaction();//結束事務 if (!inTransaction()) { // enqueue refresh only if we are NOT in a transaction. Otherwise, wait for the last // endTransaction call to do it. mInvalidationTracker.refreshVersionsAsync(); } }
第一句是正常的結束事務的語句,但是結束之後等待最後一個事務結束,會做一個操作mInvalidationTracker.refreshVersionsAsync();
也就是說在開啟事務之前,結束事務之後都呼叫了InvalidationTracker做了一些邏輯,再結合上面的第四個問題,重要環節三createInvalidationTracker()是做什麼用的?
,一切問題都指向了InvalidationTracker
。
InvalidationTracker
這個類是什麼作用,我們先從上面的第四個問題看起。
createInvalidationTracker()
是做什麼用的?
下面是在自動生成的UsersDatabase_Impl.java
類中的方法
@Override protected InvalidationTracker createInvalidationTracker() { final HashMap<String, String> _shadowTablesMap = new HashMap<String, String>(0); HashMap<String, Set<String>> _viewTables = new HashMap<String, Set<String>>(0); return new InvalidationTracker(this, _shadowTablesMap, _viewTables, "users","usergroups"); }
在RoomDatabase在被初始化的時候呼叫這個方法賦值給成員變數
/** * Creates a RoomDatabase. * <p> * You cannot create an instance of a database, instead, you should acquire it via * {@link Room#databaseBuilder(Context, Class, String)} or * {@link Room#inMemoryDatabaseBuilder(Context, Class)}. */ public RoomDatabase() { mInvalidationTracker = createInvalidationTracker(); }
因此回答上面的問題就是createInvalidationTracker()
給RoomDatabase提供了一個mInvalidationTracker例項。
mInvalidationTracker起的作用是什麼?
我們來看一下RoomDatabase.java
中的呼叫流程。
1.初始化 public RoomDatabase() { mInvalidationTracker = createInvalidationTracker(); } 2.init中呼叫mInvalidationTracker.startMultiInstanceInvalidation(configuration.context,configuration.name); @CallSuper public void init(@NonNull DatabaseConfiguration configuration) { mOpenHelper = createOpenHelper(configuration); boolean wal = false; if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.JELLY_BEAN) { wal = configuration.journalMode == JournalMode.WRITE_AHEAD_LOGGING; mOpenHelper.setWriteAheadLoggingEnabled(wal); } mCallbacks = configuration.callbacks; mQueryExecutor = configuration.queryExecutor; mAllowMainThreadQueries = configuration.allowMainThreadQueries; mWriteAheadLoggingEnabled = wal; if (configuration.multiInstanceInvalidation) { mInvalidationTracker.startMultiInstanceInvalidation(configuration.context, configuration.name); } } 3.每次Transaction 開始之前呼叫mInvalidationTracker.syncTriggers /** * Wrapper for {@link SupportSQLiteDatabase#beginTransaction()}. */ public void beginTransaction() { assertNotMainThread(); SupportSQLiteDatabase database = mOpenHelper.getWritableDatabase(); mInvalidationTracker.syncTriggers(database); database.beginTransaction(); } 4. 最後一個Transaction 結束之後呼叫mInvalidationTracker.refreshVersionsAsync /** * Wrapper for {@link SupportSQLiteDatabase#endTransaction()}. */ public void endTransaction() { mOpenHelper.getWritableDatabase().endTransaction(); if (!inTransaction()) { // enqueue refresh only if we are NOT in a transaction. Otherwise, wait for the last // endTransaction call to do it. mInvalidationTracker.refreshVersionsAsync(); } } 5,close資料庫的時候mInvalidationTracker.stopMultiInstanceInvalidation();與第2步對應。 /** * Closes the database if it is already open. */ public void close() { if (isOpen()) { try { mCloseLock.lock(); mInvalidationTracker.stopMultiInstanceInvalidation(); mOpenHelper.close(); } finally { mCloseLock.unlock(); } } }
上面就是InvalidationTracker在RoomDatabase中的整個生命週期中的呼叫情況。從程式碼上來看它其實是在track整個資料的更改情況,因為它在每個transcation前後做了一些呼叫。結合上面最後的一個問題當一個表發生更改,監聽一個查詢的實時變化是怎麼做到的
。大概可以猜測出來這個類的主要作用是來保證資料發生更改的時候,保證可以通知到這個表上其他的Query。
怎麼樣實現的監聽?
我們發現上面還有一個問題我們還沒有提到重要環節二,建立Flowable的時候做了什麼,為什麼需要table names?
我們從這裡入手講起。先看下面RxRoom
中的程式碼
public static Flowable<Object> createFlowable(final RoomDatabase database, final String... tableNames) { return Flowable.create(new FlowableOnSubscribe<Object>() { @Override public void subscribe(final FlowableEmitter<Object> emitter) throws Exception { final InvalidationTracker.Observer observer = new InvalidationTracker.Observer( tableNames) { @Override public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) { if (!emitter.isCancelled()) { emitter.onNext(NOTHING); } } }; if (!emitter.isCancelled()) { database.getInvalidationTracker().addObserver(observer); emitter.setDisposable(Disposables.fromAction(new Action() { @Override public void run() throws Exception { database.getInvalidationTracker().removeObserver(observer); } })); } // emit once to avoid missing any data and also easy chaining if (!emitter.isCancelled()) { emitter.onNext(NOTHING); } } }, BackpressureStrategy.LATEST); } /** * Helper method used by generated code to bind a Callable such that it will be run in * our disk io thread and will automatically block null values since RxJava2 does not like null. * * @hide */ @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP) public static <T> Flowable<T> createFlowable(final RoomDatabase database, final String[] tableNames, final Callable<T> callable) { Scheduler scheduler = Schedulers.from(database.getQueryExecutor()); final Maybe<T> maybe = Maybe.fromCallable(callable); return createFlowable(database, tableNames) .observeOn(scheduler) .flatMapMaybe(new Function<Object, MaybeSource<T>>() { @Override public MaybeSource<T> apply(Object o) throws Exception { return maybe; } }); }
在上面生成的UserDao_Impl.java
類中getUser()
這個方法中呼叫的createFlowable
這個方法,也就是上面的第二個方法,它實際上呼叫的上面的第一個方法flatmap到這個本次的查詢。也就是說只要第一個方法中的Flowable發射一次資料,那麼這個查詢就會執行一次,並返回結果(也就是執行這個callable)。這裡應該就能看出一點端倪,其實第一個方法就是創建出來一個觀察這個表變化的觀察者InvalidationTracker.Observer
並把它新增到InvalidationTracker的觀察者列表中去,因為一個表肯定不止一個觀察者,所有的Query應該都需要觀察表的更改。也就是上面的這行程式碼database.getInvalidationTracker().addObserver(observer);
到這裡RxRoom
這個類的使命就完成了,他就是這樣一個簡單的功能,後面你也不需要再關心它。
InvalidationTracker.Observer
是一個靜態類,就注意一下其中的一個方法
/** * Called when one of the observed tables is invalidated in the database. * * @param tables A set of invalidated tables. This is useful when the observer targets *multiple tables and you want to know which table is invalidated. This will *be names of underlying tables when you are observing views. public abstract void onInvalidated(@NonNull Set<String> tables);
從備註上已經寫的很清楚了,就是表發生更改狀態的時候會呼叫這個方法,emitter
就會發射資料,通知Query
去requery.
我們著重看一下addObserver
這個方法幹了什麼?
@WorkerThread public void addObserver(@NonNull Observer observer) { final String[] tableNames = resolveViews(observer.mTables); int[] tableIds = new int[tableNames.length]; final int size = tableNames.length; long[] versions = new long[tableNames.length]; // TODO sync versions ? for (int i = 0; i < size; i++) { Integer tableId = mTableIdLookup.get(tableNames[i].toLowerCase(Locale.US)); if (tableId == null) { throw new IllegalArgumentException("There is no table with name " + tableNames[i]); } tableIds[i] = tableId; versions[i] = mMaxVersion; } ObserverWrapper wrapper = new ObserverWrapper(observer, tableIds, tableNames, versions); ObserverWrapper currentObserver; synchronized (mObserverMap) { currentObserver = mObserverMap.putIfAbsent(observer, wrapper); } if (currentObserver == null && mObservedTableTracker.onAdded(tableIds)) { syncTriggers(); } }
首先對Observer
做了一層包裝,主要就是包裝了當表發生變化的時候通過各種方式去通知也就是執行mObserver.onInvalidated(invalidatedTables);
,接下來,把包裝後的wrapper放進map裡。然後在滿足特定條件下會執行syncTriggers();
這個似曾相識,在上面RoomDatabase
開始一個事務之前也執行這個方法。我們來仔細看看這個方法做了什麼。
void syncTriggers(SupportSQLiteDatabase database) { if (database.inTransaction()) { // we won't run this inside another transaction. return; } try { // This method runs in a while loop because while changes are synced to db, another // runnable may be skipped. If we cause it to skip, we need to do its work. while (true) { Lock closeLock = mDatabase.getCloseLock(); closeLock.lock(); try { // there is a potential race condition where another mSyncTriggers runnable // can start running right after we get the tables list to sync. final int[] tablesToSync = mObservedTableTracker.getTablesToSync(); if (tablesToSync == null) { return; } final int limit = tablesToSync.length; try { database.beginTransaction(); for (int tableId = 0; tableId < limit; tableId++) { switch (tablesToSync[tableId]) { case ObservedTableTracker.ADD: startTrackingTable(database, tableId); break; case ObservedTableTracker.REMOVE: stopTrackingTable(database, tableId); break; } } database.setTransactionSuccessful(); } finally { database.endTransaction(); } mObservedTableTracker.onSyncCompleted(); } finally { closeLock.unlock(); } } } catch (IllegalStateException | SQLiteException exception) { // may happen if db is closed. just log. Log.e(Room.LOG_TAG, "Cannot run invalidation tracker. Is the db closed?", exception); } }
這個方法看起來很長,其實是在做一件事.ObservedTableTracker
維護了一個需要被觀察的表的列表,就是發現有新的表需要被觀察就執行startTrackingTable(database, tableId);
,有表不需要被觀察了就執行stopTrackingTable(database, tableId);
。
繼續往下看,看看這兩個方法做了什麼?
private void stopTrackingTable(SupportSQLiteDatabase writableDb, int tableId) { final String tableName = mShadowTableLookup.get(tableId, mTableNames[tableId]); StringBuilder stringBuilder = new StringBuilder(); for (String trigger : TRIGGERS) { stringBuilder.setLength(0); stringBuilder.append("DROP TRIGGER IF EXISTS "); appendTriggerName(stringBuilder, tableName, trigger); writableDb.execSQL(stringBuilder.toString()); } } private void startTrackingTable(SupportSQLiteDatabase writableDb, int tableId) { final String tableName = mShadowTableLookup.get(tableId, mTableNames[tableId]); StringBuilder stringBuilder = new StringBuilder(); for (String trigger : TRIGGERS) { stringBuilder.setLength(0); stringBuilder.append("CREATE TEMP TRIGGER IF NOT EXISTS "); appendTriggerName(stringBuilder, tableName, trigger); stringBuilder.append(" AFTER ") .append(trigger) .append(" ON `") .append(tableName) .append("` BEGIN INSERT OR REPLACE INTO ") .append(UPDATE_TABLE_NAME) .append(" VALUES(null, ") .append(tableId) .append("); END"); writableDb.execSQL(stringBuilder.toString()); } }
插曲: InvalidationTracker自己維護了一個叫room_table_modification_log
的表,有兩個欄位,一個是version
它是自增的,還有一個是table_id,是被觀察的表的標識。
其實就是當需要去觀察一個表的時候startTrackingTable ()
就在資料庫上建立了三個資料庫的Trigger
。關於Trigger
是什麼這是資料庫基礎知識,請自備。也就是說,只要在這個表上發生了插入修改或者刪除,就會往room_table_modification_log
表裡面插入一條資料INSERT OR REPLACE INTO room_table_modification_log VALUES(null, table_id)
。
當不需要觀察一個表的時候,就通過stopTrackingTable
把這三個Trigger
刪除掉。
以上就是我們在建立一個Query做的事情。
我們先對建立一個Query的流程做一個小的總結:
- 通過自動生成的程式碼建立一個Flowable
- RxRoom會根據這個Flowable建立一個InvalidationTracker.Observer
- InvalidationTracker把這個Observer加到自己的觀察列表中
- 如果之前沒有人觀察過這個表,會去建立這個表上修改的Trigger
到這裡,我們似乎應該有一點頭緒了,既然每次有資料更新的時候就會往這個表中插入一條資料,那在每一個Trascation
結束之後去查這個表就應該可以知道哪些表上的Query可以更新。所以我們回到上面的RoomDatabase
中看看endTrasction
之後的mInvalidationTracker.refreshVersionsAsync();
到底做了什麼?
/** * Enqueues a task to refresh the list of updated tables. * <p> * This method is automatically called when {@link RoomDatabase#endTransaction()} is called but * if you have another connection to the database or directly use {@link * SupportSQLiteDatabase}, you may need to call this manually. */ @SuppressWarnings("WeakerAccess") public void refreshVersionsAsync() { // TODO we should consider doing this sync instead of async. if (mPendingRefresh.compareAndSet(false, true)) { mDatabase.getQueryExecutor().execute(mRefreshRunnable); } } @VisibleForTesting Runnable mRefreshRunnable = new Runnable() { @Override public void run() { final Lock closeLock = mDatabase.getCloseLock(); boolean hasUpdatedTable = false; try { closeLock.lock(); if (!ensureInitialization()) { return; } if (!mPendingRefresh.compareAndSet(true, false)) { // no pending refresh return; } if (mDatabase.inTransaction()) { // current thread is in a transaction. when it ends, it will invoke // refreshRunnable again. mPendingRefresh is left as false on purpose // so that the last transaction can flip it on again. return; } mCleanupStatement.executeUpdateDelete(); mQueryArgs[0] = mMaxVersion; if (mDatabase.mWriteAheadLoggingEnabled) { // This transaction has to be on the underlying DB rather than the RoomDatabase // in order to avoid a recursive loop after endTransaction. SupportSQLiteDatabase db = mDatabase.getOpenHelper().getWritableDatabase(); try { db.beginTransaction(); hasUpdatedTable = checkUpdatedTable(); db.setTransactionSuccessful(); } finally { db.endTransaction(); } } else { hasUpdatedTable = checkUpdatedTable(); } } catch (IllegalStateException | SQLiteException exception) { // may happen if db is closed. just log. Log.e(Room.LOG_TAG, "Cannot run invalidation tracker. Is the db closed?", exception); } finally { closeLock.unlock(); } if (hasUpdatedTable) { synchronized (mObserverMap) { for (Map.Entry<Observer, ObserverWrapper> entry : mObserverMap) { entry.getValue().notifyByTableVersions(mTableVersions); } } } } private boolean checkUpdatedTable() { boolean hasUpdatedTable = false; Cursor cursor = mDatabase.query(SELECT_UPDATED_TABLES_SQL, mQueryArgs); //noinspection TryFinallyCanBeTryWithResources try { while (cursor.moveToNext()) { final long version = cursor.getLong(0); final int tableId = cursor.getInt(1); mTableVersions[tableId] = version; hasUpdatedTable = true; // result is ordered so we can safely do this assignment mMaxVersion = version; } } finally { cursor.close(); } return hasUpdatedTable; } } static final String SELECT_UPDATED_TABLES_SQL = "SELECT * FROM " + UPDATE_TABLE_NAME + " WHERE " + VERSION_COLUMN_NAME + "> ? ORDER BY " + VERSION_COLUMN_NAME + " ASC;";
它實際上是執行了mRefreshRunnable
的,這個runnerable的邏輯非常清晰,先做一些邊界檢測,然後去checkUpdatedTable
,看有沒有用表在變化,怎麼檢測。看上面的sql語句,就是去查room_table_modification_log
中相同的table_id的version,如果有大於之前儲存的maxversion的資料,說明有新的修改。然後呼叫ObserverWrapper 中的notifyByTableVersions
去通知表上的觀察者。
這也就回到了上面最後一個問題當一個表發生更改,監聽一個查詢的實時變化是怎麼做到的?
。
MultiInstanceInvalidation
到這裡我們還漏了一點沒有講到。那就是剛才說InvalidationTracker在RoomDatabase中的整個生命週期中的呼叫情況的時候還有初始化的時候和關閉資料庫的時候執行了
mInvalidationTracker.startMultiInstanceInvalidation(configuration.context,configuration.name); 和 mInvalidationTracker.stopMultiInstanceInvalidation();
因為我們在引用中不可能永遠是單標上的查詢。也就是說我們一個查詢可能是連表的查詢,那麼這個查詢的更新就會依賴於多個表的觀察操作。這就引出了框架中的一個經典的CS結構的兩個類MultiInstanceInvalidationClient
,MultiInstanceInvalidationService
在初始化RoomDatabase
的時候我們會開啟一個Client也就是startMultiInstanceInvalidation
,其實就是建立了有一個Client
void startMultiInstanceInvalidation(Context context, String name) { mMultiInstanceInvalidationClient = new MultiInstanceInvalidationClient(context, name, this, mDatabase.getQueryExecutor()); }
看一下Client初始化的過程
MultiInstanceInvalidationClient(Context context, String name, InvalidationTracker invalidationTracker, Executor executor) { mContext = context.getApplicationContext(); mName = name; mInvalidationTracker = invalidationTracker; mExecutor = executor; mObserver = new InvalidationTracker.Observer(invalidationTracker.mTableNames) { @Override public void onInvalidated(@NonNull Set<String> tables) { if (mStopped.get()) { return; } try { mService.broadcastInvalidation(mClientId, tables.toArray(new String[0])); } catch (RemoteException e) { Log.w(Room.LOG_TAG, "Cannot broadcast invalidation", e); } } @Override boolean isRemote() { return true; } }; Intent intent = new Intent(mContext, MultiInstanceInvalidationService.class); mContext.bindService(intent, mServiceConnection, Context.BIND_AUTO_CREATE); }
從上面來看,其實在建立RoomDatabase
的時候建立Client的時候,我們就也建立了一個InvalidationTracker.Observer
,並且新增進InvalidationTracker
的觀察列表,當這個表發生更新的時候會通過服務端ServicebroadcastInvalidation
方法去通知客戶端Client。
@SuppressWarnings("WeakerAccess") final Runnable mSetUpRunnable = new Runnable() { @Override public void run() { try { final IMultiInstanceInvalidationService service = mService; if (service != null) { mClientId = service.registerCallback(mCallback, mName); mInvalidationTracker.addObserver(mObserver); } } catch (RemoteException e) { Log.w(Room.LOG_TAG, "Cannot register multi-instance invalidation callback", e); } } };
而每個Client在Setup的時候會去service.registerCallback
final IMultiInstanceInvalidationCallback mCallback = new IMultiInstanceInvalidationCallback.Stub() { @Override public void onInvalidation(final String[] tables) { mExecutor.execute(new Runnable() { @Override public void run() { mInvalidationTracker.notifyObserversByTableNames(tables); } }); } };
這個callback
就是是說收到broadcastInvalidation
的資訊的時候會去執行。
這個流程就是在多個RoomDatabase
之間是如何溝通的,也就是說在其他的RoomDatabase
也修改了你這個表,那是如何通知到你發生改變的。
小結
到這裡我們在整體上把這個Room是如何做到響應式的做了一個框架的解析。基本上也已經瀏覽了整個Room的核心程式碼。當然其中還有很多的細節,如果感興趣可以自己去好好讀一下。因為可能我也不太清楚。我也是初略的讀了一下做了一些自己的分析。肯定有理解不對的地方。大家閱讀過程中請辯證看待,多多指正。