20 import static ca.uqac.lif.cep.Connector.connect;
22 import ca.uqac.lif.cep.GroupProcessor;
23 import ca.uqac.lif.cep.Pullable;
24 import ca.uqac.lif.cep.functions.ApplyFunction;
25 import ca.uqac.lif.cep.functions.Cumulate;
26 import ca.uqac.lif.cep.functions.CumulativeFunction;
27 import ca.uqac.lif.cep.functions.TurnInto;
29 import static ca.uqac.lif.cep.Connector.BOTTOM;
30 import static ca.uqac.lif.cep.Connector.INPUT;
31 import static ca.uqac.lif.cep.Connector.LEFT;
32 import static ca.uqac.lif.cep.Connector.TOP;
33 import static ca.uqac.lif.cep.Connector.OUTPUT;
35 import ca.uqac.lif.cep.tmf.Filter;
36 import ca.uqac.lif.cep.tmf.Fork;
37 import ca.uqac.lif.cep.tmf.QueueSource;
38 import ca.uqac.lif.cep.tmf.ResetLast;
39 import ca.uqac.lif.cep.util.Numbers;
40 import ca.uqac.lif.cep.util.Sets;
59 public static void main(String[] args)
62 QueueSource episode_stream =
new QueueSource();
63 episode_stream.setEvents(
83 GroupProcessor avg =
new GroupProcessor(1, 1);
85 Fork f1 =
new Fork(2);
87 Filter filter =
new Filter();
88 connect(f1, TOP, filter, LEFT);
89 connect(f1, BOTTOM, is_reading, INPUT);
90 connect(is_reading, OUTPUT, filter, BOTTOM);
92 connect(filter, get_reading);
93 Fork f2 =
new Fork(2);
94 connect(get_reading, f2);
95 Cumulate sum =
new Cumulate(
new CumulativeFunction<Number>(Numbers.addition));
96 connect(f2, TOP, sum, INPUT);
97 TurnInto one =
new TurnInto(1);
98 connect(f2, BOTTOM, one, INPUT);
99 Cumulate count =
new Cumulate(
new CumulativeFunction<Number>(Numbers.addition));
101 ApplyFunction div =
new ApplyFunction(Numbers.division);
102 connect(sum, OUTPUT, div, TOP);
103 connect(count, OUTPUT, div, BOTTOM);
104 avg.associateInput(INPUT, f1, INPUT);
105 avg.associateOutput(OUTPUT, div, OUTPUT);
106 avg.addProcessors(f1, is_reading, filter, get_reading, f2, sum, one, count, div);
110 GroupProcessor by_day =
new GroupProcessor(1, 1);
112 Fork f =
new Fork(2);
114 ResetLast rst_by_day =
new ResetLast(avg);
115 connect(f, TOP, rst_by_day, LEFT);
116 connect(f, BOTTOM, is_end, INPUT);
117 connect(is_end, OUTPUT, rst_by_day, BOTTOM);
118 Sets.PutIntoNew put =
new Sets.PutIntoNew();
119 connect(rst_by_day, put);
120 by_day.associateInput(INPUT, f, INPUT);
121 by_day.associateOutput(OUTPUT, put, OUTPUT);
122 by_day.addProcessors(f, is_end, rst_by_day, put);
126 Fork f =
new Fork(2);
127 connect(episode_stream, f);
129 connect(f, BOTTOM, is_new_day, INPUT);
130 ResetLast rst_by_day =
new ResetLast(by_day);
131 connect(f, TOP, rst_by_day, LEFT);
132 connect(is_new_day, OUTPUT, rst_by_day, BOTTOM);
135 Pullable p = rst_by_day.getPullableOutput();
138 System.out.println(p.next());
Abstract class representing events in a pressure stream.
static final EpisodeStart EPISODE_START
Static reference to an "episode start" event.
static final EpisodeEnd EPISODE_END
Static reference to an "episode end" event.
Computes the list of average values of each episode, grouped by day.
BeepBeep function checking if an event is an instance of "episode end".
Event containing a pressure reading.
BeepBeep function checking if an event is an instance of reading.
static final NewDay NEW_DAY
Static reference to a "new day" event.
BeepBeep function checking if an event is an instance of "episode end".
BeepBeep function checking if an event is an instance of "new day".