一、很多大数据工具是基于jvm,Flink使用自主内存管理,这两者有什么区别
1.jvm存储数据密度低,它包含三个部分,对象头,实例对象,对齐填充;java高级语言,很多事不用人为去做,自动去处理,偏底层自己做,就叫低级语言;
2.FullGC会极大的影响性能,尤其为了处理大数据而开了很大的内存空间的JVM,一次GC甚至会达到分钟级;
3.OOM问题影响稳定性,JVM所有对象的大小大于JVM所分配的内存大小,发生内存溢出
4.缓存未命中,CPU计算是从CPU缓存获取的,是以缓存line加载,是连续的,而Jvm对象在堆上存储不是连续的
二、内存模型 【在1.10版本做了重大改变】
jobManage内存模型:
taskManage内存模型:在配置文件中说明配置项 Flink内存,进程内存
堆上内存,堆外内存
在YarnClusterDescriptor类中: JobManagerProcessUtils
final JobManagerProcessSpec processSpec = JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap( flinkConfiguration, JobManagerOptions.TOTAL_PROCESS_MEMORY);
点进去找到
static JobManagerProcessSpec processSpecFromConfig(Configuration config) { return createMemoryProcessSpec(PROCESS_MEMORY_UTILS.memoryProcessSpecFromConfig(config)); }
点进配置 :判断不同配置进入不同的模式
public CommonProcessMemorySpec<FM> memoryProcessSpecFromConfig(Configuration config) { if (options.getRequiredFineGrainedOptions().stream().allMatch(config::contains)) { // all internal memory options are configured, use these to derive total Flink and process memory return deriveProcessSpecWithExplicitInternalMemory(config); } else if (config.contains(options.getTotalFlinkMemoryOption())) { // internal memory options are not configured, total Flink memory is configured, // derive from total flink memory return deriveProcessSpecWithTotalFlinkMemory(config); } else if (config.contains(options.getTotalProcessMemoryOption())) { // total Flink memory is not configured, total process memory is configured, // derive from total process memory return deriveProcessSpecWithTotalProcessMemory(config); } return failBecauseRequiredOptionsNotConfigured();
这个地方就是给JobManage赋值
private static JobManagerFlinkMemory createJobManagerFlinkMemory( MemorySize jvmHeap, MemorySize offHeapMemory) { verifyJvmHeapSize(jvmHeap); return new JobManagerFlinkMemory(jvmHeap, offHeapMemory); }
再看TaskManage内存分配
有个申请资源的过程:ActiveResourceManage这个方法 requestNewWorker
final TaskExecutorProcessSpec taskExecutorProcessSpec = TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec);
new了一个
final TaskExecutorFlinkMemory flinkMemory = new TaskExecutorFlinkMemory( frameworkHeapMemorySize, frameworkOffHeapMemorySize, workerResourceSpec.getTaskHeapSize(), workerResourceSpec.getTaskOffHeapSize(), workerResourceSpec.getNetworkMemSize(), workerResourceSpec.getManagedMemSize());
二、内存数据结构
1.内存段,最小的内存分配单元,默认32k,既可以是堆上也可以是堆外,堆上是字节数组,堆外主要是网络缓冲区,提供了对二进制数据的读写。TypeInformation封装了很多。主要使用混合的MemorySegment,堆上堆外都能用。展示一个Tuple3<Integer,Double,Person>对象的序列化,存储很紧密,使用效率高了,会对对象序列化。
2.内存页,是内存段之上的数据访问视图,使用时无需关系内存段的细节
3.buffer,task算子之间在网络数据传输使用的是buffer。使用buffer,申请和释放由flink自行管理,一个networkbuffer封装了一个memorySegment。
4.buffer池,bufferPool管理buffer,申请,释放,销毁等。
三、内存管理器
在1.10管理taskManage
在1.10版本之后管理slot级别,allocate分配资源,
四、网络传输的内存管理
每个图有他的上流下流,输入大门,输出是结果分区,每个task都有本地缓冲池,最终向网络缓冲池,网络缓冲池,可以不同节点通讯。
最后一个背压机制record,后面的task积压,层层传递到sourcetask,credit信用机制。网络是基于netty。