此引擎与Apache Kafka结合使用。
Kafka 特性:
老版Kafka集成表引擎参数格式:
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format [, kafka_row_delimiter, kafka_schema, kafka_num_consumers])
新版Kafka集成表引擎参数格式:
Kafka SETTINGS kafka_broker_list = 'localhost:9092', kafka_topic_list = 'topic1,topic2', kafka_group_name = 'group1', kafka_format = 'JSONEachRow', kafka_row_delimiter = '\n', kafka_schema = '', kafka_num_consumers = 2
必要参数:
kafka_broker_list
– 以逗号分隔的 brokers 列表 (localhost:9092
)。kafka_topic_list
– topic 列表 (my_topic
)。kafka_group_name
– Kafka 消费组名称 (group1
)。如果不希望消息在集群中重复,请在每个分片中使用相同的组名。kafka_format
– 消息体格式。使用与 SQL 部分的 FORMAT
函数相同表示方法,例如 JSONEachRow
。可选参数:
kafka_row_delimiter
- 每个消息体(记录)之间的分隔符。kafka_schema
– 如果解析格式需要一个 schema 时,此参数必填。kafka_num_consumers
– 单个表的消费者数量。默认值是:1
,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者。ClickHouse可以接受和返回各种格式的数据。受支持的输入格式可用于提交给INSERT
语句、从文件表(File,URL,HDFS或者外部目录)执行SELECT
语句,受支持的输出格式可用于格式化SELECT
语句的返回结果,或者通过INSERT
写入到文件表。
以下kafka_format是支持的格式,ClickHouse可以接受和返回各种格式的数据。受支持的输入格式可用于提交给INSERT语句、从文件表(File,URL,HDFS或者外部目录)执行SELECT语句,受支持的输出格式可用于格式化SELECT语句的返回结果,或者通过INSERT写入到文件表。
格式 | 输入 | 输出 |
---|---|---|
[TabSeparated] | ✔ | ✔ |
[TabSeparatedRaw] | ✔ | ✔ |
[TabSeparatedWithNames] | ✔ | ✔ |
[TabSeparatedWithNamesAndTypes] | ✔ | ✔ |
[Template] | ✔ | ✔ |
[TemplateIgnoreSpaces] | ✔ | ✗ |
[CSV] | ✔ | ✔ |
[CSVWithNames] | ✔ | ✔ |
[CustomSeparated] | ✔ | ✔ |
[Values] | ✔ | ✔ |
[Vertical] | ✗ | ✔ |
[JSON] | ✗ | ✔ |
[JSONAsString] | ✔ | ✗ |
[JSONStrings] | ✗ | ✔ |
[JSONCompact] | ✗ | ✔ |
[JSONCompactStrings] | ✗ | ✔ |
[JSONEachRow] | ✔ | ✔ |
[JSONEachRowWithProgress] | ✗ | ✔ |
[JSONStringsEachRow] | ✔ | ✔ |
[JSONStringsEachRowWithProgress] | ✗ | ✔ |
[JSONCompactEachRow] | ✔ | ✔ |
[JSONCompactEachRowWithNamesAndTypes] | ✔ | ✔ |
[JSONCompactStringsEachRow] | ✔ | ✔ |
[JSONCompactStringsEachRowWithNamesAndTypes] | ✔ | ✔ |
[TSKV] | ✔ | ✔ |
[Pretty] | ✗ | ✔ |
[PrettyCompact] | ✗ | ✔ |
[PrettyCompactMonoBlock] | ✗ | ✔ |
[PrettyNoEscapes] | ✗ | ✔ |
[PrettySpace] | ✗ | ✔ |
[Protobuf] | ✔ | ✔ |
[ProtobufSingle] | ✔ | ✔ |
[Avro] | ✔ | ✔ |
[AvroConfluent] | ✔ | ✗ |
[Parquet] | ✔ | ✔ |
[Arrow] | ✔ | ✔ |
[ArrowStream] | ✔ | ✔ |
[ORC] | ✔ | ✔ |
[RowBinary] | ✔ | ✔ |
[RowBinaryWithNamesAndTypes] | ✔ | ✔ |
[Native] | ✔ | ✔ |
[Null] | ✗ | ✔ |
[XML] | ✗ | ✔ |
[CapnProto] | ✔ | ✗ |
[LineAsString] | ✔ | ✗ |
[Regexp] | ✔ | ✗ |
[RawBLOB] | ✔ | ✔ |
示例:
CREATE TABLE queue ( timestamp UInt64, level String, message String ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow'); SELECT * FROM queue LIMIT 5; CREATE TABLE queue2 ( timestamp UInt64, level String, message String ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092', kafka_topic_list = 'topic', kafka_group_name = 'group1', kafka_format = 'JSONEachRow', kafka_num_consumers = 4; CREATE TABLE queue2 ( timestamp UInt64, level String, message String ) ENGINE = Kafka('localhost:9092', 'topic', 'group1') SETTINGS kafka_format = 'JSONEachRow', kafka_num_consumers = 4;
消费的消息会被自动追踪,因此每个消息在不同的消费组里只会记录一次。如果希望获得两次数据,则使用另一个组名创建副本。
消费组可以灵活配置并且在集群之间同步。例如,如果群集中有10个主题和5个表副本,则每个副本将获得2个主题。 如果副本数量发生变化,主题将自动在副本中重新分配。
SELECT
查询对于读取消息并不是很有用(调试除外),因为每条消息只能被读取一次。使用物化视图创建实时线程更实用。您可以这样做:
当 MATERIALIZED VIEW
添加至引擎,它将会在后台收集数据。可以持续不断地从 Kafka 收集数据并通过 SELECT
将数据转换为所需要的格式。
示例:
CREATE TABLE queue ( timestamp UInt64, level String, message String ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow'); CREATE TABLE daily ( day Date, level String, total UInt64 ) ENGINE = SummingMergeTree(day, (day, level), 8192); CREATE MATERIALIZED VIEW consumer TO daily AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total FROM queue GROUP BY day, level; SELECT level, sum(total) FROM daily GROUP BY level;
为了提高性能,接受的消息被分组为max_insert_block_size
大小的块。如果未在stream_flush_interval_ms
毫秒内形成块,则不关心块的完整性,都会将数据刷新到表中。
停止接收主题数据或更改转换逻辑,请 detach 物化视图:
DETACH TABLE consumer; ATTACH TABLE consumer;
如果使用 ALTER
更改目标表,为了避免目标表与视图中的数据之间存在差异,推荐停止物化视图。
与 GraphiteMergeTree
类似,Kafka 引擎支持使用ClickHouse配置文件进行扩展配置。可以使用两个配置键:全局 (kafka
) 和 主题级别 (kafka_*
)。首先应用全局配置,然后应用主题级配置(如果存在)。
<!-- Global configuration options for all tables of Kafka engine type --> <kafka> <debug>cgrp</debug> <auto_offset_reset>smallest</auto_offset_reset> </kafka> <!-- Configuration specific for topic "logs" --> <kafka_logs> <retry_backoff_ms>250</retry_backoff_ms> <fetch_min_bytes>100000</fetch_min_bytes> </kafka_logs>
在ClickHouse
配置中使用下划线 (_
) ,并不是使用点 (.
)。例如,check.crcs=true
将是 <check_crcs>true</check_crcs>
。
对于使用了kerberos的kafka, 将security_protocol 设置为sasl_plaintext就够了,如果kerberos的ticket是由操作系统获取和缓存的。
clickhouse也支持自己使用keyfile的方式来维护kerbros的凭证。配置sasl_kerberos_service_name、sasl_kerberos_keytab、sasl_kerberos_principal三个子元素就可以。
示例:
<!-- Kerberos-aware Kafka --> <kafka> <security_protocol>SASL_PLAINTEXT</security_protocol> <sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab> <sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal> </kafka>
_topic
– Kafka 主题。_key
– 信息的键。_offset
– 消息的偏移量。_timestamp
– 消息的时间戳。_timestamp_ms
– 消息的时间戳(毫秒)。_partition
– Kafka 主题的分区。ClickHouse经典中文文档分享