18 package mining.trenddistance;
20 import static ca.uqac.lif.cep.Connector.BOTTOM;
21 import static ca.uqac.lif.cep.Connector.INPUT;
22 import static ca.uqac.lif.cep.Connector.OUTPUT;
23 import static ca.uqac.lif.cep.Connector.TOP;
24 import ca.uqac.lif.cep.Connector;
25 import ca.uqac.lif.cep.GroupProcessor;
26 import ca.uqac.lif.cep.Pullable;
27 import ca.uqac.lif.cep.functions.StreamVariable;
28 import ca.uqac.lif.cep.functions.CumulativeFunction;
29 import ca.uqac.lif.cep.functions.Cumulate;
30 import ca.uqac.lif.cep.functions.ApplyFunction;
31 import ca.uqac.lif.cep.functions.FunctionTree;
32 import ca.uqac.lif.cep.functions.TurnInto;
33 import ca.uqac.lif.cep.peg.TrendDistance;
34 import ca.uqac.lif.cep.tmf.Fork;
35 import ca.uqac.lif.cep.tmf.QueueSource;
36 import ca.uqac.lif.cep.util.Numbers;
93 public static void main(String[] args)
95 GroupProcessor average =
new GroupProcessor(1, 1);
97 Fork fork =
new Fork(2);
98 average.associateInput(INPUT, fork, INPUT);
99 Cumulate sum =
new Cumulate(
new CumulativeFunction<Number>(Numbers.addition));
100 Connector.connect(fork, TOP, sum, INPUT);
101 TurnInto one =
new TurnInto(1);
102 Connector.connect(fork, BOTTOM, one, INPUT);
103 Cumulate sum_one =
new Cumulate(
new CumulativeFunction<Number>(Numbers.addition));
104 Connector.connect(one, sum_one);
105 ApplyFunction div =
new ApplyFunction(Numbers.division);
106 Connector.connect(sum, OUTPUT, div, TOP);
107 Connector.connect(sum_one, OUTPUT, div, BOTTOM);
108 average.associateOutput(OUTPUT, div, OUTPUT);
109 average.addProcessors(fork, sum, one, sum_one, div);
111 TrendDistance<Number,Number,Number> alarm =
new TrendDistance<Number,Number,Number>(6, 3, average,
new FunctionTree(Numbers.absoluteValue,
112 new FunctionTree(Numbers.subtraction, StreamVariable.X, StreamVariable.Y)), 0.5, Numbers.isLessThan);
113 QueueSource source =
new QueueSource();
114 source.setEvents(6.1, 5.9, 6, 6.7, 6.7, 6.7);
115 Connector.connect(source, alarm);
116 Pullable p = alarm.getPullableOutput();
118 for (
int i = 0; b && i < 10; i++)
120 b = (Boolean) p.pull();
121 System.out.println(b);
Trend distance based on the average of values in a stream.