3 import java.io.InputStream;
5 import ca.uqac.lif.cep.Connector;
6 import ca.uqac.lif.cep.Pullable;
7 import ca.uqac.lif.cep.functions.ApplyFunction;
8 import ca.uqac.lif.cep.functions.FunctionTree;
9 import ca.uqac.lif.cep.functions.StreamVariable;
10 import ca.uqac.lif.cep.graphviz.ToDot;
11 import ca.uqac.lif.cep.graphviz.UpdateGraph;
12 import ca.uqac.lif.cep.io.ReadLines;
13 import ca.uqac.lif.cep.tuples.FetchAttribute;
14 import ca.uqac.lif.cep.tuples.TupleFeeder;
15 import ca.uqac.lif.cep.util.Bags;
16 import ca.uqac.lif.cep.util.Lists;
17 import ca.uqac.lif.cep.util.Strings;
21 public static void main(String[] args)
23 InputStream is =
Relationships.class.getResourceAsStream(
"arf.csv");
24 ReadLines reader =
new ReadLines(is);
25 TupleFeeder
tuples =
new TupleFeeder();
26 Connector.connect(reader, tuples);
27 ApplyFunction pairs =
new ApplyFunction(
new FunctionTree(Bags.product,
28 new FunctionTree(
new Bags.ToSet(String.class),
29 new FunctionTree(
new FetchAttribute(
"author"), StreamVariable.X)),
30 new FunctionTree(
new Strings.FindRegex(
"@([_\\d\\w]*)"),
31 new FunctionTree(
new FetchAttribute(
"text"), StreamVariable.X))));
32 Connector.connect(tuples, pairs);
33 Lists.Unpack unpack =
new Lists.Unpack();
34 Connector.connect(pairs, unpack);
35 ApplyFunction explode =
new ApplyFunction(
new Bags.Explode(String.class, String.class));
36 Connector.connect(unpack, explode);
37 UpdateGraph graph =
new UpdateGraph();
38 ApplyFunction td =
new ApplyFunction(ToDot.instance);
39 Connector.connect(explode, graph, td);
40 Pullable p = td.getPullableOutput();
44 dot = (String) p.next();
46 System.out.println(dot);