netty原始碼分析(28)- PooledByteBufAllocator分析
上一節分析了UnpooledByteBufAllocator
,包括了堆內堆外記憶體是如何分配的,底層時時如何獲取資料內容的。
本節分析分析PooledByteBufAllocator
,看看它是怎麼做Pooled
型別的記憶體管理的。
-
入口
PooledByteBufAllocator#newHeapBuffer()
和PooledByteBufAllocator#newDirectBuffer()
,堆內記憶體和堆外記憶體分配的模式都比較固定
PoolThreadCache rena arena
@Override protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { //拿到執行緒區域性快取 PoolThreadCache cache = threadCache.get(); //拿到heapArena PoolArena<byte[]> heapArena = cache.heapArena; final ByteBuf buf; if (heapArena != null) { //使用heapArena分配記憶體 buf = heapArena.allocate(cache, initialCapacity, maxCapacity); } else { buf = PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) : new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); } @Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { //拿到執行緒區域性快取 PoolThreadCache cache = threadCache.get(); //拿到directArena PoolArena<ByteBuffer> directArena = cache.directArena; final ByteBuf buf; if (directArena != null) { //使用directArena分配記憶體 buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); }
-
跟蹤threadCache.get()
呼叫的是FastThreadLocal#get()
方法。那麼其實threadCache
也是一個FastThreadLocal
,可以看成是jdk的ThreadLocal
,只不過還了一種跟家塊的是西安方法。get
方發住喲啊是呼叫了初始化方法initialize
public final V get() { InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); Object v = threadLocalMap.indexedVariable(index); if (v != InternalThreadLocalMap.UNSET) { return (V) v; } //呼叫初始化方法 V value = initialize(threadLocalMap); registerCleaner(threadLocalMap); return value; }
private final PoolThreadLocalCache threadCache;
initialValue()
方法的邏輯如下
-
從預先準備好的
heapArenas
和directArenas
中獲取最少使用的arena
-
使用獲取到的
arean
為引數,例項化一個PoolThreadCache
並返回
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { private final boolean useCacheForAllThreads; PoolThreadLocalCache(boolean useCacheForAllThreads) { this.useCacheForAllThreads = useCacheForAllThreads; } @Override protected synchronized PoolThreadCache initialValue() { /** * arena翻譯成競技場,關於記憶體非配的邏輯都在這個競技場中進行分配 */ //獲取heapArena:從heapArenas堆內競技場中拿出使用最少的一個arena final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); //獲取directArena:從directArena堆內競技場中拿出使用最少的一個arena final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); Thread current = Thread.currentThread(); if (useCacheForAllThreads || current instanceof FastThreadLocalThread) { //建立PoolThreadCache:該Cache最終被一個執行緒使用 //通過heapArena和directArena維護兩大塊記憶體:堆和堆外記憶體 //通過tinyCacheSize,smallCacheSize,normalCacheSize維護ByteBuf快取列表維護反覆使用的記憶體塊 return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } // No caching so just use 0 as sizes. return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0); } //省略程式碼...... }
檢視PoolThreadCache
其維護了兩種型別的記憶體分配策略,一種是上述通過持有heapArena
和directArena
,另一種是通過維護tiny
,small
,normal
對應的快取列表來維護反覆使用的記憶體。
final class PoolThreadCache { private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class); //通過arena的方式維護記憶體 final PoolArena<byte[]> heapArena; final PoolArena<ByteBuffer> directArena; //維護了tiny, small, normal三種類型的快取列表 // Hold the caches for the different size classes, which are tiny, small and normal. private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches; private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches; private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches; private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches; private final MemoryRegionCache<byte[]>[] normalHeapCaches; private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches; // Used for bitshifting when calculate the index of normal caches later private final int numShiftsNormalDirect; private final int numShiftsNormalHeap; private final int freeSweepAllocationThreshold; private final AtomicBoolean freed = new AtomicBoolean(); private int allocations; // TODO: Test if adding padding helps under contention //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity"); this.freeSweepAllocationThreshold = freeSweepAllocationThreshold; //通過持有heapArena和directArena,arena的方式管理記憶體分配 this.heapArena = heapArena; this.directArena = directArena; //通過tinyCacheSize,smallCacheSize,normalCacheSize建立不同型別的快取列表並儲存到成員變數 if (directArena != null) { tinySubPageDirectCaches = createSubPageCaches( tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); smallSubPageDirectCaches = createSubPageCaches( smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small); numShiftsNormalDirect = log2(directArena.pageSize); normalDirectCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, directArena); directArena.numThreadCaches.getAndIncrement(); } else { // No directArea is configured so just null out all caches tinySubPageDirectCaches = null; smallSubPageDirectCaches = null; normalDirectCaches = null; numShiftsNormalDirect = -1; } if (heapArena != null) { // Create the caches for the heap allocations //建立規格化快取佇列 tinySubPageHeapCaches = createSubPageCaches( tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); //建立規格化快取佇列 smallSubPageHeapCaches = createSubPageCaches( smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small); numShiftsNormalHeap = log2(heapArena.pageSize); //建立規格化快取佇列 normalHeapCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, heapArena); heapArena.numThreadCaches.getAndIncrement(); } else { // No heapArea is configured so just null out all caches tinySubPageHeapCaches = null; smallSubPageHeapCaches = null; normalHeapCaches = null; numShiftsNormalHeap = -1; } // Only check if there are caches in use. if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null) && freeSweepAllocationThreshold < 1) { throw new IllegalArgumentException("freeSweepAllocationThreshold: " + freeSweepAllocationThreshold + " (expected: > 0)"); } } private static <T> MemoryRegionCache<T>[] createSubPageCaches( int cacheSize, int numCaches, SizeClass sizeClass) { if (cacheSize > 0 && numCaches > 0) { //MemoryRegionCache 維護快取的一個物件 @SuppressWarnings("unchecked") MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches]; for (int i = 0; i < cache.length; i++) { // TODO: maybe use cacheSize / cache.length //每一種MemoryRegionCache(tiny,small,normal)都表示不同記憶體大小(不同規格)的一個佇列 cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass); } return cache; } else { return null; } } private static <T> MemoryRegionCache<T>[] createNormalCaches( int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) { if (cacheSize > 0 && maxCachedBufferCapacity > 0) { int max = Math.min(area.chunkSize, maxCachedBufferCapacity); int arraySize = Math.max(1, log2(max / area.pageSize) + 1); //MemoryRegionCache 維護快取的一個物件 @SuppressWarnings("unchecked") MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize]; for (int i = 0; i < cache.length; i++) { //每一種MemoryRegionCache(tiny,small,normal)都表示不同記憶體(不同規格)大小的一個佇列 cache[i] = new NormalMemoryRegionCache<T>(cacheSize); } return cache; } else { return null; } } ...... }
通過檢視分配快取的方法PoolThreadCache#createSubPageCaches()
可以發現具體維護的快取列表物件MemoryRegionCache
實際上時維護了一個Queue<Entry<T>> queue
也就是佇列。
private abstract static class MemoryRegionCache<T> { private final int size; private final Queue<Entry<T>> queue; private final SizeClass sizeClass; private int allocations; MemoryRegionCache(int size, SizeClass sizeClass) { //做一個簡單的規格化 this.size = MathUtil.safeFindNextPositivePowerOfTwo(size); //持有這種規格的快取佇列 queue = PlatformDependent.newFixedMpscQueue(this.size); this.sizeClass = sizeClass; } ...... }
-
關於準備好的記憶體競技場
heapArena
和directArena
被PooledByteBufAllocator
持有。在例項化分配器的時候被初始化值
private final PoolArena<byte[]>[] heapArenas; private final PoolArena<ByteBuffer>[] directArenas; //三種快取列表長度 private final int tinyCacheSize; private final int smallCacheSize; private final int normalCacheSize;
跟蹤初始化的過程可以發現,其實headArena
和directArena
都是一個PoolArena[]
,其內部分別定義了兩個內部類PoolArena.HeapArena
和PoolArena.DirectArena
分別表示堆內記憶體競技場和堆外記憶體競技場。
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize, boolean useCacheForAllThreads, int directMemoryCacheAlignment) { super(preferDirect); threadCache = new PoolThreadLocalCache(useCacheForAllThreads); this.tinyCacheSize = tinyCacheSize; this.smallCacheSize = smallCacheSize; this.normalCacheSize = normalCacheSize; chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder); checkPositiveOrZero(nHeapArena, "nHeapArena"); checkPositiveOrZero(nDirectArena, "nDirectArena"); checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment"); if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) { throw new IllegalArgumentException("directMemoryCacheAlignment is not supported"); } if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) { throw new IllegalArgumentException("directMemoryCacheAlignment: " + directMemoryCacheAlignment + " (expected: power of two)"); } int pageShifts = validateAndCalculatePageShifts(pageSize); //建立兩種記憶體分配的PoolArena陣列,heapArenas和directArenas if (nHeapArena > 0) { //建立heapArenas記憶體競技場(其實是PoolArena[]) //nHeapArena:陣列大小 heapArenas = newArenaArray(nHeapArena); List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length); for (int i = 0; i < heapArenas.length; i ++) { //堆內:PoolArena[]存放它下面的HeapArena PoolArena.HeapArena arena = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment); heapArenas[i] = arena; metrics.add(arena); } heapArenaMetrics = Collections.unmodifiableList(metrics); } else { heapArenas = null; heapArenaMetrics = Collections.emptyList(); } if (nDirectArena > 0) { //建立heapArenas記憶體競技場(其實是PoolArena[]) directArenas = newArenaArray(nDirectArena); List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length); for (int i = 0; i < directArenas.length; i ++) { //堆外:PoolArena[]存放它下面的DirectArena PoolArena.DirectArena arena = new PoolArena.DirectArena( this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment); directArenas[i] = arena; metrics.add(arena); } directArenaMetrics = Collections.unmodifiableList(metrics); } else { directArenas = null; directArenaMetrics = Collections.emptyList(); } metric = new PooledByteBufAllocatorMetric(this); }
private static <T> PoolArena<T>[] newArenaArray(int size) { //建立PoolArena陣列 return new PoolArena[size]; }
初始化記憶體競技場陣列的大家的預設值為defaultMinNumArena
,2被的cpu核心數,執行時每個執行緒可獨享一個arena,記憶體分配的時候就不用加鎖了
public PooledByteBufAllocator(boolean preferDirect) { this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER); }
//2倍cpu核心數,預設建立這個數量大小的Arena陣列 // (這個數字和建立NioEventLoop陣列的數量一致,每個執行緒都可以由一個獨享的arena,這個陣列中的arena其實在分配記憶體的時候是不用加鎖的) final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2; final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER; DEFAULT_NUM_HEAP_ARENA = Math.max(0, SystemPropertyUtil.getInt( "io.netty.allocator.numHeapArenas", (int) Math.min( defaultMinNumArena, runtime.maxMemory() / defaultChunkSize / 2 / 3))); DEFAULT_NUM_DIRECT_ARENA = Math.max(0, SystemPropertyUtil.getInt( "io.netty.allocator.numDirectArenas", (int) Math.min( defaultMinNumArena, PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));
-
整體分配架構,如圖
假設初始化了4個
NioEventLoop
也就是4個執行緒的陣列,預設cpu核心數為2。那麼記憶體分配器PooledByteBufAllocator
持有的arena
數量也是4個。建立一個ByteBuf的過程如下:
-
首先,通過
PoolThreadCache
去拿到一個對應的arena
物件。那麼PoolThreadCache
的作用就是通過ThreadLoad
的方式把記憶體分配器PooledByteBufAllocator
持有的arena
陣列中其中的一個arena(最少使用的)
塞到PoolThreadCache
的一個成員變數裡面。 -
然後,當每個執行緒通過它(
threadCache
)去呼叫get
方法的時候,會拿到它底層的一個arena
,也就是第一個執行緒拿到第一個,第二個執行緒拿到第二個以此類推。這樣可以把執行緒和arena
進行一個繫結 -
PoolThreadCache
除了可以直接在arena
管理的這塊記憶體進行記憶體分配,還可在它底層維護的一個ByteBuf快取列表裡進行記憶體分配。在PooledByteBufAllocator
中持有tinyCacheSize
,smallCacheSize
,normalCacheSize
,分配記憶體時呼叫threadCache.get();
的時候例項化PoolThreadCache
作為它的構造方法引數傳入,建立了對應的快取列表。