1 package misc.temperature;
3 import static ca.uqac.lif.cep.Connector.connect;
5 import static ca.uqac.lif.cep.Connector.BOTTOM;
6 import static ca.uqac.lif.cep.Connector.INPUT;
7 import static ca.uqac.lif.cep.Connector.OUTPUT;
8 import static ca.uqac.lif.cep.Connector.TOP;
10 import ca.uqac.lif.cep.GroupProcessor;
11 import ca.uqac.lif.cep.UniformProcessor;
12 import ca.uqac.lif.cep.functions.ApplyFunction;
13 import ca.uqac.lif.cep.functions.Constant;
14 import ca.uqac.lif.cep.functions.Cumulate;
15 import ca.uqac.lif.cep.functions.CumulativeFunction;
16 import ca.uqac.lif.cep.functions.FunctionLambda;
17 import ca.uqac.lif.cep.functions.FunctionTree;
18 import ca.uqac.lif.cep.functions.StreamVariable;
19 import ca.uqac.lif.cep.functions.TurnInto;
20 import ca.uqac.lif.cep.io.Print;
21 import ca.uqac.lif.cep.tmf.Filter;
22 import ca.uqac.lif.cep.tmf.FilterOn;
23 import ca.uqac.lif.cep.tmf.Fork;
24 import ca.uqac.lif.cep.tmf.Pump;
25 import ca.uqac.lif.cep.tmf.SliceLast;
26 import ca.uqac.lif.cep.tmf.Window;
27 import ca.uqac.lif.cep.util.Bags.RunOn;
28 import ca.uqac.lif.cep.util.Booleans;
29 import ca.uqac.lif.cep.util.InstanceOf;
30 import ca.uqac.lif.cep.util.Lists;
31 import ca.uqac.lif.cep.util.NthElement;
32 import ca.uqac.lif.cep.util.Numbers;
38 private static final double TEMPERATURE_THRESHOLD = 100;
39 private static final int MAX_RACK_ID = 10;
40 private static final long PAUSE = 100;
41 private static final double TEMPERATURE_RATIO = 0.5;
42 private static final double POWER_STD = 10;
43 private static final double POWER_MEAN = 100;
44 private static final double TEMP_STD = 20;
45 private static final double TEMP_MEAN = 80;
48 private static final int M = 2;
49 private static final int N = 2;
51 public static void main(String[] args)
53 FunctionLambda get_rack_id =
new FunctionLambda((Object o) -> ((
MonitoringEvent) o).getRackID()).setReturnType(Integer.class);
54 FunctionLambda get_temperature =
new FunctionLambda((Object o) -> ((TemperatureEvent) o).getTemperature()).setReturnType(Double.class);
55 FunctionLambda get_timestamp =
new FunctionLambda((Object o) -> ((
MonitoringEvent) o).getTimestamp()).setReturnType(Float.class);
58 Pump pump =
new Pump();
59 FilterOn temp_events =
new FilterOn(
new InstanceOf(TemperatureEvent.class));
60 GroupProcessor warning =
new GroupProcessor(1, 1);
62 Window w =
new Window(
new Lists.PutInto(), N);
63 Fork w_f =
new Fork(3);
65 GroupProcessor all_above =
new GroupProcessor(1, 1);
67 ApplyFunction above =
new ApplyFunction(
new FunctionLambda((Object o) -> ((TemperatureEvent) o).getTemperature() > TEMPERATURE_THRESHOLD).setReturnType(Boolean.class));
68 Cumulate all =
new Cumulate(
new CumulativeFunction<Boolean>(Booleans.and));
70 all_above.associateInput(INPUT, above, INPUT);
71 all_above.associateOutput(OUTPUT, all, OUTPUT);
72 all_above.addProcessors(all, above);
74 RunOn l_all_above =
new RunOn(all_above);
75 connect(w_f, 0, l_all_above, INPUT);
76 ApplyFunction condition =
new ApplyFunction(
new FunctionTree(Booleans.and,
77 new FunctionTree(Numbers.isLessThan,
78 new FunctionTree(Numbers.subtraction,
79 new FunctionTree(get_timestamp,
new FunctionTree(
new NthElement(N - 1), StreamVariable.Y)),
80 new FunctionTree(get_timestamp,
new FunctionTree(
new NthElement(N - 1), StreamVariable.Y))),
83 connect(l_all_above, OUTPUT, condition, 0);
84 connect(w_f, 1, condition, 1);
85 Filter filter =
new Filter();
86 connect(w_f, 2, filter, TOP);
87 connect(condition, OUTPUT, filter, BOTTOM);
88 GroupProcessor get_avg_temp =
new GroupProcessor(1, 1);
90 ApplyFunction get_temp =
new ApplyFunction(get_temperature);
93 Cumulate sum =
new Cumulate(
new CumulativeFunction<Number>(Numbers.addition));
94 connect(f, TOP, sum, INPUT);
95 TurnInto one =
new TurnInto(1);
96 connect(f, BOTTOM, one, INPUT);
97 Cumulate sum_one =
new Cumulate(
new CumulativeFunction<Number>(Numbers.addition));
98 connect(one, sum_one);
99 ApplyFunction div =
new ApplyFunction(Numbers.division);
100 connect(sum, OUTPUT, div, TOP);
101 connect(sum_one, OUTPUT, div, BOTTOM);
102 get_avg_temp.addProcessors(get_temp, f, sum, one, sum_one, div);
103 get_avg_temp.associateInput(INPUT, get_temp, INPUT);
104 get_avg_temp.associateOutput(OUTPUT, div, OUTPUT);
106 RunOn avg =
new RunOn(get_avg_temp);
107 connect(filter, avg);
108 CreateWarning create_warning =
new CreateWarning();
109 connect(avg, create_warning);
110 warning.addProcessors(w, w_f, l_all_above, condition, filter, get_avg_temp, create_warning);
111 warning.associateInput(INPUT, w, INPUT);
112 warning.associateOutput(OUTPUT, create_warning, OUTPUT);
114 SliceLast slice1 =
new SliceLast(
new FunctionLambda(
115 (Object o) -> ((TemperatureEvent) o).getRackID()).setReturnType(Integer.class),
117 Fork fork1 =
new Fork();
118 connect(source, pump, temp_events, slice1);
119 Print print1 =
new Print();
120 connect(slice1, print1);
124 protected static class CreateWarning
extends UniformProcessor
126 public CreateWarning()
132 protected boolean compute(Object[] inputs, Object[] outputs)
134 int slice_id = ((Number) getContext(
"sliceID")).intValue();
140 public CreateWarning duplicate(
boolean with_state)
142 return new CreateWarning();
A collection of examples taken from other stream processing software, and redone with BeepBeep...
A temperature warning message.
Parent class of all the input events in this example.
Sub-class of monitoring event containing a temperature reading.