Code Examples
A repository of 155 code examples for BeepBeep
ComplexProcessorBuilder.java
1 package dsl;
2 
3 import static ca.uqac.lif.cep.Connector.BOTTOM;
4 import static ca.uqac.lif.cep.Connector.INPUT;
5 import static ca.uqac.lif.cep.Connector.LEFT;
6 import static ca.uqac.lif.cep.Connector.OUTPUT;
7 import static ca.uqac.lif.cep.Connector.RIGHT;
8 import static ca.uqac.lif.cep.Connector.TOP;
9 
10 import java.util.ArrayDeque;
11 import java.util.ArrayList;
12 import java.util.List;
13 
14 import ca.uqac.lif.bullwinkle.BnfParser.InvalidGrammarException;
15 import ca.uqac.lif.bullwinkle.Builds;
16 import ca.uqac.lif.cep.Connector;
17 import ca.uqac.lif.cep.Processor;
18 import ca.uqac.lif.cep.Pullable;
19 import ca.uqac.lif.cep.dsl.GroupProcessorBuilder;
20 import ca.uqac.lif.cep.functions.ApplyFunction;
21 import ca.uqac.lif.cep.functions.Constant;
22 import ca.uqac.lif.cep.functions.Cumulate;
23 import ca.uqac.lif.cep.functions.CumulativeFunction;
24 import ca.uqac.lif.cep.functions.Function;
25 import ca.uqac.lif.cep.functions.FunctionTree;
26 import ca.uqac.lif.cep.functions.StreamVariable;
27 import ca.uqac.lif.cep.functions.TurnInto;
28 import ca.uqac.lif.cep.tmf.CountDecimate;
29 import ca.uqac.lif.cep.tmf.Filter;
30 import ca.uqac.lif.cep.tmf.Fork;
31 import ca.uqac.lif.cep.tmf.Passthrough;
32 import ca.uqac.lif.cep.tmf.QueueSource;
33 import ca.uqac.lif.cep.tmf.Trim;
34 import ca.uqac.lif.cep.util.Numbers;
35 
36 public class ComplexProcessorBuilder extends GroupProcessorBuilder
37 {
39  {
40  super();
41  try
42  {
43  setGrammar(ComplexProcessorBuilder.class.getResourceAsStream("complex.bnf"));
44  }
45  catch (InvalidGrammarException e)
46  {
47  // Do nothing
48  }
49  }
50 
51  //!
52  @Builds(rule="<trim>", pop=true, clean=true)
53  public Trim handleTrim(Object ... parts)
54  {
55  Integer n = Integer.parseInt((String) parts[0]);
56  Processor p = (Processor) parts[1];
57  Trim trim = new Trim(n);
58  Connector.connect(p, trim);
59  add(trim);
60  return trim;
61  }
62  //!
63 
64  //*
65  @Builds(rule="<decim>", pop=true, clean=true)
66  public CountDecimate handleDecimate(Object ... parts)
67  {
68  Integer n = Integer.parseInt((String) parts[0]);
69  Processor p = (Processor) parts[1];
70  CountDecimate dec = new CountDecimate(n);
71  Connector.connect(p, dec);
72  add(dec);
73  return dec;
74  }
75  //*
76 
77  //@
78  @Builds(rule="<filter>", pop=true, clean=true)
79  public Filter handleFilter(Object ... parts)
80  {
81  Processor p1 = (Processor) parts[0];
82  Processor p2 = (Processor) parts[1];
83  Filter filter = new Filter();
84  Connector.connect(p1, 0, filter, 0);
85  Connector.connect(p2, 0, filter, 1);
86  add(filter);
87  return filter;
88  }
89  //@
90 
91  //.
92  @Builds(rule="<stream>")
93  public void handleStream(ArrayDeque<Object> stack)
94  {
95  Integer n = Integer.parseInt((String) stack.pop());
96  stack.pop(); // INPUT
97  Passthrough p = forkInput(n);
98  add(p);
99  stack.push(p);
100  }
101  //.
102 
103  @SuppressWarnings("unchecked")
104  ///
105  @Builds(rule="<apply>", pop=true, clean=true)
106  public Processor handleApply(Object ... parts)
107  {
108  Function f = (Function) parts[0];
109  ApplyFunction af = new ApplyFunction(f);
110  List<Processor> list = (List<Processor>) parts[1];
111  if (list.size() == 1)
112  {
113  Connector.connect(list.get(0), af);
114  }
115  else if (list.size() == 2)
116  {
117  Connector.connect(list.get(0), 0, af, 0);
118  Connector.connect(list.get(1), 0, af, 1);
119  }
120  add(af);
121  return af;
122  }
123  ///
124 
125  @Builds(rule="<add>")
126  public void handleAdd(ArrayDeque<Object> stack)
127  {
128  Function f2 = (Function) stack.pop();
129  Function f1 = (Function) stack.pop();
130  stack.pop(); // To remove the "+" symbol
131  stack.push(new FunctionTree(Numbers.addition, f1, f2));
132  }
133 
134 
135  @Builds(rule="<sbt>")
136  public void handleSbt(ArrayDeque<Object> stack)
137  {
138  Function f2 = (Function) stack.pop();
139  Function f1 = (Function) stack.pop();
140  stack.pop(); // To remove the "-" symbol
141  stack.push(new FunctionTree(Numbers.subtraction, f1, f2));
142  }
143 
144  @Builds(rule="<lt>")
145  public void handleLt(ArrayDeque<Object> stack)
146  {
147  Function f2 = (Function) stack.pop();
148  Function f1 = (Function) stack.pop();
149  stack.pop(); // To remove the "LT" symbol
150  stack.push(new FunctionTree(Numbers.isLessThan, f1, f2));
151  }
152 
153  @Builds(rule="<abs>")
154  public void handleAbs(ArrayDeque<Object> stack)
155  {
156  Function f1 = (Function) stack.pop();
157  stack.pop(); // To remove the "ABS" symbol
158  stack.push(new FunctionTree(Numbers.absoluteValue, f1));
159  }
160 
161  @Builds(rule="<cons>")
162  public void handleCons(ArrayDeque<Object> stack)
163  {
164  stack.push(new Constant(Integer.parseInt((String) stack.pop())));
165  }
166 
167  //s
168  @Builds(rule="<svar>")
169  public void handleStreamVariable(ArrayDeque<Object> stack)
170  {
171  String var_name = (String) stack.pop();
172  if (var_name.compareTo("X") == 0)
173  stack.push(StreamVariable.X);
174  if (var_name.compareTo("Y") == 0)
175  stack.push(StreamVariable.Y);
176  }
177  //s
178 
179  //l
180  @Builds(rule="<proclist>")
181  public void handleProcList(ArrayDeque<Object> stack)
182  {
183  List<Processor> list = new ArrayList<Processor>();
184  stack.pop(); // (
185  list.add((Processor) stack.pop());
186  stack.pop(); // )
187  if (stack.peek() instanceof String &&
188  ((String) stack.peek()).compareTo("AND") == 0)
189  {
190  stack.pop(); // AND
191  stack.pop(); // (
192  list.add((Processor) stack.pop());
193  stack.pop(); // )
194  }
195  stack.push(list);
196  }
197  //l
198 
199  //a
200  @Builds(rule="<avg>", pop=true, clean=true)
201  public Processor handleAvg(Object ... parts)
202  {
203  Fork fork = new Fork(2);
204  Connector.connect((Processor) parts[0], fork);
205  Cumulate sum_proc = new Cumulate(
206  new CumulativeFunction<Number>(Numbers.addition));
207  Connector.connect(fork, TOP, sum_proc, INPUT);
208  TurnInto ones = new TurnInto(1);
209  Connector.connect(fork, BOTTOM, ones, INPUT);
210  Cumulate counter = new Cumulate(
211  new CumulativeFunction<Number>(Numbers.addition));
212  Connector.connect(ones, OUTPUT, counter, INPUT);
213  ApplyFunction division = new ApplyFunction(Numbers.division);
214  Connector.connect(sum_proc, OUTPUT, division, LEFT);
215  Connector.connect(counter, OUTPUT, division, RIGHT);
216  add(fork, sum_proc, ones, counter, division);
217  return division;
218  }
219  //a
220 
221  public static void main(String[] args) throws ca.uqac.lif.bullwinkle.ParseTreeObjectBuilder.BuildException
222  {
224  /*{
225  System.out.println("First query");
226  //*
227  Processor proc = builder.build("APPLY ABS - X Y ON (INPUT 0) AND (INPUT 1)");
228  QueueSource src0 = new QueueSource().setEvents(0, 1, 2, 3, 4);
229  QueueSource src1 = new QueueSource().setEvents(5, 6, 8, 2, 5);
230  Connector.connect(src0, 0, proc, 0);
231  Connector.connect(src1, 0, proc, 1);
232  Pullable pul1 = proc.getPullableOutput();
233  for (int i = 0; i < 5; i++)
234  System.out.println(pul1.pull());
235  //*
236  }*/
237  {
238  System.out.println("Second query");
239  //*
240  Processor proc = builder.build("APPLY + X Y ON (FILTER (INPUT 0) WITH (APPLY LT X 0 ON (INPUT 0))) AND (INPUT 1)");
241  QueueSource src0 = new QueueSource().setEvents(0, -1, 2, -3, -4);
242  QueueSource src1 = new QueueSource().setEvents(5, 6, 8, 2, 5);
243  Connector.connect(src0, 0, proc, 0);
244  Connector.connect(src1, 0, proc, 1);
245  Pullable pul1 = proc.getPullableOutput();
246  for (int i = 0; i < 5; i++)
247  System.out.println(pul1.pull());
248  //*
249  }
250  }
251 
252 }