18 package mining.trenddistance;
20 import static ca.uqac.lif.cep.Connector.BOTTOM;
21 import static ca.uqac.lif.cep.Connector.INPUT;
22 import static ca.uqac.lif.cep.Connector.OUTPUT;
23 import static ca.uqac.lif.cep.Connector.TOP;
24 import ca.uqac.lif.cep.Connector;
25 import ca.uqac.lif.cep.GroupProcessor;
26 import ca.uqac.lif.cep.Pullable;
27 import ca.uqac.lif.cep.functions.CumulativeFunction;
28 import ca.uqac.lif.cep.functions.Cumulate;
29 import ca.uqac.lif.cep.functions.ApplyFunction;
30 import ca.uqac.lif.cep.functions.TurnInto;
31 import ca.uqac.lif.cep.peg.TrendDistance;
32 import ca.uqac.lif.cep.tmf.Fork;
33 import ca.uqac.lif.cep.tmf.QueueSource;
34 import ca.uqac.lif.cep.util.Numbers;
78 public static void main(String[] args)
80 GroupProcessor average =
new GroupProcessor(1, 1);
82 Fork fork =
new Fork(2);
83 average.associateInput(INPUT, fork, INPUT);
84 Cumulate sum =
new Cumulate(
new CumulativeFunction<Number>(Numbers.addition));
85 Connector.connect(fork, TOP, sum, INPUT);
86 TurnInto one =
new TurnInto(1);
87 Connector.connect(fork, BOTTOM, one, INPUT);
88 Cumulate sum_one =
new Cumulate(
new CumulativeFunction<Number>(Numbers.addition));
89 Connector.connect(one, sum_one);
90 ApplyFunction div =
new ApplyFunction(Numbers.division);
91 Connector.connect(sum, OUTPUT, div, TOP);
92 Connector.connect(sum_one, OUTPUT, div, BOTTOM);
93 average.associateOutput(OUTPUT, div, OUTPUT);
94 average.addProcessors(fork, sum, one, sum_one, div);
96 TrendDistance<Number,Number,Number> alarm =
new TrendDistance<Number,Number,Number>(6, 3, average, Numbers.division, 0.5, Numbers.isLessThan);
97 QueueSource source =
new QueueSource();
98 source.setEvents(6.1, 5.9, 6, 6.7, 6.7, 6.7);
99 Connector.connect(source, alarm);
100 Pullable p = alarm.getPullableOutput();
102 for (
int i = 0; b && i < 10; i++)
104 b = (Boolean) p.pull();
105 System.out.println(b);
Trend distance based on the average of values in a stream.