Code Examples
A repository of 155 code examples for BeepBeep
PushLocalSerialize.java
1 /*
2  BeepBeep, an event stream processor
3  Copyright (C) 2008-2017 Sylvain HallĂ©
4 
5  This program is free software: you can redistribute it and/or modify
6  it under the terms of the GNU Lesser General Public License as published
7  by the Free Software Foundation, either version 3 of the License, or
8  (at your option) any later version.
9 
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU Lesser General Public License for more details.
14 
15  You should have received a copy of the GNU Lesser General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17  */
18 package network.httppush;
19 
20 import network.CompoundObject;
21 import ca.uqac.lif.cep.Connector;
22 import ca.uqac.lif.cep.ProcessorException;
23 import ca.uqac.lif.cep.Pushable;
24 import ca.uqac.lif.cep.functions.ApplyFunction;
25 import ca.uqac.lif.cep.http.HttpDownstreamGateway;
26 import ca.uqac.lif.cep.http.HttpUpstreamGateway;
27 import ca.uqac.lif.cep.io.Print;
28 import ca.uqac.lif.cep.serialization.JsonDeserializeString;
29 import ca.uqac.lif.cep.serialization.JsonSerializeString;
30 import ca.uqac.lif.jerrydog.RequestCallback.Method;
31 
32 /**
33  * In this example, Machine A and Machine B are actually the same host; they
34  * just listen to different TCP ports on the same computer. Read this program
35  * first before looking that the other files in this package.
36  * <p>
37  * <img
38  * src="./doc-files/network/httppush/PushLocal.png"
39  * alt="Processor graph">
40  * @author Sylvain HallĂ©
41  *
42  */
43 public class PushLocalSerialize
44 {
45  public static void main(String[] args) throws ProcessorException, InterruptedException
46  {
47  ///
48  /* We first setup a {@link FunctionProcessor} that will execute
49  * the function {@link JsonSerializeString} on each input event. This function
50  * transforms an incoming object into a character string in the JSON format,
51  * through a process called <em>serialization</em>. Under the hood, the Azrael
52  * library takes care of this task. */
53  ApplyFunction serialize = new ApplyFunction(new JsonSerializeString());
54 
55  /* This function processor is connected to a {@link HttpUpstreamGateway}.
56  * The gateway is a processor that transmits its received events to another
57  * machine, through HTTP requests and responses. Since the chain operates in
58  * <em>push</em> mode, the gateway will be pushed character strings from
59  * upstream, and will in turn push them to the outside world by sending an HTTP
60  * request at a specific address. Thus, when instantiating the gateway, we must
61  * tell it the URL at which the request will be sent. In this case, the URL
62  * for Machine B is on the same host, on port 12144. The "/push" prefix is the
63  * "page" on Machine B the server will respond to. */
64  HttpUpstreamGateway up_gateway =
65  new HttpUpstreamGateway("http://localhost:12144/push");
66 
67  /* We now move on to Machine B, which is responsible for receiving character
68  * strings and converting them back into objects. This is the mirror process of
69  * what was just done. So, the first step is to create an
70  * {@link HttpDownstreamGateway}. The gateway is instructed to listen to incoming
71  * requests on port 12144, to respond to requests made at the page "/push", and
72  * send through an HTTP <code>POST</code> request. */
73  HttpDownstreamGateway dn_gateway =
74  new HttpDownstreamGateway(12144, "/push", Method.POST);
75 
76  /* Each event output from this processor is a sequence of character strings,
77  * taken from the payload of each HTTP request that is received. Remember
78  * that these strings are in the JSON format, and correspond to the serialized
79  * content of objects. The next step is to transform these strings back into
80  * objects, i.e. to <em>deserialize</em> them. This is done by a
81  * {@link FunctionProcessor} that applies the {@link JsonDeserializeString}
82  * function (again, using the Azrael library in the background). Note that
83  * this function must be given the class of the objects
84  * to be deserialized, so that it knows instances of what kind of objects
85  * to create. */
86  ApplyFunction deserialize = new ApplyFunction(
87  new JsonDeserializeString<CompoundObject>(
88  CompoundObject.class));
89 
90  /* Just so that we can see something, we plug a {@link Print} processor at
91  * the end; it will print to the standard output whatever object it receives
92  * from upstream. */
93  Print print = new Print();
94 
95  /* We are now ready to pipe everything together. The interesting bit is
96  * what is <em>not</em> there: notice that we do not connect
97  * <code>up_gateway</code> and <code>dn_gateway</code>. Indeed, these two
98  * processors do not communicate using a BeepBeep pipe like the others;
99  * rather, <code>up_gateway</code> sends its events to <code>dn_gateway</code>
100  * through HTTP requests (i.e., outside of BeepBeep). This is what would
101  * make it possible to put the two halves of this processor chain
102  * (serialize and up_gateway on one side, dn_gateway, deserialize and
103  * print on the other) on two different machines. */
104  Connector.connect(serialize, up_gateway);
105  Connector.connect(dn_gateway, deserialize);
106  Connector.connect(deserialize, print);
107 
108  /* Since the gateways are actually mini-web servers, the servers need to
109  * be launched so that they can actually communicate. This is done by
110  * calling the {@link Processor#start() start()} method on both
111  * processors. Look out, as there can be only one server on a machine
112  * listening to a given TCP port; if you have another instance of this
113  * program already running, the call to start will throw an Exception. */
114  up_gateway.start();
115  dn_gateway.start();
116  ///
117 
118  /* We are now ready to push events and see what happens. First, we get
119  * a handle on the {@link Pushable} of the very first processor of the
120  * chain, <code>serialize</code> (which resides on Machine A). */
121  //!
122  Pushable p = serialize.getPushableInput();
123 
124  /* Let's push some dummy object. After the call to push, the standard
125  * output should display the contents of that object. This is pretty
126  * boring, but think about all the magic that happened in the background:
127  * <ul>
128  * <li>The object was converted into a JSON string</li>
129  * <li>The string was sent over the network through an HTTP request to
130  * another server...</li>
131  * <li>converted back into an object identical to the original...</li>
132  * <li>and pushed downstream to be handled by the rest of the
133  * processors as usual.</li>
134  * </ul>
135  * And so far, we've written about 10 lines of code. */
136  p.push(new CompoundObject(0, "foo", null));
137 
138  /* Sleep a little so you have time to look at the console... */
139  Thread.sleep(1000);
140 
141  /* Let's push again. You know the drill. */
142  p.push(new CompoundObject(0, "foo", new CompoundObject(6, "z", null)));
143  //!
144 
145  /* Once everything is done, we have to stop the servers to free the
146  * TCP ports on your machine. This is done by calling
147  * {@link Processor#stop() stop()} on both processors. */
148  up_gateway.stop();
149  dn_gateway.stop();
150 
151  /* That's all folks! */
152  ///
153  }
154 }
Send events from one processor to another over a network.
In this example, Machine A and Machine B are actually the same host; they just listen to different TC...
A dummy object used to show serialization.