3 import static ca.uqac.lif.cep.Connector.BOTTOM;
4 import static ca.uqac.lif.cep.Connector.INPUT;
5 import static ca.uqac.lif.cep.Connector.LEFT;
6 import static ca.uqac.lif.cep.Connector.OUTPUT;
7 import static ca.uqac.lif.cep.Connector.RIGHT;
8 import static ca.uqac.lif.cep.Connector.TOP;
10 import java.util.ArrayDeque;
11 import java.util.ArrayList;
12 import java.util.List;
14 import ca.uqac.lif.bullwinkle.BnfParser.InvalidGrammarException;
15 import ca.uqac.lif.bullwinkle.Builds;
16 import ca.uqac.lif.cep.Connector;
17 import ca.uqac.lif.cep.Processor;
18 import ca.uqac.lif.cep.Pullable;
19 import ca.uqac.lif.cep.dsl.GroupProcessorBuilder;
20 import ca.uqac.lif.cep.functions.ApplyFunction;
21 import ca.uqac.lif.cep.functions.Constant;
22 import ca.uqac.lif.cep.functions.Cumulate;
23 import ca.uqac.lif.cep.functions.CumulativeFunction;
24 import ca.uqac.lif.cep.functions.Function;
25 import ca.uqac.lif.cep.functions.FunctionTree;
26 import ca.uqac.lif.cep.functions.StreamVariable;
27 import ca.uqac.lif.cep.functions.TurnInto;
28 import ca.uqac.lif.cep.tmf.CountDecimate;
29 import ca.uqac.lif.cep.tmf.Filter;
30 import ca.uqac.lif.cep.tmf.Fork;
31 import ca.uqac.lif.cep.tmf.Passthrough;
32 import ca.uqac.lif.cep.tmf.QueueSource;
33 import ca.uqac.lif.cep.tmf.Trim;
34 import ca.uqac.lif.cep.util.Numbers;
45 catch (InvalidGrammarException e)
52 @Builds(rule=
"<trim>", pop=
true, clean=
true)
53 public Trim handleTrim(Object ... parts)
55 Integer n = Integer.parseInt((String) parts[0]);
56 Processor p = (Processor) parts[1];
57 Trim trim =
new Trim(n);
58 Connector.connect(p, trim);
65 @Builds(rule=
"<decim>", pop=
true, clean=
true)
66 public CountDecimate handleDecimate(Object ... parts)
68 Integer n = Integer.parseInt((String) parts[0]);
69 Processor p = (Processor) parts[1];
70 CountDecimate dec =
new CountDecimate(n);
71 Connector.connect(p, dec);
78 @Builds(rule=
"<filter>", pop=
true, clean=
true)
79 public Filter handleFilter(Object ... parts)
81 Processor p1 = (Processor) parts[0];
82 Processor p2 = (Processor) parts[1];
83 Filter filter =
new Filter();
84 Connector.connect(p1, 0, filter, 0);
85 Connector.connect(p2, 0, filter, 1);
92 @Builds(rule=
"<stream>")
93 public void handleStream(ArrayDeque<Object> stack)
95 Integer n = Integer.parseInt((String) stack.pop());
97 Passthrough p = forkInput(n);
103 @SuppressWarnings(
"unchecked")
105 @Builds(rule=
"<apply>", pop=
true, clean=
true)
106 public Processor handleApply(Object ... parts)
108 Function f = (Function) parts[0];
109 ApplyFunction af =
new ApplyFunction(f);
110 List<Processor> list = (List<Processor>) parts[1];
111 if (list.size() == 1)
113 Connector.connect(list.get(0), af);
115 else if (list.size() == 2)
117 Connector.connect(list.get(0), 0, af, 0);
118 Connector.connect(list.get(1), 0, af, 1);
125 @Builds(rule=
"<add>")
126 public void handleAdd(ArrayDeque<Object> stack)
128 Function f2 = (Function) stack.pop();
129 Function f1 = (Function) stack.pop();
131 stack.push(
new FunctionTree(Numbers.addition, f1, f2));
135 @Builds(rule=
"<sbt>")
136 public void handleSbt(ArrayDeque<Object> stack)
138 Function f2 = (Function) stack.pop();
139 Function f1 = (Function) stack.pop();
141 stack.push(
new FunctionTree(Numbers.subtraction, f1, f2));
145 public void handleLt(ArrayDeque<Object> stack)
147 Function f2 = (Function) stack.pop();
148 Function f1 = (Function) stack.pop();
150 stack.push(
new FunctionTree(Numbers.isLessThan, f1, f2));
153 @Builds(rule=
"<abs>")
154 public void handleAbs(ArrayDeque<Object> stack)
156 Function f1 = (Function) stack.pop();
158 stack.push(
new FunctionTree(Numbers.absoluteValue, f1));
161 @Builds(rule=
"<cons>")
162 public void handleCons(ArrayDeque<Object> stack)
164 stack.push(
new Constant(Integer.parseInt((String) stack.pop())));
168 @Builds(rule=
"<svar>")
169 public void handleStreamVariable(ArrayDeque<Object> stack)
171 String var_name = (String) stack.pop();
172 if (var_name.compareTo(
"X") == 0)
173 stack.push(StreamVariable.X);
174 if (var_name.compareTo(
"Y") == 0)
175 stack.push(StreamVariable.Y);
180 @Builds(rule=
"<proclist>")
181 public void handleProcList(ArrayDeque<Object> stack)
183 List<Processor> list =
new ArrayList<Processor>();
185 list.add((Processor) stack.pop());
187 if (stack.peek() instanceof String &&
188 ((String) stack.peek()).compareTo(
"AND") == 0)
192 list.add((Processor) stack.pop());
200 @Builds(rule=
"<avg>", pop=
true, clean=
true)
201 public Processor handleAvg(Object ... parts)
203 Fork fork =
new Fork(2);
204 Connector.connect((Processor) parts[0], fork);
205 Cumulate sum_proc =
new Cumulate(
206 new CumulativeFunction<Number>(Numbers.addition));
207 Connector.connect(fork, TOP, sum_proc, INPUT);
208 TurnInto ones =
new TurnInto(1);
209 Connector.connect(fork, BOTTOM, ones, INPUT);
210 Cumulate counter =
new Cumulate(
211 new CumulativeFunction<Number>(Numbers.addition));
212 Connector.connect(ones, OUTPUT, counter, INPUT);
213 ApplyFunction division =
new ApplyFunction(Numbers.division);
214 Connector.connect(sum_proc, OUTPUT, division, LEFT);
215 Connector.connect(counter, OUTPUT, division, RIGHT);
216 add(fork, sum_proc, ones, counter, division);
221 public static void main(String[] args)
throws ca.uqac.lif.bullwinkle.ParseTreeObjectBuilder.BuildException
238 System.out.println(
"Second query");
240 Processor proc = builder.build(
"APPLY + X Y ON (FILTER (INPUT 0) WITH (APPLY LT X 0 ON (INPUT 0))) AND (INPUT 1)");
241 QueueSource src0 =
new QueueSource().setEvents(0, -1, 2, -3, -4);
242 QueueSource src1 =
new QueueSource().setEvents(5, 6, 8, 2, 5);
243 Connector.connect(src0, 0, proc, 0);
244 Connector.connect(src1, 0, proc, 1);
245 Pullable pul1 = proc.getPullableOutput();
246 for (
int i = 0; i < 5; i++)
247 System.out.println(pul1.pull());