Code Examples
A repository of 155 code examples for BeepBeep
SlicerCollapse.java
1 /*
2  BeepBeep, an event stream processor
3  Copyright (C) 2008-2017 Sylvain HallĂ©
4 
5  This program is free software: you can redistribute it and/or modify
6  it under the terms of the GNU Lesser General Public License as published
7  by the Free Software Foundation, either version 3 of the License, or
8  (at your option) any later version.
9 
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU Lesser General Public License for more details.
14 
15  You should have received a copy of the GNU Lesser General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17  */
18 package basic;
19 
20 import ca.uqac.lif.cep.Connector;
21 import static ca.uqac.lif.cep.Connector.INPUT;
22 import static ca.uqac.lif.cep.Connector.OUTPUT;
23 import ca.uqac.lif.cep.GroupProcessor;
24 import ca.uqac.lif.cep.Pullable;
25 import ca.uqac.lif.cep.functions.Constant;
26 import ca.uqac.lif.cep.functions.Cumulate;
27 import ca.uqac.lif.cep.functions.CumulativeFunction;
28 import ca.uqac.lif.cep.functions.Function;
29 import ca.uqac.lif.cep.functions.ApplyFunction;
30 import ca.uqac.lif.cep.functions.IdentityFunction;
31 import ca.uqac.lif.cep.functions.TurnInto;
32 import ca.uqac.lif.cep.util.Maps.Values;
33 import ca.uqac.lif.cep.util.Numbers;
34 import ca.uqac.lif.cep.util.Bags.RunOn;
35 import ca.uqac.lif.cep.tmf.QueueSource;
36 import ca.uqac.lif.cep.tmf.Slice;
37 
38 /**
39  * Apply an aggregation function on the output of a slicer.
40  * <p>
41  * You should first read {@link SlicerSimple}. This example starts in the
42  * same way; however, it shows how the values of each slice can be aggregated.
43  * This is done by first extracting the image (i.e. set of values in each
44  * key-value pair) of the map, and then applying a
45  * {@link ca.uqac.lif.cep.sets.CumulateOnSet CumulateOnSet} function on the
46  * resulting set.
47  * <p>
48  * In this program, the slicing function is the identity, and the processor
49  * given to the slicer is a simple counter that increments every time an event
50  * is received. Since there is one such counter for each different input
51  * event, the slicer effectively maintains the count of how many times each
52  * value has been seen in its input stream. Graphically, this can be
53  * represented as:
54  * <p>
55  * <img src="./doc-files/basic/SlicerCollapse.png" alt="Processor chain">
56  * <p>
57  * The expected output of this program is:
58  * <pre>
59  * 1.0
60  * 1.0
61  * 1.0
62  * 1.0
63  * 1.0
64  * 2.0
65  * 2.0
66  * 3.0
67  * 3.0
68  * &hellip;
69  * </pre>
70  * @author Sylvain HallĂ©
71  * @difficulty Medium
72  */
73 public class SlicerCollapse
74 {
75  public static void main(String[] args)
76  {
77  /* We first setup a stream of numbers to be used as a source */
78  QueueSource source = new QueueSource();
79  source.setEvents(1, 6, 4, 3, 2, 1, 9);
80 
81  /* The function we'll use to create slices is the identity.
82  * This will create one distinct subtrace per number .*/
83  Function slice_fct = new IdentityFunction(1);
84 
85  /* The processor chain to apply to each subtrace is a simple
86  * counter of events. */
87  GroupProcessor counter = new GroupProcessor(1, 1);
88  {
89  TurnInto to_one = new TurnInto(new Constant(1));
90  Cumulate sum = new Cumulate(new CumulativeFunction<Number>(Numbers.addition));
91  Connector.connect(to_one, sum);
92  counter.addProcessors(to_one, sum);
93  counter.associateInput(INPUT, to_one, INPUT);
94  counter.associateOutput(OUTPUT, sum, OUTPUT);
95  }
96 
97  /* Create the slicer processor, by giving it the slicing function and
98  * the processor to apply on each slide. */
99  Slice slicer = new Slice(slice_fct, counter);
100  Connector.connect(source, slicer);
101 
102  /* Extract the image of the resulting map, by applying the
103  * MapValues function. The result is a Multiset of all the objects
104  * that occur as values in the input map. */
105  ApplyFunction map_values = new ApplyFunction(Values.instance);
106  Connector.connect(slicer, map_values);
107 
108  /* Apply the CumulateOnSet processor. This processor applies a
109  * cumulative function successively on every value of the input
110  * set. Here the function is Maximum, meaning that the resulting
111  * event is the maximum of all values in the input set. */
112  RunOn max = new RunOn(new Cumulate(new CumulativeFunction<Number>(Numbers.maximum)));
113  Connector.connect(map_values, max);
114 
115  /* Let us now pull and print 10 events from the output. */
116  Pullable p = max.getPullableOutput();
117  for (int i = 0; i < 10; i++)
118  {
119  Object o = p.pull();
120  System.out.println(o);
121  }
122  }
123 }
Apply an aggregation function on the output of a slicer.