Skip to content

Writing your own Stage

Nelson Tavares de Sousa edited this page Sep 18, 2015 · 1 revision

Writing a Stage/Filter

What Base Class Should I Extend?

When writing an own stage, you should know a few abstract classes that could save you some work.

Inherit from AbstractConsumerStage, if you want to write a stage that always needs to consume elements from its input ports to make progress. In this way, you get a stage with a pre-initialized consumer semantics. Moreover, you can access a default input port named inputPort. Example consumer stages are Counter, Cache, and Delay.

Inherit from AbstractProducerStage, if you want to write a stage that does not need to have any input ports. In this way, you get a stage with a pre-initialized producer semantics. Moreover, you can access a default output port named outputPort. Note: You need to place a terminate() call within your execute method to signal the framework when your producer stage has finished its job. Example producer stages are Clock and InitialElementProducer.

Inherit from AbstractStage, if you want to write a stage that should have neither a pre-initialized semantics nor any default input and output ports. An example stage is the Merger.

What Code Shall I Write?

You only need to implement the logics your stage should represent in the execute() method. If you need an input or an output port, declare it as class attribute by using the createInputPort() and createOutputPort() methods. If your stage should have an internal state, follow these instructions.

For example, consider the InstanceOfFilter below. It relays the element from its default input port (got from its super class AbstractConsumerStage) to its self-declared output port if the element is an instance of the class represented by type. The logics is implemented in the execute() method.

public class InstanceOfFilter<I, O> extends AbstractConsumerStage<I> {

	private final OutputPort<O> outputPort = this.createOutputPort();

	private Class<O> type;

	public InstanceOfFilter(final Class<O> type) {
		this.type = type;
	}

	@SuppressWarnings("unchecked")
	@Override
	protected void execute(final I element) {
		if (this.type.isInstance(element)) {
			this.outputPort.send((O) element);
		} else { // swallow up the element
			if (this.logger.isDebugEnabled()) {
				this.logger.info("element is not an instance of " + this.type.getName() + ", but of " + element.getClass());
			}
		}
	}

	public Class<O> getType() {
		return this.type;
	}

	public void setType(final Class<O> type) {
		this.type = type;
	}

	public OutputPort<O> getOutputPort() {
		return this.outputPort;
	}

}

Clone this wiki locally