Apache Flink
是一个在无界和有界数据流上进行有状态计算的框架。由于许多流应用程序被设计为在最短的停机时间内连续运行,流处理器必须提供出色的故障恢复,以及在应用程序运行时监视和维护应用程序的工具。
Apache Flink非常关注流处理的操作方面。在这里,我们将解释Flink的故障恢复机制,并介绍其用于管理和监督运行中的应用程序的特性。
Flink
+ClickHouse
主要特点:
指标实现支持 sql 描述,以前的方案使用是 storm 的程序,通过 stormsql 实现,包括 flinksql,这些内容对于 UDF 支持相对有限,但是现在这套 Flink+ClickHouse 基本上可以把分析师提的指标通过 sql 实现。
指标的上下线互不影响,这个主要是解决上边提到的关于 Flink 任务消费了 topic 以后,假如用户提出新的指标的时候,是启动新任务还是要不断修改的问题。
数据可回溯,方便异常排查,这个就类似上边提到的假如我的日活掉了,需要知道哪些指标的口径的逻辑掉了、哪个上报的数据掉了,如 cmd 掉了还是数据流 kafka 掉了还是用户上报的时候指标没有上报导致的日活掉了。假如单纯的 flink 的话,只是会计算出那个指标掉了,是没办法回溯的。
计算快,一个周期内完成所有的指标计算,现在的 horizon 曲线可能是几百上千,需要在五分钟之内或者十分钟之内,把所有分时、累时、以及维度下降的指标全部计算出来。
支持实时流,分部署部署,运维简单。
如何学习?通过Flink+ClickHouse 玩转企业级实时大数据开发课程你将很好掌握以上特点。
Step 1 Flink安装
要能够运行Flink,唯一的要求是安装一个可工作的Java 8或11。你可以通过发出以下命令来检查Java的正确安装:
java -version $ tar -xzf flink-1.13.0-bin-scala_2.11.tgz $ cd flink-1.13.0-bin-scala_2.11
Step 2: Start a Cluster
Flink ships with a single bash script to start a local cluster.
$ ./bin/start-cluster.sh
启动集群。
在主机上启动独立进程。
启动主机上的任务执行程序守护进程。
Step 3: Submit a Job
Flink的发布附带了一些示例Jobs。您可以快速地将其中一个应用程序部署到正在运行的集群中。
$ ./bin/flink run examples/streaming/WordCount.jar $ tail log/flink-*-taskexecutor-*.out (to,1) (be,1) (or,1) (not,1) (to,2) (be,2)
此外,您可以检查Flink的Web UI来监视Cluster和运行Job的状态。
Step 4: Stop the Cluster #
完成后,可以快速停止集群和所有正在运行的组件。
$ ./bin/stop-cluster.sh
Flink 程序是实现了分布式集合转换(例如过滤、映射、更新状态、join、分组、定义窗口、聚合)的规范化程序。集合初始创建自 source(例如读取文件、kafka 主题,或本地内存中的集合)。结果通过 sink 返回,例如,它可以将数据写入(分布式)文件,或标准输出(例如命令行终端)。Flink 程序可以在多种环境中运行,独立运行或嵌入到其他程序中。可以在本地 JVM 中执行,也可以在多台机器的集群上执行。
针对有界和无界两种数据 source 类型,你可以使用 DataSet API 来编写批处理程序或使用 DataStream API 来编写流处理程序。本篇指南将介绍这两种 API 通用的基本概念
/** * clickhouse方言 */ public class ClickHouseJDBCDialect implements JDBCDialect { private static final long serialVersionUID = 1L; @Override public boolean canHandle(String url) { return url.startsWith("jdbc:clickhouse:"); } @Override public Optional<String> defaultDriverName() { return Optional.of("ru.yandex.clickhouse.ClickHouseDriver"); } @Override public String quoteIdentifier(String identifier) { return "`" + identifier + "`"; } @Override public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) { return Optional.of(getInsertIntoStatement(tableName, fieldNames)); } @Override public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) { return null; } }
。。。