摘要:该项目是DAYU平台的数据开发(DLF),数据开发中一个重要的功能就是ETL(数据清洗)。ETL由源端到目的端,中间的业务逻辑一般由用户自己编写的SQL模板实现,velocity是其中涉及的一种模板语言。
Velocity模板语言的基本使用代码如下:
在ETL业务中,Velocity模板的输出是用户的ETL SQL语句集,相当于.sql文件。这里官方提供的api需要传入一个java.io.Writer类的对象用于存储模板的生成的SQL语句集。然后,这些语句集会根据我们的业务做SQL语句的拆分,逐个执行。
java.io.Writer类是一个抽象类,在JDK1.8中有多种实现,包括但不仅限于以下几种:
由于云环境对用户文件读写创建等权限的安全性要求比较苛刻,因此,我们使用了java.io.StringWriter,其底层是StringBuffer对象,StringBuffer底层是char数组。
#set($iAMVariable = 'good!') #set($person.password = '123') Welcome ${name} to velocity.com today is ${date} #foreach($one in $list) $one #end Name: ${person.name} Password: ${person.password}
package com.xlf; import org.apache.velocity.Template; import org.apache.velocity.VelocityContext; import org.apache.velocity.app.VelocityEngine; import org.apache.velocity.runtime.RuntimeConstants; import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader; import java.io.StringWriter; import java.util.ArrayList; import java.util.Date; import java.util.List; public class HelloVelocity { public static void main(String[] args) { // 初始化模板引擎 VelocityEngine ve = new VelocityEngine(); ve.setProperty(RuntimeConstants.RESOURCE_LOADER, "classpath"); ve.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName()); ve.init(); // 获取模板文件 Template template = ve.getTemplate("Hellovelocity.vm"); VelocityContext ctx = new VelocityContext(); // 设置变量 ctx.put("name", "velocity"); ctx.put("date", (new Date())); List temp = new ArrayList(); temp.add("Hey"); temp.add("Volecity!"); ctx.put("list", temp); Person person = new Person(); ctx.put("person", person); // 输出 StringWriter sw = new StringWriter(); template.merge(ctx, sw); System.out.println(sw.toString()); } }
(文件字数超出博客限制,稍后在附件中给出~~)
模板文件本身就379kb不算大,关键在于其中定义了一个包含90000多个元素的String数组,数组的每个元素都是”1”,然后写了79层嵌套循环,循环的每一层都是遍历该String数组;最内层循环调用了一次:
show table;
这意味着这个模板将生成包含96372的79次方个SQL语句,其中每一个SQL语句都是:
show table;
将如此巨大的字符量填充进StringWriter对象里面,至少需要10的380多次方GB的内存空间,这几乎是不现实的。因此OOM溢出是必然的。
package com.xlf; import org.apache.velocity.Template; import org.apache.velocity.VelocityContext; import org.apache.velocity.app.VelocityEngine; import org.apache.velocity.runtime.RuntimeConstants; import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader; import java.io.StringWriter; public class BigVelocity { public static void main(String[] args) { // 初始化模板引擎 VelocityEngine ve = new VelocityEngine(); ve.setProperty(RuntimeConstants.RESOURCE_LOADER, "classpath"); ve.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName()); ve.init(); // 获取模板文件 Template template = ve.getTemplate("BigVelocity.template.vm"); VelocityContext ctx = new VelocityContext(); StringWriter sw = new StringWriter(); template.merge(ctx, sw); } }
控制台输出
Velocity模板生成的结果写入StringWriter对象中,如前面分析,其底层是一个char数组。直接产生OOM的代码在于java.util.Array.copyOf()函数:
package com.xlf; import java.io.StringWriter; public class StringWriterOOMTest { public static void main(String[] args) { System.out.println("The maximum value of Integer is: " + Integer.MAX_VALUE); StringWriter sw = new StringWriter(); int count = 0; for (int i = 0; i < 100000; i++) { for (int j = 0; j < 100000; j++) { sw.write("This will cause OOM\n"); System.out.println("sw.getBuffer().length(): " + sw.getBuffer().length() + ", count: " + (++count)); } } } }
环境:JDK8 + Windows10台式机 + 32GB内存 + 1TB SSD + i7-8700
如果你的硬件配置不充分,请勿轻易尝试!
StringWriterOOMTest运行时的整个进程内存大小在Windows任务管理器中达10300多MB时,程序停止。
char数组元素最大值不会超过Integer.MAX_VALUE,回事非常接近的一个值,我这里相差20多。网上搜索了一番,比较靠谱的说法是:确实比Integer.MAX_VALUE小一点,不会等于Integer.MAX_VALUE,是因为char[]对象还有一些别的空间占用,比如对象头,应该说是这些空间加起来不能超过Integer.MAX_VALUE。如果有读者感兴趣,可以自行探索下别的类型数组的元素个数。我这里也算是一点拙见,抛砖引玉。
通过上面一系列重现与分析,我们知道了OOM的根本原因是模板文件渲染而成的StringWriter对象过大。具体表现在:
前面分析过,出于安全的原因,我们只能用StringWriter对象去接收模板渲染结果的输出。不能用文件。所以只能在StringWriter本身去做文章进行改进了:
继承StringWriter类,重写其write方法为:
StringWriter sw = new StringWriter() { public void write(String str) { int length = this.getBuffer().length() + str.length(); // 限制大小为10MB if (length > 10 * 1024 * 1024) { this.getBuffer().delete(0, this.getBuffer().length()); throw new RuntimeException("Velocity template size exceeds limit!"); } this.getBuffer().append(str); } };
其他代码保持不变
package com.xlf; import org.apache.velocity.Template; import org.apache.velocity.VelocityContext; import org.apache.velocity.app.VelocityEngine; import org.apache.velocity.runtime.RuntimeConstants; import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader; import java.io.StringWriter; public class BigVelocitySolution { public static void main(String[] args) { // 初始化模板引擎 VelocityEngine ve = new VelocityEngine(); ve.setProperty(RuntimeConstants.RESOURCE_LOADER, "classpath"); ve.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName()); ve.init(); // 获取模板文件 Template template = ve.getTemplate("BigVelocity.template.vm"); VelocityContext ctx = new VelocityContext(); StringWriter sw = new StringWriter() { public void write(String str) { int length = this.getBuffer().length() + str.length(); // 限制大小为10MB if (length > 10 * 1024 * 1024) { this.getBuffer().delete(0, this.getBuffer().length()); throw new RuntimeException("Velocity template size exceeds limit!"); } this.getBuffer().append(str); } }; template.merge(ctx, sw); } }
如果velocity模板渲染后的sql语句集大小在允许的范围内,这些语句集会根据我们的业务做SQL语句的拆分,逐句执行。
在后续逐句执行sql语句的过程中,每一句sql都是调用的周边服务(DLI,OBS,MySql等)去执行的,结果每次都会返回给我们的作业开发调度服务(DLF)后台。我们的DLF平台支持及时停止作业的功能,也就是说假如这个作业在调度过程中要执行10000条SQL,我要在中途停止不执行后面的SQL了——这样的功能是支持的。
在修改上面提到OOM那个bug并通过测试后,测试同学发现我们的作业无法停止下来,换句话说,我们作业所在的java线程无法停止。
一番debug与代码深入研读之后,发现我们项目中确实是调用了对应的线程对象的interrupt方法thread.interrupt();去终止线程的。
那么为什么调用了interrupt方法依旧无法终止线程?
package com.xlf; public class TestForInterruptedException { public static void main(String[] args) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < 10; i++) { sb.append("show tables;\n"); } int i = 0; for (String str : sb.toString().split("\n")) { if (i > 4) { Thread.currentThread().interrupt(); System.out.println(i + " after interrupt"); } System.out.println(str); System.out.println(i++); } } }
TestForInterruptedException.main函数中做的事情足够简单,先产生一个大一点的字符串,拆分成10段小字符串,for循环中逐段打印小字符串;并企图从第5段(初始段为0)开始,去终止线程。结果发现线程并没有终止!
这是怎么回事?为什么调用了线程的interrupt方法并没有终止线程?或者说是因为jvm需要一点时间去响应这个方法?其实并非如此,感兴趣的同学可以把循环次数加的更大一些,在循环开始几次就进行interrupt,你会发现结果还是这样。
经过一番探索,线程终止的方法无外乎两种:
要终止线程,目前JDK中可行的做法有:
这两个做法都需要后续做相应处理比如去break循环,return方法或者抛出异常等等。
线程终止原因一般来讲有两种:
package com.xlf; public class ExplicitlyCatchExceptionAndDoNotThrow { public static void main(String[] args) throws Exception { boolean flag = true; System.out.println("Main started!"); try { throw new InterruptedException(); } catch (InterruptedException exception) { System.out.println("InterruptedException is caught!"); } System.out.println("Main doesn't stop!"); try { throw new Throwable(); } catch (Throwable throwable) { System.out.println("Throwable is caught!"); } System.out.println("Main is still here!"); if (flag) { throw new Exception("Main is dead!"); } System.out.println("You'll never see this!"); } }
这个测试验证了前面关于线程异常终止的结论:
线程执行中抛出Throwable对象且不被显式捕获,JVM会终止线程。
线程执行中需要手动终止,最好的做法就是设置标识位(可以是interrupt也可以是自己定义的),然后及时捕获标识位并抛出异常,在业务逻辑的最后去捕获异常并做一些收尾的清理动作:比如统计任务执行失败成功的比例,或者关闭某些流等等。这样,程序的执行就兼顾到了正常与异常的情况并得到了优雅的处理。
package com.xlf; public class TerminateThreadGracefully { public static void main(String[] args) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < 10; i++) { sb.append("show tables;\n"); } int i = 0; try { for (String str : sb.toString().split("\n")) { if (i > 4) { Thread.currentThread().interrupt(); if (Thread.currentThread().isInterrupted()) { throw new InterruptedException(); } System.out.println(i + " after interrupt"); } System.out.println(str); System.out.println(i++); } } catch (InterruptedException exception) { // TODO:此处可能做一些清理工作 System.out.println(Thread.currentThread().isInterrupted()); } System.out.println("Thread main stops normally!"); } }
我们项目中确实是调用了对应的线程对象的interrupt方法thread.interrupt();去终止线程的。
那么为什么线程不能相应中断标识位并终止呢?
回到我们项目的业务逻辑:
整个job分为模板读取、渲染以及SQL执行三个阶段,一般而言前两个阶段时间会比较快。在后续逐句执行sql语句的过程中,每一句sql都是调用的周边服务(DLI,OBS,MySql等)去执行的,结果每次都会返回给我们的作业开发调度服务(DLF)后台。我们的DLF平台支持及时停止作业的功能,也就是说假如这个作业在调度过程中要执行10000条SQL,我要在中途停止不执行后面的SQL了——这样的功能是支持的。
因此问题就出在了SQL执行的过程。经过多次debug发现:在SQL执行过程中需要每次都往OBS(华为自研,第三方包)中写log,该过程不可略去。调用该线程对象的interrupt方法thread.interrupt(),interrupt标识位最早被OBS底层用到的java.util.concurrent. CountDownLatch类的await()方法捕获到,重置标识位并抛出异常,然后在一层层往上抛的时候被转变成了别的异常类型,而且不能根据最终抛的异常类型去判断是否是由于我们手动终止job引起的。
对于第三方包OBS根据自己的底层逻辑去处理CountDownLatch抛的异常,这本无可厚非。但是我们的程序终止不了!为了达到终止线程的做法,我在其中加入了一个自定义的标志变量,当调用thread.interrupt()的时候去设置变量的状态,并在几个关键点比如OBS写log之后去判断我的自定义标识位的状态,如果状态改变了就抛出RuntimeException(可以不被捕获,最小化改动代码)。并且为了能重用线程池里的线程对象,在每次job开始的地方去从重置这一自定义标识位。最终达到了优雅手动终止job的目的。
这一部分的源码涉及项目细节就不贴出来了,但是相关的逻辑前面已经代码展示过。
在线程中运行过程中定义的普通的局部变量,非ThreadLocal型,一般而言会随着线程结束而得到回收。我所遇到的现象是上面的那个线程无法停止的bug解决之后,线程停下来了,但是在linux上运行top命令相应进程内存占用还是很高。
在如下试验中
设置jvm参数为:
-Xms100m -Xmx200m -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
其意义在于:
限制jvm初始内存为100M,最大堆内存为200M。并在jvm发生垃圾回收时及时打印详细的GC信息以及时间戳。而我的代码里要做的事情就是重现jvm内存不够而不得不发生垃圾回收。同时观察操作系统层面该java进程的内存占用。
package com.xlf; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class SystemMemoryOccupiedAndReleaseTest { public static void main(String[] args) { try { System.out.println("start"); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 30, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { public Thread newThread(Runnable r) { return new Thread(r); } }, new ThreadPoolExecutor.AbortPolicy()); try { System.out.println("(executor已初始化):"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Thread t1 = new Thread(new Runnable() { { System.out.println("t1 已经初始化"); } @Override public void run() { byte[] b = new byte[100 * 1024 * 1024]; System.out.println("t1分配了100M空间给数组"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); throw new RuntimeException("t1 stop"); } System.out.println("t1 stop"); } }, "t1"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Thread t2 = new Thread(new Runnable() { { System.out.println("t2 已经初始化"); } @Override public void run() { byte[] b = new byte[100 * 1024 * 1024]; System.out.println("t2分配了100M空间给数组"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); throw new RuntimeException("t2 stop"); } System.out.println("t2 stop"); } }, "t2"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Thread t3 = new Thread(new Runnable() { { System.out.println("t3 已经初始化"); } @Override public void run() { byte[] b = new byte[100 * 1024 * 1024]; System.out.println("t3分配了100M空间给数组"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); throw new RuntimeException("t3 stop"); } System.out.println("t3 stop"); } }, "t3"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } executor.execute(t1); System.out.println("t1 executed!"); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } executor.execute(t2); System.out.println("t2 executed!"); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } executor.execute(t3); System.out.println("t3 executed!"); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("jmap -histo:live pid by cmd:"); try { Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("After jmap!"); // You may run jmap -heap pid using cmd here // executor.shutdown(); } }
上述代码里我先定义了三个Thread对象,这三个对象都是在run()方法里分配了100M大小的char[],然后线程休眠(sleep)5秒。然后new一个线程池,并将这三个线程对象依次交给线程池去execute。线程池每两次execute之间相隔10秒,这是为了给足时间给上一个线程跑完并让jvm去回收这部分内存(200M的最大堆内存,一个线程对象要占用100多M,要跑下一个线程必然会发生GC),这样就能把GC信息打印下来便于观察。最后等到三个线程都执行完毕sleep一段时间(大概20秒),让我有时间手动在cmd执行jmap -histo live pid,该命令会强制触发FullGC,jmap命令之后你也可以试着执行jmap -heap pid,该命令不会触发gc,但是可以看下整个jvm堆的占用详情.
在jmp -histo:live执行之前进程在操作系统内存占用:
执行jmp -histo:live之后
执行jmap -heap pid的结果:
t1分配了100M空间给数组之后,t2结束:
内存占用:107042K,总可用堆空间大小:166400K
无法给t2分配100M,触发FullGC:
103650K->1036K(98304K)
t2分配了100M空间给数组之后,t2结束:
内存占用:104461K,总可用堆空间大小:166400K
无法给t3分配100M,触发FullGC:
103532K->1037K(123904K)
t3分配了100M空间给数组之后,t3结束.
jmap -histo:live pid by cmd:
103565K->997K(123904K)
最后jmap -heap pid结果中堆大小也是123M。
这一过程中,操作系统层面jvm进程内存占用不会超过122M,jmap -histo:live pid触发FullGC之后维持在87M左右(反复几次试验都是这个结果)
那么为什么jvm的堆栈信息大小与资源管理器对应的不一致呢?
这个问题在网上搜了一圈,结论如下:
提交内存指的是程序要求系统为程序运行的最低大小,如果得不到满足,就会出现内存不足的提示。
工作集内存才是程序真正占用的内存,而工作集内存=可共享内存+专用内存
可共享内存的用处是当你打开更多更大的软件时,或者进行内存整理时,这一部分会被分给其他软件,所以这一块算是为程序运行预留下来的内存专用内存,专用内存指的是目前程序运行独占的内存,这一块和可共享内存不一样,无论目前系统内存多么紧张,这块专用内存是不会主动给其他程序腾出空间的
所以总结一下就是,任务管理器显示的内存,实际上是显示的程序的专用内存而程序真正占用的内存,是工作集内存
上面两张图能对的上:
如下两张图“勉强”能对的上:
但是和jmap触发gc之后的堆内存123904K还有点差距,这部分差距不大,暂时网上找不到比较靠谱的回答,笔者猜想可能这一部分用的是别的进程的可共享内存。我去linux上试了一把,也有这个内存算不准的问题。这个问题留待下次填坑吧~~
附上本文中描述的所有代码以及对应资源文件,供大家参考学习!也欢迎大家评论提问!
VelocityExperiment.zip 19.40KB