IteratorX - 极简的 JDBC/File 读取器
Apache-2.0
跨平台
Java
软件简介
IteratorX 是一个极简主义的 jdbc/file reader。
参见:
maven:https://mvnrepository.com/artifact/io.iteratorx/iteratorx
使用:
1. Reader: JdbcReader, FileReader
超简单,从 jdbc 或 file 中读取数据到 JSONObject 中。
1.1. JdbcReader:读取 jdbc
// create jdbc reader
final JdbcReader jdbcReader = new JdbcReader(
new JdbcDataSourceBuilder().setUrl("jdbc:postgresql://10.23.112.2:3333/dbname")
.setUser("username").setPassword("password").build());
// fetch by iterable
for (final JSONObject item : jdbcReader.read("select * from tablename")) {
System.err.println(item);
}
// fetch all into one collection
final Collection items = jdbcReader.readAll("select * from tablename where type = ?", param);
for (final JSONObject item : items) {
System.err.println(item);
}
1.2. FileReader:读取 file
// create file reader
final FileReader fileReader = new FileReader();
// fetch by iterable
for (final JSONObject item : fileReader.read(new File("data.json"), "utf-8")) {
System.err.println(item);
}
// fetch all into one collection
final Collection items = fileReader.readAll(new File("data.json"), "utf-8");
for (final JSONObject item : items) {
System.err.println(item);
}
2. Parallels: Threads, Flink, RxJava
简单易用的 多线程处理。
2.1. Threads: 使用 ThreadPool 并行处理
// process each item parallelly using thread pool
Threads.from(jdbcReader.read("select * from tablename")).forEach(item -> {
System.err.println(item);
});
// process batch data parallelly
Threads.from(jdbcReader.read("select * from tablename")).forBatch(items -> {
for (final JSONObject item : items) {
System.err.println(item);
}
});
2.2. Flink: 使用 Flink 并行处理
// process each item parallelly using Flink engine
Flink.from(jdbcReader.read("select * from tablename")).forEach(item -> {
System.err.println(item);
});
// process batch data parallelly
Flink.from(jdbcReader.read("select * from tablename")).forBatch(items -> {
for (final JSONObject item : items) {
System.err.println(item);
}
});
// use DataSet directly to enable all Flink power
Flink.from(jdbcReader.read("select * from tablename")).dataSet().distinct().count();
2.3. RxJava: 使用 RxJava 并行处理
// process each item parallely using RxJava engine
RxJava.from(jdbcReader.read("select * from tablename")).forEach(item -> {
System.err.println(item);
});
// process batch data parallely
RxJava.from(jdbcReader.read("select * from tablename")).forBatch(items -> {
for (final JSONObject item : items) {
System.err.println(item);
}
});
// use Observable directly
RxJava.from(jdbcReader.read("select * from tablename")).observable().distinct().count();
Bug:使用 RxJava 的时候,程序结束了停不下来,谁能帮忙解决吗?
参见:
maven:https://mvnrepository.com/artifact/io.iteratorx/iteratorx