Code Examples
A repository of 155 code examples for BeepBeep
FileDemo.java
1 /*
2  BeepBeep, an event stream processor
3  Copyright (C) 2008-2023 Sylvain HallĂ©
4 
5  This program is free software: you can redistribute it and/or modify
6  it under the terms of the GNU Lesser General Public License as published
7  by the Free Software Foundation, either version 3 of the License, or
8  (at your option) any later version.
9 
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU Lesser General Public License for more details.
14 
15  You should have received a copy of the GNU Lesser General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17  */
18 package complex;
19 
20 import static ca.uqac.lif.cep.Connector.connect;
21 
22 import ca.uqac.lif.cep.GroupProcessor;
23 import ca.uqac.lif.cep.Processor;
24 import ca.uqac.lif.cep.Pushable;
25 import ca.uqac.lif.cep.complex.RangeCep;
26 import ca.uqac.lif.cep.functions.ApplyFunction;
27 import ca.uqac.lif.cep.functions.Constant;
28 import ca.uqac.lif.cep.functions.Cumulate;
29 import ca.uqac.lif.cep.functions.CumulativeFunction;
30 import ca.uqac.lif.cep.functions.FunctionTree;
31 import ca.uqac.lif.cep.functions.StreamVariable;
32 import ca.uqac.lif.cep.io.Print.Println;
33 import ca.uqac.lif.cep.tmf.FilterOn;
34 import ca.uqac.lif.cep.tmf.Fork;
35 import ca.uqac.lif.cep.tmf.Freeze;
36 import ca.uqac.lif.cep.tmf.Insert;
37 import ca.uqac.lif.cep.tmf.SliceLast;
38 import ca.uqac.lif.cep.util.Bags;
39 import ca.uqac.lif.cep.util.Booleans;
40 import ca.uqac.lif.cep.util.Equals;
41 import ca.uqac.lif.cep.util.Lists;
42 import ca.uqac.lif.cep.util.NthElement;
43 import ca.uqac.lif.cep.util.Numbers;
44 
45 /**
46  * Creates complex events out of low-level events occurring to files.
47  * Each basic event represents an operation done on a Java iterator. It is
48  * an array made of at least two elements:
49  * <ul>
50  * <li>The name of the file being manipulated (a String)</li>
51  * <li>The name of the operation being done on this file (a String)</li>
52  * </ul>
53  * <p>
54  * The operation can be either "open", "close", "read" or "write". When the
55  * operation is "write" or "read", the array contains two more elements:
56  * <ul>
57  * <li>The offset at which bytes are written (resp. read)</li>
58  * <li>The number of bytes being written (resp. read)</li>
59  * </ul>
60  * <p>
61  * The goal is to produce a high-level stream where each event summarizes the
62  * "lifecycle" of a file. When a file is closed, a "complex"
63  * {@link FileOperation} event should be produced, summarizing the operations
64  * done on this file during the time it was open:
65  * <ul>
66  * <li>The filename</li>
67  * <li>Whether the interaction was for reading or for writing</li>
68  * <li>The number of bytes read (or written)</li>
69  * <li>Whether the operations were <em>contiguous</em>. Contiguous operations
70  * mean that the first read (resp. write) operation occurs at the start of the
71  * file (offset 0), and that the next operation starts where the previous one
72  * left off.</li>
73  * <li>The global range of bytes that was accessed by this interaction. This
74  * is calculated as the interval from the minimum value of <i>offset</i> seen
75  * in an operation, to the maximum value of <i>offset</i>+<i>length</i>.
76  * </li>
77  * </ul>
78  * <p>
79  * The pipeline allows resets, which means that if the same file is reopened
80  * at a later time, a new complex event will eventually be output for this new
81  * interaction.
82  * <p>
83  * The global pipeline is illustrated as follows:
84  * <p>
85  * <img src=./doc-files/complex/FileAccess.png" alt="Pipeline" />
86  */
87 public class FileDemo
88 {
89  public static void main(String[] args)
90  {
91  /* Create a processor that gets the filename from the first event. */
92  GroupProcessor get_filename = new GroupProcessor(1, 1) {{
93  ApplyFunction a = new ApplyFunction(new NthElement(0));
94  Freeze f = new Freeze();
95  connect(a, f);
96  addProcessors(a, f).associateInput(a).associateOutput(f);
97  }};
98 
99  /* Create a processor that gets the access mode (i.e. read or write) first
100  * such event. */
101  GroupProcessor read_or_write = new GroupProcessor(1, 1) {{
102  ApplyFunction op = new ApplyFunction(new NthElement(1));
103  FilterOn filter = new FilterOn(new FunctionTree(Booleans.or,
104  new FunctionTree(Equals.instance, StreamVariable.X, new Constant("read")),
105  new FunctionTree(Equals.instance, StreamVariable.X, new Constant("write"))
106  ));
107  Freeze f = new Freeze();
108  connect(op, filter, f);
109  addProcessors(op, filter, f).associateInput(op).associateOutput(f);
110  }};
111 
112  /* Create a processor that sums the bytes written or read. */
113  GroupProcessor total_bytes = new GroupProcessor(1, 1) {{
114  FilterOn filter = new FilterOn(new FunctionTree(Booleans.or,
115  new FunctionTree(Equals.instance, new FunctionTree(new NthElement(1), StreamVariable.X), new Constant("read")),
116  new FunctionTree(Equals.instance, new FunctionTree(new NthElement(1), StreamVariable.X), new Constant("write"))
117  ));
118  ApplyFunction bytes = new ApplyFunction(new FunctionTree(Numbers.numberCast, new NthElement(3)));
119  Cumulate sum = new Cumulate(new CumulativeFunction<Number>(Numbers.addition));
120  connect(filter, bytes, sum);
121  addProcessors(filter, bytes, sum).associateInput(filter).associateOutput(sum);
122  }};
123 
124  /* Create a processor that checks if all accesses are contiguous. */
125  GroupProcessor contiguous = new GroupProcessor(1, 1) {{
126  FilterOn filter = new FilterOn(new FunctionTree(Booleans.or,
127  new FunctionTree(Equals.instance, new FunctionTree(new NthElement(1), StreamVariable.X), new Constant("read")),
128  new FunctionTree(Equals.instance, new FunctionTree(new NthElement(1), StreamVariable.X), new Constant("write"))
129  ));
130  Fork f = new Fork();
131  connect(filter, f);
132  ApplyFunction cur_offset = new ApplyFunction(new NthElement(2));
133  connect(f, 0, cur_offset, 0);
134  ApplyFunction exp_offset = new ApplyFunction(new FunctionTree(Numbers.addition, new NthElement(2), new NthElement(3)));
135  connect(f, 1, exp_offset, 0);
136  Insert ins = new Insert(1, 0);
137  connect(exp_offset, ins);
138  ApplyFunction eq = new ApplyFunction(Equals.instance);
139  connect(cur_offset, 0, eq, 0);
140  connect(ins, 0, eq, 1);
141  Cumulate and = new Cumulate(new CumulativeFunction<Boolean>(Booleans.and));
142  connect(eq, and);
143  addProcessors(filter, f, cur_offset, exp_offset, ins, eq, and).associateInput(filter).associateOutput(and);
144  }};
145 
146  /* Create a processor that gets the range of bytes accessed. */
147  GroupProcessor byte_range = new GroupProcessor(1, 1) {{
148  FilterOn filter = new FilterOn(new FunctionTree(Booleans.or,
149  new FunctionTree(Equals.instance, new FunctionTree(new NthElement(1), StreamVariable.X), new Constant("read")),
150  new FunctionTree(Equals.instance, new FunctionTree(new NthElement(1), StreamVariable.X), new Constant("write"))
151  ));
152  Fork f = new Fork();
153  connect(filter, f);
154  ApplyFunction off = new ApplyFunction(new FunctionTree(Numbers.numberCast, new NthElement(2)));
155  connect(f, 0, off, 0);
156  ApplyFunction off_len = new ApplyFunction(new FunctionTree(Numbers.addition, new NthElement(2), new NthElement(3)));
157  connect(f, 1, off_len, 0);
158  Cumulate min = new Cumulate(new CumulativeFunction<Number>(Numbers.minimum));
159  connect(off, min);
160  Cumulate max = new Cumulate(new CumulativeFunction<Number>(Numbers.maximum));
161  connect(off_len, max);
162  ApplyFunction pair = new ApplyFunction(new Bags.ToList(2));
163  connect(min, 0, pair, 0);
164  connect(max, 0, pair, 1);
165  addProcessors(filter, f, off, off_len, min, max, pair).associateInput(filter).associateOutput(pair);
166  }};
167 
168  /* Create a RangeCep processor that will aggregate information about a
169  * operations done on a file. */
170  RangeCep cep = new RangeCep(
171  new ApplyFunction(new FunctionTree(Booleans.not, new FunctionTree(Equals.instance, new FunctionTree(new NthElement(1), StreamVariable.X), new Constant("close")))),
172  new Processor[] {get_filename, read_or_write, total_bytes, contiguous, byte_range},
173  new Bags.ToArray(5)).allowRestarts(true);
174 
175  /* Create a slice processor associating one instance of the RangeCep
176  * processor to each individual instance of file in circulation. */
177  SliceLast slice = new SliceLast(new NthElement(0), cep);
178 
179  /* Connect to a printer and push some events. */
180  connect(slice, new Lists.Unpack(), new Println());
181  Pushable p = slice.getPushableInput();
182  p.push(new Object[] {"foo.txt", "open"});
183  p.push(new Object[] {"foo.txt", "read", 0, 10});
184  p.push(new Object[] {"bar.txt", "open"});
185  p.push(new Object[] {"foo.txt", "read", 9, 12});
186  p.push(new Object[] {"bar.txt", "write", 0, 33});
187  p.push(new Object[] {"bar.txt", "write", 33, 2});
188  p.push(new Object[] {"foo.txt", "read", 12, 5});
189  p.push(new Object[] {"bar.txt", "close"});
190  p.push(new Object[] {"foo.txt", "close"});
191  p.push(new Object[] {"bar.txt", "open"});
192  p.push(new Object[] {"bar.txt", "write", 10, 4});
193  p.push(new Object[] {"bar.txt", "close"});
194  }
195 }
Creates complex events out of low-level events occurring to files.
Definition: FileDemo.java:87