flink jobmanager与taskmanager内存配置源码分析

技术
flink内存分配

内存组成

先来看一下官网上对flink内存设置的介绍。Flink JVM 进程的进程总内存(Total Process Memory)包含了由 Flink 应用使用的内存(Flink 总内存)以及由运行 Flink 的 JVM 使用的内存。Flink 总内存(Total Flink Memory)包括 JVM 堆内存(Heap Memory)和堆外内存(Off-Heap Memory)。其中堆外内存包括直接内存(Direct Memory)和本地内存(Native Memory)。

picture.image

配置 Flink 进程内存最简单的方法是指定以下两个配置项中的任意一个:

| 配置项 | TaskManager 配置参数 | JobManager 配置参数 | | Flink 总内存 | taskmanager.memory.flink.size [1] | jobmanager.memory.flink.size [2] | | 进程总内存 | taskmanager.memory.process.size [3] | jobmanager.memory.process.size [4] |

更多关于flink内存的介绍和配置可以直接查看官网链接:https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/memory/mem\_setup.html

源码流程

TaskManager内存组成

我们看下org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec的组成:


          
@VisibleForTesting
          
    public TaskExecutorProcessSpec(
          
        CPUResource cpuCores,
          
        MemorySize frameworkHeapSize,
          
        MemorySize frameworkOffHeapSize,
          
        MemorySize taskHeapSize,
          
        MemorySize taskOffHeapSize,
          
        MemorySize networkMemSize,
          
        MemorySize managedMemorySize,
          
        MemorySize jvmMetaspaceSize,
          
        MemorySize jvmOverheadSize) {
          

          
        this(
          
            cpuCores,
          
            new TaskExecutorFlinkMemory(
          
                frameworkHeapSize,
          
                frameworkOffHeapSize,
          
                taskHeapSize,
          
                taskOffHeapSize,
          
                networkMemSize,
          
                managedMemorySize),
          
            new JvmMetaspaceAndOverhead(jvmMetaspaceSize, jvmOverheadSize));
          
    }
      

TaskExecutorProcessSpec的属性列表覆盖了TaskManager的内存配置信息。下面来一一说明:

1. cpuCores

cpu核数,每个taskmanager会占用的cpu核数。

2. TaskExecutorFlinkMemory

TaskExecutorFlinkMemory相关的配置项主要有:

•frameworkHeapSize :对应配置项taskmanager.memory.framework.heap.size,默认值为128mb,它表示TaskExecutors的框架堆内存大小。这是为TaskExecutor框架保留的JVM堆内存的大小,它不会分配给任务槽。 •frameworkOffHeapSize:对应配置项taskmanager.memory.framework.off-heap.size,默认为128mb,它表示TaskExecutors的框架堆外内存大小。这是保留给TaskExecutor框架的堆外内存(JVM直接内存和本机内存)的大小,这些内存不会分配给任务槽。当Flink计算JVM最大直接内存大小参数时,将完全计算所配置的值。 •taskHeapSize:通过配置项taskmanager.memory.task.heap.size来配置,它表示TaskExecutors的堆内存大小。它是为任务保留的jvm堆内存。如果没有指定,它将派生为总的Flink内存减去框架堆内存、任务堆外内存、托管内存和网络内存。 •taskOffHeapSize:通过配置项taskmanager.memory.task.off-heap.size来配置,它表示TaskExecutors的任务堆外内存大小。这是为任务保留的非堆内存(JVM直接内存和本机内存)的大小。当Flink计算JVM最大直接内存大小参数时,将完全计算所配置的值。 •totalFlinkMemorySize:通过配置项taskmanager.memory.flink.size来配置,TaskExecutors的总的Flink内存大小。这包括TaskExecutors消耗的所有内存,JVM元存储和JVM开销除外。它由框架堆内存、任务堆内存、任务堆外内存、托管内存和网络内存组成。参见taskmanager.memory.process:用于配置总的进程内存大小。 •networkMemSize:如果配置了taskmanager.memory.flink.size,那么networkMemSize的值为(totalFlinkMemorySize - (frameworkHeapMemorySize + frameworkOffHeapMemorySize + taskHeapMemorySize + taskOffHeapMemorySize + managedMemorySize));如果没有配置taskmanager.memory.flink.size,那么networkMemSize的值根据taskmanager.network.numberOfBuffers和taskmanager.memory.segment-size、taskmanager.memory.network的一些配置信息来计算得到。 •managedMemorySize:通过taskmanager.memory.managed.size来配置,它表示TaskExecutors的托管内存大小。这是内存管理器管理的堆外内存的大小,用于排序、哈希表、中间结果缓存和RocksDB状态后端。内存使用者可以从内存管理器中以内存段的形式分配内存,也可以从内存管理器中保留字节并将其内存使用保持在该边界内。如果未指定,则派生它以构成整个Flink内存中已配置的部分。

3. JvmMetaspaceAndOverhead

•metaspace:通过taskmanager.memory.jvm-metaspace.size配置,默认为256mb,表示TaskExecutors的元数据空间的大小。 •overhead:通过taskmanager.memory.jvm-overhead.fraction(默认为0.1)、taskmanager.memory.jvm-overhead.max(默认为1gb)、taskmanager.memory.jvm-overhead.min(默认为192mb)几个配置来控制,代表jvm预留空间。

4. 更多配置

官网链接:https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#taskmanager-memory-managed-fraction

5. TaskExecutor内存组件的关系图

picture.image

TaskManager内存配置流程

先看下如下测试代码入口:


          
final MemorySize totalProcessMemorySize = MemorySize.parse("2048m");
          
@SuppressWarnings("deprecation")
          
final ConfigOption<MemorySize> legacyOption = TaskManagerOptions.TOTAL_PROCESS_MEMORY;
          
Configuration conf = new Configuration();
          
conf.set(legacyOption, totalProcessMemorySize);
          
TaskExecutorProcessSpec taskExecutorProcessSpec = TaskExecutorProcessUtils.processSpecFromConfig(conf);
          
assertThat(taskExecutorProcessSpec.getTotalProcessMemorySize(), is(totalProcessMemorySize));
      

关于TaskManager的配置信息最后都是在TaskExecutorProcessSpec对象里的,这个对象是TaskExecutorProcessUtils.processSpecFromConfig方法生成的,我们来看下TaskExecutorProcessUtils.processSpecFromConfig方法:


          
/**
          
     * 从配置中获取内存设置情况
          
     *
          
     * @param config
          
     * @return
          
     */
          
    public static TaskExecutorProcessSpec processSpecFromConfig(final Configuration config) {
          
        return createMemoryProcessSpec(config, PROCESS_MEMORY_UTILS.memoryProcessSpecFromConfig(config));
          
    }
      

PROCESS_MEMORY_UTILS的代码如下:


          
/**
          
     * 创建ProcessMemoryUtils对象
          
     */
          
private static final ProcessMemoryUtils<TaskExecutorFlinkMemory> PROCESS_MEMORY_UTILS = new ProcessMemoryUtils<>(
          
        TM_PROCESS_MEMORY_OPTIONS,
          
        new TaskExecutorFlinkMemoryUtils());
          

          
/**
          
     * 所有的配置选项
          
     */
          
    static final ProcessMemoryOptions TM_PROCESS_MEMORY_OPTIONS = new ProcessMemoryOptions(
          
        Arrays.asList(TaskManagerOptions.TASK_HEAP_MEMORY, TaskManagerOptions.MANAGED_MEMORY_SIZE),
          
        TaskManagerOptions.TOTAL_FLINK_MEMORY,TaskManagerOptions.TOTAL_PROCESS_MEMORY,
          
        new JvmMetaspaceAndOverheadOptions(TaskManagerOptions.JVM_METASPACE,TaskManagerOptions.JVM_OVERHEAD_MIN,
          
            TaskManagerOptions.JVM_OVERHEAD_MAX,TaskManagerOptions.JVM_OVERHEAD_FRACTION));
      

从代码中可以看出在ProcessMemoryUtils中设置了ProcessMemoryOptions和TaskExecutorFlinkMemoryUtils属性。

接着来看org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils#memoryProcessSpecFromConfig方法代码如下:


          
/**
          
     * 进程内存设置
          
     *
          
     * @param config
          
     * @return
          
     */
          
    public CommonProcessMemorySpec<FM> memoryProcessSpecFromConfig(Configuration config) {
          
        // 如果config中包括taskmanager.memory.task.heap.size和taskmanager.memory.managed.size
          
        if (options.getRequiredFineGrainedOptions().stream().allMatch(config::contains)) {
          
            // all internal memory options are configured, use these to derive total Flink and process memory
          
            return deriveProcessSpecWithExplicitInternalMemory(config);
          
        } else if (config.contains(options.getTotalFlinkMemoryOption())) {
          
            // internal memory options are not configured, total Flink memory is configured,
          
            // derive from total flink memory
          
            return deriveProcessSpecWithTotalFlinkMemory(config);
          
        } else if (config.contains(options.getTotalProcessMemoryOption())) {//如果配置中包含有进程总内存设置
          
            // total Flink memory is not configured, total process memory is configured,
          
            // derive from total process memory
          
            return deriveProcessSpecWithTotalProcessMemory(config);
          
        }
          
        return failBecauseRequiredOptionsNotConfigured();
          
    }
      

这里关于内存的计算主要分三种方式:

1.如果当前用户配置中包含了options.getRequiredFineGrainedOptions()中的所有配置,则调用deriveProcessSpecWithExplicitInternalMemory方法来生成CommonProcessMemorySpec配置; 2.如果当前用户配置中包含了total Flink memory的配置,则调用deriveProcessSpecWithTotalFlinkMemory方法来生成CommonProcessMemorySpec配置; 3.如果当前用户配置中包含了total process memory的配置,则调用deriveProcessSpecWithTotalProcessMemory方法来生成CommonProcessMemorySpec配置。

deriveProcessSpecWithExplicitInternalMemory方法


          
    private CommonProcessMemorySpec<FM> deriveProcessSpecWithExplicitInternalMemory(Configuration config) {
          
        // 加载flink内部内存配置 如totalFlinkMemorySize 、frameworkHeapMemorySize 、 frameworkOffHeapMemorySize 、 taskHeapMemorySize 、 taskOffHeapMemorySize 、 managedMemorySize
          
        FM flinkInternalMemory = flinkMemoryUtils.deriveFromRequiredFineGrainedOptions(config);
          
        // 从totalFlinkMemory中获取jvm元数据空间和jvm预留空间大小
          
        JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead = deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory(
          
            config,
          
            flinkInternalMemory.getTotalFlinkMemorySize());
          
        return new CommonProcessMemorySpec<>(flinkInternalMemory, jvmMetaspaceAndOverhead);
          
    }
      

flink内部内存的加载

org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils#deriveFromRequiredFineGrainedOptions方法代码如下:


          
@Override
          
    public TaskExecutorFlinkMemory deriveFromRequiredFineGrainedOptions(Configuration config) {
          
        final MemorySize taskHeapMemorySize = getTaskHeapMemorySize(config);
          
        final MemorySize managedMemorySize = getManagedMemorySize(config);
          
        final MemorySize frameworkHeapMemorySize = getFrameworkHeapMemorySize(config);
          
        final MemorySize frameworkOffHeapMemorySize = getFrameworkOffHeapMemorySize(config);
          
        final MemorySize taskOffHeapMemorySize = getTaskOffHeapMemorySize(config);
          
        final MemorySize networkMemorySize;
          
        final MemorySize totalFlinkExcludeNetworkMemorySize =    frameworkHeapMemorySize.add(frameworkOffHeapMemorySize).add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize);
          
        // 如果配置了flink总内存
          
        if (isTotalFlinkMemorySizeExplicitlyConfigured(config)) {
          
            // derive network memory from total flink memory, and check against network min/max
          
            final MemorySize totalFlinkMemorySize = getTotalFlinkMemorySize(config);
          
            if (totalFlinkExcludeNetworkMemorySize.getBytes() > totalFlinkMemorySize.getBytes()) {
          
                throw new IllegalConfigurationException("Sum of configured Framework Heap Memory (" + frameworkHeapMemorySize.toHumanReadableString()+ "), Framework Off-Heap Memory (" +frameworkOffHeapMemorySize.toHumanReadableString()+ "), Task Heap Memory (" + taskHeapMemorySize.toHumanReadableString()+ "), Task Off-Heap Memory (" + taskOffHeapMemorySize.toHumanReadableString()+ ") and Managed Memory (" + managedMemorySize.toHumanReadableString()+ ") exceed configured Total Flink Memory (" + totalFlinkMemorySize.toHumanReadableString() + ").");
          
            }
          
            networkMemorySize = totalFlinkMemorySize.subtract(totalFlinkExcludeNetworkMemorySize);
          
            sanityCheckNetworkMemoryWithExplicitlySetTotalFlinkAndHeapMemory(config, networkMemorySize, totalFlinkMemorySize);
          
        } else {
          
            // derive network memory from network configs
          
            // 如果使用了旧的配置参数来配置network内存的大小则读取taskmanager.network.numberOfBuffers的值并与pageSize(taskmanager.memory.segment-size默认为32k)相乘
          
            networkMemorySize = isUsingLegacyNetworkConfigs(config) ? getNetworkMemorySizeWithLegacyConfig(config) :
          
                // 根据taskmanager.memory.network.fraction和network内存的最小值和最大值的配置来计算network内存的大小
          
                deriveNetworkMemoryWithInverseFraction(config, totalFlinkExcludeNetworkMemorySize);
          
        }
          

          
        final TaskExecutorFlinkMemory flinkInternalMemory = new TaskExecutorFlinkMemory(
          
            frameworkHeapMemorySize,
          
            frameworkOffHeapMemorySize,
          
            taskHeapMemorySize,
          
            taskOffHeapMemorySize,
          
            networkMemorySize,
          
            managedMemorySize);
          
        sanityCheckTotalFlinkMemory(config, flinkInternalMemory);
          

          
        return flinkInternalMemory;
          
    }
      

关于totalFlinkMemorySize 、frameworkHeapMemorySize 、 frameworkOffHeapMemorySize 、 taskHeapMemorySize 、 taskOffHeapMemorySize 、 managedMemorySize的说明在上文中都已经说明,上面这段代码主要涉及到具体的加载过程和校验逻辑。

JvmMetaspaceAndOverhead信息

org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils#deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory方法代码如下:


          
public JvmMetaspaceAndOverhead deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory(
          
            Configuration config,
          
            MemorySize totalFlinkMemorySize) {
          
        // 元数据空间内存大小  默认为256M
          
        MemorySize jvmMetaspaceSize = getMemorySizeFromConfig(config, options.getJvmOptions().getJvmMetaspaceOption());
          
        // 计算flink总内存与jvm元数据空间的大小之和
          
        MemorySize totalFlinkAndJvmMetaspaceSize = totalFlinkMemorySize.add(jvmMetaspaceSize);
          
        JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead;
          
        if (config.contains(options.getTotalProcessMemoryOption())) {//如果配置中包含了进程总内程空间则从flink总内存中获取=(totalProcessMemorySize-(totalFlinkMemorySize+jvmMetaspaceSize))
          
            MemorySize jvmOverheadSize = deriveJvmOverheadFromTotalFlinkMemoryAndOtherComponents(config, totalFlinkMemorySize);
          
            jvmMetaspaceAndOverhead = new JvmMetaspaceAndOverhead(jvmMetaspaceSize, jvmOverheadSize);
          
        } else {
          
            // 使用配置比例计算
          
            MemorySize jvmOverheadSize = deriveWithInverseFraction(
          
                "jvm overhead memory",
          
                totalFlinkAndJvmMetaspaceSize,
          
                getJvmOverheadRangeFraction(config));
          
            jvmMetaspaceAndOverhead = new JvmMetaspaceAndOverhead(jvmMetaspaceSize, jvmOverheadSize);
          
            // 根据进程内存空间大小进行校验,防止内存超出
          
            sanityCheckTotalProcessMemory(config, totalFlinkMemorySize, jvmMetaspaceAndOverhead);
          
        }
          
        return jvmMetaspaceAndOverhead;
          
    }
      

如果配置了进程总内存,计算jvm预留空间大小的方式为:


          
private MemorySize deriveJvmOverheadFromTotalFlinkMemoryAndOtherComponents(
          
            Configuration config,
          
            MemorySize totalFlinkMemorySize) {
          
        // 进程总内存空间大小
          
        MemorySize totalProcessMemorySize = getMemorySizeFromConfig(config, options.getTotalProcessMemoryOption());
          
        // jvm元数据空间大小
          
        MemorySize jvmMetaspaceSize = getMemorySizeFromConfig(config, options.getJvmOptions().getJvmMetaspaceOption());
          
        // 总的flink内存空间加上jvm元数据空间大小
          
        MemorySize totalFlinkAndJvmMetaspaceSize = totalFlinkMemorySize.add(jvmMetaspaceSize);
          
        if (totalProcessMemorySize.getBytes() < totalFlinkAndJvmMetaspaceSize.getBytes()) {
          
            throw new IllegalConfigurationException(
          
                "The configured Total Process Memory size (%s) is less than the sum of the derived " +"Total Flink Memory size (%s) and the configured or default JVM Metaspace size  (%s).",
          
                totalProcessMemorySize.toHumanReadableString(),
          
                totalFlinkMemorySize.toHumanReadableString(),
          
                jvmMetaspaceSize.toHumanReadableString());
          
        }
          
        // 进程总内存空间大小减去totalFlinkAndJvmMetaspaceSize
          
        MemorySize jvmOverheadSize = totalProcessMemorySize.subtract(totalFlinkAndJvmMetaspaceSize);
          
        // 根据最大值与最小值来校验jvm的预留空间
          
        sanityCheckJvmOverhead(config, jvmOverheadSize, totalProcessMemorySize);
          
        return jvmOverheadSize;
          
    }
      

如果没有配置进程总内存,则会通过下面的方法来反向计算(totalFlinkAndJvmMetaspaceSize占整个进程内存的比例为1-fraction,那么totalFlinkAndJvmMetaspaceSize/(1-fraction)的值与overheadSize/fraction的值是相同的)jvm预留空间大小:


          
public static MemorySize deriveWithInverseFraction(String memoryDescription, MemorySize base, RangeFraction rangeFraction) {
          
        checkArgument(rangeFraction.getFraction() < 1);
          
        MemorySize relative = base.multiply(rangeFraction.getFraction() / (1 - rangeFraction.getFraction()));
          
        return capToMinMax(memoryDescription, relative, rangeFraction);
          
    }
          

          
    private static MemorySize capToMinMax(
          
            String memoryDescription,
          
            MemorySize relative,
          
            RangeFraction rangeFraction) {
          
        long size = relative.getBytes();
          
        if (size > rangeFraction.getMaxSize().getBytes()) {
          
            LOG.info(
          
                "The derived from fraction {} ({}) is greater than its max value {}, max value will be used instead",
          
                memoryDescription,
          
                relative.toHumanReadableString(),
          
                rangeFraction.getMaxSize().toHumanReadableString());
          
            size = rangeFraction.getMaxSize().getBytes();
          
        } else if (size < rangeFraction.getMinSize().getBytes()) {
          
            LOG.info(
          
                "The derived from fraction {} ({}) is less than its min value {}, min value will be used instead",
          
                memoryDescription,
          
                relative.toHumanReadableString(),
          
                rangeFraction.getMinSize().toHumanReadableString());
          
            size = rangeFraction.getMinSize().getBytes();
          
        }
          
        return new MemorySize(size);
          
    }
      

这里主要根据TaskManagerOptions.JVM_METASPACE,TaskManagerOptions.JVM_OVERHEAD_MIN,TaskManagerOptions.JVM_OVERHEAD_MAX,TaskManagerOptions.JVM_OVERHEAD_FRACTION配置来进行jvm预留空间的计算和校验。

deriveProcessSpecWithTotalFlinkMemory方法


          
    private CommonProcessMemorySpec<FM> deriveProcessSpecWithTotalFlinkMemory(Configuration config) {
          
        // 从配置中加载flink总内存配置
          
        MemorySize totalFlinkMemorySize = getMemorySizeFromConfig(config, options.getTotalFlinkMemoryOption());
          
        // 加载flink内部内存配置 如frameworkHeapMemorySize 、 frameworkOffHeapMemorySize 、 taskHeapMemorySize 、 taskOffHeapMemorySize 、 managedMemorySize
          
        FM flinkInternalMemory = flinkMemoryUtils.deriveFromTotalFlinkMemory(config, totalFlinkMemorySize);
          
        JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead = deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory(config, totalFlinkMemorySize);
          
        return new CommonProcessMemorySpec<>(flinkInternalMemory, jvmMetaspaceAndOverhead);
          
    }
      

计算flink内部内存分配

代码在org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils#deriveFromTotalFlinkMemory方法中:


          
@Override
          
    public TaskExecutorFlinkMemory deriveFromTotalFlinkMemory(
          
        final Configuration config,
          
        final MemorySize totalFlinkMemorySize) {
          
        final MemorySize frameworkHeapMemorySize = getFrameworkHeapMemorySize(config);
          
        final MemorySize frameworkOffHeapMemorySize = getFrameworkOffHeapMemorySize(config);
          
        final MemorySize taskOffHeapMemorySize = getTaskOffHeapMemorySize(config);
          

          
        final MemorySize taskHeapMemorySize;
          
        final MemorySize networkMemorySize;
          
        final MemorySize managedMemorySize;
          

          
        if (isTaskHeapMemorySizeExplicitlyConfigured(config)) {
          
            // task heap memory is configured,
          
            // derive managed memory first, leave the remaining to network memory and check against network min/max
          
            taskHeapMemorySize = getTaskHeapMemorySize(config);
          
            managedMemorySize = deriveManagedMemoryAbsoluteOrWithFraction(config, totalFlinkMemorySize);
          
            final MemorySize totalFlinkExcludeNetworkMemorySize =
          
                frameworkHeapMemorySize.add(frameworkOffHeapMemorySize).add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize);
          
            if (totalFlinkExcludeNetworkMemorySize.getBytes() > totalFlinkMemorySize.getBytes()) {
          
                throw new IllegalConfigurationException(
          
                    "Sum of configured Framework Heap Memory (" + frameworkHeapMemorySize.toHumanReadableString()
          
                        + "), Framework Off-Heap Memory (" + frameworkOffHeapMemorySize.toHumanReadableString()
          
                        + "), Task Heap Memory (" + taskHeapMemorySize.toHumanReadableString()
          
                        + "), Task Off-Heap Memory (" + taskOffHeapMemorySize.toHumanReadableString()
          
                        + ") and Managed Memory (" + managedMemorySize.toHumanReadableString()
          
                        + ") exceed configured Total Flink Memory (" + totalFlinkMemorySize.toHumanReadableString() + ").");
          
            }
          
            networkMemorySize = totalFlinkMemorySize.subtract(totalFlinkExcludeNetworkMemorySize);
          
            sanityCheckNetworkMemoryWithExplicitlySetTotalFlinkAndHeapMemory(config, networkMemorySize, totalFlinkMemorySize);
          
        } else {
          
            // task heap memory is not configured
          
            // derive managed memory and network memory, leave the remaining to task heap memory
          
            managedMemorySize = deriveManagedMemoryAbsoluteOrWithFraction(config, totalFlinkMemorySize);
          

          
            networkMemorySize = isUsingLegacyNetworkConfigs(config) ? getNetworkMemorySizeWithLegacyConfig(config) :
          
                deriveNetworkMemoryWithFraction(config, totalFlinkMemorySize);
          
            final MemorySize totalFlinkExcludeTaskHeapMemorySize =
          
                frameworkHeapMemorySize.add(frameworkOffHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize).add(networkMemorySize);
          
            if (totalFlinkExcludeTaskHeapMemorySize.getBytes() > totalFlinkMemorySize.getBytes()) {
          
                throw new IllegalConfigurationException(
          
                    "Sum of configured Framework Heap Memory (" + frameworkHeapMemorySize.toHumanReadableString()
          
                        + "), Framework Off-Heap Memory (" + frameworkOffHeapMemorySize.toHumanReadableString()
          
                        + "), Task Off-Heap Memory (" + taskOffHeapMemorySize.toHumanReadableString()
          
                        + "), Managed Memory (" + managedMemorySize.toHumanReadableString()
          
                        + ") and Network Memory (" + networkMemorySize.toHumanReadableString()
          
                        + ") exceed configured Total Flink Memory (" + totalFlinkMemorySize.toHumanReadableString() + ").");
          
            }
          
            taskHeapMemorySize = totalFlinkMemorySize.subtract(totalFlinkExcludeTaskHeapMemorySize);
          
        }
          

          
        final TaskExecutorFlinkMemory flinkInternalMemory = new TaskExecutorFlinkMemory(
          
            frameworkHeapMemorySize,
          
            frameworkOffHeapMemorySize,
          
            taskHeapMemorySize,
          
            taskOffHeapMemorySize,
          
            networkMemorySize,
          
            managedMemorySize);
          
        sanityCheckTotalFlinkMemory(config, flinkInternalMemory);
          

          
        return flinkInternalMemory;
          
    }
      

相对于上面的方法,该方法中考虑到了有些配置如task heap memory等没有配置时的情况。

计算元数据空间和jvm预留空间

也是通过org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils#deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory方法来进行计算,与上文deriveProcessSpecWithExplicitInternalMemory方法中的方法调用逻辑相同。

deriveProcessSpecWithTotalProcessMemory方法


          
private CommonProcessMemorySpec<FM> deriveProcessSpecWithTotalProcessMemory(Configuration config) {
          
        // 进程总内存
          
        MemorySize totalProcessMemorySize = getMemorySizeFromConfig(config, options.getTotalProcessMemoryOption());
          
        // 从totalFlinkMemory中获取jvm元数据空间和jvm预留空间大小
          
        JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead =
          
            deriveJvmMetaspaceAndOverheadWithTotalProcessMemory(config, totalProcessMemorySize);
          
        // flink总内存-(元数据空间内存+jvm开销占用内存)
          
        MemorySize totalFlinkMemorySize = totalProcessMemorySize.subtract(
          
            jvmMetaspaceAndOverhead.getTotalJvmMetaspaceAndOverheadSize());
          
        // 加载flink内部内存配置 如frameworkHeapMemorySize 、 frameworkOffHeapMemorySize 、 taskHeapMemorySize 、 taskOffHeapMemorySize 、 managedMemorySize
          
        FM flinkInternalMemory = flinkMemoryUtils.deriveFromTotalFlinkMemory(config, totalFlinkMemorySize);
          
        return new CommonProcessMemorySpec<>(flinkInternalMemory, jvmMetaspaceAndOverhead);
          
    }
      

获取元数据和jvm预留空间的分配

org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils#deriveJvmMetaspaceAndOverheadWithTotalProcessMemory方法代码:


          
    private JvmMetaspaceAndOverhead deriveJvmMetaspaceAndOverheadWithTotalProcessMemory(
          
            Configuration config,
          
            MemorySize totalProcessMemorySize) {
          
        // 元数据空间和jvm开销需要的内存
          
        // 元数据空间
          
        MemorySize jvmMetaspaceSize = getMemorySizeFromConfig(config, options.getJvmOptions().getJvmMetaspaceOption());
          
        // jvm开销
          
        MemorySize jvmOverheadSize = deriveWithFraction(
          
            "jvm overhead memory",
          
            totalProcessMemorySize,
          
            getJvmOverheadRangeFraction(config));
          
        JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead = new JvmMetaspaceAndOverhead(jvmMetaspaceSize, jvmOverheadSize);
          

          
        if (jvmMetaspaceAndOverhead.getTotalJvmMetaspaceAndOverheadSize().getBytes() > totalProcessMemorySize.getBytes()) {
          
            throw new IllegalConfigurationException(
          
                "Sum of configured JVM Metaspace (" + jvmMetaspaceAndOverhead.getMetaspace().toHumanReadableString()
          
                    + ") and JVM Overhead (" + jvmMetaspaceAndOverhead.getOverhead().toHumanReadableString()
          
                    + ") exceed configured Total Process Memory (" + totalProcessMemorySize.toHumanReadableString() + ").");
          
        }
          

          
        return jvmMetaspaceAndOverhead;
          
    }
      

这里需要注意的是deriveWithFraction方法,方法代码如下:


          
public static MemorySize deriveWithFraction(String memoryDescription, MemorySize base, RangeFraction rangeFraction) {
          
        MemorySize relative = base.multiply(rangeFraction.getFraction());
          
        return capToMinMax(memoryDescription, relative, rangeFraction);
          
    }
      

这里计算方式与上面略有不同,计算公式为jvmOverheadSize = totalProcessMemorySize*rangeFraction.getFraction()。

获取flink内部内存配置方法

方法代码在org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils#deriveFromTotalFlinkMemory中,该方法与上面的解释相同。

Jobmanager内存配置

JobManager内存配置入口在JobManagerProcessUtils.processSpecFromConfig方法中,加载方式与taskmanager的内存配置加载方式大同小异,后续有时间再进行分析。这里先给出org.apache.flink.runtime.jobmanager.JobManagerProcessSpec的结构图:

picture.image

总结

那么在实际使用时如何配置呢,可以通过flink-conf.yaml文件中指定,也可以在提交任务时在Configuration中按照JobManagerOptions和TaskManagerOptions中的选项进行相应配置。如下配置来自flink官方文档[5],这里只是与本文整理在一起,所有解释请参考原文档。

如何配置

配置 Flink 进程内存最简单的方法是指定以下两个配置项中的任意一个:

| 配置项 | TaskManager 配置参数 | JobManager 配置参数 | | Flink 总内存 | taskmanager.memory.flink.size [6] | jobmanager.memory.flink.size [7] | | 进程总内存 | taskmanager.memory.process.size [8] | jobmanager.memory.process.size [9] |

提示 关于本地执行,请分别参考 TaskManager[10] 和 JobManager[11] 的相关文档。

Flink 会根据默认值或其他配置参数自动调整剩余内存部分的大小。关于各内存部分的更多细节,请分别参考 TaskManager[12] 和 JobManager[13] 的相关文档。

对于独立部署模式(Standalone Deployment)[14],如果你希望指定由 Flink 应用本身使用的内存大小,最好选择配置 Flink 总内存。Flink 总内存会进一步划分为 JVM 堆内存和堆外内存。更多详情请参考如何为独立部署模式配置内存[15]。

通过配置进程总内存可以指定由 Flink JVM 进程使用的总内存大小。对于容器化部署模式(Containerized Deployment),这相当于申请的容器(Container)大小,详情请参考如何配置容器内存[16](Kubernetes[17]、Yarn[18] 或 Mesos[19])。

此外,还可以通过设置 Flink 总内存的特定内部组成部分的方式来进行内存配置。不同进程需要设置的内存组成部分是不一样的。详情请分别参考 TaskManager[20] 和 JobManager[21] 的相关文档。

提示 以上三种方式中,用户需要至少选择其中一种进行配置(本地运行除外),否则 Flink 将无法启动。这意味着,用户需要从以下无默认值的配置参数(或参数组合)中选择一个给出明确的配置:

| TaskManager: | JobManager: | | taskmanager.memory.flink.size [22] | jobmanager.memory.flink.size [23] | | taskmanager.memory.process.size [24] | jobmanager.memory.process.size [25] | | taskmanager.memory.task.heap.size[26] 和 taskmanager.memory.managed.size[27] | jobmanager.memory.heap.size [28] |

提示 不建议同时设置进程总内存和 Flink 总内存。这可能会造成内存配置冲突,从而导致部署失败。额外配置其他内存部分时,同样需要注意可能产生的配置冲突。

JVM 参数

Flink 进程启动时,会根据配置的和自动推导出的各内存部分大小,显式地设置以下 JVM 参数:

| JVM 参数 | TaskManager 取值 | JobManager 取值 | | -Xmx和-Xms | 框架堆内存 + 任务堆内存 | JVM 堆内存 | | -XX:MaxDirectMemorySize(TaskManager 始终设置,JobManager 见注释) | 框架堆外内存 + 任务堆外内存(*) + 网络内存 | 堆外内存 | | -XX:MaxMetaspaceSize | JVM Metaspace | JVM Metaspace |

请注意,堆外内存也包括了用户代码使用的本地内存(非直接内存)。

只有在 jobmanager.memory.enable-jvm-direct-memory-limit [29] 设置为 true 时,JobManager 才会设置 JVM 直接内存限制。

相关内存部分的配置方法,请同时参考 TaskManager[30] 和 JobManager[31] 的详细内存模型。

受限的等比内存部分

本节介绍下列内存部分的配置方法,它们都可以通过指定在总内存中所占比例的方式进行配置,同时受限于相应的的最大/最小值范围。

•JVM 开销:可以配置占用进程总内存的固定比例 •网络内存:可以配置占用 Flink 总内存的固定比例(仅针对 TaskManager)

相关内存部分的配置方法,请同时参考 TaskManager[32] 和 JobManager[33] 的详细内存模型。

这些内存部分的大小必须在相应的最大值、最小值范围内,否则 Flink 将无法启动。最大值、最小值具有默认值,也可以通过相应的配置参数进行设置。例如,如果仅配置下列参数:

•进程总内存 = 1000Mb •JVM 开销最小值 = 64Mb •JVM 开销最大值 = 128Mb •JVM 开销占比 = 0.1

那么 JVM 开销的实际大小将会是 1000Mb x 0.1 = 100Mb,在 64-128Mb 的范围内。

如果将最大值、最小值设置成相同大小,那相当于明确指定了该内存部分的大小。

如果没有明确指定内存部分的大小,Flink 会根据总内存和占比计算出该内存部分的大小。计算得到的内存大小将受限于相应的最大值、最小值范围。例如,如果仅配置下列参数:

•进程总内存 = 1000Mb •JVM 开销最小值 = 128Mb •JVM 开销最大值 = 256Mb •JVM 开销占比 = 0.1

那么 JVM 开销的实际大小将会是 128Mb,因为根据总内存和占比计算得到的内存大小 100Mb 小于最小值。

如果配置了总内存和其他内存部分的大小,那么 Flink 也有可能会忽略给定的占比。这种情况下,受限的等比内存部分的实际大小是总内存减去其他所有内存部分后剩余的部分。这样推导得出的内存大小必须符合最大值、最小值范围,否则 Flink 将无法启动。例如,如果仅配置下列参数:

•进程总内存 = 1000Mb •任务堆内存 = 100Mb(或 JobManager 的 JVM 堆内存) •JVM 开销最小值 = 64Mb •JVM 开销最大值 = 256Mb •JVM 开销占比 = 0.1

进程总内存中所有其他内存部分均有默认大小,包括 TaskManager 的托管内存默认占比或 JobManager 的默认堆外内存。因此,JVM 开销的实际大小不是根据占比算出的大小(1000Mb x 0.1 = 100Mb),而是进程总内存中剩余的部分。这个剩余部分的大小必须在 64-256Mb 的范围内,否则将会启动失败。

参考

https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/memory/mem\_setup.htmlhttps://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/memory/mem\_migration.htmlhttps://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/memory/mem\_setup\_tm.htmlhttps://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#taskmanager-memory-managed-fraction

References

[1] taskmanager.memory.flink.size: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#taskmanager-memory-flink-size
[2] jobmanager.memory.flink.size: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#jobmanager-memory-flink-size
[3] taskmanager.memory.process.size: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#taskmanager-memory-process-size
[4] jobmanager.memory.process.size: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#jobmanager-memory-process-size
[5] flink官方文档: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/memory/mem\_setup.html
[6] taskmanager.memory.flink.size: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#taskmanager-memory-flink-size
[7] jobmanager.memory.flink.size: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#jobmanager-memory-flink-size
[8] taskmanager.memory.process.size: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#taskmanager-memory-process-size
[9] jobmanager.memory.process.size: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#jobmanager-memory-process-size
[10] TaskManager: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/memory/mem\_setup\_tm.html#local-execution
[11] JobManager: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/memory/mem\_setup\_jobmanager.html#local-execution
[12] TaskManager: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/memory/mem\_setup\_tm.html
[13] JobManager: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/memory/mem\_setup\_jobmanager.html
[14] 独立部署模式(Standalone Deployment): https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/deployment/cluster\_setup.html
[15] 如何为独立部署模式配置内存: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/memory/mem\_tuning.html#configure-memory-for-standalone-deployment
[16] 如何配置容器内存: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/memory/mem\_tuning.html#configure-memory-for-containers
[17] Kubernetes: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/deployment/kubernetes.html
[18] Yarn: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/deployment/yarn\_setup.html
[19] Mesos: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/deployment/mesos.html
[20] TaskManager: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/memory/mem\_setup\_tm.html#configure-heap-and-managed-memory
[21] JobManager: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/memory/mem\_setup\_jobmanager.html#configure-jvm-heap
[22] taskmanager.memory.flink.size: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#taskmanager-memory-flink-size
[23] jobmanager.memory.flink.size: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#jobmanager-memory-flink-size
[24] taskmanager.memory.process.size: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#taskmanager-memory-process-size
[25] jobmanager.memory.process.size: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#jobmanager-memory-process-size
[26] taskmanager.memory.task.heap.size: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#taskmanager-memory-task-heap-size
[27] taskmanager.memory.managed.size: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#taskmanager-memory-managed-size
[28] jobmanager.memory.heap.size: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#jobmanager-memory-heap-size
[29] jobmanager.memory.enable-jvm-direct-memory-limit: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#jobmanager-memory-enable-jvm-direct-memory-limit
[30] TaskManager: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/memory/mem\_setup\_tm.html#detailed-memory-model
[31] JobManager: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/memory/mem\_setup\_jobmanager.html#detailed-configuration
[32] TaskManager: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/memory/mem\_setup\_tm.html#detailed-memory-model
[33] JobManager: https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/memory/mem\_setup\_jobmanager.html#detailed-configuration

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
火山引擎大规模机器学习平台架构设计与应用实践
围绕数据加速、模型分布式训练框架建设、大规模异构集群调度、模型开发过程标准化等AI工程化实践,全面分享如何以开发者的极致体验为核心,进行机器学习平台的设计与实现。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论