Code Examples
A repository of 155 code examples for BeepBeep
TwinPrimesA.java
1 /*
2  BeepBeep, an event stream processor
3  Copyright (C) 2008-2017 Sylvain HallĂ©
4 
5  This program is free software: you can redistribute it and/or modify
6  it under the terms of the GNU Lesser General Public License as published
7  by the Free Software Foundation, either version 3 of the License, or
8  (at your option) any later version.
9 
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU Lesser General Public License for more details.
14 
15  You should have received a copy of the GNU Lesser General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17  */
18 package network.httppush.twinprimes;
19 
20 import java.math.BigInteger;
21 
25 
26 import static ca.uqac.lif.cep.Connector.INPUT;
27 import static ca.uqac.lif.cep.Connector.LEFT;
28 import static ca.uqac.lif.cep.Connector.RIGHT;
29 import static ca.uqac.lif.cep.Connector.OUTPUT;
30 import ca.uqac.lif.cep.Connector;
31 import ca.uqac.lif.cep.ProcessorException;
32 import ca.uqac.lif.cep.UtilityMethods;
33 import ca.uqac.lif.cep.functions.CumulativeFunction;
34 import ca.uqac.lif.cep.functions.ApplyFunction;
35 import ca.uqac.lif.cep.functions.Cumulate;
36 import ca.uqac.lif.cep.http.HttpUpstreamGateway;
37 import ca.uqac.lif.cep.io.Print;
38 import ca.uqac.lif.cep.tmf.Filter;
39 import ca.uqac.lif.cep.tmf.Fork;
40 import ca.uqac.lif.cep.tmf.Pump;
41 import ca.uqac.lif.cep.tmf.QueueSource;
42 
43 /**
44  * The code for Machine A in the twin prime example.
45  * @author Sylvain HallĂ©
46  */
47 public class TwinPrimesA
48 {
49  public static void main(String[] args) throws ProcessorException
50  {
51  ///
52  /* The URL where prime numbers will be pushed downstream. Change
53  * this string to correspond to Machine B's address and port. */
54  String push_url = "http://localhost:12312/bigprime";
55 
56  /* The first processor is a source that will push the BigInteger
57  * "2" repeatedly. */
58  QueueSource source = new QueueSource().addEvent(new BigInteger("2"));
59 
60  /* We the connect a pump to that source. */
61  Pump pump = new Pump(500);
62  Connector.connect(source, pump);
63 
64  /* The second processor is a simple counter. We will feed it with the
65  * BigInteger "2" repeatedly, and it will return the cumulatve sum
66  * of those "2" as its output. Since the start value of BigIntegerAdd
67  * is one, the resulting sequence is made of all odd numbers. */
68  Cumulate counter = new Cumulate(
69  new CumulativeFunction<BigInteger>(BigIntegerAdd.instance));
70  Connector.connect(pump, counter);
71 
72  /* The events output from the counter are duplicated along two paths. */
73  Fork fork1 = new Fork(2);
74  Connector.connect(counter, fork1);
75 
76  /* Along the first path, the numbers are checked for primality. */
77  ApplyFunction prime_check = new ApplyFunction(IsPrime.instance);
78  Connector.connect(fork1, LEFT, prime_check, INPUT);
79 
80  /* Along the second path, we feed a filter and use the primality
81  * verdict as the filtering condition. What comes out of the filter
82  * are only prime numbers. */
83  Filter filter = new Filter();
84  Connector.connect(fork1, RIGHT, filter, LEFT);
85  Connector.connect(prime_check, OUTPUT, filter, RIGHT);
86 
87  /* We fork the output of the filter, just so that we can
88  * print what comes out of it. */
89  Fork fork2 = new Fork(2);
90  Connector.connect(filter, fork2);
91  Print print = new Print();
92  Connector.connect(fork2, LEFT, print, INPUT);
93 
94  /* We convert BigIntegers to Strings, and push them across the network
95  * to Machine B using the HttpUpstreamGateway. */
96  ApplyFunction int_to_string =
97  new ApplyFunction(BigIntegerToString.instance);
98  HttpUpstreamGateway up_gateway = new HttpUpstreamGateway(push_url);
99  Connector.connect(fork2, RIGHT, int_to_string, INPUT);
100  Connector.connect(int_to_string, up_gateway);
101  ///
102 
103  /* All set! We are ready to start the gateway, and repeatedly push
104  * primes to Machine B. */
105  System.out.println("This is Machine A. Press Enter to start pushing numbers to Machine B.");
106  UtilityMethods.readLine();
107  System.out.println("Let's go! Pushing prime numbers to " + push_url);
108  System.out.println("Press Ctrl+C to stop.");
109  up_gateway.start();
110  pump.start();
111  }
112 }
Send events from one processor to another over a network.
Contains a few utility functions for manipulating Java&#39;s BigInteger objects.
Use HTTP gateways in push mode.
The code for Machine A in the twin prime example.
Compute twin primes by distributing the computation across two machines over a network.