Code Examples
A repository of 155 code examples for BeepBeep
Monitoring.java
1 package misc.temperature;
2 
3 import static ca.uqac.lif.cep.Connector.connect;
4 
5 import static ca.uqac.lif.cep.Connector.BOTTOM;
6 import static ca.uqac.lif.cep.Connector.INPUT;
7 import static ca.uqac.lif.cep.Connector.OUTPUT;
8 import static ca.uqac.lif.cep.Connector.TOP;
9 
10 import ca.uqac.lif.cep.GroupProcessor;
11 import ca.uqac.lif.cep.UniformProcessor;
12 import ca.uqac.lif.cep.functions.ApplyFunction;
13 import ca.uqac.lif.cep.functions.Constant;
14 import ca.uqac.lif.cep.functions.Cumulate;
15 import ca.uqac.lif.cep.functions.CumulativeFunction;
16 import ca.uqac.lif.cep.functions.FunctionLambda;
17 import ca.uqac.lif.cep.functions.FunctionTree;
18 import ca.uqac.lif.cep.functions.StreamVariable;
19 import ca.uqac.lif.cep.functions.TurnInto;
20 import ca.uqac.lif.cep.io.Print;
21 import ca.uqac.lif.cep.tmf.Filter;
22 import ca.uqac.lif.cep.tmf.FilterOn;
23 import ca.uqac.lif.cep.tmf.Fork;
24 import ca.uqac.lif.cep.tmf.Pump;
25 import ca.uqac.lif.cep.tmf.SliceLast;
26 import ca.uqac.lif.cep.tmf.Window;
27 import ca.uqac.lif.cep.util.Bags.RunOn;
28 import ca.uqac.lif.cep.util.Booleans;
29 import ca.uqac.lif.cep.util.InstanceOf;
30 import ca.uqac.lif.cep.util.Lists;
31 import ca.uqac.lif.cep.util.NthElement;
32 import ca.uqac.lif.cep.util.Numbers;
34 
35 public class Monitoring
36 {
37  /* These are the same constants as in the original example */
38  private static final double TEMPERATURE_THRESHOLD = 100;
39  private static final int MAX_RACK_ID = 10;
40  private static final long PAUSE = 100;
41  private static final double TEMPERATURE_RATIO = 0.5;
42  private static final double POWER_STD = 10;
43  private static final double POWER_MEAN = 100;
44  private static final double TEMP_STD = 20;
45  private static final double TEMP_MEAN = 80;
46 
47  /* Additional parameters for our example */
48  private static final int M = 2;
49  private static final int N = 2;
50 
51  public static void main(String[] args)
52  {
53  FunctionLambda get_rack_id = new FunctionLambda((Object o) -> ((MonitoringEvent) o).getRackID()).setReturnType(Integer.class);
54  FunctionLambda get_temperature = new FunctionLambda((Object o) -> ((TemperatureEvent) o).getTemperature()).setReturnType(Double.class);
55  FunctionLambda get_timestamp = new FunctionLambda((Object o) -> ((MonitoringEvent) o).getTimestamp()).setReturnType(Float.class);
56 
57  MonitoringEventSource source = new MonitoringEventSource(MAX_RACK_ID, PAUSE, TEMPERATURE_RATIO, POWER_STD, POWER_MEAN, TEMP_STD, TEMP_MEAN);
58  Pump pump = new Pump();
59  FilterOn temp_events = new FilterOn(new InstanceOf(TemperatureEvent.class));
60  GroupProcessor warning = new GroupProcessor(1, 1);
61  {
62  Window w = new Window(new Lists.PutInto(), N);
63  Fork w_f = new Fork(3);
64  connect(w, w_f);
65  GroupProcessor all_above = new GroupProcessor(1, 1);
66  {
67  ApplyFunction above = new ApplyFunction(new FunctionLambda((Object o) -> ((TemperatureEvent) o).getTemperature() > TEMPERATURE_THRESHOLD).setReturnType(Boolean.class));
68  Cumulate all = new Cumulate(new CumulativeFunction<Boolean>(Booleans.and));
69  connect(above, all);
70  all_above.associateInput(INPUT, above, INPUT);
71  all_above.associateOutput(OUTPUT, all, OUTPUT);
72  all_above.addProcessors(all, above);
73  }
74  RunOn l_all_above = new RunOn(all_above);
75  connect(w_f, 0, l_all_above, INPUT);
76  ApplyFunction condition = new ApplyFunction(new FunctionTree(Booleans.and,
77  new FunctionTree(Numbers.isLessThan,
78  new FunctionTree(Numbers.subtraction,
79  new FunctionTree(get_timestamp, new FunctionTree(new NthElement(N - 1), StreamVariable.Y)),
80  new FunctionTree(get_timestamp, new FunctionTree(new NthElement(N - 1), StreamVariable.Y))),
81  new Constant(10)),
82  StreamVariable.X));
83  connect(l_all_above, OUTPUT, condition, 0);
84  connect(w_f, 1, condition, 1);
85  Filter filter = new Filter();
86  connect(w_f, 2, filter, TOP);
87  connect(condition, OUTPUT, filter, BOTTOM);
88  GroupProcessor get_avg_temp = new GroupProcessor(1, 1);
89  {
90  ApplyFunction get_temp = new ApplyFunction(get_temperature);
91  Fork f = new Fork();
92  connect(get_temp, f);
93  Cumulate sum = new Cumulate(new CumulativeFunction<Number>(Numbers.addition));
94  connect(f, TOP, sum, INPUT);
95  TurnInto one = new TurnInto(1);
96  connect(f, BOTTOM, one, INPUT);
97  Cumulate sum_one = new Cumulate(new CumulativeFunction<Number>(Numbers.addition));
98  connect(one, sum_one);
99  ApplyFunction div = new ApplyFunction(Numbers.division);
100  connect(sum, OUTPUT, div, TOP);
101  connect(sum_one, OUTPUT, div, BOTTOM);
102  get_avg_temp.addProcessors(get_temp, f, sum, one, sum_one, div);
103  get_avg_temp.associateInput(INPUT, get_temp, INPUT);
104  get_avg_temp.associateOutput(OUTPUT, div, OUTPUT);
105  }
106  RunOn avg = new RunOn(get_avg_temp);
107  connect(filter, avg);
108  CreateWarning create_warning = new CreateWarning();
109  connect(avg, create_warning);
110  warning.addProcessors(w, w_f, l_all_above, condition, filter, get_avg_temp, create_warning);
111  warning.associateInput(INPUT, w, INPUT);
112  warning.associateOutput(OUTPUT, create_warning, OUTPUT);
113  }
114  SliceLast slice1 = new SliceLast(new FunctionLambda(
115  (Object o) -> ((TemperatureEvent) o).getRackID()).setReturnType(Integer.class),
116  warning);
117  Fork fork1 = new Fork();
118  connect(source, pump, temp_events, slice1);
119  Print print1 = new Print();
120  connect(slice1, print1);
121  pump.run();
122  }
123 
124  protected static class CreateWarning extends UniformProcessor
125  {
126  public CreateWarning()
127  {
128  super(1, 1);
129  }
130 
131  @Override
132  protected boolean compute(Object[] inputs, Object[] outputs)
133  {
134  int slice_id = ((Number) getContext("sliceID")).intValue();
135  outputs[0] = new TemperatureWarning(slice_id, ((Number) inputs[0]).doubleValue());
136  return true;
137  }
138 
139  @Override
140  public CreateWarning duplicate(boolean with_state)
141  {
142  return new CreateWarning();
143  }
144  }
145 
146 
147 
148 }
A collection of examples taken from other stream processing software, and redone with BeepBeep...
A temperature warning message.
Parent class of all the input events in this example.
Sub-class of monitoring event containing a temperature reading.