20 import ca.uqac.lif.cep.Connector;
21 import static ca.uqac.lif.cep.Connector.INPUT;
22 import static ca.uqac.lif.cep.Connector.OUTPUT;
23 import ca.uqac.lif.cep.GroupProcessor;
24 import ca.uqac.lif.cep.Pullable;
25 import ca.uqac.lif.cep.functions.Constant;
26 import ca.uqac.lif.cep.functions.Cumulate;
27 import ca.uqac.lif.cep.functions.CumulativeFunction;
28 import ca.uqac.lif.cep.functions.Function;
29 import ca.uqac.lif.cep.functions.ApplyFunction;
30 import ca.uqac.lif.cep.functions.IdentityFunction;
31 import ca.uqac.lif.cep.functions.TurnInto;
32 import ca.uqac.lif.cep.util.Maps.Values;
33 import ca.uqac.lif.cep.util.Numbers;
34 import ca.uqac.lif.cep.util.Bags.RunOn;
35 import ca.uqac.lif.cep.tmf.QueueSource;
36 import ca.uqac.lif.cep.tmf.Slice;
75 public static void main(String[] args)
78 QueueSource source =
new QueueSource();
79 source.setEvents(1, 6, 4, 3, 2, 1, 9);
83 Function slice_fct =
new IdentityFunction(1);
87 GroupProcessor counter =
new GroupProcessor(1, 1);
89 TurnInto to_one =
new TurnInto(
new Constant(1));
90 Cumulate sum =
new Cumulate(
new CumulativeFunction<Number>(Numbers.addition));
91 Connector.connect(to_one, sum);
92 counter.addProcessors(to_one, sum);
93 counter.associateInput(INPUT, to_one, INPUT);
94 counter.associateOutput(OUTPUT, sum, OUTPUT);
99 Slice slicer =
new Slice(slice_fct, counter);
100 Connector.connect(source, slicer);
105 ApplyFunction map_values =
new ApplyFunction(Values.instance);
106 Connector.connect(slicer, map_values);
112 RunOn max =
new RunOn(
new Cumulate(
new CumulativeFunction<Number>(Numbers.maximum)));
113 Connector.connect(map_values, max);
116 Pullable p = max.getPullableOutput();
117 for (
int i = 0; i < 10; i++)
120 System.out.println(o);
Apply an aggregation function on the output of a slicer.