-
Notifications
You must be signed in to change notification settings - Fork 526
Cpp OTF User Guide
Some applications, such as network sniffers, need to process messages dynamically and thus have to use the Intermediate Representation to decode the messages on-the-fly (OTF). An example of using the OTF API can be found here. Doxygen documentation for the C++ OTF API can be found in the header files here. You will need to have doxygen installed to build the doc.
The C++ OTF decoder follows the design principles of the generated codec stubs.
Note: Due to the dynamic nature of OTF decoding, the stubs generated by the SBE compiler will yield greater relative performance.
The C++ OTF decoder for SBE is concerned with being able to take a piece of data and a schema and being able to decode it, well, on-the-fly. The decoder uses a reactive, Rx, style API to accomplish this. Applications subclass callback interfaces, such as OnNext
, OnError
, and OnComplete
to be notified of decoding actions. Listener
is a decoding engine object. It is configured with the data to decode as well as the schema. The schema is represented in a "compiled" form called Intermediate Representation, or IR.
Instances of the Listener
class may be reused for different buffers as well as different Ir. The methods use a fluent style for composition.
The basic usage pattern is:
- Instantiate a
Listener
. This can be on the stack or vianew
, etc. - Set the
Ir
to use for the decoding that describes the data format - Pass in a pointer to the start of the data along with the length of the data in bytes
- Subscribe callbacks for
Field
,Group
,Error
, andOnCompleted
events. When called,Listener::subscribe
initiates the decoding of the message. Thus all callbacks come from the calling thread.
This is demonstrated in the example below.
Listener listener(); // instantiate a decoder
Ir ir(irBuffer, irLen); // create an Ir object for the format based on Ir in buffer
listener.resetForDecode(buffer, len) // get ready to decode data located at buffer for len bytes
.ir(ir) // pass in the Ir to use
.subscribe(...); // subscribe callbacks and initiate decoding
A more advanced usage pattern is when a header is used to dispatch to a set of different formats for the data. This is accomplished using the Listener::dispatchMessageByHeader
method. An example is below.
Listener listener();
Ir headerIr(headerIrBuffer, headerIrLen); // the Ir for the header
listener.resetForDecode(buffer, len) // get ready to decode data at buffer for len bytes
.dispatchByMessageHeader(headerIr, // the Ir of the header
irCallback) // the callback called for dispatch choices
.subscribe(...);
Decoding multiple messages in a single buffer is straight forward. Simply bump the pointer to the data by the offset of the Listener
after it is done and reuse the decoder. The Listener
keeps track of its current offset within the buffer and this offset can be retrieved via Listener::bufferOffset
.
Listener listener();
Ir headerIr(headerIrBuffer, headerIrLen);
listener.resetForDecode(buffer, len)
.dispatchByMessageHeader(headerIr, irCallback)
.subscribe(...); // go ahead and decode single message header plus message and return
listener.resetForDecode(buffer + listener.bufferOffset(), len - listener.bufferOffset())
.dispatchByMessageHeader(headerIr, irCallback)
.subscribe(...); // go ahead and decode single message header plus message and return
As fields and repeating groups are encountered in a message, the callbacks passed into Listener::subscribe(OnNext *, OnError *, OnCompleted *)
will be called. Fields will be seen via the OnNext::onNext(const Field &)
method. Group event, such as group start and end, will be seen via the OnNext::onNext(const Group &)
method. Errors that occur will be seen via the OnError::onError(const Error &)
method. Errors stop decoding of the current message. If a message is successfully completed, then a completion event will be seen via the OnCompleted::onCompleted(void)
method. For Listener::subscribe
, the OnError
and OnCompleted
arguments are optional and default to NULL
. Below is a simple example.
class ExampleCallback : public OnNext, OnError, OnCompleted
{
virtual int onNext(const Field &f)
{
... // handle Fields
return 0; // 0 for success and -1 for failure
}
virtual int onNext(const Group &g)
{
... // handle Group event
return 0; // 0 for success and -1 for failure
}
virtual int onError(const Error &e)
{
... // handle error
return 0; // 0 for success and -1 for failure
}
virtual int onCompleted(void)
{
... // handle completion event
return 0; // 0 for success and -1 for failure
}
};
...
ExampleCallback cbs;
// set up listener and kick off decoding with subscribe
listener.dispatchMessageByHeader(...)
.resetForDecode(...)
.subscribe(&cbs, &cbs, &cbs);
During decoding a Listener will call OnNext::onNext(const Field &)
and pass encountered fields to the application. These fields may be of varying types, including composites (or structs), enumerations, bit sets, or variable length data. All of these types may be accessed via the Field
class.
For details of the Field
class, see the header file here or the doxygen doc. Below is an example of accessing various field methods, types, etc. from the example.
class CarCallbacks : public OnNext
{
public:
// callback for when a field is encountered
virtual int onNext(const Field &f)
{
std::cout << "Field name=\"" << f.fieldName() << "\" id=" << f.schemaId();
if (f.isComposite())
{
std::cout << ", composite name=\"" << f.compositeName() << "\"";
}
std::cout << std::endl;
if (f.isEnum())
{
std::cout << " Enum [" << f.validValue() << "]";
printEncoding(f, 0); // print the encoding. Index is 0.
}
else if (f.isSet())
{
std::cout << " Set ";
// print the various names for the bits that are set
for (std::vector<std::string>::iterator it = ((std::vector<std::string>&)f.choices()).begin(); it != f.choices().end(); ++it)
{
std::cout << "[" << *it << "]";
}
printEncoding(f, 0); // print the encoding. Index is 0.
}
else if (f.isVariableData())
{
// index 0 is the length field type, value, etc.
// index 1 is the actual variable length data
std::cout << " Variable Data length=" << f.length(1);
char tmp[256];
f.getArray(1, tmp, 0, f.length(1)); // copy the data
std::cout << " value=\"" << std::string(tmp, f.length(1)) << "\"";
std::cout << " presence=" << presenceStr(f.presence(1));
std::cout << std::endl;
}
else // if not enum, set, or var data, then just normal encodings, but could be composite
{
for (int i = 0, size = f.numEncodings(); i < size; i++)
{
printEncoding(f, i);
}
}
return 0;
};
protected:
// print out details of an encoding
void printEncoding(const Field &f, int index)
{
std::cout << " name=\"" << f.encodingName(index) << "\" length=" << f.length(index);
switch (f.primitiveType(index))
{
case Ir::CHAR:
if (f.length(index) == 1)
{
std::cout << " type=CHAR value=\"" << (char)f.getUInt(index) << "\"";
}
else
{
char tmp[1024];
// copy data to temp array and print it out.
f.getArray(index, tmp, 0, f.length(index));
std::cout << " type=CHAR value=\"" << std::string(tmp, f.length(index)) << "\"";
}
break;
case Ir::INT8:
std::cout << " type=INT8 value=\"" << f.getInt(index) << "\"";
break;
case Ir::INT16:
std::cout << " type=INT16 value=\"" << f.getInt(index) << "\"";
break;
case Ir::INT32:
if (f.length() == 1)
{
std::cout << " type=INT32 value=\"" << f.getInt(index) << "\"";
}
else
{
char tmp[1024];
// copy data to temp array and print it out.
f.getArray(index, tmp, 0, f.length(index));
std::cout << " type=INT32 value=";
for (int i = 0, size = f.length(index); i < size; i++)
{
std::cout << "{" << *((int32_t *)(tmp + (sizeof(int32_t) * i))) << "}";
}
}
break;
case Ir::INT64:
std::cout << " type=INT64 value=\"" << f.getInt(index) << "\"";
break;
case Ir::UINT8:
std::cout << " type=UINT8 value=\"" << f.getUInt(index) << "\"";
break;
case Ir::UINT16:
std::cout << " type=UINT16 value=\"" << f.getUInt(index) << "\"";
break;
case Ir::UINT32:
std::cout << " type=UINT32 value=\"" << f.getUInt(index) << "\"";
break;
case Ir::UINT64:
std::cout << " type=UINT64 value=\"" << f.getUInt(index) << "\"";
break;
case Ir::FLOAT:
std::cout << " type=FLOAT value=\"" << f.getDouble(index) << "\"";
break;
case Ir::DOUBLE:
std::cout << " type=DOUBLE value=\"" << f.getDouble(index) << "\"";
break;
default:
break;
}
std::cout << " presence=" << presenceStr(f.presence(index));
std::cout << std::endl;
}
// print presence
const char *presenceStr(Ir::TokenPresence presence)
{
switch (presence)
{
case Ir::REQUIRED:
return "REQUIRED";
break;
case Ir::OPTIONAL:
return "OPTIONAL";
break;
case Ir::CONSTANT:
return "CONSTANT";
break;
default:
return "UNKNOWN";
break;
}
}
};
Groups are markers in the event sequence of calls to OnNext::onNext
. Groups contain fields. When a group starts, OnNext::onNext(const Group &)
is called with a Group::Event
type of Group::START
, the name of the group, the iteration number (starting at 0), and the expected number of iterations. After that, a set of calls to OnNext(const Field &)
should occur. A group is ended by a call to OnNext::onNext(const Group &)
with a Group::Event
type of Group::END
. Nested repeating groups are handled as one would expect with Group::START
and Group::END
within an existing Group sequence.
For details of the Group
class, see the header file here or the doxygen doc. Below is an example of usage from the example.
class CarCallbacks : public OnNext
{
public:
// save reference to listener for printing offset
CarCallbacks(Listener &listener) : listener_(listener) , indent_(0) {};
// callback for when a group is encountered
virtual int onNext(const Group &g)
{
// group started
if (g.event() == Group::START)
{
std::cout << "Group name=\"" << g.name() << "\" id=\"" << g.schemaId() << "\" start (";
std::cout << g.iteration() << "/" << g.numInGroup() - 1 << "):" << "\n";
if (g.iteration() == 1)
{
indent_++;
}
}
else if (g.event() == Group::END) // group ended
{
std::cout << "Group name=\"" << g.name() << "\" id=\"" << g.schemaId() << "\" end (";
std::cout << g.iteration() << "/" << g.numInGroup() - 1 << "):" << "\n";
if (g.iteration() == g.numInGroup() - 1)
{
indent_--;
}
}
return 0;
}
private:
Listener &listener_;
int indent_;
};
Decoding of a message stops when an error is encountered, such as the length of the buffer being too short,
or the message is completed successfully. The former is signaled via the OnError::onError
method. And the latter is signaled via the OnCompleted::onCompleted
method. An example of usage taken from the example is below.
class CarCallbacks : public OnError, public OnCompleted
{
public:
// save reference to listener for printing offset
CarCallbacks(Listener &listener) : listener_(listener) , indent_(0) {};
// callback for when an error is encountered
virtual int onError(const Error &e)
{
std::cout << "Error " << e.message() << " at offset " << listener_.bufferOffset() << "\n";
return 0;
};
// callback for when decoding is completed
virtual int onCompleted()
{
std::cout << "Completed" << "\n";
return 0;
};
private:
Listener &listener_;
int indent_;
};
A convenience collection object for IR is provided via the IrCollection
class. This class can read in a serialized IR file, created via SbeTool
and provides it ready to go for decoding automatically. An example of usage is below. For more details, please see the header or the doxygen documentation.
// class to encapsulate Ir repository as well as Ir callback for dispatch
class IrRepo : public IrCollection, public Ir::Callback
{
public:
// save a reference to the Listener so we can print out the offset
IrRepo(Listener &listener) : listener_(listener) {};
virtual Ir *irForTemplateId(const int templateId, const int templateVersion)
{
std::cout << "Message lookup id=" << templateId << " version " << templateVersion << " offset " << listener_.bufferOffset() << std::endl;
// lookup in IrCollection the IR for the template ID and version
return (Ir *)IrCollection::message(templateId, templateVersion);
};
private:
Listener &listener_;
};
Listener listener;
IrRepo repo(listener);
CarCallbacks carCbs(listener);
// load IR from .sbeir file
if (repo.loadFromFile(irFilename) < 0)
{
std::cout << "could not load IR" << std::endl;
exit(-1);
}
// load data for header + message into a buffer
// set up listener and kick off decoding with subscribe
listener.dispatchMessageByHeader(repo.header(), &repo)
.resetForDecode(buffer, length)
.subscribe(&carCbs, &carCbs, &carCbs);