Skip to content

Writing your own Pipe

Nelson Tavares de Sousa edited this page Sep 18, 2015 · 1 revision

Writing your own Pipe

If the pipe implementations we provide (see available pipe implementations) do not suit your requirements, you can implement one by your own.

You only need to implement a pipe and a corresponding pipe factory.

Implementing a Pipe

A pipe must conform to the interface IPipe that defines methods for handling two distinct entities: elements and signals.

An element is the entity that is consumed and processed from one stage and sent to another stage.

A signal is triggered from the first stage in the analysis and passed through all other ones. For example, when starting an analysis, a start signal is triggered so that each stage can initialize itself before beginning to consume input and to produce output elements. To save you some work, we provide an AbstractPipe that already implements a few methods.

Below, we sketch the implementation of SingleElementPipe that can store one single element at a time. It extends AbstractPipe and provides the implementation to add and remove one element. Moreover, it offers an implementation to react on signals.

public class SingleElementPipe<T> extends AbstractPipe<T> {

	private T element;

	@Override
	public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
		sourcePort.setPipe(this);
		targetPort.setPipe(this);
		sourcePort.setCachedTargetStage(targetPort.getOwningStage());
	}

	@Override
	public boolean add(final T element) {
		this.element = element;
		return true;
	}

	@Override
	public T removeLast() {
		T temp = this.element;
		this.element = null;
		return temp;
	}

	@Override
	public boolean isEmpty() {
		return this.element == null;
	}

	@Override
	public T readLast() {
		return this.element;
	}

	@Override
	public int size() {
		return (this.element == null) ? 0 : 1;
	}

	@Override
	public void setSignal(final Signal signal) {
		if (this.getTargetPort() != null) {
			this.getTargetPort().getOwningStage().onSignal(signal, this.getTargetPort());
		}
	}

}

Implementing a Pipe Factory

What Base Interface Should I Implement?

A new pipe factory needs to implement the interface IPipeFactory that describes

  • how to create the particular pipe implementation and
  • when to use it, i.e, in which situations. Below, we show the pipe factory for the SingleElementPipe.
public class SingleElementPipeFactory implements IPipeFactory {

	@Override
	public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
		return this.create(sourcePort, targetPort, 1);
	}

	/**
	 * Hint: The capacity for this pipe implementation is ignored.
	 * <p>
	 * {@inheritDoc}
	 */
	@Override
	public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
		return new SingleElementPipe(sourcePort, targetPort);
	}

	@Override
	public ThreadCommunication getThreadCommunication() {
		return ThreadCommunication.INTRA;
	}

	@Override
	public PipeOrdering getOrdering() {
		return PipeOrdering.ARBITRARY;
	}

	@Override
	public boolean isGrowable() {
		return false;
	}

}

If you have written a new pipe factory, you need to register it to the framework. Create a new file called pipe-factories.conf, add it to the classpath, and insert a new line with the pipe factory's fully qualified name. An example pipe-factories.conf is shown below.

teetime.framework.pipe.SingleElementPipeFactory
teetime.framework.pipe.OrderedGrowableArrayPipeFactory
teetime.framework.pipe.UnorderedGrowablePipeFactory
teetime.framework.pipe.SpScPipeFactory

Why Do I Need to Provide a Pipe Factory?

A pipe factory allows the framework user to define only the requirements on the pipe he or she wants to create. Consider the following two pipe instantiations:

IPipe pipe = new SpScPipe();
IPipe pipe = PipeFactoryRegistry.INSTANCE.create(ThreadCommunication.INTER, PipeOrdering.QUEUE, false, 1);

The first line shows a direct instantiation of a particular pipe implementation whose semantics is not obivious. It could be thread-safe or thread-unsafe; it could be ordered like a queue or like a stack; perhaps it could only contain one single element at a time which would lead the order question ad absurdum.

Thus, we follow another pipe instantiation approach that focus on the qualitative questions we have just asked ourselves for the SpScPipe implementation. Consider the second line of the example above. We now do not instantiate a particular pipe implementation, but invoke the create method of the PipeFactoryRegistry singleton instance. We pass the requirements we have on our desired pipe and get an instance of a registered pipe implementation that conforms to these requirements.

Of course, we now do not know anymore what pipe implementation is chosen, but to be honest we do not want to know at all. We only want to get a pipe instance that implements our requirements.

Clone this wiki locally