Code Examples
A repository of 155 code examples for BeepBeep
PushMachineB.java
1 /*
2  BeepBeep, an event stream processor
3  Copyright (C) 2008-2017 Sylvain HallĂ©
4 
5  This program is free software: you can redistribute it and/or modify
6  it under the terms of the GNU Lesser General Public License as published
7  by the Free Software Foundation, either version 3 of the License, or
8  (at your option) any later version.
9 
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU Lesser General Public License for more details.
14 
15  You should have received a copy of the GNU Lesser General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17  */
18 package network.httppush;
19 
20 import java.util.Scanner;
21 
22 import ca.uqac.lif.cep.Connector;
23 import ca.uqac.lif.cep.ProcessorException;
24 import ca.uqac.lif.cep.functions.ApplyFunction;
25 import ca.uqac.lif.cep.http.HttpDownstreamGateway;
26 import ca.uqac.lif.cep.io.Print;
27 import ca.uqac.lif.cep.serialization.JsonDeserializeString;
28 import ca.uqac.lif.jerrydog.RequestCallback.Method;
29 
30 /**
31  * This is the same example as {@link PushLocalSerialize}, but with the
32  * "Machine A" and "Machine B" parts of the chain split into two independent
33  * programs, with some user interaction added. You are advised to first
34  * go through the {@link PushLocalSerialize} example.
35  * This file corresponds to Machine B.
36  * <p>
37  * You can actually run this example on two physical machines if you
38  * wish. Simply start {@link PushMachineA} and {@link PushMachineB} on
39  * two computers, and use the IP address of Machine B in the URL you give
40  * to Machine A.
41  *
42  * @author Sylvain HallĂ©
43  */
44 @SuppressWarnings("resource")
45 public class PushMachineB
46 {
47  public static void main(String[] args) throws ProcessorException, InterruptedException
48  {
49  // Ask a few infos about the remote end of the process
50  System.out.println("Hello, I am Machine B (downstream).");
51  System.out.print("Enter the port I should listen to: ");
52  Scanner sc = new Scanner(System.in);
53  int port = sc.nextInt();
54 
55  // Instantiate and pipe the processors
56  HttpDownstreamGateway dn_gateway = new HttpDownstreamGateway(port, "/push", Method.POST);
57  ApplyFunction deserialize = new ApplyFunction(new JsonDeserializeString<CompoundObject>(CompoundObject.class));
58  Print print = new Print();
59  Connector.connect(dn_gateway, deserialize);
60  Connector.connect(deserialize, print);
61 
62  // Start the gateways so it can listen to requests
63  dn_gateway.start();
64 
65  // Push events on one end
66  System.out.println("The received objects will be displayed below. Press q to quit.");
67  for (;;)
68  {
69  String line = sc.nextLine();
70  if (line.startsWith("q") || line.startsWith("Q"))
71  break;
72  }
73  // Stop the gateway
74  dn_gateway.stop();
75  sc.close();
76  }
77 
78  /**
79  * A dummy object used to test serialization
80  */
81  protected static class CompoundObject
82  {
83  int a;
84  String b;
85  CompoundObject c;
86 
87  protected CompoundObject()
88  {
89  super();
90  }
91 
92  public CompoundObject(int a, String b, CompoundObject c)
93  {
94  super();
95  this.a = a;
96  this.b = b;
97  this.c = c;
98  }
99 
100  @Override
101  public boolean equals(Object o)
102  {
103  if (o == null || !(o instanceof CompoundObject))
104  {
105  return false;
106  }
107  CompoundObject co = (CompoundObject) o;
108  if (this.a != co.a || this.b.compareTo(co.b) != 0)
109  {
110  return false;
111  }
112  if ((this.c == null && co.c == null) || (this.c != null && co.c != null && this.c.equals(co.c)))
113  {
114  return true;
115  }
116  return false;
117  }
118 
119  @Override
120  public String toString()
121  {
122  return "a = " + a + ", b = " + b + ", c = (" + c + ")";
123  }
124  }
125 
126 }
This is the same example as PushLocalSerialize, but with the "Machine A" and "Machine B" parts of the...