本文主要是介绍1.11 flink读取本地文件例子以及细节,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
两个细节
- 可以指定文件或目录
- 可以指定读取模式一次性或持续性检测
代码例子
PROCESS_ONCE模式
public class FileToPrint {public static void main(final String[] args) throws Exception {StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();// 创建TextInputFormatTextInputFormat textInputFormat = new TextInputFormat(null);// 可以过滤文件textInputFormat.setFilesFilter(new FilePathFilter() {@Overridepublic boolean filterPath(Path path) {return path.getName().startsWith("2");//过滤掉2开头的文件}});//指定对应的文件或者文件目录下的文件,读取对应文件内容,读取完就结束(FileProcessingMode.PROCESS_ONCE)String filePath = "C:\\Users\\xuyin\\Desktop\\新建文件夹";
// String filePath = "C:\\Users\\xuyin\\Desktop\\新建文件夹\\1.txt";streamEnv.readFile(textInputFormat, filePath,FileProcessingMode.PROCESS_ONCE, 100).print();streamEnv.execute("packaged");}
}
PROCESS_CONTINUOUSLY模式
public class FileToPrint {public static void main(final String[] args) throws Exception {StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();// 创建TextInputFormatTextInputFormat textInputFormat = new TextInputFormat(null);// 可以过滤文件textInputFormat.setFilesFilter(new FilePathFilter() {@Overridepublic boolean filterPath(Path path) {return path.getName().startsWith("2");//过滤掉2开头的文件}});// interval间隔时间到了会检查文件是否做了修改,有修改则促发重新获取所有的数据String file = "C:\\Users\\xuyin\\Desktop\\新建文件夹";
// String filePath = "C:\\Users\\xuyin\\Desktop\\新建文件夹\\1.txt";streamEnv.readFile(textInputFormat, file, FileProcessingMode.PROCESS_CONTINUOUSLY, 10000).print();streamEnv.execute("packaged");}
}
问题1
TextInputFormat构造方法中的filePath参数作用是什么?目测没什么用,因为看源码readFile的参数也有一个filePath
而最终还是会set到inputFormat的filePath,所以我的例子在构建TextInputFormat时传入的null是可以正常运行的
问题2
如何实现文件append数据不重新获取所有内容,而是从上次的位置开始读取呢?
flink没有这么做,应该是考虑到文件数据有可能存在update而不是append,所以自己的业务是单纯的append,则需要自己实现对应功能
这篇关于1.11 flink读取本地文件例子以及细节的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!