首页 笔记 图片 查字 
所属分类:Flink
浏览:26
内容:

参考:
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/connectors/table/filesystem/#streaming-sink

要点:
流式写入hive表需要开启checkpoint配置。

Hive表增加属性:

ALTER TABLE test_db.test_table SET TBLPROPERTIES ('execution.checkpointing.interval'='5 s');

ALTER TABLE test_db.test_table SET TBLPROPERTIES ('sink.partition-commit.trigger'='process-time');
ALTER TABLE test_db.test_table SET TBLPROPERTIES ('sink.partition-commit.delay'='0 s');
ALTER TABLE test_db.test_table SET TBLPROPERTIES ('sink.partition-commit.watermark-time-zone'='Asia/Shanghai');

ALTER TABLE test_db.test_table SET TBLPROPERTIES ('sink.rolling-policy.file-size' = '512KB');
ALTER TABLE test_db.test_table SET TBLPROPERTIES ('sink.rolling-policy.rollover-interval' = '10 s');
ALTER TABLE test_db.test_table SET TBLPROPERTIES ('sink.rolling-policy.check-interval' = '10 s');

ALTER TABLE test_db.test_table SET TBLPROPERTIES ('sink.partition-commit.policy.kind' = 'metastore,success-file');

程序开启checkpoint配置代码:
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.enableCheckpointing(5000);
environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);