Code Examples
A repository of 155 code examples for BeepBeep
AverageFork.java
1 /*
2  BeepBeep, an event stream processor
3  Copyright (C) 2008-2016 Sylvain HallĂ©
4 
5  This program is free software: you can redistribute it and/or modify
6  it under the terms of the GNU Lesser General Public License as published
7  by the Free Software Foundation, either version 3 of the License, or
8  (at your option) any later version.
9 
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU Lesser General Public License for more details.
14 
15  You should have received a copy of the GNU Lesser General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17  */
18 package basic;
19 
20 import static ca.uqac.lif.cep.Connector.BOTTOM;
21 import static ca.uqac.lif.cep.Connector.INPUT;
22 import static ca.uqac.lif.cep.Connector.LEFT;
23 import static ca.uqac.lif.cep.Connector.OUTPUT;
24 import static ca.uqac.lif.cep.Connector.RIGHT;
25 import static ca.uqac.lif.cep.Connector.TOP;
26 
27 import ca.uqac.lif.cep.Connector;
28 import ca.uqac.lif.cep.Pullable;
29 import ca.uqac.lif.cep.UtilityMethods;
30 import ca.uqac.lif.cep.functions.CumulativeFunction;
31 import ca.uqac.lif.cep.functions.ApplyFunction;
32 import ca.uqac.lif.cep.functions.Cumulate;
33 import ca.uqac.lif.cep.functions.TurnInto;
34 import ca.uqac.lif.cep.tmf.Fork;
35 import ca.uqac.lif.cep.tmf.QueueSource;
36 import ca.uqac.lif.cep.util.Numbers;
37 
38 /**
39  * Compute the cumulative average of a list of numbers. The cumulative average
40  * is the average of all the numbers processed so far.
41  * <p>
42  * This example is similar to {@link Average}, except that the second queue
43  * is replaced by a fork and a {@link ca.uqac.lif.cep.functions.TurnInto TurnInto}
44  * processor. Represented graphically, this example corresponds to the
45  * following chain of processors:
46  * <p>
47  * <img src="./doc-files/basic/AverageFork.png" alt="Processor graph">
48  * <p>
49  * The output of this program should look like this:
50  * <pre>
51  * The cumulative average is...
52  * 2.0, 4.5, 3.3333333, 4.5, 4.0, 4.6666665, 4.142857, 4.625, 4.3333335,
53 * </pre>
54  * @author Sylvain HallĂ©
55  * @difficulty Easy
56  */
57 public class AverageFork
58 {
59 
60  public static void main(String[] args)
61  {
62  /* Let us first create a source of arbitrary numbers. This is done
63  * by instantiating a {@link QueueSource} object, and giving to it
64  * an array of numbers. */
65  ///
66  QueueSource numbers = new QueueSource(1);
67  numbers.setEvents(new Object[]{2, 7, 1, 8, 2, 8, 1, 8, 2, 8,
68  4, 5, 9, 0, 4, 5, 2, 3, 5, 3, 6, 0, 2, 8, 7});
69 
70  /* We connect this source into a fork */
71  Fork fork = new Fork(2);
72  Connector.connect(numbers, fork);
73 
74  /* We pipe the output of this processor to a cumulative processor.
75  * Such a processor applies a function with two arguments: the first
76  * is the event that is being received, and the second is the output
77  * value of the cumulative function at the previous step. In our
78  * case, the function we use is addition over numbers; hence the
79  * output is the cumulative sum of all numbers received so far. */
80  Cumulate sum_proc = new Cumulate(
81  new CumulativeFunction<Number>(Numbers.addition));
82  Connector.connect(fork, TOP, sum_proc, INPUT);
83 
84  /* Now we create a source of 1s and sum it; this is done with the same
85  * process as above, but on a stream that output the value 1 all the
86  * time. This effectively creates a counter outputting 1, 2, ... */
87  TurnInto ones = new TurnInto(1);
88  Connector.connect(fork, BOTTOM, ones, INPUT);
89  Cumulate counter = new Cumulate(
90  new CumulativeFunction<Number>(Numbers.addition));
91  Connector.connect(ones, OUTPUT, counter, INPUT);
92 
93  /* Divide one trace by the other; the output is the cumulative average
94  * of all numbers seen so far. */
95  ApplyFunction division = new ApplyFunction(Numbers.division);
96  Connector.connect(sum_proc, OUTPUT, division, LEFT);
97  Connector.connect(counter, OUTPUT, division, RIGHT);
98  ///
99 
100  /* Extract the first 20 events from that pipe and print them. */
101  Pullable p = division.getPullableOutput();
102  System.out.println("The cumulative average is...");
103  for (int i = 0; i < 20; i++)
104  {
105  float value = (Float) p.pull();
106  System.out.print(value + ", ");
107  UtilityMethods.pause(1000);
108  }
109  System.out.println("done!");
110  }
111 }
Compute the cumulative average of a list of numbers.