Code Examples
A repository of 155 code examples for BeepBeep
MaxEpisode.java
1 /*
2  BeepBeep, an event stream processor
3  Copyright (C) 2008-2018 Sylvain Hallé
4 
5  This program is free software: you can redistribute it and/or modify
6  it under the terms of the GNU Lesser General Public License as published
7  by the Free Software Foundation, either version 3 of the License, or
8  (at your option) any later version.
9 
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU Lesser General Public License for more details.
14 
15  You should have received a copy of the GNU Lesser General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17  */
18 package episodes;
19 
20 import static ca.uqac.lif.cep.Connector.connect;
21 
22 import ca.uqac.lif.cep.GroupProcessor;
23 import ca.uqac.lif.cep.Pullable;
24 import ca.uqac.lif.cep.functions.ApplyFunction;
25 import ca.uqac.lif.cep.functions.Cumulate;
26 import ca.uqac.lif.cep.functions.CumulativeFunction;
27 
28 import static ca.uqac.lif.cep.Connector.BOTTOM;
29 import static ca.uqac.lif.cep.Connector.INPUT;
30 import static ca.uqac.lif.cep.Connector.LEFT;
31 import static ca.uqac.lif.cep.Connector.TOP;
32 import static ca.uqac.lif.cep.Connector.OUTPUT;
33 
34 import ca.uqac.lif.cep.tmf.Filter;
35 import ca.uqac.lif.cep.tmf.Fork;
36 import ca.uqac.lif.cep.tmf.QueueSource;
37 import ca.uqac.lif.cep.tmf.ResetLast;
38 import ca.uqac.lif.cep.util.Numbers;
39 
40 /**
41  * Computes the maximum value of all episodes in each day.
42  * For example, given this stream of pressure events:
43  * <blockquote>
44  * ✸ ↑ 151 142 ↓ ↑ 149 148 144 ↓ ✸ ↑ 150 142 ↓ ✸
45  * </blockquote>
46  * the processor should output 151 (maximum value of first day) followed
47  * by 150 (maximum value of second day).
48  * Graphically, this can be represented by the following processor chain:
49  * <p>
50  * <img src="./doc-files/episodes/MaxEpisodes.png" alt="Processor chain">
51  *
52  * @author Sylvain Hallé
53  */
54 public class MaxEpisode
55 {
56  public static void main(String[] args)
57  {
58  // Create a simple source of pressure events to illustrate the program
59  QueueSource episode_stream = new QueueSource();
60  episode_stream.setEvents(
63  new PressureEvent.Reading(151),
64  new PressureEvent.Reading(142),
67  new PressureEvent.Reading(149),
68  new PressureEvent.Reading(148),
69  new PressureEvent.Reading(144),
73  new PressureEvent.Reading(150),
74  new PressureEvent.Reading(142),
76  PressureEvent.NEW_DAY).loop(false);
77 
78  // Create a group that computes the maximum value of all
79  // readings received
80  GroupProcessor max_of_days = new GroupProcessor(1, 1);
81  {
82  Fork f = new Fork(2);
83  ApplyFunction is_reading = new ApplyFunction(PressureEvent.IsReading.instance);
84  Filter filter = new Filter();
85  connect(f, TOP, filter, LEFT);
86  connect(f, BOTTOM, is_reading, INPUT);
87  connect(is_reading, OUTPUT, filter, BOTTOM);
88  ApplyFunction get_reading = new ApplyFunction(PressureEvent.GetValue.instance);
89  connect(filter, get_reading);
90  Cumulate max = new Cumulate(new CumulativeFunction<Number>(Numbers.maximum));
91  connect(get_reading, max);
92  max_of_days.associateInput(INPUT, f, INPUT);
93  max_of_days.associateOutput(OUTPUT, max, OUTPUT);
94  max_of_days.addProcessors(f, is_reading, filter, get_reading, max);
95  }
96 
97  // Reset the "max reading" at every new day event
98  Fork f = new Fork(2);
99  connect(episode_stream, f);
100  ApplyFunction is_new_day = new ApplyFunction(PressureEvent.IsNewDay.instance);
101  connect(f, BOTTOM, is_new_day, INPUT);
102  ResetLast reset = new ResetLast(max_of_days);
103  connect(f, TOP, reset, LEFT);
104  connect(is_new_day, OUTPUT, reset, BOTTOM);
105 
106  // Run the chain on the input source
107  Pullable p = reset.getPullableOutput();
108  while (p.hasNext())
109  {
110  System.out.println(p.next());
111  }
112  }
113 
114 }
Abstract class representing events in a pressure stream.
static final EpisodeStart EPISODE_START
Static reference to an "episode start" event.
static final EpisodeEnd EPISODE_END
Static reference to an "episode end" event.
Computes the maximum value of all episodes in each day.
Definition: MaxEpisode.java:54
Event containing a pressure reading.
BeepBeep function checking if an event is an instance of reading.
static final NewDay NEW_DAY
Static reference to a "new day" event.
BeepBeep function checking if an event is an instance of "episode end".
BeepBeep function checking if an event is an instance of "new day".