3 import java.io.InputStream;
6 import ca.uqac.lif.cep.Connector;
7 import ca.uqac.lif.cep.GroupProcessor;
8 import ca.uqac.lif.cep.Pullable;
9 import ca.uqac.lif.cep.functions.ApplyFunction;
10 import ca.uqac.lif.cep.functions.FunctionTree;
11 import ca.uqac.lif.cep.functions.StreamVariable;
12 import ca.uqac.lif.cep.io.ReadLines;
13 import ca.uqac.lif.cep.tmf.Slice;
14 import ca.uqac.lif.cep.tuples.FetchAttribute;
15 import ca.uqac.lif.cep.tuples.TupleFeeder;
16 import ca.uqac.lif.cep.util.Lists;
17 import ca.uqac.lif.cep.util.Sets;
18 import ca.uqac.lif.cep.util.Strings;
22 @SuppressWarnings(
"unchecked")
23 public static void main(String[] args)
25 InputStream is =
Hashtags.class.getResourceAsStream(
"arf.csv");
26 ReadLines reader =
new ReadLines(is);
27 TupleFeeder
tuples =
new TupleFeeder();
28 Connector.connect(reader, tuples);
29 GroupProcessor hts =
new GroupProcessor(1, 1);
31 ApplyFunction htags =
new ApplyFunction(
new FunctionTree(
32 new Strings.FindRegex(
"#([_\\d\\w]*)"),
33 new FunctionTree(
new FetchAttribute(
"text"), StreamVariable.X)));
34 Lists.Unpack unpack =
new Lists.Unpack();
35 Sets.PutInto
set =
new Sets.PutInto();
36 Connector.connect(htags, unpack,
set);
37 hts.addProcessors(htags, unpack,
set);
38 hts.associateInput(0, htags, 0);
39 hts.associateOutput(0,
set, 0);
41 Slice slice =
new Slice(
new FetchAttribute(
"author"), hts);
42 Connector.connect(tuples, slice);
43 Pullable p = slice.getPullableOutput();
44 Map<Object,Object> map = null;
47 map = (Map<Object,Object>) p.next();
49 System.out.println(map);