18 package network.httppush;
20 import java.util.LinkedList;
22 import ca.uqac.lif.cep.Connector;
23 import ca.uqac.lif.cep.ProcessorException;
24 import ca.uqac.lif.cep.UtilityMethods;
25 import ca.uqac.lif.cep.functions.CumulativeFunction;
26 import ca.uqac.lif.cep.functions.ApplyFunction;
27 import ca.uqac.lif.cep.functions.Cumulate;
28 import ca.uqac.lif.cep.http.HttpDownstreamGateway;
29 import ca.uqac.lif.cep.http.HttpUpstreamGateway;
30 import ca.uqac.lif.cep.io.Print;
31 import ca.uqac.lif.cep.serialization.JsonDeserializeString;
32 import ca.uqac.lif.cep.serialization.JsonSerializeString;
33 import ca.uqac.lif.cep.tmf.QueueSource;
34 import ca.uqac.lif.cep.tmf.TimeDecimate;
35 import ca.uqac.lif.cep.util.Lists.TimePack;
36 import ca.uqac.lif.cep.util.Lists.Unpack;
37 import ca.uqac.lif.cep.util.Numbers;
38 import ca.uqac.lif.jerrydog.RequestCallback.Method;
88 @SuppressWarnings(
"rawtypes")
89 public static void main(String[] args)
throws ProcessorException, InterruptedException
93 QueueSource source =
new QueueSource();
95 Cumulate sum =
new Cumulate(
new CumulativeFunction<Number>(Numbers.addition));
96 Connector.connect(source, sum);
100 TimePack packer =
new TimePack();
101 packer.setInterval(1000);
102 Connector.connect(sum, packer);
106 ApplyFunction serialize =
new ApplyFunction(
new JsonSerializeString());
107 Connector.connect(packer, serialize);
108 HttpUpstreamGateway up_gateway =
new HttpUpstreamGateway(
"http://localhost:12149/push");
109 Connector.connect(serialize, up_gateway);
115 HttpDownstreamGateway dn_gateway =
new HttpDownstreamGateway(12149,
"/push", Method.POST);
116 ApplyFunction deserialize =
new ApplyFunction(
new JsonDeserializeString<LinkedList>(LinkedList.class));
117 Connector.connect(dn_gateway, deserialize);
118 Unpack unpacker =
new Unpack();
119 Connector.connect(deserialize, unpacker);
123 TimeDecimate every_second =
new TimeDecimate(1000000000);
124 Connector.connect(unpacker, every_second);
125 Print print =
new Print();
127 Connector.connect(every_second, print);
136 System.out.println(
"Pushing 1000 events per second. Hit Ctrl+C to end.");
137 for (
long n = 0; n < 10000; n++)
140 UtilityMethods.pause(1);
Use a packer to send events in batch and reduce the number of HTTP requests.