Flink 状态管理
有状态算子
无状态的算子任务只需要观察每个独立事件,根据当前输入的数据之间转化输出结果,如 map,filter,flatMap,计算时不依赖其他数据,就属于无状态的算子。
有状态的算子任务,出当前数据之外,还需要一些其他数据来得到计算结果,如聚合算子、窗口算子都属于有状态的算子。
状态的分类
托管状态就是由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现,我们只要调接口就可以。托管状态分为 按键分区状态 和 算子状态
原始状态是自定义的,相当于开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复
键控状态 (按键分区状态)
键控状态,也叫按键分区状态 (keyed state),是 Flink 状态管理的核心,每个键值对就是一个状态,经过 keyBy
处理的数据就是键控状态,否则是其他状态。
值状态
案例:检测每种传感器的水位值,如果连续的两个水位值相差超过 10 ,就输出报警
//todo 检测每种传感器的水位值,如果连续的两个水位值相差超过 10 , 就输出报警
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDs = env.socketTextStream("8.140.207.120", 8081)
.map(new WaterSensorMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
);
sensorDs.keyBy(WaterSensor::getId)
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
//todo 1、定义一个状态,保存上一条数据的水位值
ValueState<Integer> lastVcState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//todo 2、在 open 方法中,初始化状态
lastVcState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastVcState", Types.INT));
}
@Override
public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
// 1、取出上一条数据的水位值
int lastVc = lastVcState.value() == null ? 0 : lastVcState.value();
// 2、求差值的绝对值,判断是否超过 10
if (Math.abs(lastVc - value.getVc()) > 10) {
out.collect("传感器=" + value.getId() +"当前水位值=" + value.getVc() + ",与上一次的水位值=" + lastVc + ",超过10");
}
// 3、保存更新自己的水位值
lastVcState.update(value.getVc());
}
}
)
.print();
env.execute();
}
列表状态
案例: 针对每种传感器输出最高的3个水位值
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDs = env.socketTextStream("8.140.207.120", 8081)
.map(new WaterSensorMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
);
//todo 案例: 针对每种传感器输出最高的3个水位值
sensorDs.keyBy(WaterSensor::getId)
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
//todo 1、定义一个状态,保存数据的水位值,只保存最高的3个水位值即可
ListState<Integer> vcListState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
vcListState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vcListState", Types.INT));
}
@Override
public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
//1、来一条存到 list 状态里
vcListState.add(value.getVc());
//2、从 list 状态拿出来当前所有数据,拷贝到一个 list 中排序,只留 3 个最大的
Iterable<Integer> vcListIt = vcListState.get();
List<Integer> vcList = new ArrayList<>();
for (Integer i : vcListIt) {
vcList.add(i);
}
//3、排序
vcList.sort((o1, o2) -> o2 - o1);
//4、直留3个最大的
if (vcList.size() > 3) {
vcList.remove(3);
}
out.collect("传感器Id为:" + value.getId() + "最大的3个水位值=" + vcList.toString());
//5、更新状态保存留下来的数据
vcListState.update(vcList);
}
}
).print();
env.execute();
}
Map 状态
案例:统计每种传感器每种水位值出现的次数
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDs = env.socketTextStream("8.140.207.120", 8081)
.map(new WaterSensorMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
);
//todo 案例:统计每种传感器每种水位值出现的次数
sensorDs.keyBy(WaterSensor::getId)
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
MapState<Integer,Integer> vcCountMapState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
vcCountMapState = getRuntimeContext()
.getMapState(new MapStateDescriptor<Integer, Integer>("vcCountMapState", Types.INT, Types.INT));
}
@Override
public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
//如果 vcCountMapState 包含 vc 对应的 Key
Integer vc = value.getVc();
if (vcCountMapState.contains(vc)) {
Integer count = vcCountMapState.get(vc);
vcCountMapState.put(vc,++count);
}else {
//如果不包含则初始化
vcCountMapState.put(vc,1);
}
//2、遍历 Map 状态,输出每个 k-v 的值
StringBuilder outStr = new StringBuilder();
outStr.append("传感器 id 为" + value.getId() + "\n");
for (Map.Entry<Integer, Integer> vcCount : vcCountMapState.entries()) {
outStr.append(vcCount.toString() + "\n");
}
outStr.append("==========================\n");
out.collect(outStr.toString());
}
}
).print();
env.execute();
}
规约状态
案例:输出每个传感器的水位值总和
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDs = env.socketTextStream("8.140.207.120", 8081)
.map(new WaterSensorMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
);
//todo 案例: 输出每个传感器的水位值总和
sensorDs.keyBy(WaterSensor::getId)
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
ReducingState<Integer> vcSumReducingState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
vcSumReducingState = getRuntimeContext()
.getReducingState(new ReducingStateDescriptor<Integer>("vcSumReducingState", Integer::sum, Types.INT));
}
@Override
public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
vcSumReducingState.add(value.getVc());
Integer vcSum = vcSumReducingState.get();
out.collect("传感器 id 为" + value.getId() + ",水位值总和=" + vcSum);
}
}
).print();
env.execute();
}
聚合状态
案例:输出每个传感器的水位值总和
sensorDs.keyBy(WaterSensor::getId)
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
@Override
public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
// 将水位值添加到聚合状态中
vcAvgAggregatingState.add(value.getVc());
// 从聚合状态中获取结果
Double vcAvg = vcAvgAggregatingState.get();
out.collect("传感器id为:" + value.getId() + ",平均水位值=" + vcAvg);
}
AggregatingState<Integer,Double> vcAvgAggregatingState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
vcAvgAggregatingState = getRuntimeContext().getAggregatingState(
new AggregatingStateDescriptor<Integer, Tuple2<Integer,Integer>, Double>(
"vcAvgAggregatingState",
new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {
@Override
public Tuple2<Integer, Integer> createAccumulator() {
return Tuple2.of(0,0);
}
@Override
public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
return Tuple2.of(accumulator.f0 + value,accumulator.f1 + 1);
}
@Override
public Double getResult(Tuple2<Integer, Integer> accumulator) {
return accumulator.f0 * 1D / accumulator.f1;
}
@Override
public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
return null;
}
},
Types.TUPLE(Types.INT, Types.INT)));
}
}
).print();
状态生存时间
可以为状态生成生存时间 (Time-to-life) TTL,当状态过期后,会自动清除。
sensorDs.keyBy(WaterSensor::getId)
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
ValueState<Integer> lastVcState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//todo 1、创建 StateTtlConfig
StateTtlConfig stateTtlConfig = StateTtlConfig
//过期时间 5s
.newBuilder(Time.seconds(5))
//状态 创建和更新 会更新过期时间
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
//不返回过期的状态值
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
//todo 2、状态描述器启用 ttl
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("lastVcState", Types.INT);
stateDescriptor.enableTimeToLive(stateTtlConfig);
lastVcState = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
Integer lastVc = lastVcState.value();
out.collect("key:" + value.getId() + ",状态值=" + lastVc);
//更新状态值
lastVcState.update(value.getVc());
}
}
)
.print();
算子状态
列表状态
案例: 在 map 算子中计算数据的个数
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.socketTextStream("8.140.207.120", 8081)
.map(new MyCountMapFunction())
.print();
env.execute();
}
//todo 1、实现 CheckpointedFunction
public static class MyCountMapFunction implements MapFunction<String,Long>, CheckpointedFunction {
private Long count = 0L;
private ListState<Long> state;
@Override
public Long map(String value) throws Exception {
return ++count;
}
/**
* //todo 2、本地变量持久化,将 本地变量 拷贝到 算子状态中,开启 checkpoint 时才会调用
* @param context the context for drawing a snapshot of the operator
* @throws Exception
*/
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
System.out.println("snapshotState");
//2.1 清空算子状态
state.clear();
//2.2 将本地变量添加到算子状态中
state.add(count);
}
/**
* //todo 3、初始化本地变量,从状态中把数据添加到本地变量,每个子任务调用一次
* @param context the context for initializing the operator
* @throws Exception
*/
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
System.out.println("initializeState");
// 3.1 获取算子状态
state = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<Long>("state", Types.LONG));
/**
* 算子状态中 list 与 unionlist 的区别: 并行度改变后,怎么重新分配状态
* 1、list : 轮询分给新的并行子任务
* 2、unionlist : 原先多个子任务的状态合并成一份完整的,广播给每个新的子任务
*
*/
context.getOperatorStateStore()
.getUnionListState(new ListStateDescriptor<Long>("state2", Types.LONG));
// 3.2 从算子状态中把数据拷贝到本地变量
if (context.isRestored()) {
for (Long c : state.get()) {
count += c;
}
}
}
}
广播状态
有时我们希望算子并行子任务都保持同一份“全局状态”,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样, 这种特殊的算子状态,就叫做广播状态。
因此广播状态在每个并行子任务上的实例都一样,所以在并行度调整的时候,只要复制一份到新的并行任务就可以实现扩展; 并行度缩小的情况,可以将多余的并行子任务连同状态之间砍掉--因为状态都是复制出来的,并不会丢失。
案例: 水位超过指定的阈值发送告警,阈值可以动态修改
//todo 水位超过指定的阈值发送告警,阈值可以动态修改
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
//数据流
SingleOutputStreamOperator<WaterSensor> sensorDs = env.socketTextStream("8.140.207.120", 8081)
.map(new WaterSensorMapFunction());
//配置流 用来广播配置
DataStreamSource<String> configDs = env.socketTextStream("8.140.207.120", 33333);
//todo 1、将配置流 广播
MapStateDescriptor<String, Integer> broadcastMapState = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT);
BroadcastStream<String> configBS = configDs.broadcast(broadcastMapState);
//todo 2、将 数据流 和 广播后的配置流 连接起来
BroadcastConnectedStream<WaterSensor, String> sensorBCS = sensorDs.connect(configBS);
//todo 3、调用 process
sensorBCS.process(
new BroadcastProcessFunction<WaterSensor, String, String>() {
@Override
public void processElement(WaterSensor value, BroadcastProcessFunction<WaterSensor, String, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {
// todo 5、通过上下文获取广播状态,取出里面的值(只读,不可修改)
ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);
Integer threshold = broadcastState.get("threshold");
//判断广播状态里是否有数据
threshold = (threshold == null ? 0 : threshold);
if (value.getVc() > threshold) {
out.collect("水位:"+ value.getVc() + "超过阈值:" + threshold);
}
}
//广播后的配置流的处理方法
@Override
public void processBroadcastElement(String value, BroadcastProcessFunction<WaterSensor, String, String>.Context ctx, Collector<String> out) throws Exception {
//todo 4、通过上下文获取广播状态,往里面写数据
BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);
broadcastState.put("threshold", Integer.valueOf(value));
}
}
).print();
env.execute();
}
状态后端
FLink 中状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫做状态后端。状态后端主要管理本地状态的存储方式和位置
HashMapStateBackend
内存计算,读写速度非常块,但是状态大小受限于集群可用内存,如果应用状态不断增长,可能耗尽内存资源,是flink 的默认配置状态后端
RocksDBStateBackend
硬盘存储,适合海量级状态存储,但是读写速度慢,状态大小无限制,但是需要集群可用磁盘空间