20 import static ca.uqac.lif.cep.Connector.connect;
22 import ca.uqac.lif.cep.GroupProcessor;
23 import ca.uqac.lif.cep.Pullable;
24 import ca.uqac.lif.cep.functions.ApplyFunction;
25 import ca.uqac.lif.cep.functions.Cumulate;
26 import ca.uqac.lif.cep.functions.CumulativeFunction;
28 import static ca.uqac.lif.cep.Connector.BOTTOM;
29 import static ca.uqac.lif.cep.Connector.INPUT;
30 import static ca.uqac.lif.cep.Connector.LEFT;
31 import static ca.uqac.lif.cep.Connector.TOP;
32 import static ca.uqac.lif.cep.Connector.OUTPUT;
34 import ca.uqac.lif.cep.tmf.Filter;
35 import ca.uqac.lif.cep.tmf.Fork;
36 import ca.uqac.lif.cep.tmf.QueueSource;
37 import ca.uqac.lif.cep.tmf.ResetLast;
38 import ca.uqac.lif.cep.util.Numbers;
56 public static void main(String[] args)
59 QueueSource episode_stream =
new QueueSource();
60 episode_stream.setEvents(
80 GroupProcessor max_of_days =
new GroupProcessor(1, 1);
84 Filter filter =
new Filter();
85 connect(f, TOP, filter, LEFT);
86 connect(f, BOTTOM, is_reading, INPUT);
87 connect(is_reading, OUTPUT, filter, BOTTOM);
89 connect(filter, get_reading);
90 Cumulate max =
new Cumulate(
new CumulativeFunction<Number>(Numbers.maximum));
91 connect(get_reading, max);
92 max_of_days.associateInput(INPUT, f, INPUT);
93 max_of_days.associateOutput(OUTPUT, max, OUTPUT);
94 max_of_days.addProcessors(f, is_reading, filter, get_reading, max);
99 connect(episode_stream, f);
101 connect(f, BOTTOM, is_new_day, INPUT);
102 ResetLast reset =
new ResetLast(max_of_days);
103 connect(f, TOP, reset, LEFT);
104 connect(is_new_day, OUTPUT, reset, BOTTOM);
107 Pullable p = reset.getPullableOutput();
110 System.out.println(p.next());
Abstract class representing events in a pressure stream.
static final EpisodeStart EPISODE_START
Static reference to an "episode start" event.
static final EpisodeEnd EPISODE_END
Static reference to an "episode end" event.
Computes the maximum value of all episodes in each day.
Event containing a pressure reading.
BeepBeep function checking if an event is an instance of reading.
static final NewDay NEW_DAY
Static reference to a "new day" event.
BeepBeep function checking if an event is an instance of "episode end".
BeepBeep function checking if an event is an instance of "new day".