简介
Apache Flink具有两个关系API - 表API和SQL - 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,Table API和SQL接口彼此紧密集成,以及Flink的DataStream和DataSet API。您可以轻松地在基于API构建的所有API和库之间切换。例如,您可以使用CEP库从DataStream中提取模式,然后使用Table API分析模式,或者可以在预处理上运行Gelly图算法之前使用SQL查询扫描,过滤和聚合批处理表数据。
Flink SQL的编程模型
创建一个TableEnvironment
TableEnvironment是Table API和SQL集成的核心概念,它主要负责:
1、在内部目录中注册一个Table
2、注册一个外部目录
3、执行SQL查询
4、注册一个用户自定义函数(标量、表及聚合)
5、将DataStream或者DataSet转换成Table
6、持有ExecutionEnvironment或者StreamExecutionEnvironment的引用
一个Table总是会绑定到一个指定的TableEnvironment中,相同的查询不同的TableEnvironment是无法通过join、union合并在一起。
TableEnvironment有一个在内部通过表名组织起来的表目录,Table API或者SQL查询可以访问注册在目录中的表,并通过名称来引用它们。
在目录中注册表
TableEnvironment允许通过各种源来注册一个表:
1、一个已存在的Table对象,通常是Table API或者SQL查询的结果
Table projTable = tableEnv.scan(“X”).select(…);
2、TableSource,可以访问外部数据如文件、数据库或者消息系统
TableSource csvSource = new CsvTableSource(“/path/to/file”, …);
3、DataStream或者DataSet程序中的DataStream或者DataSet
//将DataSet转换为Table
Table table= tableEnv.fromDataSet(tableset);
注册TableSink
注册TableSink可用于将 Table API或SQL查询的结果发送到外部存储系统,例如数据库,键值存储,消息队列或文件系统(在不同的编码中,例如,CSV,Apache [Parquet] ,Avro,ORC],……):
1
TableSink csvSink = new CsvTableSink("/path/to/file", ...);
1 | 2、 String[] fieldNames = {"a", "b", "c"}; |
实战案例一
基于Flink SQL的WordCount:
1 | public class WordCountSQL { |
输出如下:
1 | WC TOM 1 |
实战案例二
本例稍微复杂,首先读取一个文件中的内容进行统计,并写入到另外一个文件中:
1 | public class SQLTest { |
以上所有代码,大家在公众号回复Flink
即可下载,可以直接本地运行,方便大家调试