20 import java.util.ArrayDeque;
22 import ca.uqac.lif.bullwinkle.BnfParser.InvalidGrammarException;
23 import ca.uqac.lif.bullwinkle.Builds;
24 import ca.uqac.lif.cep.Connector;
25 import ca.uqac.lif.cep.Processor;
26 import ca.uqac.lif.cep.Pullable;
27 import ca.uqac.lif.cep.dsl.GroupProcessorBuilder;
28 import ca.uqac.lif.cep.tmf.CountDecimate;
29 import ca.uqac.lif.cep.tmf.Filter;
30 import ca.uqac.lif.cep.tmf.Passthrough;
31 import ca.uqac.lif.cep.tmf.QueueSource;
32 import ca.uqac.lif.cep.tmf.Trim;
42 setGrammar(
"<proc> := <trim> | <decim> | <filter> | <stream> ;\n" 43 +
"<trim> := TRIM <num> FROM ( <proc> );\n" 44 +
"<decim> := KEEP ONE EVERY <num> FROM ( <proc> );\n" 45 +
"<filter> := FILTER ( <proc> ) WITH ( <proc> );\n" 46 +
"<stream> := INPUT <num> ;\n" 47 +
"<num> := ^[0-9]+;");
49 catch (InvalidGrammarException e)
57 @Builds(rule=
"<trim>", pop=
true, clean=
true)
58 public Trim handleTrim(Object ... parts)
60 Integer n = Integer.parseInt((String) parts[0]);
61 Processor p = (Processor) parts[1];
62 Trim trim =
new Trim(n);
63 Connector.connect(p, trim);
70 @Builds(rule=
"<decim>", pop=
true, clean=
true)
71 public CountDecimate handleDecimate(Object ... parts)
73 Integer n = Integer.parseInt((String) parts[0]);
74 Processor p = (Processor) parts[1];
75 CountDecimate dec =
new CountDecimate(n);
76 Connector.connect(p, dec);
83 @Builds(rule=
"<filter>", pop=
true, clean=
true)
84 public Filter handleFilter(Object ... parts)
86 Processor p1 = (Processor) parts[0];
87 Processor p2 = (Processor) parts[1];
88 Filter filter =
new Filter();
89 Connector.connect(p1, 0, filter, 0);
90 Connector.connect(p2, 0, filter, 1);
97 @Builds(rule=
"<stream>")
98 public void handleStream(ArrayDeque<Object> stack)
100 Integer n = Integer.parseInt((String) stack.pop());
102 Passthrough p = forkInput(n);
108 public static void main(String[] args)
throws ca.uqac.lif.bullwinkle.ParseTreeObjectBuilder.BuildException
112 System.out.println(
"First query");
113 Processor proc = builder.build(
114 "KEEP ONE EVERY 2 FROM (INPUT 0)");
115 QueueSource src =
new QueueSource().setEvents(0, 1, 2, 3, 4, 5, 6, 8);
116 Connector.connect(src, proc);
117 Pullable pul1 = proc.getPullableOutput();
118 for (
int i = 0; i < 5; i++)
119 System.out.println(pul1.pull());
122 System.out.println(
"Second query");
124 Processor proc = builder.build(
125 "KEEP ONE EVERY 2 FROM (TRIM 3 FROM (INPUT 0))");
126 QueueSource src =
new QueueSource().setEvents(0, 1, 2, 3, 4, 5, 6, 8);
127 Connector.connect(src, proc);
128 Pullable pul1 = proc.getPullableOutput();
129 for (
int i = 0; i < 5; i++)
130 System.out.println(pul1.pull());
134 System.out.println(
"Third query");
135 Processor proc = builder.build(
"FILTER (INPUT 0) WITH (INPUT 1)");
136 QueueSource src0 =
new QueueSource().setEvents(0, 1, 2, 3, 4, 5, 6, 8);
137 QueueSource src1 =
new QueueSource().setEvents(
true,
false,
false,
true,
true,
false);
138 Connector.connect(src0, 0, proc, 0);
139 Connector.connect(src1, 0, proc, 1);
140 Pullable pul1 = proc.getPullableOutput();
141 for (
int i = 0; i < 5; i++)
142 System.out.println(pul1.pull());