18 package network.httppush;
21 import ca.uqac.lif.cep.Connector;
22 import ca.uqac.lif.cep.ProcessorException;
23 import ca.uqac.lif.cep.Pushable;
24 import ca.uqac.lif.cep.functions.ApplyFunction;
25 import ca.uqac.lif.cep.http.HttpDownstreamGateway;
26 import ca.uqac.lif.cep.http.HttpUpstreamGateway;
27 import ca.uqac.lif.cep.io.Print;
28 import ca.uqac.lif.cep.serialization.JsonDeserializeString;
29 import ca.uqac.lif.cep.serialization.JsonSerializeString;
30 import ca.uqac.lif.jerrydog.RequestCallback.Method;
45 public static void main(String[] args)
throws ProcessorException, InterruptedException
53 ApplyFunction serialize =
new ApplyFunction(
new JsonSerializeString());
64 HttpUpstreamGateway up_gateway =
65 new HttpUpstreamGateway(
"http://localhost:12144/push");
73 HttpDownstreamGateway dn_gateway =
74 new HttpDownstreamGateway(12144,
"/push", Method.POST);
86 ApplyFunction deserialize =
new ApplyFunction(
87 new JsonDeserializeString<CompoundObject>(
93 Print print =
new Print();
104 Connector.connect(serialize, up_gateway);
105 Connector.connect(dn_gateway, deserialize);
106 Connector.connect(deserialize, print);
122 Pushable p = serialize.getPushableInput();
Send events from one processor to another over a network.
In this example, Machine A and Machine B are actually the same host; they just listen to different TC...
A dummy object used to show serialization.