聊聊 Flink SQL增量查询Hudi表

    作者:匿名更新于: 2022-12-09 22:46:09

      read.end-commit 增量查询结束时间 不指定该参数则默认读取到最新的记录,该参数一般只适用于批读,因为流读一般的需求是查询所有的增量数据。

      ​官网文档

      地址:https://hudi.apache.org/cn/docs/querying_data#incremental-query

      参数

      read.start-commit 增量查询开始时间 对于流读,如果不指定该值,默认取最新的instantTime,也就是流读默认从最新的instantTime开始读(包含最新的)。对于批读,如果不指定该参数,只指定read.end-commit,则实现时间旅行的功能,可查询历史记录

      read.end-commit 增量查询结束时间 不指定该参数则默认读取到最新的记录,该参数一般只适用于批读,因为流读一般的需求是查询所有的增量数据

      read.streaming.enabled 是否流读 默认false

      read.streaming.check-interval 流读的检查时间间隔,单位秒(s),默认值60,也就是一分钟查询范围 [BEGIN_INSTANTTIME,END_INSTANTTIME],既包含开始时间又包含结束时间,对于默认值可参考上面的参数说明

      版本

      建表造数:

      Hudi 0.9.0

      Spark 2.4.5

      我这里建表造数使用Hudi Spark SQL 0.9.0,目的是为了模拟项目上用Java Client和Spark SQL创建的Hudi表,以验证Hudi Flink SQL增量查询时是否兼容旧版本的Hudi表(大家没有这种需求的,可以使用任何方式正常造数)

      查询

      Hudi 0.13.0-SNAPSHOT

      Flink 1.14.3 (增量查询)

      Spark 3.1.2 (主要是为了使用Call Procedures命令查看commit信息)

      建表造数

      复制

      1.  -- Spark SQL Hudi 0.9.0

      2.  create table hudi.test_flink_incremental (

      3.  id int,

      4.  name string,

      5.  price double,

      6.  ts long,

      7.  dt string

      8.  ) using hudi

      9.  partitioned by (dt)

      10.  options (

      11.  primaryKey = 'id',

      12.  preCombineField = 'ts',

      13.  type = 'cow'

      14.  );

          15.

      16.  insert into hudi.test_flink_incremental values (1,'a1', 10, 1000, '2022-11-25');

      17.  insert into hudi.test_flink_incremental values (2,'a2', 20, 2000, '2022-11-25');

      18.  update hudi.test_flink_incremental set name='hudi2_update' where id = 2;

      19.  insert into hudi.test_flink_incremental values (3,'a3', 30, 3000, '2022-11-26');

      20.  insert into hudi.test_flink_incremental values (4,'a4', 40, 4000, '2022-12-26');

      用show_commits看一下有哪些commits(这里查询用的是Hudi的master,因为show_commits是在0.11.0版本开始支持的,也可以通过使用hadoop命令查看.hoodie文件夹下的.commit文件)

      复制

      1.  call show_commits(table => 'hudi.test_flink_incremental');

      复制

      1.  20221205152736

      2.  20221205152723

      3.  20221205152712

      4.  20221205152702

      5.  20221205152650

      Flink SQL创建Hudi内存表

      复制

      1.  CREATE TABLE test_flink_incremental (

      2.  id int PRIMARY KEY NOT ENFORCED,

      3.  name VARCHAR(10),

      4.  price double,

      5.  ts bigint,

      6.  dt VARCHAR(10)

      7.  )

      8.  PARTITIONED BY (dt)

      9.  WITH (

      10.  'connector' = 'hudi',

      11.  'path' = 'hdfs://cluster1/warehouse/tablespace/managed/hive/hudi.db/test_flink_incremental'

      12.  ); 

      建表时不指定增量查询相关的参数,我们在查询时动态指定,这样比较灵活。动态指定参数方法,在查询语句后面加上如下形式的语句

      复制

      1.  /*+

      2.  options(

      3.  'read.start-commit' = '20221205152723',

      4.  'read.end-commit'='20221205152736'

      5.  )

      6.  */

      批读

      Flink SQL读Hudi有两种模式:批读和流读。默认批读,先看一下批读的增量查询

      验证是否包含起始时间和默认结束时间

      复制

      1.  select * from test_flink_incremental

      2.  /*+

      3.  options(

      4.  'read.start-commit' = '20221205152723' --起始时间对应id=3的记录

      5.  )

      6.  */

      结果包含起始时间,不指定结束时间默认读到最新的数据

      复制

      1.  id name price ts dt

      2.  4 a4 40.0 4000 dt=2022-12-26

      3.  3 a3 30.0 3000 dt=2022-11-26

      验证是否包含结束时间

      复制

      1.  select * from test_flink_incremental

      2.  /*+

      3.  options(

      4.  'read.start-commit' = '20221205152712', --起始时间对应id=2的记录

      5.  'read.end-commit'='20221205152723' --结束时间对应id=3的记录

      6.  )

      7.  */

      结果包含结束时间

      复制

      1.  id name price ts dt

      2.  3 a3 30.0 3000 dt=2022-11-26

      3.  2 hudi2_update 20.0 2000 dt=2022-11-25

      验证默认开始时间

      这种情况是指定结束时间,但不指定开始时间,如果都不指定,则读表所有的最新版本的记录。

      复制

      1.  select * from test_flink_incremental

      2.  /*+

      3.  options(

      4.  'read.end-commit'='20221205152712' --结束时间对应id=2的更新记录

      5.  )

      6.  */

      结果:只查询end-commit对应的记录

      复制

      1.  id name price ts dt

      2.  2 hudi2_update 20.0 2000 dt=2022-11-25

      时间旅行(查询历史记录)

      验证是否可以查询历史记录,我们更新id为2的name,更新前name为a2,更新后为hudi2_update,我们验证一下,是否可以通过FlinkSQL查询Hudi历史记录,预期结果查出id=2,name=a2

      复制

      1.  select * from test_flink_incremental

      2.  /*+

      3.  options(

      4.  'read.end-commit'='20221205152702' --结束时间对应id=2的历史记录

      5.  )

      6.  */

      结果:可以正确查询历史记录

      复制

      1.  id name price ts dt

      2.  2 a2 20.0 2000 dt=2022-11-25

      流读

      开启流读的参数:

      复制

      1.  read.streaming.enabled = true

      流读不需要设置结束时间,因为一般的需求是读所有的增量数据,我们只需要验证开始时间就好了

      验证默认开始时间

      复制

      1.  select * from test_flink_incremental

      2.  /*+

      3.  options(

      4.  'read.streaming.enabled'='true',

      5.  'read.streaming.check-interval' = '4'

      6.  )

      7.  */

      结果:从最新的instantTime开始增量读取,也就是默认的read.start-commit为最新的instantTime

      复制

      1.  id name price ts dt

      2.  4 a4 40.0 4000 dt=2022-12-26

      验证指定开始时间

      复制

      1.  select * from test_flink_incremental

      2.  /*+

      3.  options(

      4.  'read.streaming.enabled'='true',

      5.  'read.streaming.check-interval' = '4',

      6.  'read.start-commit' = '20221205152712'

      7.  )

      8.  */

      结果:

      复制

      1.  id name price ts dt

      2.  2 hudi2_update 20.0 2000 dt=2022-11-25

      3.  3 a3 30.0 3000 dt=2022-11-26

      4.  4 a4 40.0 4000 dt=2022-11-26

      如果想第一次查询全部的历史数据,可以将start-commit设置的早一点,比如设置到去年:'read.start-commit' = '20211205152712'

      复制

      1.  select * from test_flink_incremental

      2.  /*+

      3.  options(

      4.  'read.streaming.enabled'='true',

      5.  'read.streaming.check-interval' = '4',

      6.  'read.start-commit' = '20211205152712'

      7.  )

      8.  */

      复制

      1.  id name price ts dt

      2.  1 a1 10.0 1000 dt=2022-11-25

      3.  2 hudi2_update 20.0 2000 dt=2022-11-25

      4.  3 a3 30.0 3000 dt=2022-11-26

      5.  4 a4 40.0 4000 dt=2022-11-26

      验证流读的连续性

      验证新的增量数据进来,是否可以持续消费Hudi增量数据,验证数据的准确一致性,为了方便验证,我可以使用Flink SQL增量流读Hudi表然后Sink到MySQL表中,最后通过读取MySQL表中的数据验证数据的准确性

      Flink SQL读写MySQL需要配置jar包,将flink-connector-jdbc_2.12-1.14.3.jar​放到lib​下即可,下载地址:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.14.3/flink-connector-jdbc_2.12-1.14.3.jar

      先在MySQL中创建一张Sink表

      复制

      1.  -- MySQL

      2.  CREATE TABLE `test_sink` (

      3.  `id` int(11),

      4.  `name` text DEFAULT NULL,

      5.  `price` int(11),

      6.  `ts` int(11),

      7.  `dt` text DEFAULT NULL

      8.  ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

      Flink中创建对应的sink表

      复制

      1.  create table test_sink (

      2.  id int,

      3.  name string,

      4.  price double,

      5.  ts bigint,

      6.  dt string

      7.  ) with (

      8.  'connector' = 'jdbc',

      9.  'url' = 'jdbc:mysql://192.468.44.128:3306/hudi?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8',

      10.  'username' = 'root',

      11.  'password' = 'root-123',

      12.  'table-name' = 'test_sink',

      13.  'sink.buffer-flush.max-rows' = '1'

      14.  );

      然后流式增量读取Hudi表Sink Mysql

      复制

      1.  insert into test_sink

      2.  select * from test_flink_incremental

      3.  /*+

      4.  options(

      5.  'read.streaming.enabled'='true',

      6.  'read.streaming.check-interval' = '4',

      7.  'read.start-commit' = '20221205152712'

      8.  )

      9.  */

      这样会起一个长任务,一直处于running状态,我们可以在yarn-session界面上验证这一点

      然后先在MySQL中验证一下历史数据的准确性

      再利用Spark SQL往source表插入两条数据

      复制

      1.  -- Spark SQL

      2.  insert into hudi.test_flink_incremental values (5,'a5', 50, 5000, '2022-12-07');

      3.  insert into hudi.test_flink_incremental values (6,'a6', 60, 6000, '2022-12-07');

      我们增量读取的间隔设置的4s,成功插入数据等待4s后,再在MySQL表中验证一下数据

      发现新增的数据已经成功Sink到MySQL中了,并且数据没有重复

      最后验证一下更新的增量数据,Spark SQL更新Hudi source表

      复制

      1.  -- Spark SQL

      2.  update hudi.test_flink_incremental set name='hudi5_update' where id = 5;

      继续验证结果

      结果是更新的增量数据也会insert到MySQL中的sink表,但是不会更新原来的数据

      那如果想实现更新的效果呢?我们需要在MySQL和Flink的sink表中加上主键字段,两者缺一不可,如下:

      复制

      1.  -- MySQL

      2.  CREATE TABLE `test_sink` (

      3.  `id` int(11),

      4.  `name` text DEFAULT NULL,

      5.  `price` int(11),

      6.  `ts` int(11),

      7.  `dt` text DEFAULT NULL,

      8.  PRIMARY KEY (`id`)

      9.  ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

      复制

      1.  -- Flink SQL

      2.  create table test_sink (

      3.  id int PRIMARY KEY NOT ENFORCED,

      4.  name string,

      5.  price double,

      6.  ts bigint,

      7.  dt string

      8.  ) with (

      9.  'connector' = 'jdbc',

      10.  'url' = 'jdbc:mysql://192.468.44.128:3306/hudi?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8',

      11.  'username' = 'root',

      12.  'password' = 'root-123',

      13.  'table-name' = 'test_sink',

      14.  'sink.buffer-flush.max-rows' = '1'

      15.  );

      将刚才起的长任务关掉,重新执行刚才的insert语句,先跑一下历史数据,最后再验证一下增量效果

      复制

      1.  -- Spark SQL

      2.  update hudi.test_flink_incremental set name='hudi6_update' where id = 6;

      3.  insert into hudi.test_flink_incremental values (7,'a7', 70, 7000, '2022-12-07');

      可以看到,达到了预期效果,对于id=6的执行更新操作,对于id=7的执行插入操作。

      来源: 伦少的博客

      >>>>>>点击进入数据库专题

数据库 更多推荐

课课家教育

未登录