Code Examples
A repository of 155 code examples for BeepBeep
Engine.java
1 /*
2  BeepBeep, an event stream processor
3  Copyright (C) 2008-2017 Sylvain HallĂ©
4 
5  This program is free software: you can redistribute it and/or modify
6  it under the terms of the GNU Lesser General Public License as published
7  by the Free Software Foundation, either version 3 of the License, or
8  (at your option) any later version.
9 
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU Lesser General Public License for more details.
14 
15  You should have received a copy of the GNU Lesser General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17  */
18 package artimon;
19 
20 import java.io.File;
21 import java.io.FileInputStream;
22 import java.io.FileNotFoundException;
23 
24 import ca.uqac.lif.cep.Connector;
25 import ca.uqac.lif.cep.GroupProcessor;
26 import ca.uqac.lif.cep.Pullable;
27 import static ca.uqac.lif.cep.Connector.BOTTOM;
28 import static ca.uqac.lif.cep.Connector.INPUT;
29 import static ca.uqac.lif.cep.Connector.OUTPUT;
30 import static ca.uqac.lif.cep.Connector.TOP;
31 import ca.uqac.lif.cep.Connector.ConnectorException;
32 import ca.uqac.lif.cep.functions.StreamVariable;
33 import ca.uqac.lif.cep.functions.Constant;
34 import ca.uqac.lif.cep.functions.Cumulate;
35 import ca.uqac.lif.cep.functions.CumulativeFunction;
36 import ca.uqac.lif.cep.functions.ApplyFunction;
37 import ca.uqac.lif.cep.functions.FunctionTree;
38 import ca.uqac.lif.cep.io.ReadLines;
39 import ca.uqac.lif.cep.util.Booleans;
40 import ca.uqac.lif.cep.util.Equals;
41 import ca.uqac.lif.cep.util.NthElement;
42 import ca.uqac.lif.cep.util.Numbers;
43 import ca.uqac.lif.cep.util.Strings;
44 import ca.uqac.lif.cep.tmf.Filter;
45 import ca.uqac.lif.cep.tmf.Fork;
46 import ca.uqac.lif.cep.tmf.Slice;
47 import ca.uqac.lif.cep.tmf.Trim;
48 
49 /**
50  * Compute data about the execution of a hybrid car engine. In this example,
51  * sensors collect data about the execution of a car engine, such as
52  * electrical power remaining, fuel remaining, speed and distance travelled.
53  * The execution of the engine is broken down into extended periods of time
54  * called "cycles", identified in the input data by a cycle number.
55  * <p>The processor chain in this example computes, for each cycle, the
56  * <em>recovery rate</em>. This corresponds to the sum of all datapoints
57  * in each cycle where electrical power is negative. The chain of processors
58  * that performs this computation can be illustrated as follows:
59  * <p>
60  * <img src="./doc-files/artimon/Engine.png" alt="Processor chain">
61  * @author Sylvain HallĂ© (for the BeepBeep processor chain)
62  * @author Nicolas Rapin (for the original example)
63  */
64 @SuppressWarnings("unused")
65 public class Engine
66 {
67  /* Let us define constants for each column index */
68  static final int TIME = 0, NO_CYCLE = 1, DISTANCE = 2, PUISSANCE_ELEC = 3,
69  SOC_ELEC = 4, PUISSANCE_FUEL = 5, SOC_FUEL = 6, SPEED = 7;
70 
71  /* A constant used in the computation, called "cbat" */
72  static final int CBAT = 16021800;
73 
74  public static void main(String[] args) throws FileNotFoundException
75  {
76  /* We put the filename in a variable */
77  String filename = "simple.csv";
78 
79  /* We create a line reader which will read from an input stream. In the
80  * present case, the stream is obtained from the file we specified. */
81  ReadLines reader = new ReadLines(Engine.class.getResourceAsStream(filename));
82 
83  /* The first line of the file is a comment line, so we ignore it */
84  Trim first_line = new Trim(1);
85  Connector.connect(reader, first_line);
86 
87  /* We create an array feeder, which will read individual lines of text
88  * and turn them into an array of primitive values. */
89  ApplyFunction array_feeder = new ApplyFunction(new Strings.SplitString(";"));
90  Connector.connect(first_line, array_feeder);
91 
92  /* Let us fork the stream of input tuples in two parts */
93  Fork fork = new Fork(2);
94  Connector.connect(array_feeder, fork);
95 
96  /* We create a sub-processor that will add all values of the
97  * "puissance_elec" attribute, but only if it is negative. */
98  GroupProcessor add_negative_pe = new GroupProcessor(1, 1);
99  {
100  ApplyFunction get_pe = new ApplyFunction(new NthElement(PUISSANCE_ELEC));
101  Fork anp_fork = new Fork(2);
102  Connector.connect(get_pe, anp_fork);
103  ApplyFunction is_negative = new ApplyFunction(
104  new FunctionTree(Numbers.isLessThan,
105  StreamVariable.X,
106  Constant.ZERO));
107  Filter filter = new Filter();
108  Connector.connect(anp_fork, 0, filter, TOP);
109  Connector.connect(anp_fork, 1, is_negative, INPUT);
110  Connector.connect(is_negative, OUTPUT, filter, BOTTOM);
111  Cumulate sum_of_negatives = new Cumulate(new CumulativeFunction<Number>(Numbers.addition));
112  Connector.connect(filter, sum_of_negatives);
113  ApplyFunction divide = new ApplyFunction(new FunctionTree(Numbers.division, StreamVariable.X, new Constant(CBAT)));
114  Connector.connect(sum_of_negatives, divide);
115  add_negative_pe.addProcessors(get_pe, anp_fork, is_negative, filter, sum_of_negatives, divide);
116  add_negative_pe.associateInput(INPUT, get_pe, INPUT);
117  add_negative_pe.associateOutput(OUTPUT, divide, OUTPUT);
118  }
119 
120  /* We create a slicer according to the cycle number, and connect it to
121  * the first output of the fork. */
122  Slice slicer = new Slice(new NthElement(NO_CYCLE), add_negative_pe);
123  Connector.connect(fork, TOP, slicer, INPUT);
124 
125  /* In the second path, we determine when a cycle ends. This
126  * is done by checking if two successive events have the same value
127  * for their "cycle" attribute. */
128  ApplyFunction get_cycle_nb = new ApplyFunction(new NthElement(NO_CYCLE));
129  Connector.connect(fork, BOTTOM, get_cycle_nb, INPUT);
130  Fork cycle_fork = new Fork(2);
131  Connector.connect(get_cycle_nb, cycle_fork);
132  Trim cycle_trim = new Trim(1);
133  Connector.connect(cycle_fork, BOTTOM, cycle_trim, INPUT);
134  ApplyFunction cycle_change = new ApplyFunction(Equals.instance);
135  Connector.connect(cycle_fork, 0, cycle_change, TOP);
136  Connector.connect(cycle_trim, OUTPUT, cycle_change, BOTTOM);
137  ApplyFunction not = new ApplyFunction(Booleans.not);
138  Connector.connect(cycle_change, not);
139 
140  /* We merge the two paths into a filter. */
141  Filter out_if_cycle_changes = new Filter();
142  Connector.connect(slicer, OUTPUT, out_if_cycle_changes, TOP);
143  Connector.connect(not, OUTPUT, out_if_cycle_changes, BOTTOM);
144 
145  /* Let's pull events! */
146  Pullable p = out_if_cycle_changes.getPullableOutput();
147  long start = System.currentTimeMillis();
148  while (true)
149  {
150  Object o = p.next();
151  if (o == null)
152  break;
153  System.out.println(o);
154  }
155  long end = System.currentTimeMillis();
156  System.out.println((end - start) / 1000);
157  }
158 }
Compute data about the execution of a hybrid car engine.
Definition: Engine.java:65