3 import ca.uqac.lif.cep.Connector;
4 import ca.uqac.lif.cep.GroupProcessor;
5 import ca.uqac.lif.cep.Pullable;
6 import ca.uqac.lif.cep.functions.ApplyFunction;
7 import ca.uqac.lif.cep.tmf.Fork;
8 import ca.uqac.lif.cep.tmf.QueueSource;
9 import ca.uqac.lif.cep.tmf.Trim;
10 import ca.uqac.lif.cep.util.Numbers;
26 public static void main(String[] args)
29 QueueSource source =
new QueueSource().setEvents(1, 2, 3, 4, 5, 6);
32 GroupProcessor group =
new GroupProcessor(1, 1);
34 Fork fork =
new Fork(2);
35 ApplyFunction add =
new ApplyFunction(Numbers.addition);
36 Connector.connect(fork, 0, add, 0);
37 Trim trim =
new Trim(1);
38 Connector.connect(fork, 1, trim, 0);
39 Connector.connect(trim, 0, add, 1);
40 group.addProcessors(fork, trim, add);
41 group.associateInput(0, fork, 0);
42 group.associateOutput(0, add, 0);
46 Connector.connect(source, group);
50 Pullable p = group.getPullableOutput();
51 for (
int i = 0; i < 6; i++)
53 float x = (Float) p.pull();
54 System.out.println(
"The event is: " + x);
Encapsulate a chain of processors into a Group.
static void main(String[] args)