1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
| public class TopNStreamJob {
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(200L);
env.setParallelism(1);
final ParameterTool parameterTool = ParameterTool.fromArgs(args); String host = parameterTool.get("host"); int port = parameterTool.getInt("port");
final DataStreamSource<String> dataStreamSource = env.socketTextStream(host, port);
final SingleOutputStreamOperator<SensorReading> dataStream = dataStreamSource.map(line -> { String[] fields = line.split(",");
return new SensorReading(fields[0], Long.parseLong(fields[1]), Double.parseDouble(fields[2])); });
final WatermarkStrategy<SensorReading> watermarkStrategy = WatermarkStrategy .<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner((event, timestamp) -> event.getTimestamp() * 1000); final SingleOutputStreamOperator<SensorReading> eventDataStream = dataStream.assignTimestampsAndWatermarks(watermarkStrategy);
OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") { }; final WindowedStream<SensorReading, String, TimeWindow> windowStream = eventDataStream.keyBy(sensorReading -> sensorReading.getId()) .window(TumblingEventTimeWindows.of(Time.seconds(15))) .allowedLateness(Time.minutes(1)) .sideOutputLateData(outputTag);
SingleOutputStreamOperator<TopSensor> avgProcessWindow = windowStream.aggregate(new MyAggrgationFunction(), new MyProcessWindowFunction());
final SingleOutputStreamOperator<String> topStream = avgProcessWindow.keyBy(topSensor -> topSensor.getWinEnd()).process(new TopSensorKeyedProcessFunction());
avgProcessWindow.getSideOutput(outputTag).print("late");
topStream.print("top");
env.execute("Flink TopN Job"); }
private static class MyProcessWindowFunction extends ProcessWindowFunction<Double, TopSensor, String, TimeWindow> {
@Override public void process(String key, Context context, Iterable<Double> avgReadings, Collector<TopSensor> out) { Double avgValue = avgReadings.iterator().next();
out.collect(new TopSensor(key, context.window().getStart(), context.window().getEnd(), avgValue)); } }
private static class MyAggrgationFunction implements AggregateFunction<SensorReading, AvgValue, Double> {
@Override public AvgValue createAccumulator() { return new AvgValue(); }
@Override public AvgValue add(SensorReading value, AvgValue accumulator) {
accumulator.aggregate(value.getTemperature());
return accumulator; }
@Override public Double getResult(AvgValue accumulator) { return accumulator.avg(); }
@Override public AvgValue merge(AvgValue a, AvgValue b) { return a.merge(b); } }
private static class TopSensorKeyedProcessFunction extends KeyedProcessFunction<Long, TopSensor, String> {
private ListState<TopSensor> sensorList;
@Override public void open(Configuration parameters) throws Exception {
sensorList = getRuntimeContext().getListState(new ListStateDescriptor<TopSensor>("sensor-state", TopSensor.class)); }
@Override public void processElement(TopSensor value, Context ctx, Collector<String> out) throws Exception { sensorList.add(value);
ctx.timerService().registerEventTimeTimer(value.getWinEnd() + 1); }
@Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
List<TopSensor> topList = new ArrayList<>();
for (TopSensor sensor : sensorList.get()) { topList.add(sensor); }
sensorList.clear();
topList.sort(new Comparator<TopSensor>() { @Override public int compare(TopSensor o1, TopSensor o2) { return o2.getAvgVlaue().compareTo(o1.getAvgVlaue()); } });
StringBuilder result = new StringBuilder(); result.append("\n====================================\n"); result.append("时间: ").append(timestamp - 1).append("\n");
for (int i = 0; i < topList.size() - 1; i++) {
if (i >= 3) { break; }
TopSensor sensor = topList.get(i);
result.append("No").append(i).append(":") .append(" sensorId=").append(sensor.getId()) .append(" 平均值=").append(sensor.getAvgVlaue()) .append("\n"); } result.append("====================================\n\n");
out.collect(result.toString()); } } }
|