18 package mining.trenddistance;
20 import static ca.uqac.lif.cep.Connector.INPUT;
21 import static ca.uqac.lif.cep.Connector.OUTPUT;
23 import java.util.HashSet;
26 import org.apache.commons.math3.ml.clustering.DoublePoint;
27 import org.apache.commons.math3.ml.distance.EuclideanDistance;
29 import ca.uqac.lif.cep.Connector;
30 import ca.uqac.lif.cep.GroupProcessor;
31 import ca.uqac.lif.cep.Pullable;
32 import ca.uqac.lif.cep.functions.StreamVariable;
33 import ca.uqac.lif.cep.functions.CumulativeFunction;
34 import ca.uqac.lif.cep.functions.Cumulate;
35 import ca.uqac.lif.cep.functions.ApplyFunction;
36 import ca.uqac.lif.cep.functions.FunctionTree;
37 import ca.uqac.lif.cep.functions.IdentityFunction;
38 import ca.uqac.lif.cep.functions.TurnInto;
39 import ca.uqac.lif.cep.io.ReadStringStream;
40 import ca.uqac.lif.cep.peg.TrendDistance;
41 import ca.uqac.lif.cep.peg.ml.DistanceToClosest;
42 import ca.uqac.lif.cep.peg.ml.DoublePointCast;
43 import ca.uqac.lif.cep.peg.MapDistance.ToValueArray;
44 import ca.uqac.lif.cep.peg.Normalize;
45 import ca.uqac.lif.cep.tmf.Slice;
46 import ca.uqac.lif.cep.util.Numbers;
47 import ca.uqac.lif.cep.util.FindPattern;
115 public static void main(String[] args)
117 ReadStringStream reader =
new ReadStringStream(
SymbolDistributionClusters.class.getResourceAsStream(
"SymbolDistribution-AB.txt"));
118 FindPattern feeder =
new FindPattern(
"(.*?),");
119 Connector.connect(reader, feeder);
122 GroupProcessor vector =
new GroupProcessor(1, 1);
124 GroupProcessor counter =
new GroupProcessor(1, 1);
126 TurnInto one =
new TurnInto(1);
127 counter.associateInput(INPUT, one, INPUT);
128 Cumulate sum_one =
new Cumulate(
new CumulativeFunction<Number>(Numbers.addition));
129 Connector.connect(one, sum_one);
130 counter.associateOutput(OUTPUT, sum_one, OUTPUT);
131 counter.addProcessors(one, sum_one);
133 Slice slicer =
new Slice(
new IdentityFunction(1), counter);
134 ApplyFunction to_normalized_vector =
new ApplyFunction(
135 new FunctionTree(DoublePointCast.instance,
136 new FunctionTree(Normalize.instance,
137 new FunctionTree(ToValueArray.instance, StreamVariable.X))));
138 Connector.connect(slicer, to_normalized_vector);
139 vector.associateInput(INPUT, slicer, INPUT);
140 vector.associateOutput(OUTPUT, to_normalized_vector, OUTPUT);
141 vector.addProcessors(slicer, to_normalized_vector);
143 Connector.connect(feeder, vector);
144 Set<DoublePoint> pattern =
new HashSet<DoublePoint>();
145 pattern.add(
new DoublePoint(
new double[]{0.7, 0.3}));
146 pattern.add(
new DoublePoint(
new double[]{0.3, 0.7}));
147 TrendDistance<Set<DoublePoint>,Set<DoublePoint>,Number> alarm =
new TrendDistance<Set<DoublePoint>,Set<DoublePoint>,Number>(pattern, 9, vector,
new FunctionTree(Numbers.absoluteValue,
148 new FunctionTree(
new DistanceToClosest(
new EuclideanDistance()), StreamVariable.X, StreamVariable.Y)), 0.25, Numbers.isLessThan);
149 Connector.connect(feeder, alarm);
150 Pullable p = alarm.getPullableOutput();
152 for (
int i = 0; b && i < 10; i++)
154 b = (Boolean) p.pull();
155 System.out.println(b);
Trend distance based on the statistical distribution of symbols in a stream.