首页 笔记 图片 查字 
所属分类:Flink
浏览:45
内容:

参考:

https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/connectors/datastream/formats/text_files/ 



要点:
可以通过两种方式使用此 format:
1、批处理模式的有界读取
2、流模式的连续读取:监视目录中出现的新文件

final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

String path="hdfs://master:9000/test/data_dir/";

1、有界读取示例代码:
final FileSource<String> source =
  FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path(path))
  .build();
final DataStream<String> stream =
  env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");

2、连续读取示例代码:
final FileSource<String> source =
    FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path(path))
  .monitorContinuously(Duration.ofSeconds(1L))
  .build();
final DataStream<String> stream =
  env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");