netty原始碼分析(29)- directArena分配direct記憶體的流程
上一節眼研究了PooledByteBufAllocator
分配記憶體的前兩個步驟,通過ThreadLocal
的方式拿到PoolThreadCache
之後,獲取對應的Arena
。那麼之後就是Arena
具體分配記憶體的步驟,正是本節研究學習的內容。
-
入口
PooledByteBufAllocator#newDirectBuffer()
方法種有如下程式碼:
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
- 可以看到分配的過程如下:
PooledByteBuf cache
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { //拿到PooledByteBuf物件,僅僅是一個物件 PooledByteBuf<T> buf = newByteBuf(maxCapacity); //從cache種分配記憶體,並初始化buf種記憶體地址相關的屬性 allocate(cache, buf, reqCapacity); return buf; }
-
先看第一步
newByteBuf(maxCapacity);
拿到PooledByteBuf
物件
@Override protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) { if (HAS_UNSAFE) { //獲取一個PooledByteBuf return PooledUnsafeDirectByteBuf.newInstance(maxCapacity); } else { return PooledDirectByteBuf.newInstance(maxCapacity); } } static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) { //從帶有回收特性的物件池RECYCLER獲取一個PooledUnsafeDirectByteBuf PooledUnsafeDirectByteBuf buf = RECYCLER.get(); //buf可能是從回收站拿出來的,要進行復用 buf.reuse(maxCapacity); return buf; }
Recycler
是一個基於執行緒本地堆疊的物件池。Recycler
維護了一個ThreadLocal
成員變數,用於返回一個stack
給回收處理器DefaultHandle
,該處理器通過維護這個堆疊來維護PooledUnsafeDirectByteBuf
快取。
private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() { @Override protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) { //Recycler負責用回收處理器handler維護PooledUnsafeDirectByteBuf //handler底層持有一個stack作為物件池,維護物件池,handle同時負責物件回收 //儲存handler為成員變數,使用完該ByteBuf可以呼叫回收方法回收 return new PooledUnsafeDirectByteBuf(handle, 0); } };
//維護了一個`ThreadLocal`,`initialValue`方法返回一個堆疊。 private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() { @Override protected Stack<T> initialValue() { return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor, ratioMask, maxDelayedQueuesPerThread); } @Override protected void onRemoval(Stack<T> value) { // Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead if (value.threadRef.get() == Thread.currentThread()) { if (DELAYED_RECYCLED.isSet()) { DELAYED_RECYCLED.get().remove(value); } } } };
-
再跟蹤
Recycler#get()
方法
public final T get() { if (maxCapacityPerThread == 0) { return newObject((Handle<T>) NOOP_HANDLE); } //獲取對應的堆疊,相當一個回收站 Stack<T> stack = threadLocal.get(); //從棧頂拿出一個來DefaultHandle(回收處理器) //DefaultHandle持有一個value,其實是PooledUnsafeDirectByteBuf DefaultHandle<T> handle = stack.pop(); //沒有回收處理器,說明沒有閒置的ByteBuf if (handle == null) { //新增一個處理器 handle = stack.newHandle(); //回撥,還記得麼?該回調返回一個PooledUnsafeDirectByteBuf //讓處理器持有一個新的PooledUnsafeDirectByteBuf handle.value = newObject(handle); } //如果有,則可直接重複使用 return (T) handle.value; } public final V get() { InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); Object v = threadLocalMap.indexedVariable(index); if (v != InternalThreadLocalMap.UNSET) { return (V) v; } //回撥initialize V value = initialize(threadLocalMap); registerCleaner(threadLocalMap); return value; } private V initialize(InternalThreadLocalMap threadLocalMap) { V v = null; try { //回撥 v = initialValue(); } catch (Exception e) { PlatformDependent.throwException(e); } threadLocalMap.setIndexedVariable(index, v); addToVariablesToRemove(threadLocalMap, this); return v; } DefaultHandle<T> newHandle() { //例項化一個處理器並並且初四話成員變數,該成員變數stack從threalocal中初始化 return new DefaultHandle<T>(this); }
DefaultHandle
用stack
作為快取池維護PooledUnsafeDirectByteBuf
,同理PooledDirectByteBuf
也是一樣的。只不過例項化的物件的實現不一樣而已。
同時,處理器定義了回收的方法是將兌現存回棧內,使用的時候則是從棧頂取出。
static final class DefaultHandle<T> implements Handle<T> { private int lastRecycledId; private int recycleId; boolean hasBeenRecycled; //物件快取池 private Stack<?> stack; private Object value; DefaultHandle(Stack<?> stack) { this.stack = stack; } /** * 定義回收方法,回收物件到stack * @param object */ @Override public void recycle(Object object) { if (object != value) { throw new IllegalArgumentException("object does not belong to handle"); } Stack<?> stack = this.stack; if (lastRecycledId != recycleId || stack == null) { throw new IllegalStateException("recycled already"); } //回收:將自己存進棧中快取起來 stack.push(this); } }
-
到這我們剛剛看完第一步,到第二步重置快取內指標的時候了 ,獲取到
PooledUnsafeDirectByteBuf
的時候,有可能是從快取中取出來的。因此需要複用
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) { //從帶有回收特性的物件池RECYCLER獲取一個PooledUnsafeDirectByteBuf PooledUnsafeDirectByteBuf buf = RECYCLER.get(); //buf可能是從回收站拿出來的,要進行復用 buf.reuse(maxCapacity); return buf; } final void reuse(int maxCapacity) { //重置最大容量 maxCapacity(maxCapacity); //設定引用 setRefCnt(1); //重置指標 setIndex0(0, 0); //重置標記值 discardMarks(); }
-
到這才剛剛完成分配記憶體的第一步(拿到
PooledByteBuf
物件),以上都是僅僅是獲取並且用回收站和回收處理器管理這些物件,這些物件仍然只是一個物件,還沒有分配實際的記憶體。
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { //拿到PooledByteBuf物件,僅僅是一個物件 PooledByteBuf<T> buf = newByteBuf(maxCapacity); //從cache種分配記憶體,並初始化buf種記憶體地址相關的屬性 allocate(cache, buf, reqCapacity); return buf; }
-
跟蹤
PoolArena#allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity)
其整體分配記憶體的邏輯是根據不同規格大小的記憶體需要來的,顯示tiny
和small
規格的,再是normal
規格的。分配也是先嚐試從快取中進行記憶體分配,如果分配失敗再從記憶體堆中進行記憶體分配。當然,分配出來的記憶體回和第一步拿到的PooledByteBuf
進行繫結起來。
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { final int normCapacity = normalizeCapacity(reqCapacity); //不同的規格大小進行記憶體分配 /** * 分配整體邏輯(先判斷tiny和small規格的,再判斷normal規格的) * 1. 嘗試從快取上進行記憶體分配,成功則返回 * 2. 失敗則再從記憶體堆中進行分配記憶體 */ if (isTinyOrSmall(normCapacity)) { // capacity < pageSize int tableIdx; PoolSubpage<T>[] table; boolean tiny = isTiny(normCapacity); //嘗試tiny和small規格的快取記憶體分配 if (tiny) { // < 512 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; } tableIdx = tinyIdx(normCapacity); table = tinySubpagePools; } else { if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; } tableIdx = smallIdx(normCapacity); table = smallSubpagePools; } final PoolSubpage<T> head = table[tableIdx]; /** * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and * {@link PoolChunk#free(long)} may modify the doubly linked list as well. */ synchronized (head) { final PoolSubpage<T> s = head.next; if (s != head) { assert s.doNotDestroy && s.elemSize == normCapacity; long handle = s.allocate(); assert handle >= 0; s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity); incTinySmallAllocation(tiny); return; } } //tiny和small規格的快取記憶體分配嘗試失敗 //從記憶體堆中分配記憶體 synchronized (this) { allocateNormal(buf, reqCapacity, normCapacity); } incTinySmallAllocation(tiny); return; } //normal規格 //如果分配處出來的記憶體大於一個值(chunkSize),則執行allocateHuge if (normCapacity <= chunkSize) { //從快取上進行記憶體分配 if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; } //快取沒有再從記憶體堆中分配記憶體 synchronized (this) { allocateNormal(buf, reqCapacity, normCapacity); ++allocationsNormal; } } else { // Huge allocations are never served via the cache so just call allocateHuge allocateHuge(buf, reqCapacity); } }