Code Examples
A repository of 155 code examples for BeepBeep
WindowAverage.java
1 /*
2  BeepBeep, an event stream processor
3  Copyright (C) 2008-2018 Sylvain HallĂ©
4 
5  This program is free software: you can redistribute it and/or modify
6  it under the terms of the GNU Lesser General Public License as published
7  by the Free Software Foundation, either version 3 of the License, or
8  (at your option) any later version.
9 
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU Lesser General Public License for more details.
14 
15  You should have received a copy of the GNU Lesser General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17  */
18 package basic;
19 
20 import static ca.uqac.lif.cep.Connector.BOTTOM;
21 import static ca.uqac.lif.cep.Connector.INPUT;
22 import static ca.uqac.lif.cep.Connector.LEFT;
23 import static ca.uqac.lif.cep.Connector.OUTPUT;
24 import static ca.uqac.lif.cep.Connector.RIGHT;
25 import static ca.uqac.lif.cep.Connector.TOP;
26 
27 import ca.uqac.lif.cep.Connector;
28 import ca.uqac.lif.cep.GroupProcessor;
29 import ca.uqac.lif.cep.Pullable;
30 import ca.uqac.lif.cep.UtilityMethods;
31 import ca.uqac.lif.cep.functions.CumulativeFunction;
32 import ca.uqac.lif.cep.functions.ApplyFunction;
33 import ca.uqac.lif.cep.functions.Cumulate;
34 import ca.uqac.lif.cep.functions.TurnInto;
35 import ca.uqac.lif.cep.tmf.Fork;
36 import ca.uqac.lif.cep.tmf.QueueSource;
37 import ca.uqac.lif.cep.tmf.Window;
38 import ca.uqac.lif.cep.util.Numbers;
39 
40 /**
41  * Compute the cumulative average of a list of numbers over a sliding window.
42  * Represented graphically, this example corresponds to the
43  * following chain of processors:
44  * <p>
45  * <img src="./doc-files/basic/WindowAverage.png" alt="Processor graph">
46  * @author Sylvain HallĂ©
47  * @difficulty Easy
48  */
49 public class WindowAverage
50 {
51  public static void main(String[] args)
52  {
53  /* Let us first create a source of arbitrary numbers. This is done
54  * by instantiating a {@link QueueSource} object, and giving to it
55  * an array of numbers. */
56  ///
57  QueueSource numbers = new QueueSource(1);
58  numbers.setEvents(new Object[]{2, 7, 1, 8, 2, 8, 1, 8, 2, 8,
59  4, 5, 9, 0, 4, 5, 2, 3, 5, 3, 6, 0, 2, 8, 7});
60 
61  /* We connect this source into a fork */
62  GroupProcessor group = new GroupProcessor(1, 1);
63  {
64  Fork fork = new Fork(2);
65  Cumulate sum_proc = new Cumulate(
66  new CumulativeFunction<Number>(Numbers.addition));
67  Connector.connect(fork, TOP, sum_proc, INPUT);
68 
69  /* Now we create a source of 1s and sum it; this is done with the same
70  * process as above, but on a stream that output the value 1 all the
71  * time. This effectively creates a counter outputting 1, 2, ... */
72  TurnInto ones = new TurnInto(1);
73  Connector.connect(fork, BOTTOM, ones, INPUT);
74  Cumulate counter = new Cumulate(
75  new CumulativeFunction<Number>(Numbers.addition));
76  Connector.connect(ones, OUTPUT, counter, INPUT);
77 
78  /* Divide one trace by the other; the output is the cumulative average
79  * of all numbers seen so far. */
80  ApplyFunction division = new ApplyFunction(Numbers.division);
81  Connector.connect(sum_proc, OUTPUT, division, LEFT);
82  Connector.connect(counter, OUTPUT, division, RIGHT);
83  group.addProcessors(fork, sum_proc, ones, counter, division);
84  group.associateInput(0, fork, 0);
85  group.associateOutput(0, division, 0);
86  }
87 
88  Window win = new Window(group, 3);
89  Connector.connect(numbers, win);
90  ///
91 
92  /* Extract the first 20 events from that pipe and print them. */
93  Pullable p = win.getPullableOutput();
94  System.out.println("The cumulative average is...");
95  for (int i = 0; i < 20; i++)
96  {
97  float value = (Float) p.pull();
98  System.out.print(value + ", ");
99  UtilityMethods.pause(1000);
100  }
101  System.out.println("done!");
102  }
103 }
Compute the cumulative average of a list of numbers over a sliding window.