Code Examples
A repository of 155 code examples for BeepBeep
PackerExample.java
1 /*
2  BeepBeep, an event stream processor
3  Copyright (C) 2008-2017 Sylvain HallĂ©
4 
5  This program is free software: you can redistribute it and/or modify
6  it under the terms of the GNU Lesser General Public License as published
7  by the Free Software Foundation, either version 3 of the License, or
8  (at your option) any later version.
9 
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU Lesser General Public License for more details.
14 
15  You should have received a copy of the GNU Lesser General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17  */
18 package network.httppush;
19 
20 import java.util.LinkedList;
21 
22 import ca.uqac.lif.cep.Connector;
23 import ca.uqac.lif.cep.ProcessorException;
24 import ca.uqac.lif.cep.UtilityMethods;
25 import ca.uqac.lif.cep.functions.CumulativeFunction;
26 import ca.uqac.lif.cep.functions.ApplyFunction;
27 import ca.uqac.lif.cep.functions.Cumulate;
28 import ca.uqac.lif.cep.http.HttpDownstreamGateway;
29 import ca.uqac.lif.cep.http.HttpUpstreamGateway;
30 import ca.uqac.lif.cep.io.Print;
31 import ca.uqac.lif.cep.serialization.JsonDeserializeString;
32 import ca.uqac.lif.cep.serialization.JsonSerializeString;
33 import ca.uqac.lif.cep.tmf.QueueSource;
34 import ca.uqac.lif.cep.tmf.TimeDecimate;
35 import ca.uqac.lif.cep.util.Lists.TimePack;
36 import ca.uqac.lif.cep.util.Lists.Unpack;
37 import ca.uqac.lif.cep.util.Numbers;
38 import ca.uqac.lif.jerrydog.RequestCallback.Method;
39 
40 /**
41  * Use a packer to send events in batch and reduce the number of HTTP
42  * requests.
43  * <p>
44  * <img
45  * src="./doc-files/network/httppush/PackerExample.png"
46  * alt="Processor graph">
47  * <p>
48  * In this example, we send a relatively fast flow of events (about 1,000
49  * per second) from one host to another using HTTP gateways
50  * (see {@link PushLocalSerialize} for an explanation of gateways). If we used
51  * the {@link HttpUpstreamGateway} directly, this would result in one
52  * HTTP request-response cycle for each event to be pushed over the network.
53  * However, each request has an associated overhead, which places an upper
54  * bound on the number of requests per second that can be sent.
55  * <p>
56  * The {@link TimePack} processor can be used to reduce the number of
57  * such requests. The packer accumulates events for a predetermined amount
58  * of time (say, one second), and outputs all accumulated events as a single
59  * <code>List</code> object when the time interval is expired. When coupled
60  * with a JSON serializer (to transform that list into a JSON string) and
61  * an {@link HttpUpstreamGateway}, this results in fewer HTTP requests, a
62  * smaller overhead, and hence an increased throughput.
63  * <p>
64  * Setting the packer to a relatively long interval (e.g. 1 second) will
65  * be such that a single HTTP request will contain a batch of about 1,000
66  * events at a time. The output of the program should look like this:
67  * <pre>
68  * Pushing 1000 events per second. Hit Ctrl+C to end.
69  * 1,717,1574,2432,3280,...
70  * </pre>
71  * In contrast, setting the packer to a short time interval
72  * (e.g. 2 milliseconds) will result in the packer making much smaller
73  * bundles, which in turn will make the HTTP gateway send more HTTP requests.
74  * The output of the program should look like this:
75  * <pre>
76  * Pushing 1000 events per second. Hit Ctrl+C to end.
77  * 1,393,830,1375,1966,...
78  * </pre>
79  * Since every number corresponds to the cumulative number of events received
80  * at every passing second, one can see that sending too many HTTP requests
81  * results in a slower throughput than packing events and sending them in bulk
82  * periodically.
83  *
84  * @author Sylvain HallĂ©
85  */
86 public class PackerExample
87 {
88  @SuppressWarnings("rawtypes")
89  public static void main(String[] args) throws ProcessorException, InterruptedException
90  {
91  /* Let us assume that "Machine A" produces a stream of increasing
92  * integers. */
93  QueueSource source = new QueueSource();
94  source.addEvent(1);
95  Cumulate sum = new Cumulate(new CumulativeFunction<Number>(Numbers.addition));
96  Connector.connect(source, sum);
97 
98  /* Connect the output to a packer, and give it a time interval
99  * 1 second. */
100  TimePack packer = new TimePack();
101  packer.setInterval(1000);
102  Connector.connect(sum, packer);
103 
104  /* Connect the output of the packer to a serializer, and the serializer
105  * to an {@link HttpUpstreamGateway}. */
106  ApplyFunction serialize = new ApplyFunction(new JsonSerializeString());
107  Connector.connect(packer, serialize);
108  HttpUpstreamGateway up_gateway = new HttpUpstreamGateway("http://localhost:12149/push");
109  Connector.connect(serialize, up_gateway);
110 
111  /* We now move on to Machine B. We create in a row an
112  * {@link HttpDownstreamGateway}, a deserializser that will convert the
113  * string back into a list of events, and an unpacker that will push
114  * each element of the list as individual events. */
115  HttpDownstreamGateway dn_gateway = new HttpDownstreamGateway(12149, "/push", Method.POST);
116  ApplyFunction deserialize = new ApplyFunction(new JsonDeserializeString<LinkedList>(LinkedList.class));
117  Connector.connect(dn_gateway, deserialize);
118  Unpack unpacker = new Unpack();
119  Connector.connect(deserialize, unpacker);
120 
121  /* We only keep one event about every second, using the TimeDecimate
122  * processor, and print it to the screen. */
123  TimeDecimate every_second = new TimeDecimate(1000000000);
124  Connector.connect(unpacker, every_second);
125  Print print = new Print();
126  //Connector.connect(unpacker, print);
127  Connector.connect(every_second, print);
128 
129  /* We start the gateways and the packer. */
130  packer.start();
131  up_gateway.start();
132  dn_gateway.start();
133 
134  /* We are now ready to push events and see what happens. This loop sends
135  * about 100 events per second. */
136  System.out.println("Pushing 1000 events per second. Hit Ctrl+C to end.");
137  for (long n = 0; n < 10000; n++)
138  {
139  source.push();
140  UtilityMethods.pause(1);
141  }
142 
143  /* Stop the servers to free the TCP ports on your machine. */
144  up_gateway.stop();
145  dn_gateway.stop();
146 
147  /* That's all folks! */
148  }
149 }
Use a packer to send events in batch and reduce the number of HTTP requests.