0.10.8-alpha
ca.uqac.lif.cep.GroupProcessor Class Reference

Encapsulates a chain of processors as if it were a single one. More...

Classes

class  CopyCrawler
 Crawler that creates copies (clones) of whatever it encounters on its way and re-pipes processors as in the original group. More...
 
class  ProcessorAssociation
 Tuple made of a number and a processor. More...
 
class  ProxyPullable
 
class  ProxyPushable
 

Public Member Functions

 GroupProcessor (int in_arity, int out_arity)
 Crate a group processor. More...
 
GroupProcessor notifySources (boolean b)
 Sets the processor to notify the QueueSource objects in the group to push an event when a call to push is made on the group. More...
 
EventTracker getInnerTracker ()
 Gets the tracker instance for the processors contained in this group. More...
 
synchronized GroupProcessor addProcessor (Processor p)
 Adds a processor to the group. More...
 
synchronized GroupProcessor addProcessors (Processor ... procs)
 Adds multiple processors to the group. More...
 
synchronized GroupProcessor associateInput (int i, Processor p, int j)
 Declares that the i-th input of the group is linked to the j-th input of processor p More...
 
synchronized GroupProcessor associateOutput (int i, Processor p, int j)
 Declares that the i-th output of the group is linked to the j-th output of processor p. More...
 
synchronized ProxyPushable getPushableInput (int index)
 
synchronized Pullable getPullableOutput (int index)
 
final synchronized void setPullableInput (int i, Pullable p)
 
final synchronized void setPushableOutputAssociation (int i, Processor p, int j)
 
final synchronized void setPushableOutput (int i, Pushable p)
 
final synchronized void setPullableInputAssociation (int i, Processor p, int j)
 
final synchronized void setPushableInput (int i, Pushable p)
 Sets an input pushable for this processor. More...
 
final synchronized void setPullableOutput (int i, Pullable p)
 Sets an output pullable for this processor. More...
 
final synchronized Pushable getPushableOutput (int index)
 
final synchronized Pullable getPullableInput (int index)
 
synchronized Map< Integer, ProcessorcloneInto (GroupProcessor group, boolean with_state)
 Clones the contents of the current GroupProcessor into a new group. More...
 
synchronized GroupProcessor duplicate (boolean with_state)
 Duplicates an object. More...
 
synchronized void setContext (Context context)
 Adds a complete context to this object. More...
 
synchronized void setContext (String key, Object value)
 Adds an object to the object's context. More...
 
synchronized void start ()
 
synchronized void stop ()
 
Processor getAssociatedInput (int index)
 Gets the processor associated to the i-th input of the group. More...
 
int getGroupInputIndex (int id, int pipe_index)
 Gets the index of the group's input pipe associated to the inner processor with given ID and pipe index. More...
 
int getAssociatedInputIndex (int index)
 Gets the input stream index of the processor associated to the i-th input of the group. More...
 
Processor getAssociatedOutput (int index)
 Gets the processor associated to the i-th output of the group. More...
 
int getAssociatedOutputIndex (int index)
 Gets the output stream index of the processor associated to the i-th output of the group. More...
 
boolean onEndOfTrace (Queue< Object[]> outputs)
 
Object printState ()
 
GroupProcessor readState (Object o) throws ProcessorException
 
void reset ()
 
final Processor setEventTracker (EventTracker tracker)
 
Object getState ()
 Gets the token corresponding to the processor's internal state. More...
 
- Public Member Functions inherited from ca.uqac.lif.cep.Processor
 Processor (int in_arity, int out_arity)
 Initializes a processor. More...
 
final synchronized Object getContext (String key)
 Retrieves an object from the processor's context. More...
 
synchronized Context getContext ()
 Gets the context associated to this object. More...
 
synchronized void setContext (String key, Object value)
 Adds an object to the object's context. More...
 
synchronized void setContext (Context context)
 Adds a complete context to this object. More...
 
final int hashCode ()
 Implementation of hashCode() specific to processors. More...
 
final boolean equals (Object o)
 Implementation of equals() specific to processors. More...
 
final int getId ()
 Fetches the processor instance's unique ID. More...
 
synchronized void reset ()
 Resets the processor. More...
 
abstract Pushable getPushableInput (int index)
 Returns the Pushable corresponding to the processor's i-th input trace. More...
 
final synchronized Pushable getPushableInput ()
 Returns the Pushable corresponding to the processor's first input trace. More...
 
abstract Pullable getPullableOutput (int index)
 Returns the Pullable corresponding to the processor's i-th output trace. More...
 
final synchronized Pullable getPullableOutput ()
 Returns the Pullable corresponding to the processor's first output trace. More...
 
synchronized void setPullableInput (int i, Pullable p)
 Assigns a Pullable to the processor's i-th input. More...
 
synchronized Pullable getPullableInput (int i)
 Returns the Pullable corresponding to the processor's i-th input. More...
 
synchronized void setPushableOutput (int i, Pushable p)
 Assigns a Pushable to the processor's i-th output. More...
 
synchronized Pushable getPushableOutput (int i)
 Retrieves the Pushable associated to the processor's i-th output. More...
 
final int getInputArity ()
 Returns the processor's input arity. More...
 
final int getOutputArity ()
 Returns the processor's output arity. More...
 
void duplicateInto (Processor p)
 Copies the contents and state of the current processor into another. More...
 
final Set< Class<?> > getInputType (int index)
 Gets the type of events the processor accepts for its i-th input trace. More...
 
void getInputTypesFor (Set< Class<?>> classes, int index)
 Populates the set of classes accepted by the processor for its i-th input. More...
 
Class<?> getOutputType (int index)
 Returns the type of the events produced by the processor for its i-th output. More...
 
void start ()
 Starts the processor. More...
 
void stop ()
 Stops the processor. More...
 
final EventTracker getEventTracker ()
 Gets the instance of event tracker associated to this processor. More...
 
Processor setEventTracker (EventTracker tracker)
 Associates an event tracker to this processor. More...
 
void associateToInput (int in_stream_index, int in_stream_pos, int out_stream_index, int out_stream_pos)
 Associates an input event to an output event. More...
 
void associateTo (NodeFunction f, int out_stream_index, int out_stream_pos)
 Associates a node function to a particular event of processor's output stream. More...
 
void associateToOutput (int in_stream_index, int in_stream_pos, int out_stream_index, int out_stream_pos)
 Associates an input event to an output event. More...
 
final int getInputCount ()
 Gets the number of event fronts received so far by this processor. More...
 
final int getOutputCount ()
 Gets the number of event fronts produced so far by this processor. More...
 
final Object print (ObjectPrinter<?> printer) throws ProcessorException
 Prints the contents of this processor into an object printer. More...
 
final Processor read (ObjectReader<?> reader, Object o) throws ProcessorException
 Reads the content of a processor from a serialized object. More...
 
final Processor duplicate ()
 Duplicates an object and sets it to its initial state. More...
 
void copyInputQueue (int index, Collection< Object > to)
 Copies the content of one of the processor's input queue to a collection. More...
 
void copyOutputQueue (int index, Collection< Object > to)
 Copies the content of one of the processor's output queue to a collection. More...
 
abstract Processor duplicate (boolean with_state)
 Duplicates an object. More...
 
Processor or (Processor p)
 Connects the first output pipe of this processor to the first input pipe of another processor. More...
 
Processor or (Pushable p)
 Connects the output at index 0 of the current processor to the input of another processor. More...
 
Pushable getAt (int index)
 Gets the Pushable object corresponding to the processor's input pipe for a given index. More...
 

Protected Member Functions

 GroupProcessor ()
 No-args constructor. More...
 
synchronized void associateEndpoints (GroupProcessor group, Map< Integer, Processor > new_procs)
 Associates the endpoints of a new GroupProcessor like the ones in the current group. More...
 
- Protected Member Functions inherited from ca.uqac.lif.cep.Processor
boolean allNotifiedEndOfTrace ()
 Determines if all the upstream pushables have sent the end of trace notification. More...
 
final Context newContext ()
 Creates a new empty context map. More...
 
boolean onEndOfTrace (Queue< Object[]> outputs) throws ProcessorException
 Allows to describe a specific behavior when the trace of input fronts has reached its end. More...
 
Object printState ()
 Produces an object that represents the state of the current processor. More...
 
Processor readState (Object o)
 Reads the state of a processor and uses it to create a new instance. More...
 

Static Protected Member Functions

static synchronized Processor copyProcessor (Processor p, boolean with_state)
 Creates a copy of a processor. More...
 
- Static Protected Member Functions inherited from ca.uqac.lif.cep.Processor
static void getLeaves (ProvenanceNode root, List< ProvenanceNode > leaves)
 Accumulates the leaves of a provenance tree in a list. More...
 

Protected Attributes

EventTracker m_innerTracker
 An inner event tracker for the group. More...
 
- Protected Attributes inherited from ca.uqac.lif.cep.Processor
int m_inputArity
 The processor's input arity, i.e. More...
 
int m_outputArity
 The processor's output arity, i.e. More...
 
transient Queue< Object > [] m_inputQueues
 An array of input event queues. More...
 
transient EventTracker m_eventTracker = null
 An object that keeps track of the relationship between input and output events. More...
 
transient Queue< Object > [] m_outputQueues
 An array of output event queues. More...
 
transient Pullable [] m_inputPullables
 An array of Pullables, one for each input trace this processor receives. More...
 
transient Pushable [] m_outputPushables
 An array of Pushables, one for each output trace this processor produces. More...
 
int m_inputCount = 0
 A counter incremented upon each input front processed. More...
 
int m_outputCount = 0
 A counter incremented upon each output front processed. More...
 
Context m_context = null
 The context in which the processor is instantiated. More...
 
boolean [] m_hasBeenNotifiedOfEndOfTrace
 Indicates whether the processor has been notified of the end of trace or not. More...
 

Additional Inherited Members

- Static Public Member Functions inherited from ca.uqac.lif.cep.Processor
static boolean allNull (Object[] v)
 Checks if all objects in the array are null. More...
 
static Queue< Object[]> getEmptyQueue ()
 Gets an instance of an empty event queue. More...
 
static void startAll (Processor ... procs)
 Starts all processors given as an argument. More...
 
static void stopAll (Processor ... procs)
 Stops all processors given as an argument. More...
 
static List< ProvenanceNode > getLeaves (ProvenanceNode root)
 Gets the leaves of a provenance tree. More...
 
- Static Public Attributes inherited from ca.uqac.lif.cep.Processor
static final transient String s_versionString = "0.10.5"
 A string used to identify the program's version. More...
 
static final transient int MAX_PULL_RETRIES = 10000000
 Number of times the Pullable#hasNext() method tries to produce an output from the input before giving up. More...
 

Detailed Description

Encapsulates a chain of processors as if it were a single one.

Author
Sylvain Hallé
Since
0.1

Definition at line 41 of file GroupProcessor.java.

Constructor & Destructor Documentation

◆ GroupProcessor() [1/2]

ca.uqac.lif.cep.GroupProcessor.GroupProcessor ( int  in_arity,
int  out_arity 
)

Crate a group processor.

Parameters
in_arityThe input arity
out_arityThe output arity

Definition at line 96 of file GroupProcessor.java.

◆ GroupProcessor() [2/2]

ca.uqac.lif.cep.GroupProcessor.GroupProcessor ( )
protected

No-args constructor.

Used only for serialization and deserialization.

Definition at line 111 of file GroupProcessor.java.

Member Function Documentation

◆ addProcessor()

synchronized GroupProcessor ca.uqac.lif.cep.GroupProcessor.addProcessor ( Processor  p)

Adds a processor to the group.

Parameters
pThe processor to add
Returns
A reference to the current group processor

Definition at line 188 of file GroupProcessor.java.

◆ addProcessors()

synchronized GroupProcessor ca.uqac.lif.cep.GroupProcessor.addProcessors ( Processor ...  procs)

Adds multiple processors to the group.

Parameters
procsThe processors to add
Returns
A reference to the current group processor

Definition at line 209 of file GroupProcessor.java.

◆ associateEndpoints()

synchronized void ca.uqac.lif.cep.GroupProcessor.associateEndpoints ( GroupProcessor  group,
Map< Integer, Processor new_procs 
)
protected

Associates the endpoints of a new GroupProcessor like the ones in the current group.

Parameters
groupThe new group
new_procsAn association between processor IDs and processors

Definition at line 413 of file GroupProcessor.java.

◆ associateInput()

synchronized GroupProcessor ca.uqac.lif.cep.GroupProcessor.associateInput ( int  i,
Processor  p,
int  j 
)

Declares that the i-th input of the group is linked to the j-th input of processor p

Parameters
iThe number of the input of the group
pThe processor to connect to
jThe number of the input of processor p
Returns
A reference to the current group processor

Definition at line 238 of file GroupProcessor.java.

◆ associateOutput()

synchronized GroupProcessor ca.uqac.lif.cep.GroupProcessor.associateOutput ( int  i,
Processor  p,
int  j 
)

Declares that the i-th output of the group is linked to the j-th output of processor p.

Parameters
iThe number of the output of the group
pThe processor to connect to
jThe number of the output of processor p
Returns
A reference to the current group processor

Definition at line 257 of file GroupProcessor.java.

◆ cloneInto()

synchronized Map<Integer, Processor> ca.uqac.lif.cep.GroupProcessor.cloneInto ( GroupProcessor  group,
boolean  with_state 
)

Clones the contents of the current GroupProcessor into a new group.

Parameters
groupThe GroupProcessor to clone into. When the method is called, it is expected to be empty.
with_stateIt set to true, each processor in the new group has the same events in its input/output buffers as in the original. Otherwise, the queues are empty.
Returns
An association between IDs and the new processors that have been put into the group

Definition at line 369 of file GroupProcessor.java.

◆ copyProcessor()

static synchronized Processor ca.uqac.lif.cep.GroupProcessor.copyProcessor ( Processor  p,
boolean  with_state 
)
staticprotected

Creates a copy of a processor.

Parameters
pThe processor to copy. Nothing is changed on this processor.
with_stateIf set to true, the new copy has the same events in its input/output buffers as the original. Otherwise, the queues are empty.
Returns
The new processor

Definition at line 445 of file GroupProcessor.java.

◆ duplicate()

synchronized GroupProcessor ca.uqac.lif.cep.GroupProcessor.duplicate ( boolean  with_state)

Duplicates an object.

Optionally, set the object into the same state as the source object.

Parameters
with_stateSet to true to replicate the object's state, false to create a new copy in the initial state.
Returns
Another object

Implements ca.uqac.lif.cep.Duplicable.

Definition at line 465 of file GroupProcessor.java.

◆ getAssociatedInput()

Processor ca.uqac.lif.cep.GroupProcessor.getAssociatedInput ( int  index)

Gets the processor associated to the i-th input of the group.

Parameters
indexThe index
Returns
The processor, or null if no processor is associated to this index

Definition at line 819 of file GroupProcessor.java.

◆ getAssociatedInputIndex()

int ca.uqac.lif.cep.GroupProcessor.getAssociatedInputIndex ( int  index)

Gets the input stream index of the processor associated to the i-th input of the group.

Parameters
indexThe index
Returns
The index, or -1 if no processor is associated to this index

Definition at line 858 of file GroupProcessor.java.

◆ getAssociatedOutput()

Processor ca.uqac.lif.cep.GroupProcessor.getAssociatedOutput ( int  index)

Gets the processor associated to the i-th output of the group.

Parameters
indexThe index
Returns
The processor, or null if no processor is associated to this index

Definition at line 875 of file GroupProcessor.java.

◆ getAssociatedOutputIndex()

int ca.uqac.lif.cep.GroupProcessor.getAssociatedOutputIndex ( int  index)

Gets the output stream index of the processor associated to the i-th output of the group.

Parameters
indexThe index
Returns
The index, or -1 if no processor is associated to this index

Definition at line 893 of file GroupProcessor.java.

◆ getGroupInputIndex()

int ca.uqac.lif.cep.GroupProcessor.getGroupInputIndex ( int  id,
int  pipe_index 
)

Gets the index of the group's input pipe associated to the inner processor with given ID and pipe index.

Parameters
idThe ID of the inner processor
pipe_indexThe input pipe index of the inner processor
Returns
The index of the group's input pipe, or -1 if no such association exists

Definition at line 836 of file GroupProcessor.java.

◆ getInnerTracker()

EventTracker ca.uqac.lif.cep.GroupProcessor.getInnerTracker ( )

Gets the tracker instance for the processors contained in this group.

Returns
The tracker instance, or null if no inner tracker is set.
Since
0.11

Definition at line 135 of file GroupProcessor.java.

◆ getPullableInput()

final synchronized Pullable ca.uqac.lif.cep.GroupProcessor.getPullableInput ( int  index)

Definition at line 346 of file GroupProcessor.java.

◆ getPullableOutput()

synchronized Pullable ca.uqac.lif.cep.GroupProcessor.getPullableOutput ( int  index)

Definition at line 271 of file GroupProcessor.java.

◆ getPushableInput()

synchronized ProxyPushable ca.uqac.lif.cep.GroupProcessor.getPushableInput ( int  index)

Definition at line 265 of file GroupProcessor.java.

◆ getPushableOutput()

final synchronized Pushable ca.uqac.lif.cep.GroupProcessor.getPushableOutput ( int  index)

Definition at line 335 of file GroupProcessor.java.

◆ getState()

Object ca.uqac.lif.cep.GroupProcessor.getState ( )

Gets the token corresponding to the processor's internal state.

Returns
The token
Exceptions
UnsupportedOperationExceptionThrown if the internal state cannot be provided for some reason

Implements ca.uqac.lif.cep.Stateful.

Definition at line 1053 of file GroupProcessor.java.

◆ notifySources()

GroupProcessor ca.uqac.lif.cep.GroupProcessor.notifySources ( boolean  b)

Sets the processor to notify the QueueSource objects in the group to push an event when a call to push is made on the group.

Parameters
bSet to true to notify the sources
Returns
This group processor

Definition at line 124 of file GroupProcessor.java.

◆ onEndOfTrace()

boolean ca.uqac.lif.cep.GroupProcessor.onEndOfTrace ( Queue< Object[]>  outputs)

Definition at line 903 of file GroupProcessor.java.

◆ printState()

Object ca.uqac.lif.cep.GroupProcessor.printState ( )
Since
0.10.2

Definition at line 912 of file GroupProcessor.java.

◆ readState()

GroupProcessor ca.uqac.lif.cep.GroupProcessor.readState ( Object  o) throws ProcessorException
Since
0.10.2

Definition at line 956 of file GroupProcessor.java.

◆ reset()

void ca.uqac.lif.cep.GroupProcessor.reset ( )

Definition at line 1020 of file GroupProcessor.java.

◆ setContext() [1/2]

synchronized void ca.uqac.lif.cep.GroupProcessor.setContext ( Context  context)

Adds a complete context to this object.

Parameters
contextThe context to add

Implements ca.uqac.lif.cep.Contextualizable.

Definition at line 537 of file GroupProcessor.java.

◆ setContext() [2/2]

synchronized void ca.uqac.lif.cep.GroupProcessor.setContext ( String  key,
Object  value 
)

Adds an object to the object's context.

Parameters
keyThe key associated to that object
valueThe object

Implements ca.uqac.lif.cep.Contextualizable.

Definition at line 547 of file GroupProcessor.java.

◆ setEventTracker()

final Processor ca.uqac.lif.cep.GroupProcessor.setEventTracker ( EventTracker  tracker)

Definition at line 1034 of file GroupProcessor.java.

◆ setPullableInput()

final synchronized void ca.uqac.lif.cep.GroupProcessor.setPullableInput ( int  i,
Pullable  p 
)

Definition at line 277 of file GroupProcessor.java.

◆ setPullableInputAssociation()

final synchronized void ca.uqac.lif.cep.GroupProcessor.setPullableInputAssociation ( int  i,
Processor  p,
int  j 
)

Definition at line 295 of file GroupProcessor.java.

◆ setPullableOutput()

final synchronized void ca.uqac.lif.cep.GroupProcessor.setPullableOutput ( int  i,
Pullable  p 
)

Sets an output pullable for this processor.

Parameters
iThe index of the pullable
pThe pullable

Definition at line 322 of file GroupProcessor.java.

◆ setPushableInput()

final synchronized void ca.uqac.lif.cep.GroupProcessor.setPushableInput ( int  i,
Pushable  p 
)

Sets an input pushable for this processor.

Parameters
iThe position
pThe pushable

Definition at line 305 of file GroupProcessor.java.

◆ setPushableOutput()

final synchronized void ca.uqac.lif.cep.GroupProcessor.setPushableOutput ( int  i,
Pushable  p 
)

Definition at line 289 of file GroupProcessor.java.

◆ setPushableOutputAssociation()

final synchronized void ca.uqac.lif.cep.GroupProcessor.setPushableOutputAssociation ( int  i,
Processor  p,
int  j 
)

Definition at line 283 of file GroupProcessor.java.

◆ start()

synchronized void ca.uqac.lif.cep.GroupProcessor.start ( )

Definition at line 792 of file GroupProcessor.java.

◆ stop()

synchronized void ca.uqac.lif.cep.GroupProcessor.stop ( )

Definition at line 802 of file GroupProcessor.java.

Member Data Documentation

◆ m_innerTracker

EventTracker ca.uqac.lif.cep.GroupProcessor.m_innerTracker
protected

An inner event tracker for the group.

Definition at line 86 of file GroupProcessor.java.


The documentation for this class was generated from the following file: