20 import static ca.uqac.lif.cep.Connector.connect;
22 import ca.uqac.lif.cep.GroupProcessor;
23 import ca.uqac.lif.cep.Processor;
24 import ca.uqac.lif.cep.Pushable;
25 import ca.uqac.lif.cep.complex.RangeCep;
26 import ca.uqac.lif.cep.functions.ApplyFunction;
27 import ca.uqac.lif.cep.functions.Constant;
28 import ca.uqac.lif.cep.functions.Cumulate;
29 import ca.uqac.lif.cep.functions.CumulativeFunction;
30 import ca.uqac.lif.cep.functions.FunctionTree;
31 import ca.uqac.lif.cep.functions.StreamVariable;
32 import ca.uqac.lif.cep.io.Print.Println;
33 import ca.uqac.lif.cep.tmf.FilterOn;
34 import ca.uqac.lif.cep.tmf.Fork;
35 import ca.uqac.lif.cep.tmf.Freeze;
36 import ca.uqac.lif.cep.tmf.Insert;
37 import ca.uqac.lif.cep.tmf.SliceLast;
38 import ca.uqac.lif.cep.util.Bags;
39 import ca.uqac.lif.cep.util.Booleans;
40 import ca.uqac.lif.cep.util.Equals;
41 import ca.uqac.lif.cep.util.Lists;
42 import ca.uqac.lif.cep.util.NthElement;
43 import ca.uqac.lif.cep.util.Numbers;
89 public static void main(String[] args)
92 GroupProcessor get_filename =
new GroupProcessor(1, 1) {{
93 ApplyFunction a =
new ApplyFunction(
new NthElement(0));
94 Freeze f =
new Freeze();
96 addProcessors(a, f).associateInput(a).associateOutput(f);
101 GroupProcessor read_or_write =
new GroupProcessor(1, 1) {{
102 ApplyFunction op =
new ApplyFunction(
new NthElement(1));
103 FilterOn filter =
new FilterOn(
new FunctionTree(Booleans.or,
104 new FunctionTree(Equals.instance, StreamVariable.X,
new Constant(
"read")),
105 new FunctionTree(Equals.instance, StreamVariable.X,
new Constant(
"write"))
107 Freeze f =
new Freeze();
108 connect(op, filter, f);
109 addProcessors(op, filter, f).associateInput(op).associateOutput(f);
113 GroupProcessor total_bytes =
new GroupProcessor(1, 1) {{
114 FilterOn filter =
new FilterOn(
new FunctionTree(Booleans.or,
115 new FunctionTree(Equals.instance,
new FunctionTree(
new NthElement(1), StreamVariable.X),
new Constant(
"read")),
116 new FunctionTree(Equals.instance,
new FunctionTree(
new NthElement(1), StreamVariable.X),
new Constant(
"write"))
118 ApplyFunction bytes =
new ApplyFunction(
new FunctionTree(Numbers.numberCast,
new NthElement(3)));
119 Cumulate sum =
new Cumulate(
new CumulativeFunction<Number>(Numbers.addition));
120 connect(filter, bytes, sum);
121 addProcessors(filter, bytes, sum).associateInput(filter).associateOutput(sum);
125 GroupProcessor contiguous =
new GroupProcessor(1, 1) {{
126 FilterOn filter =
new FilterOn(
new FunctionTree(Booleans.or,
127 new FunctionTree(Equals.instance,
new FunctionTree(
new NthElement(1), StreamVariable.X),
new Constant(
"read")),
128 new FunctionTree(Equals.instance,
new FunctionTree(
new NthElement(1), StreamVariable.X),
new Constant(
"write"))
132 ApplyFunction cur_offset =
new ApplyFunction(
new NthElement(2));
133 connect(f, 0, cur_offset, 0);
134 ApplyFunction exp_offset =
new ApplyFunction(
new FunctionTree(Numbers.addition,
new NthElement(2),
new NthElement(3)));
135 connect(f, 1, exp_offset, 0);
136 Insert ins =
new Insert(1, 0);
137 connect(exp_offset, ins);
138 ApplyFunction eq =
new ApplyFunction(Equals.instance);
139 connect(cur_offset, 0, eq, 0);
140 connect(ins, 0, eq, 1);
141 Cumulate and =
new Cumulate(
new CumulativeFunction<Boolean>(Booleans.and));
143 addProcessors(filter, f, cur_offset, exp_offset, ins, eq, and).associateInput(filter).associateOutput(and);
147 GroupProcessor byte_range =
new GroupProcessor(1, 1) {{
148 FilterOn filter =
new FilterOn(
new FunctionTree(Booleans.or,
149 new FunctionTree(Equals.instance,
new FunctionTree(
new NthElement(1), StreamVariable.X),
new Constant(
"read")),
150 new FunctionTree(Equals.instance,
new FunctionTree(
new NthElement(1), StreamVariable.X),
new Constant(
"write"))
154 ApplyFunction off =
new ApplyFunction(
new FunctionTree(Numbers.numberCast,
new NthElement(2)));
155 connect(f, 0, off, 0);
156 ApplyFunction off_len =
new ApplyFunction(
new FunctionTree(Numbers.addition,
new NthElement(2),
new NthElement(3)));
157 connect(f, 1, off_len, 0);
158 Cumulate min =
new Cumulate(
new CumulativeFunction<Number>(Numbers.minimum));
160 Cumulate max =
new Cumulate(
new CumulativeFunction<Number>(Numbers.maximum));
161 connect(off_len, max);
162 ApplyFunction pair =
new ApplyFunction(
new Bags.ToList(2));
163 connect(min, 0, pair, 0);
164 connect(max, 0, pair, 1);
165 addProcessors(filter, f, off, off_len, min, max, pair).associateInput(filter).associateOutput(pair);
170 RangeCep cep =
new RangeCep(
171 new ApplyFunction(
new FunctionTree(Booleans.not,
new FunctionTree(Equals.instance,
new FunctionTree(
new NthElement(1), StreamVariable.X),
new Constant(
"close")))),
172 new Processor[] {get_filename, read_or_write, total_bytes, contiguous, byte_range},
173 new Bags.ToArray(5)).allowRestarts(
true);
177 SliceLast slice =
new SliceLast(
new NthElement(0), cep);
180 connect(slice,
new Lists.Unpack(),
new Println());
181 Pushable p = slice.getPushableInput();
182 p.push(
new Object[] {
"foo.txt",
"open"});
183 p.push(
new Object[] {
"foo.txt",
"read", 0, 10});
184 p.push(
new Object[] {
"bar.txt",
"open"});
185 p.push(
new Object[] {
"foo.txt",
"read", 9, 12});
186 p.push(
new Object[] {
"bar.txt",
"write", 0, 33});
187 p.push(
new Object[] {
"bar.txt",
"write", 33, 2});
188 p.push(
new Object[] {
"foo.txt",
"read", 12, 5});
189 p.push(
new Object[] {
"bar.txt",
"close"});
190 p.push(
new Object[] {
"foo.txt",
"close"});
191 p.push(
new Object[] {
"bar.txt",
"open"});
192 p.push(
new Object[] {
"bar.txt",
"write", 10, 4});
193 p.push(
new Object[] {
"bar.txt",
"close"});
Creates complex events out of low-level events occurring to files.