基础上手
项目创建
创建一个 maven 项目,引入以下两个依赖
1、基础上手
1.1 项目创建
创建一个 maven 项目,引入以下两个依赖
xml
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.18.0</version>
</dependency>
</dependencies>
1.2 样例
编写一段流式处理的代码,利用 nc 工具来模拟数据流。可以发现我们输入数据,该数据就被 flink 接收到并产生结果
java
public static void main(String[] args) throws Exception {
//todo 1、创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//todo 2、获取数据源
DataStreamSource<String> inputData = env.socketTextStream("8.140.207.120",8081);
//todo 3、处理数据
SingleOutputStreamOperator<Tuple2<String, Integer>> result = inputData.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
Tuple2<String, Integer> tuple2 = Tuple2.of(word, 1);
out.collect(tuple2);
}
}
});
//分组
KeyedStream<Tuple2<String, Integer>, String> kbStream = result.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
//返回第一个元素
return value.f0;
}
});
//聚合
SingleOutputStreamOperator<Tuple2<String,Integer>> data = kbStream.sum(1);
//todo 4、输出数据
data.print();
//todo 5、执行
env.execute();
}