本文编写:@万钰臻 @唐湘润 @赵旭阳
合作同学:@袁一博 @王明哲 @杨勇勇 @黄冰尧
引言
K/N 的内存管理器和 GC,和主流虚拟机基本一样,主要功能如下:
-
K/N 使用自己的 custom 内存分配器,每个线程有自己的 tlab
-
默认垃圾回收器通过 Stop-the-world 标记和并发清除收集器,并且不会将堆分代
-
当前只支持弱引用,当标记阶段完成后,GC 会处理弱引用,并使指向未标记对象的引用无效
要监控 GC 性能,需要在 Gradle 构建脚本中设置以下编译器选项
代码块:
-Xruntime-logs=gc=info
为了提高 GC 性能,可以在 Gradle 构建脚本启用 cms 垃圾回收器,将存活对象标记与应用程序线程并行运行,减少 GC 暂停时间
代码块:
kotlin.native.binary.gc=cms
从文档看,内存分配器已经比较完善了,但是 GC 性能比较差,默认垃圾回收器是 STW,cms 还需要手动配置。我们从代码层面看一下。
Runtime
通过抓取过 kmp trace,可以看到 runtime 入口
-
鸿蒙 linker 是 ld-musl-aarch64.so,加载 libbenchmark.so,这是 kmp 的编译产物
-
之后执行 workRoutine 方法,这是 Runtime 的入口方法
抖音仓库用的是 kotlin2.0.20, workerRoutine 代码在 kotlin-native 项目 Worker.cpp 文件
-
先调用 Kotlin_initRuntimeIfNeeded 初始化 Runtime
-
然后通过 do/while 循环调用 processQueueElement 处理任务,类似消息循环
代码块:
void* workerRoutine(void* argument){
Worker* worker = reinterpret_cast<Worker*>(argument);
// Kotlin_initRuntimeIfNeeded calls WorkerInit that needs
// to see there's already a worker created for this thread.
::g_worker = worker;
Kotlin_initRuntimeIfNeeded();
// Only run this routine in the runnable state. The moment between this routine exiting and thread
// destructors running will be spent in the native state. `Kotlin_deinitRuntimeCallback` ensures
// that runtime deinitialization switches back to the runnable state.
kotlin::ThreadStateGuard guard(worker->memoryState(), ThreadState::kRunnable);
do {
if (worker->processQueueElement(true) == JOB_TERMINATE) break;
} while (true);
returnnullptr;
}
而 Kotlin_initRuntimeIfNeeded 会调用 initRuntime,每个线程有独立的 runtimeState 变量,通过判断 runtimeState 变量状态避免多次调用 initRuntime
代码块:
RUNTIME_NOTHROW voidKotlin_initRuntimeIfNeeded(){
if (!isValidRuntime()) {
initRuntime();
// Register runtime deinit function at thread cleanup.
konan::onThreadExit(Kotlin_deinitRuntimeCallback, runtimeState);
}
}
THREAD_LOCAL_VARIABLE RuntimeState* runtimeState = kInvalidRuntime;
inlineboolisValidRuntime(){
return ::runtimeState != kInvalidRuntime;
}
initRuntime 具体功能如下:
-
SetKonanTerminateHandler 为线程设置异常处理 Handler,这样可以捕获 kotlin excepiton
-
设置 runtimeState
-
initializeGlobalRuntimeIfNeeded 初始化全局变量
-
InitMemory 初始化线程内存分配器
-
WorkInit 初始化
代码块:
RuntimeState* initRuntime(){
SetKonanTerminateHandler();
RuntimeState* result = new RuntimeState();
if (!result) return kInvalidRuntime;
::runtimeState = result;
bool firstRuntime = initializeGlobalRuntimeIfNeeded();
result->memoryState = InitMemory();
// Switch thread state because worker and globals inits require the runnable state.
// This call may block if GC requested suspending threads.
ThreadStateGuard stateGuard(result->memoryState, kotlin::ThreadState::kRunnable);
result->worker = WorkerInit(result->memoryState);
result->status = RuntimeStatus::kRunning;
return result;
}
initRuntime 过程如图,我们接下来分别分析
ExceptionHandler
SetKonanTerminateHandler 通过 TerminateHandler 调用 std::set_terminate 设置 kotlinHandler 来处理异常
代码块:
// Use one public function to limit access to the class declaration
voidSetKonanTerminateHandler(){
TerminateHandler::install();
}
/// Use machinery like Meyers singleton to provide thread safety
TerminateHandler()
: queuedHandler_((QH)std::set_terminate(kotlinHandler)) {}
GlobalData
initializeGlobalRuntimeIfNeeded 调用 initGlobalMemory 初始化 GlobalData,GlobalData 包括 allocator_内存分配器,gc_垃圾回收器,threadRegistry_线程列表等。GlobalData 是全局变量,所有线程共用,还有 ThreadData 是线程私有的,后续分析
代码块:
voidkotlin::initGlobalMemory()noexcept{
mm::GlobalData::init();
}
// Global (de)initialization is undefined in C++. Use single global singleton to define it for simplicity.
classGlobalData :private Pinned {
public:
ThreadRegistry& threadRegistry()noexcept{ return threadRegistry_; }
GlobalsRegistry& globalsRegistry()noexcept{ return globalsRegistry_; }
SpecialRefRegistry& specialRefRegistry()noexcept{ return specialRefRegistry_; }
gcScheduler::GCScheduler& gcScheduler()noexcept{ return gcScheduler_; }
alloc::Allocator& allocator()noexcept{ return allocator_; }
gc::GC& gc()noexcept{ return gc_; }
ThreadData
InitMemory 通过上面分析的 ThreadRegistry 全局变量的 RegisterCurrentThread 方法,生成 ThreadData,并注册到 list_列表里,这样 gc 时可以访问到 ThreadData 中的 gc root。currentThreadDataNode 是 thread local 变量,每个线程有独立的变量。
代码块:
extern"C"MemoryState* InitMemory(){
mm::GlobalData::waitInitialized();
return mm::ToMemoryState(mm::ThreadRegistry::Instance().RegisterCurrentThread());
}
mm::ThreadRegistry::Node* mm::ThreadRegistry::RegisterCurrentThread() noexcept {
auto lock = list_.LockForIter();
auto* threadDataNode = list_.Emplace(konan::currentThreadId());
Node*& currentDataNode = currentThreadDataNode_;
currentDataNode = threadDataNode;
threadDataNode->Get()->gc().onThreadRegistration();
return threadDataNode;
}
// static
THREAD_LOCAL_VARIABLE mm::ThreadRegistry::Node* mm::ThreadRegistry::currentThreadDataNode_ = nullptr;
ThreadData 包括 threadId_,allocator_, gc_等,每个线程一个对象,这样 allocator_每个线程私有就实现了 tlab
代码块:
// `ThreadData` is supposed to be thread local singleton.
// Pin it in memory to prevent accidental copying.
classThreadDatafinal : privatePinned{
public:
explicit ThreadData(int threadId) noexcept :
threadId_(threadId),
globalsThreadQueue_(GlobalsRegistry::Instance()),
specialRefRegistry_(SpecialRefRegistry::instance()),
gcScheduler_(GlobalData::Instance().gcScheduler(), *this),
allocator_(GlobalData::Instance().allocator()),
gc_(GlobalData::Instance().gc(), *this),
suspensionData_(ThreadState::kNative, *this){}
总结一下,ThreadData 在每个线程内部定义了内存分配器和 GC,关于内存分配器我们后续分析
WorkInit
WorkInit 将 Work 的 thread_变量设置为线程自己,workRoutine 通过 pthread_create 创建新线程 thread_来执行。线程通过 kotlin 代码/c++代码创建,创建好线程之后调用 initRuntime 来初始化
代码块:
Worker* WorkerInit(MemoryState* memoryState){
Worker* worker;
if (::g_worker != nullptr) {
worker = ::g_worker;
} else {
worker = theState()->addWorkerUnlocked(workerExceptionHandling(), nullptr, WorkerKind::kOther);
::g_worker = worker;
}
worker->setThread(pthread_self());
worker->setMemoryState(memoryState);
return worker;
}
voidWorker::startEventLoop(){
kotlin::ThreadStateGuard guard(ThreadState::kNative);
pthread_create(&thread_, nullptr, workerRoutine, this);
}
这里有个问题,既然 workerRoutine 通过 runtime 初始化调用,哪里真正调用 Runtime 呢?
CodeGenerator 会将每个方法中的 kotlin ir 转换为 llvm ir,在这个过程中会插入 initRuntimeIfNeeded 调用。所以每个方法执行时都会先调用 initRuntimeIfNeeded
代码块:
if (needsRuntimeInit || switchToRunnable) {
check(!forbidRuntime) { "Attempt to init runtime where runtime usage is forbidden" }
call(llvm.initRuntimeIfNeeded, emptyList())
}
Runtime 这里分析完了,我们继续看一下 allocator_内存分配器
内存分配
K/N 有 3 种内存分配器:
-
Custom:K/N 自己开发的内存分配器,也是默认的内存分配器
-
Std:标准库内存分配器,在鸿蒙上是 jemalloc
-
Mimalloc:mimalloc 是微软开源的 native 分配器
每个内存分配器都会实现一个 Allocator::ThreadData::Impl 类,比如 CustomAllocator 就对应 Custom 内存分配器,这样 allocator_可以和特定的内存分配器关联
代码块:
classAllocator::ThreadData::Impl : private Pinned {
public:
explicitImpl(Allocator::Impl& allocator)noexcept : alloc_(allocator.heap()){}
alloc::CustomAllocator& alloc()noexcept{ return alloc_; }
private:
CustomAllocator alloc_;
};
ALWAYS_INLINE ObjHeader* alloc::Allocator::ThreadData::allocateObject(const TypeInfo* typeInfo) noexcept {
return impl_->alloc().CreateObject(typeInfo);
}
我们主要看一下 Custom 内存分配器,每个线程有独立的 threadata,通过 threaddata 创建独立的 allocator_。allocator_每次从 heap 申请一个 page(比如中小对象是 256k),之后 page 在线程内部分配内存,我们具体看一下代码
内存创建
在 GCApi.cpp 的 SafeAlloc 方法调用 mmap 创建虚拟内存
-
通过 allocatedBytesCounter 保存分配内存总量
-
onMemoryAllocation 检查是否需要触发 alloc gc
代码块:
void* SafeAlloc(uint64_t size)noexcept{
void* memory;
bool error;
if (compiler::disableMmap()) {
memory = calloc(size, 1);
error = memory == nullptr;
} else {
#if KONAN_WINDOWS
RuntimeFail("mmap is not available on mingw");
#elif KONAN_LINUX || KONAN_OHOS
memory = mmap(nullptr, size, PROT_WRITE | PROT_READ, MAP_ANONYMOUS | MAP_PRIVATE | MAP_NORESERVE | MAP_POPULATE, -1, 0);
error = memory == MAP_FAILED;
#else
memory = mmap(nullptr, size, PROT_WRITE | PROT_READ, MAP_ANONYMOUS | MAP_PRIVATE | MAP_NORESERVE, -1, 0);
error = memory == MAP_FAILED;
#endif
}
if (error) {
konan::consoleErrorf("Out of memory trying to allocate %" PRIu64 "bytes: %s. Aborting.\n", size, strerror(errno));
std::abort();
}
auto previousSize = allocatedBytesCounter.fetch_add(static_cast<size_t>(size), std::memory_order_relaxed);
OnMemoryAllocation(previousSize + static_cast<size_t>(size));
return memory;
}
onMemoryAllocation 通过 HeapGrowthController 的 boundaryForHeapSize 方法来检查 totalAllocatedBytes 是否触发 gc 阈值,我们后续分析
代码块:
voidkotlin::OnMemoryAllocation(size_t totalAllocatedBytes)noexcept{
mm::GlobalData::Instance().gcScheduler().setAllocatedBytes(totalAllocatedBytes);
}
voidsetAllocatedBytes(size_t bytes)noexcept{
// Still checking allocations: with a long running loop all safepoints
// might be "met", so that's the only trigger to not run out of memory.
auto boundary = heapGrowthController_.boundaryForHeapSize(bytes);
switch (boundary) {
case HeapGrowthController::MemoryBoundary::kNone:
safePoint();
return;
case HeapGrowthController::MemoryBoundary::kTrigger:
RuntimeLogDebug({kTagGC}, "Scheduling GC by allocation");
scheduleGC_.scheduleNextEpochIfNotInProgress();
return;
case HeapGrowthController::MemoryBoundary::kTarget:
RuntimeLogDebug({kTagGC}, "Scheduling GC by allocation");
auto epoch = scheduleGC_.scheduleNextEpochIfNotInProgress();
RuntimeLogWarning({kTagGC}, "Pausing the mutators");
mutatorAssists_.requestAssists(epoch);
return;
}
}
Custom 内存分配器通过 CreateObject 和 CreateArray 分配内存
-
CreateObject 分配对象,如果类(typeInfo)加了 TF_HAS_FINALIZER 标记,会通过 extraObject 增加对象弱引用,gc 后调用 finialize 方法,后续分析
-
CreateArray 分配 array
代码块:
ObjHeader* CustomAllocator::CreateObject(const TypeInfo* typeInfo)noexcept{
RuntimeAssert(!typeInfo->IsArray(), "Must not be an array");
auto descriptor = HeapObject::make_descriptor(typeInfo);
auto& heapObject = *descriptor.construct(Allocate(descriptor.size()));
ObjHeader* object = heapObject.header(descriptor).object();
if (typeInfo->flags_ & TF_HAS_FINALIZER) {
auto* extraObject = CreateExtraObject();
object->typeInfoOrMeta_ = reinterpret_cast<TypeInfo*>(new (extraObject) mm::ExtraObjectData(object, typeInfo));
} else {
object->typeInfoOrMeta_ = const_cast<TypeInfo*>(typeInfo);
}
return object;
}
ArrayHeader* CustomAllocator::CreateArray(const TypeInfo* typeInfo, uint32_t count)noexcept{
RuntimeAssert(typeInfo->IsArray(), "Must be an array");
auto descriptor = HeapArray::make_descriptor(typeInfo, count);
CustomAllocDebug("CustomAllocator@%p::CreateArray(%d), total size:%ld", this ,count, (long)descriptor.size());
auto& heapArray = *descriptor.construct(Allocate(descriptor.size()));
ArrayHeader* array = heapArray.header(descriptor).array();
array->typeInfoOrMeta_ = const_cast<TypeInfo*>(typeInfo);
array->count_ = count;
returnarray;
}
对象大小通过 HeapObject 计算,包括 ObjectData/ObjHeader/ObjectBody 三部分
代码块:
structHeapObjHeader {
using descriptor = type_layout::Composite<HeapObjHeader, gc::GC::ObjectData, ObjHeader>;
structHeapObject {
using descriptor = type_layout::Composite<HeapObject, HeapObjHeader, ObjectBody>;
Array 通过 HeapArray 计算,包括 ObjectData, ArrayHeader, arrayBody
代码块:
structHeapArrayHeader {
using descriptor = type_layout::Composite<HeapArrayHeader, gc::GC::ObjectData, ArrayHeader>;
// Header of value type array objects. Keep layout in sync with that of object header.
structArrayHeader {
TypeInfo* typeInfoOrMeta_;
// Elements count. Element size is stored in instanceSize_ field of TypeInfo, negated.
uint32_t count_;
};
structHeapArray {
using descriptor = type_layout::Composite<HeapArray, HeapArrayHeader, ArrayBody>;
具体如下
最后,通过 Allocater 方法决定选用哪个 page,我们后续分析下
代码块:
uint8_t* CustomAllocator::Allocate(uint64_t size)noexcept{
RuntimeAssert(size, "CustomAllocator::Allocate cannot allocate 0 bytes");
//CustomAllocDebug("CustomAllocator::Allocate(%" PRIu64 ")", size);
uint64_t cellCount = (size + sizeof(Cell) - 1) / sizeof(Cell);
if (cellCount <= FIXED_BLOCK_PAGE_MAX_BLOCK_SIZE) {
return AllocateInFixedBlockPage(cellCount);
} elseif (cellCount > NEXT_FIT_PAGE_MAX_BLOCK_SIZE) {
return AllocateInSingleObjectPage(cellCount);
} else {
return AllocateInNextFitPage(cellCount);
}
}
小对象分配
分配 8~1k 字节对象,MAX_BLOCK_SIZE = 128, 每次分配 cell 数量(一个 cell 8 个字节) < 128 时会使用 FixedBlockPage 进行内存分配,每个 page 默认 256k
代码块:
FixedBlockPage* FixedBlockPage::Create(uint32_t blockSize)noexcept{
CustomAllocInfo("FixedBlockPage::Create(%u)", blockSize);
RuntimeAssert(blockSize <= FIXED_BLOCK_PAGE_MAX_BLOCK_SIZE, "blockSize too large for FixedBlockPage");
returnnew (SafeAlloc(FIXED_BLOCK_PAGE_SIZE)) FixedBlockPage(blockSize);
}
inlineconstexprconstsize_t FIXED_BLOCK_PAGE_SIZE = (256 * KiB);
blockSize 是每个 block 的大小,大小在 1~128 个 cell
代码块:
FixedBlockPage::FixedBlockPage(uint32_t blockSize) noexcept : blockSize_(blockSize) {
CustomAllocInfo("FixedBlockPage(%p)::FixedBlockPage(%u)", this, blockSize);
nextFree_.first = 0;
nextFree_.last = FIXED_BLOCK_PAGE_CELL_COUNT / blockSize * blockSize;
end_ = FIXED_BLOCK_PAGE_CELL_COUNT / blockSize * blockSize;
}
TryAllocate 每次返回固定大小 cell,cell 数量取值 1~128
代码块:
uint8_t* FixedBlockPage::TryAllocate() noexcept {
uint32_t next = nextFree_.first;
if (next < nextFree_.last) {
nextFree_.first += blockSize_;
return cells_[next].data;
}
if (next >= end_) return nullptr;
nextFree_ = cells_[next].nextFree;
memset(&cells_[next], 0, sizeof(cells_[next]));
return cells_[next].data;
}
中对象分配
分配 1k~256k 对象,NextFitPage 和 FixedBlockPage 不同,同样创建 256K 大小的内存,每个 page 可以分配不同 cell 数量的对象,而 FixedBlockPage 只能分配固定 cell 对象
代码块:
NextFitPage* NextFitPage::Create(uint32_t cellCount) noexcept {
CustomAllocInfo("NextFitPage::Create(%u)", cellCount);
RuntimeAssert(cellCount < NEXT_FIT_PAGE_CELL_COUNT, "cellCount is too large for NextFitPage");
return new (SafeAlloc(NEXT_FIT_PAGE_SIZE)) NextFitPage(cellCount);
}
inline constexpr const size_t NEXT_FIT_PAGE_SIZE = (256 * KiB);
cells 存放的是每个 cell 编号,从 0~cellCount - 1
代码块:
NextFitPage::NextFitPage(uint32_t cellCount) noexcept : curBlock_(cells_) {
cells_[0] = Cell(0); // Size 0 ensures any actual use would break
cells_[1] = Cell(NEXT_FIT_PAGE_CELL_COUNT - 1);
}
每次从 curBlock(cell)分配 blockSize, 如果不够按照 blockSize 重新分配 cell
代码块:
uint8_t* NextFitPage::TryAllocate(uint32_t blockSize)noexcept{
CustomAllocDebug("NextFitPage@%p::TryAllocate(%u)", this, blockSize);
// +1 accounts for header, since cell->size also includes header cell
uint32_t cellsNeeded = blockSize + 1;
uint8_t* block = curBlock_->TryAllocate(cellsNeeded);
if (block) return block;
UpdateCurBlock(cellsNeeded);
return curBlock_->TryAllocate(cellsNeeded);
}
大对象分配
SingleObjectPage 每次只创建一个对象,大小为 objectSize,主要申请超过 256k 的大对象
代码块:
SingleObjectPage* SingleObjectPage::Create(uint64_t cellCount)noexcept{
CustomAllocInfo("SingleObjectPage::Create(%" PRIu64 ")", cellCount);
RuntimeAssert(cellCount > NEXT_FIT_PAGE_MAX_BLOCK_SIZE, "blockSize too small for SingleObjectPage");
uint64_t size = sizeof(SingleObjectPage) + cellCount * sizeof(uint64_t);
returnnew (SafeAlloc(size)) SingleObjectPage(size);
}
Finalize 对象
不管哪种类型对象,如果需要 finalize,在 createObject 时,通过 ExtraObject 分配 24 字节 ExtraObjectData 内存
ExtraObjectPage 分配 64k 内存
代码块:
ExtraObjectPage* ExtraObjectPage::Create(uint32_t ignored)noexcept{
CustomAllocInfo("ExtraObjectPage::Create()");
returnnew (SafeAlloc(EXTRA_OBJECT_PAGE_SIZE)) ExtraObjectPage();
}
// Optional data that's lazily allocated only for objects that need it.
classExtraObjectData :private Pinned {
private:
// Must be first to match `TypeInfo` layout.
const TypeInfo* typeInfo_;
std::atomic<uint32_t> flags_ = 0;
std::atomic<ObjHeader*> weakReferenceOrBaseObject_;
nextFree 存放 cells 地址,创建 extraObjectCount 个 cell
代码块:
ExtraObjectPage::ExtraObjectPage() noexcept {
nextFree_.store(cells_, std::memory_order_relaxed);
ExtraObjectCell* end = cells_ + EXTRA_OBJECT_COUNT;
for (ExtraObjectCell* cell = cells_; cell < end; cell = cell->next_.load(std::memory_order_relaxed)) {
cell->next_.store(cell + 1, std::memory_order_relaxed);
}
}
TryAllocate 每次分配一个 cell
代码块:
mm::ExtraObjectData* ExtraObjectPage::TryAllocate()noexcept{
auto* next = nextFree_.load(std::memory_order_relaxed);
if (next >= cells_ + EXTRA_OBJECT_COUNT) {
returnnullptr;
}
ExtraObjectCell* freeBlock = next;
nextFree_.store(freeBlock->next_.load(std::memory_order_relaxed), std::memory_order_relaxed);
CustomAllocDebug("ExtraObjectPage(%p)::TryAllocate() = %p", this, freeBlock->Data());
return freeBlock->Data();
}
FinalizerQueue 用于存放 finialze 对象,gc 后会遍历 FinalizerQueue,调用对象 finialize 方法
代码块:
classCustomAllocator {
private:
uint8_t* Allocate(uint64_t cellCount)noexcept;
uint8_t* AllocateInSingleObjectPage(uint64_t cellCount)noexcept;
uint8_t* AllocateInNextFitPage(uint32_t cellCount)noexcept;
uint8_t* AllocateInFixedBlockPage(uint32_t cellCount)noexcept;
Heap& heap_;
NextFitPage* nextFitPage_;
FixedBlockPage* fixedBlockPages_[FIXED_BLOCK_PAGE_MAX_BLOCK_SIZE + 1];
ExtraObjectPage* extraObjectPage_;
FinalizerQueue finalizerQueue_;
总结一下,custom 内存分配器一共有四种内存分配方式,FixedBlockPage/NextFitPage 适用于中小对象,SingleObjecPage 适用于大对象,ExtraObjectPage 适用于需要 finalize 对象的额外数据。
如下是简单总结
上面分析的 FixedBlockPage/SingleObjectPage/NextFitPage 都定义了 Sweep 方法,用于 GC 时回收内存,不同的 GC 算法都会调用同样的 sweep 方法,我们继续看一下 GC
GC
GC 有三种类型,默认 pcms,cms 需要手动配置
-
cms 是并发标记的,只在遍历 gc root 时暂停线程,性能最好
-
stms,需要 stop world 暂停线程,性能很差
-
默认 pcms 可以支持多线程 gc,也会 stop the world 暂停线程
stms 是早期的垃圾回收器,cms 是最新的,我们从代码层面分别看下
stms
GCImpl.cpp 是 GC 实现的接口类,每个 GC 垃圾回收器都需要实现一下,包括几个部分
-
SameThreadMarkAndSweep gc_,GC 整体都是由 SameThreadMarkAndSweep 完成的
-
gcScheduler 调度策略,gcScheduler 后续会分析
代码块:
classGC::Impl : private Pinned {
public:
explicitImpl(alloc::Allocator& allocator, gcScheduler::GCScheduler& gcScheduler)noexcept : gc_(allocator, gcScheduler){}
SameThreadMarkAndSweep& gc()noexcept{ return gc_; }
private:
SameThreadMarkAndSweep gc_;
};
SameThreadMarkAndSweep 在构造函数中创建 GC thread 线程,并通过 state_。waitScheduled 判断是否调用 PerformFullGC,这里用了 do/while 循环,state_是 GCStateHolder 变量
代码块:
gc::SameThreadMarkAndSweep::SameThreadMarkAndSweep(alloc::Allocator& allocator, gcScheduler::GCScheduler& gcScheduler) noexcept :
allocator_(allocator), gcScheduler_(gcScheduler), finalizerProcessor_([this](int64_t epoch) noexcept {
GCHandle::getByEpoch(epoch).finalizersDone();
state_.finalized(epoch);
}) {
gcThread_ = ScopedThread(ScopedThread::attributes().name("GC thread"), [this] {
while (true) {
auto epoch = state_.waitScheduled();
if (epoch.has_value()) {
PerformFullGC(*epoch);
} else {
break;
}
}
});
}
PerformFullGC 主要做几个事情
-
StopTheWord 所有线程将线程暂停执行
-
collectRootSet 收集 gc root
-
Mark 会根据 gc root 标记存活对象
-
processWeaks 处理 weakReference
-
prepareForGC 通知每个线程 customallocator 去掉 page 引用,为存活对象 sweep 提前做准备
-
heap.Sweep 释放非存活对象
-
resumeTheWorld 唤醒线程
-
finalizerProcessor 调用对象 finialize 方法,之前会收集所有线程的 finalize 对象
代码块:
void gc::SameThreadMarkAndSweep::PerformFullGC(int64_t epoch) noexcept {
stopTheWorld(gcHandle, "GC stop the world");
gc::collectRootSet<internal::MarkTraits>(gcHandle, markQueue_, [](mm::ThreadData&) { returntrue; });
gc::Mark<internal::MarkTraits>(gcHandle, markQueue_);
gc::processWeaks<DefaultProcessWeaksTraits>(gcHandle, mm::SpecialRefRegistry::instance());
// This should really be done by each individual thread while waiting
int threadCount = 0;
for (auto& thread : kotlin::mm::ThreadRegistry::Instance().LockForIter()) {
thread.allocator().prepareForGC();
++threadCount;
}
allocator_.prepareForGC();
// also sweeps extraObjects
auto finalizerQueue = allocator_.impl().heap().Sweep(gcHandle);
for (auto& thread : kotlin::mm::ThreadRegistry::Instance().LockForIter()) {
finalizerQueue.mergeFrom(thread.allocator().impl().alloc().ExtractFinalizerQueue());
}
finalizerQueue.mergeFrom(allocator_.impl().heap().ExtractFinalizerQueue());
resumeTheWorld(gcHandle);
finalizerProcessor_.ScheduleTasks(std::move(finalizerQueue.regular), epoch);
mainThreadFinalizerProcessor_.schedule(std::move(finalizerQueue.mainThread), epoch);
}
具体流程如图
collectRootSet 通过 collectRootSetForThread 从线程 stack/tls gc root, collectRootSetGlobals 读取 static 和 jni 调用的 gc root,最终放到 markQueue
代码块:
// TODO: This needs some tests now.
template <typename Traits, typename F>
voidcollectRootSet(GCHandle handle, typename Traits::MarkQueue& markQueue, F&& filter)noexcept{
Traits::clear(markQueue);
for (auto& thread : mm::GlobalData::Instance().threadRegistry().LockForIter()) {
if (!filter(thread))
continue;
thread.Publish();
collectRootSetForThread<Traits>(handle, markQueue, thread);
}
collectRootSetGlobals<Traits>(handle, markQueue);
}
Mark 方法会从 markQueue 中取出存活对象,然后调用 processInMark 处理成员变量
代码块:
template <typename Traits>
voidMark(GCHandle::GCMarkScope& markHandle, typename Traits::MarkQueue& markQueue)noexcept{
while (ObjHeader* top = Traits::tryDequeue(markQueue)) {
markHandle.addObject();
Traits::processInMark(markQueue, top);
// TODO: Consider moving it before processInMark to make the latter something of a tail call.
if (auto* extraObjectData = mm::ExtraObjectData::Get(top)) {
internal::processExtraObjectData<Traits>(markHandle, markQueue, *extraObjectData, top);
}
}
}
和 android 不同,kmp 会通过静态代码分析判断对象在栈上还是堆上分配。
栈上分配的对象在方法调用结束后可以返回,通过 field->heap 判断变量在堆上还是栈上,栈上的对象不需要放到 markQueue。
代码块:
template <typename Traits>
voidprocessFieldInMark(void* state, ObjHeader* object, ObjHeader* field)noexcept{
auto& markQueue = *static_cast<typename Traits::MarkQueue*>(state);
if (field->heap()) {
Traits::tryEnqueue(markQueue, field);
}
ifconstexpr(!Traits::kAllowHeapToStackRefs){
if (object->heap()) {
RuntimeAssert(!field->local(), "Heap object %p references stack object %p[typeInfo=%p]", object, field, field->type_info());
}
}
}
tryEnqueue 将对象的 ObjectData(上面分析过,在每个对象开头 8 个字节),通过 tryPush 放到 queue 里面
代码块:
static ALWAYS_INLINE booltryEnqueue(AnyQueue& queue, ObjHeader* object)noexcept{
auto& objectData = alloc::objectDataForObject(object);
bool pushed = queue.tryPush(objectData);
return pushed;
}
这里 queue 实现上是一个链表,每个元素是 ObjectData 中的 next_变量,如果对象 next_有值,说明已经 mark 过,直接返回。sweep 时判断 next_有值就不会释放对象
代码块:
std::optional<iterator> try_insert_after(iterator pos, reference value) noexcept {
RuntimeAssert(pos != end(), "Attempted to try_insert_after end()");
RuntimeAssert(pos != iterator(), "Attempted to try_insert_after empty iterator");
if (!trySetNext(&value, next(pos.node_))) {
return std::nullopt;
}
setNext(pos.node_, &value);
return iterator(&value);
}
void setNext(ObjectData* next) noexcept {
RuntimeAssert(next, "next cannot be nullptr");
next_.store(next, std::memory_order_relaxed);
}
bool trySetNext(ObjectData* next) noexcept {
RuntimeAssert(next, "next cannot be nullptr");
ObjectData* expected = nullptr;
return next_.compare_exchange_strong(expected, next, std::memory_order_relaxed);
}
具体逻辑如下
从代码看,stms 代码逻辑非常完整,但是 stw 会造成线程暂停,影响性能,pmcs 和 stms 实现差不多。
我们继续看下 cms 如何去掉 stop the world
cms
从代码看,cms 在遍历 gc root 时才会 stop the world,主要实现在 markDispatcher_。runMainInSTW
代码块:
void gc::ConcurrentMarkAndSweep::PerformFullGC(int64_t epoch) noexcept {
std::unique_lock mainGCLock(gcMutex);
auto gcHandle = GCHandle::create(epoch);
stopTheWorld(gcHandle, "GC stop the world #1: collect root set");
auto& scheduler = gcScheduler_;
scheduler.onGCStart();
state_.start(epoch);
markDispatcher_.runMainInSTW();
在 completeMutatorSRootSet 获取到 gc root 后,通过 resumeTheWorld 唤醒线程,这样后续 Mark 阶段就不会暂停线程了。在 Mark 阶段新产生的对象都是存活对象
代码块:
void gc::mark::ConcurrentMark::runMainInSTW() {
ParallelProcessor::Worker mainWorker(*parallelProcessor_);
// create mutator mark queues
for (auto& thread : *lockedMutatorsList_) {
thread.gc().impl().gc().mark().markQueue().construct(*parallelProcessor_);
}
completeMutatorsRootSet(mainWorker);
// global root set must be collected after all the mutator's global data have been published
collectRootSetGlobals<MarkTraits>(gcHandle(), mainWorker);
barriers::enableBarriers(gcHandle().getEpoch());
resumeTheWorld(gcHandle());
具体流程图
GCScheduler
默认是 adaptive 模式,通过 GC timer thread 线程在应用处于前台时定时触发 GC, config_。regularGcInterval 指定,默认 10s
代码块:
classGCSchedulerDataAdaptive{
public:
GCSchedulerDataAdaptive(GCSchedulerConfig& config, std::function<int64_t()> scheduleGC) noexcept :
config_(config),
scheduleGC_(std::move(scheduleGC)),
appStateTracking_(mm::GlobalData::Instance().appStateTracking()),
heapGrowthController_(config),
regularIntervalPacer_(config),
timer_("GC Timer thread", config_.regularGcInterval(), [this] {
if (appStateTracking_.state() == mm::AppStateTracking::State::kBackground) {
return;
}
if (regularIntervalPacer_.NeedsGC()) {
RuntimeLogDebug({kTagGC}, "Scheduling GC by timer");
scheduleGC_.scheduleNextEpochIfNotInProgress();
}
}) {
}
也可以在 alloc 对象时触发,boundaryForHeapSize 返回 kTrigger 触发 gc,内存分配的时候 safealloc 通过 mmap 分配内存后会调用 setAllocatedBytes 判断是否需要 gc
代码块:
voidsetAllocatedBytes(size_t bytes)noexcept{
auto boundary = heapGrowthController_.boundaryForHeapSize(bytes);
switch (boundary) {
case HeapGrowthController::MemoryBoundary::kNone:
return;
case HeapGrowthController::MemoryBoundary::kTrigger:
scheduleGC_.scheduleNextEpochIfNotInProgress();
return;
case HeapGrowthController::MemoryBoundary::kTarget:
mutatorAssists_.requestAssists(epoch);
return;
}
}
判断条件是已分配内存 totalAllocatedBytes >= targetHeapBytes(默认 10M)
代码块:
// Can be called by any thread.
MemoryBoundary boundaryForHeapSize(size_t totalAllocatedBytes)noexcept{
if (totalAllocatedBytes >= targetHeapBytes_) {
return config_.mutatorAssists() ? MemoryBoundary::kTarget : MemoryBoundary::kTrigger;
} elseif (totalAllocatedBytes >= triggerHeapBytes_) {
return MemoryBoundary::kTrigger;
} else {
return MemoryBoundary::kNone;
}
}
每次 gc 后,通过 updateboundaries 重新计算 targetHeapBytes,涉及 heapTriggerCoefficient(默认 0.9), targetheapUtilization(默认 0.1),都可以调整优化
代码块:
// Called by the GC thread.
voidupdateBoundaries(size_t aliveBytes)noexcept{
if (config_.autoTune.load()) {
double targetHeapBytes = static_cast<double>(aliveBytes) / config_.targetHeapUtilization;
if (!std::isfinite(targetHeapBytes)) {
// This shouldn't happen in practice: targetHeapUtilization is in (0, 1]. But in case it does, don't touch anything.
return;
}
double minHeapBytes = static_cast<double>(config_.minHeapBytes.load(std::memory_order_relaxed));
double maxHeapBytes = static_cast<double>(config_.maxHeapBytes.load(std::memory_order_relaxed));
targetHeapBytes = std::min(std::max(targetHeapBytes, minHeapBytes), maxHeapBytes);
triggerHeapBytes_ = static_cast<size_t>(targetHeapBytes * config_.heapTriggerCoefficient.load(std::memory_order_relaxed));
config_.targetHeapBytes.store(static_cast<int64_t>(targetHeapBytes), std::memory_order_relaxed);
targetHeapBytes_ = static_cast<size_t>(targetHeapBytes);
} else {
targetHeapBytes_ = config_.targetHeapBytes.load(std::memory_order_relaxed);
}
}
aggressive 模式只会触发 alloc gc,不会定时触发
目前问题总结
-
std 内存分配器占用内存很少,但是实践发现切换后会频繁的 alloc gc,性能比 custom 差很多
-
cms 在 mark 阶段不会暂停线程,性能更好,但是默认是 pmcs
-
GcScheduler 默认 adaptive 模式,会有定时触发 GC(默认 10s)以及默认 heap(10M)导致频繁 gc
-
gc 不支持分代,每次遍历所有对象比较耗时
-
custom 内存分配器每个线程内存分配是独立的,相当于 android 的 tlab。不过实践发现物理内存很容易 200M+,原因是没有做内存碎片整理,需要我们自己实现
针对这几个问题,我们做了优化并在抖音落地
优化落地
heap 配置优化
从 updateBoundaries 分析看,影响下次 gc 主要是 targeHeapBytes,而 targeHeapBytes 默认 10M,heapTriggerCoefficient * 10 = 9M 时就会触发 GC,GC 后 targeHeapBytes = 存活对象大小 / targetHeapUtilization(0.5)
代码块:
std::atomic<int64_t> regularGcIntervalMicroseconds = 10 * 1000 * 1000;
// GC will try to keep object bytes under this amount. If object bytes have
// become bigger than this value, and `mutatorAssists` are enabled the GC will
// stop the world and wait until current epoch finishes.
// Adapts after each GC epoch when `autoTune = true`.
std::atomic<int64_t> targetHeapBytes = 10 * 1024 * 1024;
// The rate at which `targetHeapBytes` changes when `autoTune = true`. Concretely: if after the collection
// `N` object bytes remain in the heap, the next `targetHeapBytes` will be `N / targetHeapUtilization` capped
// between `minHeapBytes` and `maxHeapBytes`.
std::atomic<double> targetHeapUtilization = 0.5;
// GC will be triggered when object bytes reach `heapTriggerCoefficient * targetHeapBytes`.
std::atomic<double> heapTriggerCoefficient = 0.9;
从实际看,alloc gc 触发次数比较多,可以设置这几个变量,另外滑动时 regularGcIntervalMicroseconds=10s 定时 gc 也会占用 cpu,可以先在滑动时增大,后续根据 heap 大小来触发。
以头条关注页为例,默认内存参数在滑动的时候会频繁触发 gc,导致帧率降低。
默认参数滑动时 gc 间隔只有 200ms 左右
在业务层可以通过 kotlin.native.runtime.GC 属性来直接调整调整参数
调大 gc 阈值内存
调整之后滑动间隔为默认的 10s
滑动 gc 抑制
目前 kotlin-native 的 gc 机制会定时 gc,如果恰好是在滑动的时候触发 gc,就可能会导致卡顿,因此需要在滑动的时候让 runtime 不进行 gc。方法是滑动时候通过GC.regularGCInterval来调整 gc 间隔到一个相对长的值,比如 1 分钟,等到滑动结束的时候再还原回去。
gc 配置优化
默认是 pmcs,可以改成 cms,减少线程暂停时间,在大多数情况下 gmcs 线程暂停(STW)时间 5ms 左右,如果想要不掉帧,一帧的渲染时间为 8.33ms(120fps),留给处理业务的时间只有 3ms,实测下来滑动带图场景基本稳定掉帧。cms 的线程暂停(STW)时间为 0.2ms 左右。直接降低了一个数量级。
默认 gmcs gc 时的暂停时间
改为 cms 时,gc 的暂停时间
经过测试,上述三项优化上了之后,头条个人页滑动场景的帧率可从 110fps 提升到 117fps 。
内存碎片优化
-
调整 FixedBlockPage 数量,cell size,每个线程都有独立的 fixedBlockPages 数组,大小为 256k * 128 = 32M,gc 后由于没有内存碎片整理,内存空洞较大。目前将 FIXED_BLOCK_PAGE_SIZE 设置为 64k,
FIXED_BLOCK_PAGE_MAX_BLOCK_SIZE 设置为 16,一个线程占用 1M
代码块:
classCustomAllocator {
private:
Heap& heap_;
NextFitPage* nextFitPage_;
FixedBlockPage* fixedBlockPages_[FIXED_BLOCK_PAGE_MAX_BLOCK_SIZE + 1];
ExtraObjectPage* extraObjectPage_;
FinalizerQueue finalizerQueue_;
inlineconstexprconstsize_t FIXED_BLOCK_PAGE_SIZE = (256 * KiB);
inlineconstexprconstint FIXED_BLOCK_PAGE_MAX_BLOCK_SIZE = 128;
-
按页释放空洞内存
Sweep 时如果内存需要释放,只是 memset 将内存设置为 0,并不会释放内存
代码块:
boolFixedBlockPage::Sweep(GCSweepScope& sweepHandle, FinalizerQueue& finalizerQueue)noexcept{
for (uint32_t cell = 0 ; cell < end_ ; cell += blockSize_) {
// Go through the occupied cells.
for (; cell < nextFree.first ; cell += blockSize_) {
if (!SweepObject(cells_[cell].data, finalizerQueue, sweepHandle)) {
// We should null this cell out, but we will do so in batch later.
continue;
}
if (prevLive + blockSize_ < cell) {
// We found an alive cell that ended a run of swept cells or a known unoccupied range.
uint32_t prevCell = cell - blockSize_;
// Nulling in batch.
memset(&cells_[prevLive + blockSize_], 0, (prevCell - prevLive) * sizeof(FixedBlockCell));
}
}
将 memset 改成 madvise 按页释放内存
代码块:
#ifndef KONAN_WINDOWS
staticsize_t kPageSize = sysconf(_SC_PAGESIZE);
#endif
voidZeroAndReleasePages(void* address, size_t length)noexcept{
#ifdef KONAN_WINDOWS
#else
if (length <= 0) {
return;
}
uint8_t* const mem_begin = reinterpret_cast<uint8_t*>(address);
uint8_t* const mem_end = mem_begin + length;
uint8_t* const page_begin = reinterpret_cast<uint8_t*>(RoundUp(reinterpret_cast<uintptr_t>(mem_begin), kPageSize));
uint8_t* const page_end = reinterpret_cast<uint8_t*>(RoundDown(reinterpret_cast<uintptr_t>(mem_end), kPageSize));
if (page_begin >= page_end) {
// No possible area to madvise.
} else {
madvise(page_begin, page_end - page_begin, MADV_DONTNEED);
}
#endif
}
//#endif
经测试,在头条关注页长时间滑动情况下,内存碎片优化 -200M 内存
-
mmap 去掉 MAP_POPULATE 标记
Runtime 使用 mmap 进行 Page 分配,如下:
代码块:
void* SafeAlloc(uint64_t size)noexcept{
//......
#if KONAN_WINDOWS
RuntimeFail("mmap is not available on mingw");
#elif KONAN_LINUX
memory = mmap(nullptr, size, PROT_WRITE | PROT_READ, MAP_ANONYMOUS | MAP_PRIVATE | MAP_NORESERVE | MAP_POPULATE, -1, 0);
error = memory == MAP_FAILED;
//......
}
调用的参数有一个 MAP_POPULATE 标记,它的主要作用是预先填充(prefault)映射区域的页表。
在标准的 mmap 调用中,系统仅会在进程的虚拟内存空间中分配一段虚拟内存区域,并建立虚拟地址与文件(或匿名内存)之间的映射关系,但并不会立即分配物理内存。物理内存的实际分配会延迟到 CPU 首次访问这段虚拟内存时,通过缺页中断(page fault)机制触发。
而当使用 MAP_POPULATE 标志时,系统会在 mmap 调用期间就预先填充页表,对于文件映射,还会触发对文件的预读(read-ahead)操作,去掉该标记能减少物理内存占用。
vma 重用优化
CMS GC 在 sweep 时会将 empty page 收集起来:
代码块:
T* SweepSingle(GCSweepScope& sweepHandle, T* page, AtomicStack<T>& from, AtomicStack<T>& to, FinalizerQueue& finalizerQueue)noexcept{
if (!page) {
returnnullptr;
}
do {
if (page->Sweep(sweepHandle, finalizerQueue)) {
to.Push(page);
return page;
}
empty_.Push(page);
} while ((page = from.Pop()));
returnnullptr;
}
在下次 GC 的第二次 STW 时,将 empty page 通过 munmap 释放物理内存:
代码块:
void PrepareForGC() noexcept {
unswept_.TransferAllFrom(std::move(ready_));
unswept_.TransferAllFrom(std::move(used_));
T* page;
// Destory 使用 munmap 释放 vma
while ((page = empty_.Pop())) page->Destroy();
}
但在 empty 比较多的场景下,这样会导致 STW 的时间显著变长,影响程序性能。
因此,我们做了 vma 重用的优化,在收集 empty page 时,对其使用 madvise (MADV_DONTNEED) 来释放物理内存 ,极大降低了第二次 STW 的时间。
gc 分代
在 sweep 调用 ObjectData tryResetMark 时,如果是 sticky(young),就标记成 kStickMark,这样下次 gc 时发现对象还是 mark 状态,就不会释放,也不会添加到 markqueue
代码块:
booltryResetMark()noexcept{
if (!isSticky) {
unMarkSticky();
}
if (next() == nullptr) returnfalse;
markUncontendedSticky();
markSticky();
returntrue;
}
voidmarkSticky()noexcept{
auto nextVal = reinterpret_cast<ObjectData*>(kStickyMark);
next_.store(nextVal, std::memory_order_relaxed);
}
boolunMarkSticky(){
auto expected = reinterpret_cast<ObjectData*>(kStickyMark);
return next_.compare_exchange_strong(expected, nullptr, std::memory_order_relaxed);
}
在不是 sticky 模式下,tryEnqueue 时,unMarkSticky 取消重新标记
代码块:
static ALWAYS_INLINE booltryEnqueue(AnyQueue& queue, ObjHeader* object)noexcept{
auto& objectData = alloc::objectDataForObject(object);
if (!GC::ObjectData::isSticky) {
objectData.unMarkSticky();
}
bool pushed = queue.tryPush(objectData);
return pushed;
}
gc 分代不会减少 gc 暂停线程时间,可以减少 gc 线程整体耗时 10m~30ms,但是由于内存释放不及时也会造成内存占用过大
对象逃逸分析
通过静态代码分析变量在堆上还是栈上分配,在栈上分配对象在函数调用结束后可以立即释放。测试发现,栈上对象数量/堆上对象数量 = 1/8,业务尽量增加栈上对象数量
-
尽量少用类成员变量,在方法内部分配变量
-
少用多态,增加识别成栈上对象概率
内存碎片整理
由于栈上变量不会调用一次 loadslot 更新为新对象地址,还有两个问题需要解决
-
内存碎片整理是 stw
-
不会整理栈上引用变量
如下是部分实现,判断 copied,从老对象 object 中取出新对象地址,否则就用 memcpy 进行 copy
代码块:
if (gc::isCopied(object)) {
UpdateStackRef(newObjAddr, gc::copyObj(object));
return;
}
//cas多线程状态设置开始状态
if (!gc::isCopying(object)) {
gc::trySetCopyObj(object, reinterpret_cast<ObjHeader*>(gc::kObjectCopy));
} else {
//否则等待copy完成
while (gc::isCopying(object)) {};
if (gc::isCopied(object)) {
UpdateStackRef(newObjAddr, gc::copyObj(object));
}
return;
}
newObj = threadData->allocator().allocateObject(typeInfo);
// Prevents unsafe class publication (see KT-58995).
// Also important in case of the concurrent GC mark phase.
std::atomic_thread_fence(std::memory_order_release);
size = computeObjectSize(typeInfo);
std::memcpy(reinterpret_cast<int8_t *>(newObj) + sizeof(ObjHeader), reinterpret_cast<int8_t *>(object) + sizeof(ObjHeader),
size - sizeof(ObjHeader));
gc::trySetCopyObj(object, newObj);
UpdateStackRef(newObjAddr, newObj);
抖音线上实验有 10%内存优化
未来规划
-
内存碎片整理使用 llvm stackmap,gc 时线程从 stw 改成 concurrent
-
指针压缩,将对象中的成员变量以及数组元素指针从 64 位改为 32 位,可以优化 10%+内存
-
大对象和小对象在同一个 heap,可以放到不同的 heap,减少 gc 次数。【加入我们】我们是「抖音客户端基础技术」团队,目前在招聘跨端相关的技术人才:https://job.toutiao.com/s/8-7naSVgpc4
