0.11.4
ca.uqac.lif.cep.SynchronousProcessor Class Referenceabstract

Detailed Description

Performs a computation on input events to produce output events.

This is the direct descendant of Processor, and probably the one you'll want to inherit from when creating your own processors. While Processor takes care of input and output queues, SynchronousProcessor also implements Pullables and Pushables. These take care of collecting input events, waiting until one new event is received from all input traces before triggering the computation, pulling and buffering events from all outputs when either of the Pullables is being called, etc.

The only thing that is left undefined is what to do when new input events have been received from all input traces. This is the task of abstract method compute(Object[], Queue), which descendants of this class must implement.

In early versions of the library, this class was called SingleProcessor.

Author
Sylvain Hallé
Since
0.1

Definition at line 50 of file SynchronousProcessor.java.

Classes

class  InputPushable
 Implementation of a Pushable for a single processor. More...
class  OutputPullable
 Implementation of a Pullable for a single processor. More...

Public Member Functions

 SynchronousProcessor (int in_arity, int out_arity)
 Initializes a processor.
Pushable getPushableInput (int index)
 Returns the Pushable corresponding to the processor's i-th input trace.
Pullable getPullableOutput (int index)
 Returns the Pullable corresponding to the processor's i-th output trace.
final SynchronousProcessor setEventTracker (EventTracker tracker)
 Associates an event tracker to this processor.
Public Member Functions inherited from ca.uqac.lif.cep.Processor
 Processor (int in_arity, int out_arity)
 Initializes a processor.
final Object getContext (String key)
 Retrieves an object from the processor's context.
Context getContext ()
 Gets the context associated to this object.
void setContext (String key, Object value)
 Adds an object to the object's context.
void setContext (Context context)
 Adds a complete context to this object.
final int hashCode ()
 Implementation of hashCode() specific to processors.
final boolean equals (Object o)
 Implementation of equals() specific to processors.
final int getId ()
 Fetches the processor instance's unique ID.
void reset ()
 Resets the processor.
final Pushable getPushableInput ()
 Returns the Pushable corresponding to the processor's first input trace.
final Pullable getPullableOutput ()
 Returns the Pullable corresponding to the processor's first output trace.
void setPullableInput (int i, Pullable p)
 Assigns a Pullable to the processor's i-th input.
Pullable getPullableInput (int i)
 Returns the Pullable corresponding to the processor's i-th input.
void setPushableOutput (int i, Pushable p)
 Assigns a Pushable to the processor's i-th output.
Pushable getPushableOutput (int i)
 Retrieves the Pushable associated to the processor's i-th output.
final int getInputArity ()
 Returns the processor's input arity.
final int getOutputArity ()
 Returns the processor's output arity.
void duplicateInto (Processor p)
 Copies the contents and state of the current processor into another.
final Set< Class<?> > getInputType (int index)
 Gets the type of events the processor accepts for its i-th input trace.
void getInputTypesFor (Set< Class<?> > classes, int index)
 Populates the set of classes accepted by the processor for its i-th input.
Class<?> getOutputType (int index)
 Returns the type of the events produced by the processor for its i-th output.
void start ()
 Starts the processor.
void stop ()
 Stops the processor.
final EventTracker getEventTracker ()
 Gets the instance of event tracker associated to this processor.
void associateToInput (int in_stream_index, int in_stream_pos, int out_stream_index, int out_stream_pos)
 Associates an input event to an output event.
void associateTo (NodeFunction f, int out_stream_index, int out_stream_pos)
 Associates a node function to a particular event of processor's output stream.
void associateToOutput (int in_stream_index, int in_stream_pos, int out_stream_index, int out_stream_pos)
 Associates an input event to an output event.
final int getInputCount ()
 Gets the number of event fronts received so far by this processor.
final int getOutputCount ()
 Gets the number of event fronts produced so far by this processor.
final Object print (ObjectPrinter<?> printer) throws ProcessorException
 Prints the contents of this processor into an object printer.
final Processor read (ObjectReader<?> reader, Object o) throws ProcessorException
 Reads the content of a processor from a serialized object.
final Processor duplicate ()
 Duplicates an object and sets it to its initial state.
void copyInputQueue (int index, Collection< Object > to)
 Copies the content of one of the processor's input queue to a collection.
void copyOutputQueue (int index, Collection< Object > to)
 Copies the content of one of the processor's output queue to a collection.
abstract Processor duplicate (boolean with_state)
 Duplicates an object.
Processor or (Processor p)
 Connects the first output pipe of this processor to the first input pipe of another processor.
Processor or (CallAfterConnect c)
 Operates similar to or(Processor), but also calls a method after the connection has been established.
Processor or (SelectedInputPipe p)
 Connects the output at index 0 of the current processor to the input of another processor.
PipeSelector getAt (int index)
 Gets the PipeSelector object corresponding to the processor's input or output pipe for a given index.
Pushable rightShift (int index)
Pullable leftShift (int index)
Queue< Object > getInputQueue (int index)
 Gets the content of the processor's input queue at a given index.
Queue< Object > getOutputQueue (int index)
 Gets the content of the processor's output queue at a given index.

Protected Member Functions

abstract boolean compute (Object[] inputs, Queue< Object[]> outputs)
 Computes one or more output events from its input events.
Protected Member Functions inherited from ca.uqac.lif.cep.Processor
boolean allNotifiedEndOfTrace ()
 Determines if all the upstream pushables have sent the end of trace notification.
final Context newContext ()
 Creates a new empty context map.
boolean onEndOfTrace (Queue< Object[]> outputs) throws ProcessorException
 Allows to describe a specific behavior when the trace of input fronts has reached its end.
Object printState ()
 Produces an object that represents the state of the current processor.
Processor readState (Object o)
 Reads the state of a processor and uses it to create a new instance.

Protected Attributes

final transient Queue< Object[]> m_tempQueue
 A queue object that will be passed to the compute(Object[], Queue) method.
final transient Pushable[] m_inputPushables
 An array of input pushables.
transient Pullable[] m_outputPullables
 An array of output pullables.
Protected Attributes inherited from ca.uqac.lif.cep.Processor
int m_inputArity
 The processor's input arity, i.e.
int m_outputArity
 The processor's output arity, i.e.
transient Queue< Object >[] m_inputQueues
 An array of input event queues.
transient EventTracker m_eventTracker = null
 An object that keeps track of the relationship between input and output events.
transient Queue< Object >[] m_outputQueues
 An array of output event queues.
transient Pullable[] m_inputPullables
 An array of Pullables, one for each input trace this processor receives.
transient Pushable[] m_outputPushables
 An array of Pushables, one for each output trace this processor produces.
int m_inputCount = 0
 A counter incremented upon each input front processed.
int m_outputCount = 0
 A counter incremented upon each output front processed.
Context m_context = null
 The context in which the processor is instantiated.
boolean[] m_hasBeenNotifiedOfEndOfTrace
 Indicates whether the processor has been notified of the end of trace or not.
boolean m_notifiedEndOfTraceDownstream
 Indicates whether the processor has notified the end of the trace to the downstream processors it is connected to.

Additional Inherited Members

Static Public Member Functions inherited from ca.uqac.lif.cep.Processor
static boolean allNull (Object[] v)
 Checks if all objects in the array are null.
static Queue< Object[]> getEmptyQueue ()
 Gets an instance of an empty event queue.
static void startAll (Processor ... procs)
 Starts all processors given as an argument.
static void stopAll (Processor ... procs)
 Stops all processors given as an argument.
static List< ProvenanceNode > getLeaves (ProvenanceNode root)
 Gets the leaves of a provenance tree.
Static Public Attributes inherited from ca.uqac.lif.cep.Processor
static final transient String s_versionString = "0.11.2"
 A string used to identify the program's version.
static final transient int MAX_PULL_RETRIES = 10000000
 Number of times the Pullable#hasNext() method tries to produce an output from the input before giving up.
Static Protected Member Functions inherited from ca.uqac.lif.cep.Processor
static void getLeaves (ProvenanceNode root, List< ProvenanceNode > leaves)
 Accumulates the leaves of a provenance tree in a list.

Constructor & Destructor Documentation

◆ SynchronousProcessor()

ca.uqac.lif.cep.SynchronousProcessor.SynchronousProcessor ( int in_arity,
int out_arity )

Initializes a processor.

Parameters
in_arityThe input arity
out_arityThe output arity

Definition at line 76 of file SynchronousProcessor.java.

Member Function Documentation

◆ compute()

abstract boolean ca.uqac.lif.cep.SynchronousProcessor.compute ( Object[] inputs,
Queue< Object[]> outputs )
abstractprotected

Computes one or more output events from its input events.

Parameters
inputsAn array of input events; its length corresponds to the processor's input arity
outputsA queue of arrays of objects. The processor should push arrays into this queue for every output front it produces. The size of each array should be equal to the processor's output arity, although this is not enforced.
Returns
true if this processor may output other events in the future, false otherwise

Reimplemented in ca.uqac.lif.cep.Adder, ca.uqac.lif.cep.Doubler, ca.uqac.lif.cep.io.HttpGet, ca.uqac.lif.cep.io.Print, ca.uqac.lif.cep.io.ReadInputStream, ca.uqac.lif.cep.io.ReadLines, ca.uqac.lif.cep.io.ReadStringStream, ca.uqac.lif.cep.io.ReadTokens, ca.uqac.lif.cep.io.SpliceSource, ca.uqac.lif.cep.io.WriteOutputStream, ca.uqac.lif.cep.io.WriteToFile, ca.uqac.lif.cep.tmf.AbstractSlice, ca.uqac.lif.cep.tmf.BlackHole, ca.uqac.lif.cep.tmf.CallbackSink, ca.uqac.lif.cep.tmf.Decimate, ca.uqac.lif.cep.tmf.DetectEnd, ca.uqac.lif.cep.tmf.Filter, ca.uqac.lif.cep.tmf.FilterOn, ca.uqac.lif.cep.tmf.Insert, ca.uqac.lif.cep.tmf.KeepLast, ca.uqac.lif.cep.tmf.Pad, ca.uqac.lif.cep.tmf.Prefix, ca.uqac.lif.cep.tmf.QueueSink, ca.uqac.lif.cep.tmf.QueueSource, ca.uqac.lif.cep.tmf.QueueSourceBatch, ca.uqac.lif.cep.tmf.ResetLast, ca.uqac.lif.cep.tmf.SimpleFilter, ca.uqac.lif.cep.tmf.SinkLast, ca.uqac.lif.cep.tmf.Splice, ca.uqac.lif.cep.tmf.Stutter, ca.uqac.lif.cep.tmf.Trim, ca.uqac.lif.cep.tmf.VariableStutter, ca.uqac.lif.cep.tmf.Window, ca.uqac.lif.cep.tmf.WindowFunction, ca.uqac.lif.cep.UniformProcessor, ca.uqac.lif.cep.util.Bags.RunOn, ca.uqac.lif.cep.util.FindPattern, ca.uqac.lif.cep.util.Lists.Pack, ca.uqac.lif.cep.util.Lists.TimePack, ca.uqac.lif.cep.util.Lists.Unpack, and ca.uqac.lif.cep.util.Multiset.PutInto.

◆ getPullableOutput()

Pullable ca.uqac.lif.cep.SynchronousProcessor.getPullableOutput ( int index)

Returns the Pullable corresponding to the processor's i-th output trace.

Parameters
indexThe index. Should be between 0 and the processor's output arity - 1 (since indices start at 0).
Returns
The pullable if the index is within the appropriate range, null otherwise.

Reimplemented from ca.uqac.lif.cep.Processor.

Reimplemented in ca.uqac.lif.cep.io.ReadInputStream, ca.uqac.lif.cep.UniformProcessor, and ca.uqac.lif.cep.util.Lists.TimePack.

Definition at line 95 of file SynchronousProcessor.java.

◆ getPushableInput()

Pushable ca.uqac.lif.cep.SynchronousProcessor.getPushableInput ( int index)

Returns the Pushable corresponding to the processor's i-th input trace.

Parameters
indexThe index. Should be between 0 and the processor's input arity - 1 (since indices start at 0).
Returns
The pushable if the index is within the appropriate range. Outside of the range,

Reimplemented from ca.uqac.lif.cep.Processor.

Reimplemented in ca.uqac.lif.cep.UniformProcessor.

Definition at line 85 of file SynchronousProcessor.java.

◆ setEventTracker()

final SynchronousProcessor ca.uqac.lif.cep.SynchronousProcessor.setEventTracker ( EventTracker tracker)

Associates an event tracker to this processor.

Parameters
trackerThe event tracker, or null to remove the association to an existing tracker
Returns
This processor

Reimplemented from ca.uqac.lif.cep.Processor.

Definition at line 583 of file SynchronousProcessor.java.

Member Data Documentation

◆ m_inputPushables

final transient Pushable [] ca.uqac.lif.cep.SynchronousProcessor.m_inputPushables
protected

An array of input pushables.

Definition at line 61 of file SynchronousProcessor.java.

◆ m_outputPullables

transient Pullable [] ca.uqac.lif.cep.SynchronousProcessor.m_outputPullables
protected

An array of output pullables.

Definition at line 66 of file SynchronousProcessor.java.

◆ m_tempQueue

final transient Queue<Object[]> ca.uqac.lif.cep.SynchronousProcessor.m_tempQueue
protected

A queue object that will be passed to the compute(Object[], Queue) method.

Definition at line 56 of file SynchronousProcessor.java.


The documentation for this class was generated from the following file: