Code Examples
A repository of 155 code examples for BeepBeep
AverageEpisodes.java
1 /*
2  BeepBeep, an event stream processor
3  Copyright (C) 2008-2018 Sylvain Hallé
4 
5  This program is free software: you can redistribute it and/or modify
6  it under the terms of the GNU Lesser General Public License as published
7  by the Free Software Foundation, either version 3 of the License, or
8  (at your option) any later version.
9 
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU Lesser General Public License for more details.
14 
15  You should have received a copy of the GNU Lesser General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17  */
18 package episodes;
19 
20 import static ca.uqac.lif.cep.Connector.connect;
21 
22 import ca.uqac.lif.cep.GroupProcessor;
23 import ca.uqac.lif.cep.Pullable;
24 import ca.uqac.lif.cep.functions.ApplyFunction;
25 import ca.uqac.lif.cep.functions.Cumulate;
26 import ca.uqac.lif.cep.functions.CumulativeFunction;
27 import ca.uqac.lif.cep.functions.TurnInto;
28 
29 import static ca.uqac.lif.cep.Connector.BOTTOM;
30 import static ca.uqac.lif.cep.Connector.INPUT;
31 import static ca.uqac.lif.cep.Connector.LEFT;
32 import static ca.uqac.lif.cep.Connector.TOP;
33 import static ca.uqac.lif.cep.Connector.OUTPUT;
34 
35 import ca.uqac.lif.cep.tmf.Filter;
36 import ca.uqac.lif.cep.tmf.Fork;
37 import ca.uqac.lif.cep.tmf.QueueSource;
38 import ca.uqac.lif.cep.tmf.ResetLast;
39 import ca.uqac.lif.cep.util.Numbers;
40 import ca.uqac.lif.cep.util.Sets;
41 
42 /**
43  * Computes the list of average values of each episode, grouped by day.
44  * For example, given this stream of pressure events:
45  * <blockquote>
46  * ✸ ↑ 151 142 ↓ ↑ 148 149 144 ↓ ✸ ↑ 150 142 ↓ ✸
47  * </blockquote>
48  * the processor should output {146.5, 147} (averages of 1st and 2nd
49  * episode of first day) followed by {146}
50  * (average of single episode of second day).
51  * Graphically, this can be represented by the following processor chain:
52  * <p>
53  * <img src="./doc-files/episodes/AvgEpisodes.png" alt="Processor chain">
54  *
55  * @author Sylvain Hallé
56  */
57 public class AverageEpisodes
58 {
59  public static void main(String[] args)
60  {
61  // Create a simple source of pressure events to illustrate the program
62  QueueSource episode_stream = new QueueSource();
63  episode_stream.setEvents(
66  new PressureEvent.Reading(151),
67  new PressureEvent.Reading(142),
70  new PressureEvent.Reading(148),
71  new PressureEvent.Reading(149),
72  new PressureEvent.Reading(144),
76  new PressureEvent.Reading(150),
77  new PressureEvent.Reading(142),
79  PressureEvent.NEW_DAY).loop(false);
80 
81  // Create a group that computes the maximum value of all
82  // readings received
83  GroupProcessor avg = new GroupProcessor(1, 1);
84  {
85  Fork f1 = new Fork(2);
86  ApplyFunction is_reading = new ApplyFunction(PressureEvent.IsReading.instance);
87  Filter filter = new Filter();
88  connect(f1, TOP, filter, LEFT);
89  connect(f1, BOTTOM, is_reading, INPUT);
90  connect(is_reading, OUTPUT, filter, BOTTOM);
91  ApplyFunction get_reading = new ApplyFunction(PressureEvent.GetValue.instance);
92  connect(filter, get_reading);
93  Fork f2 = new Fork(2);
94  connect(get_reading, f2);
95  Cumulate sum = new Cumulate(new CumulativeFunction<Number>(Numbers.addition));
96  connect(f2, TOP, sum, INPUT);
97  TurnInto one = new TurnInto(1);
98  connect(f2, BOTTOM, one, INPUT);
99  Cumulate count = new Cumulate(new CumulativeFunction<Number>(Numbers.addition));
100  connect(one, count);
101  ApplyFunction div = new ApplyFunction(Numbers.division);
102  connect(sum, OUTPUT, div, TOP);
103  connect(count, OUTPUT, div, BOTTOM);
104  avg.associateInput(INPUT, f1, INPUT);
105  avg.associateOutput(OUTPUT, div, OUTPUT);
106  avg.addProcessors(f1, is_reading, filter, get_reading, f2, sum, one, count, div);
107  }
108 
109  // Group by day
110  GroupProcessor by_day = new GroupProcessor(1, 1);
111  {
112  Fork f = new Fork(2);
113  ApplyFunction is_end = new ApplyFunction(PressureEvent.IsEpisodeEnd.instance);
114  ResetLast rst_by_day = new ResetLast(avg);
115  connect(f, TOP, rst_by_day, LEFT);
116  connect(f, BOTTOM, is_end, INPUT);
117  connect(is_end, OUTPUT, rst_by_day, BOTTOM);
118  Sets.PutIntoNew put = new Sets.PutIntoNew();
119  connect(rst_by_day, put);
120  by_day.associateInput(INPUT, f, INPUT);
121  by_day.associateOutput(OUTPUT, put, OUTPUT);
122  by_day.addProcessors(f, is_end, rst_by_day, put);
123  }
124 
125  // Reset the "max reading" at every new day event
126  Fork f = new Fork(2);
127  connect(episode_stream, f);
128  ApplyFunction is_new_day = new ApplyFunction(PressureEvent.IsNewDay.instance);
129  connect(f, BOTTOM, is_new_day, INPUT);
130  ResetLast rst_by_day = new ResetLast(by_day);
131  connect(f, TOP, rst_by_day, LEFT);
132  connect(is_new_day, OUTPUT, rst_by_day, BOTTOM);
133 
134  // Run the chain on the input source
135  Pullable p = rst_by_day.getPullableOutput();
136  while (p.hasNext())
137  {
138  System.out.println(p.next());
139  }
140  }
141 
142 }
Abstract class representing events in a pressure stream.
static final EpisodeStart EPISODE_START
Static reference to an "episode start" event.
static final EpisodeEnd EPISODE_END
Static reference to an "episode end" event.
Computes the list of average values of each episode, grouped by day.
BeepBeep function checking if an event is an instance of "episode end".
Event containing a pressure reading.
BeepBeep function checking if an event is an instance of reading.
static final NewDay NEW_DAY
Static reference to a "new day" event.
BeepBeep function checking if an event is an instance of "episode end".
BeepBeep function checking if an event is an instance of "new day".