Skip to content

Commit

Permalink
rec: add support for streaming NOD and UDR using dnstap
Browse files Browse the repository at this point in the history
  • Loading branch information
chbruyand authored and omoerbeek committed Oct 3, 2022
1 parent d7d9585 commit 9489e2b
Show file tree
Hide file tree
Showing 10 changed files with 344 additions and 28 deletions.
82 changes: 78 additions & 4 deletions pdns/pdns_recursor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,36 @@ bool isAllowNotifyForZone(DNSName qname)
return false;
}

#ifdef HAVE_FSTRM
#include "dnstap.hh"
#include "fstrm_logger.hh"

static bool isEnabledForNODs(const std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>>& fstreamLoggers)
{
if (fstreamLoggers == nullptr) {
return false;
}
for (auto& logger : *fstreamLoggers) {
if (logger->logNODs()) {
return true;
}
}
return false;
}
static bool isEnabledForUDRs(const std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>>& fstreamLoggers)
{
if (fstreamLoggers == nullptr) {
return false;
}
for (auto& logger : *fstreamLoggers) {
if (logger->logUDRs()) {
return true;
}
}
return false;
}
#endif // HAVE_FSTRM

void startDoResolve(void* p)
{
auto dc = std::unique_ptr<DNSComboWriter>(reinterpret_cast<DNSComboWriter*>(p));
Expand Down Expand Up @@ -889,7 +919,7 @@ void startDoResolve(void* p)
}

#ifdef HAVE_FSTRM
checkFrameStreamExport(luaconfsLocal);
checkFrameStreamExport(luaconfsLocal, luaconfsLocal->frameStreamExportConfig, t_frameStreamServersInfo);
#endif

DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass, dc->d_mdp.d_header.opcode);
Expand Down Expand Up @@ -1354,8 +1384,9 @@ void startDoResolve(void* p)
#ifdef NOD_ENABLED
if (g_udrEnabled) {
udr = udrCheckUniqueDNSRecord(nodlogger, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, *i);
if (!hasUDR && udr)
if (!hasUDR && udr) {
hasUDR = true;
}
}
#endif /* NOD ENABLED */

Expand All @@ -1368,8 +1399,32 @@ void startDoResolve(void* p)
}
}
}
if (needCommit)
if (needCommit) {
pw.commit();
}
#ifdef NOD_ENABLED
#ifdef HAVE_FSTRM
if (hasUDR) {
if (isEnabledForUDRs(t_nodFrameStreamServersInfo.servers)) {
struct timespec ts;
std::string str;
if (g_useKernelTimestamp && dc->d_kernelTimestamp.tv_sec) {
TIMEVAL_TO_TIMESPEC(&(dc->d_kernelTimestamp), &ts);
}
else {
TIMEVAL_TO_TIMESPEC(&(dc->d_now), &ts);
}
DnstapMessage message(str, DnstapMessage::MessageType::resolver_response, SyncRes::s_serverID, &dc->d_source, &dc->d_destination, dc->d_tcp ? DnstapMessage::ProtocolType::DoTCP : DnstapMessage::ProtocolType::DoUDP, reinterpret_cast<const char*>(&*packet.begin()), packet.size(), &ts, nullptr, dc->d_mdp.d_qname);

for (auto& logger : *(t_nodFrameStreamServersInfo.servers)) {
if (logger->logUDRs()) {
remoteLoggerQueueData(*logger, str);
}
}
}
}
#endif // HAVE_FSTRM
#endif // NOD_ENABLED
}
sendit:;

Expand Down Expand Up @@ -1504,6 +1559,25 @@ void startDoResolve(void* p)
if (g_nodEnabled) {
if (nodCheckNewDomain(nodlogger, dc->d_mdp.d_qname)) {
nod = true;
#ifdef HAVE_FSTRM
if (isEnabledForNODs(t_nodFrameStreamServersInfo.servers)) {
struct timespec ts;
std::string str;
if (g_useKernelTimestamp && dc->d_kernelTimestamp.tv_sec) {
TIMEVAL_TO_TIMESPEC(&(dc->d_kernelTimestamp), &ts);
}
else {
TIMEVAL_TO_TIMESPEC(&(dc->d_now), &ts);
}
DnstapMessage message(str, DnstapMessage::MessageType::client_query, SyncRes::s_serverID, &dc->d_source, &dc->d_destination, dc->d_tcp ? DnstapMessage::ProtocolType::DoTCP : DnstapMessage::ProtocolType::DoUDP, nullptr, 0, &ts, nullptr, dc->d_mdp.d_qname);

for (auto& logger : *(t_nodFrameStreamServersInfo.servers)) {
if (logger->logNODs()) {
remoteLoggerQueueData(*logger, str);
}
}
}
#endif // HAVE_FSTRM
}
}
#endif /* NOD_ENABLED */
Expand Down Expand Up @@ -1938,7 +2012,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
#ifdef HAVE_FSTRM
checkFrameStreamExport(luaconfsLocal);
checkFrameStreamExport(luaconfsLocal, luaconfsLocal->frameStreamExportConfig, t_frameStreamServersInfo);
#endif
EDNSSubnetOpts ednssubnet;
bool ecsFound = false;
Expand Down
41 changes: 41 additions & 0 deletions pdns/rec-lua-conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ static void parseFrameStreamOptions(boost::optional<frameStreamOptions_t> vars,
if (vars->count("logResponses")) {
config.logResponses = boost::get<bool>((*vars)["logResponses"]);
}
if (vars->count("logNODs")) {
config.logNODs = boost::get<bool>((*vars)["logNODs"]);
}
if (vars->count("logUDRs")) {
config.logUDRs = boost::get<bool>((*vars)["logUDRs"]);
}

if (vars->count("bufferHint")) {
config.bufferHint = boost::get<uint64_t>((*vars)["bufferHint"]);
Expand Down Expand Up @@ -740,6 +746,41 @@ void loadRecursorLuaConfig(const std::string& fname, luaConfigDelayedThreads& de
lci.d_slog->info(Logr::Error, "Only one dnstapFrameStreamServer() directive can be configured", "existing", Logging::Loggable(lci.frameStreamExportConfig.servers.at(0))));
}
});
Lua->writeFunction("dnstapNODFrameStreamServer", [&lci](boost::variant<const std::string, const std::unordered_map<int, std::string>> servers, boost::optional<frameStreamOptions_t> vars) {
if (!lci.nodFrameStreamExportConfig.enabled) {
lci.nodFrameStreamExportConfig.enabled = true;

try {
if (servers.type() == typeid(std::string)) {
auto server = boost::get<const std::string>(servers);
if (!boost::starts_with(server, "/")) {
ComboAddress parsecheck(server);
}
lci.nodFrameStreamExportConfig.servers.emplace_back(server);
}
else {
auto serversMap = boost::get<const std::unordered_map<int, std::string>>(servers);
for (const auto& serverPair : serversMap) {
lci.nodFrameStreamExportConfig.servers.emplace_back(serverPair.second);
}
}

parseFrameStreamOptions(vars, lci.nodFrameStreamExportConfig);
}
catch (std::exception& e) {
SLOG(g_log << Logger::Error << "Error reading config for dnstap NOD framestream logger: " << e.what() << endl,
lci.d_slog->error(Logr::Error, "Exception reading config for dnstap NOD framestream logger", "exception", Logging::Loggable("std::exception")));
}
catch (PDNSException& e) {
SLOG(g_log << Logger::Error << "Error reading config for dnstap NOD framestream logger: " << e.reason << endl,
lci.d_slog->error(Logr::Error, "Exception reading config for dnstap NOD framestream logger", "exception", Logging::Loggable("PDNSException")));
}
}
else {
SLOG(g_log << Logger::Error << "Only one dnstapNODFrameStreamServer() directive can be configured, we already have " << lci.nodFrameStreamExportConfig.servers.at(0) << endl,
lci.d_slog->info(Logr::Error, "Only one dnstapNODFrameStreamServer() directive can be configured", "existing", Logging::Loggable(lci.nodFrameStreamExportConfig.servers.at(0))));
}
});
#endif /* HAVE_FSTRM */

Lua->writeFunction("addAllowedAdditionalQType", [&lci](int qtype, std::unordered_map<int, int> targetqtypes, boost::optional<std::map<std::string, int>> options) {
Expand Down
3 changes: 3 additions & 0 deletions pdns/rec-lua-conf.hh
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ struct FrameStreamExportConfig
bool enabled{false};
bool logQueries{true};
bool logResponses{true};
bool logNODs{true};
bool logUDRs{false};
unsigned bufferHint{0};
unsigned flushTimeout{0};
unsigned inputQueueSize{0};
Expand Down Expand Up @@ -106,6 +108,7 @@ public:
ProtobufExportConfig protobufExportConfig;
ProtobufExportConfig outgoingProtobufExportConfig;
FrameStreamExportConfig frameStreamExportConfig;
FrameStreamExportConfig nodFrameStreamExportConfig;
std::shared_ptr<Logr::Logger> d_slog;
/* we need to increment this every time the configuration
is reloaded, so we know if we need to reload the protobuf
Expand Down
29 changes: 28 additions & 1 deletion pdns/recursordist/docs/lua-config/protobuf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ The recursor must have been built with configure ``--enable-dnstap`` to make thi

* ``logQueries=true``: bool - log outgoing queries
* ``logResponses=true``: bool - log incoming responses

The following options apply to the settings of the framestream library. Refer to the documentation of that
library for the default values, exact description and allowable values for these options.
For all these options, absence or a zero value has the effect of using the library-provided default value.
Expand All @@ -139,3 +139,30 @@ The recursor must have been built with configure ``--enable-dnstap`` to make thi
* ``queueNotifyThreshold=0``: unsigned
* ``reopenInterval=0``: unsigned

.. function:: dnstapNODFrameStreamServer(servers, [, options])

.. versionadded:: 4.8.0

Send dnstap formatted message for :ref:`Newly Observed Domain` and :ref:`Unique Domain Response`.
``Message.type`` will be set to ``CLIENT_QUERY`` for NOD and ``RESOLVER_RESPONSE`` for UDR. The concerned domain name will be attached in the ``Message.query_zone`` field.
UDR notifiations will get the reply attached to the ``response_message`` field.

:param servers: Either a pathname of a unix domain socket starting with a slash or the IP:port to connect to, or a list of those. If more than one server is configured, all messages are sent to every server.
:type servers: string or list of strings
:param table options: A table with ``key=value`` pairs with options.

Options:

* ``logNODs=true``: bool - log NODs
* ``logUDRs=false``: bool - log UDRs

The following options apply to the settings of the framestream library. Refer to the documentation of that
library for the default values, exact description and allowable values for these options.
For all these options, absence or a zero value has the effect of using the library-provided default value.

* ``bufferHint=0``: unsigned
* ``flushTimeout=0``: unsigned
* ``inputQueueSize=0``: unsigned
* ``outputQueueSize=0``: unsigned
* ``queueNotifyThreshold=0``: unsigned
* ``reopenInterval=0``: unsigned
4 changes: 4 additions & 0 deletions pdns/recursordist/docs/nod_udr.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.. _Newly Observed Domain:

Newly Observed Domain Tracking
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down Expand Up @@ -36,6 +38,8 @@ Protobuf Logging

If both NOD and protobuf logging are enabled, then the ``newlyObservedDomain`` field of the protobuf message emitted by the recursor will be set to true. Additionally newly observed domains will be tagged in the protobuf stream using the tag ``pdns-nod`` by default. The setting ``new-domain-pb-tag=<tag>`` can be used to alter the tag.

.. _Unique Domain Response:

Unique Domain Response
~~~~~~~~~~~~~~~~~~~~~~

Expand Down
29 changes: 17 additions & 12 deletions pdns/recursordist/rec-main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ static thread_local uint64_t t_outgoingProtobufServersGeneration;

#ifdef HAVE_FSTRM
thread_local FrameStreamServersInfo t_frameStreamServersInfo;
thread_local FrameStreamServersInfo t_nodFrameStreamServersInfo;
#endif /* HAVE_FSTRM */

string g_programname = "pdns_recursor";
Expand Down Expand Up @@ -609,6 +610,8 @@ static std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>> startFra
}
fsl->setLogQueries(config.logQueries);
fsl->setLogResponses(config.logResponses);
fsl->setLogNODs(config.logNODs);
fsl->setLogUDRs(config.logUDRs);
result->emplace_back(fsl);
}
catch (const std::exception& e) {
Expand All @@ -632,13 +635,13 @@ static void asyncFrameStreamLoggersCleanup(std::shared_ptr<std::vector<std::uniq
thread.detach();
}

bool checkFrameStreamExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
bool checkFrameStreamExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal, const FrameStreamExportConfig& config, FrameStreamServersInfo& serverInfos)
{
if (!luaconfsLocal->frameStreamExportConfig.enabled) {
if (t_frameStreamServersInfo.servers) {
if (!config.enabled) {
if (serverInfos.servers) {
// dt's take care of cleanup
asyncFrameStreamLoggersCleanup(std::move(t_frameStreamServersInfo.servers));
t_frameStreamServersInfo.config = luaconfsLocal->frameStreamExportConfig;
asyncFrameStreamLoggersCleanup(std::move(serverInfos.servers));
serverInfos.config = config;
}

return false;
Expand All @@ -647,20 +650,21 @@ bool checkFrameStreamExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
/* if the server was not running, or if it was running according to a previous
* configuration
*/
if (t_frameStreamServersInfo.generation < luaconfsLocal->generation && t_frameStreamServersInfo.config != luaconfsLocal->frameStreamExportConfig) {
if (t_frameStreamServersInfo.servers) {
if (serverInfos.generation < luaconfsLocal->generation && serverInfos.config != config) {
if (serverInfos.servers) {
// dt's take care of cleanup
asyncFrameStreamLoggersCleanup(std::move(t_frameStreamServersInfo.servers));
asyncFrameStreamLoggersCleanup(std::move(serverInfos.servers));
}

auto dnsTapLog = g_slog->withName("dnstap");
t_frameStreamServersInfo.servers = startFrameStreamServers(luaconfsLocal->frameStreamExportConfig, dnsTapLog);
t_frameStreamServersInfo.config = luaconfsLocal->frameStreamExportConfig;
t_frameStreamServersInfo.generation = luaconfsLocal->generation;
serverInfos.servers = startFrameStreamServers(config, dnsTapLog);
serverInfos.config = config;
serverInfos.generation = luaconfsLocal->generation;
}

return true;
}

#endif /* HAVE_FSTRM */

static void makeControlChannelSocket(int processNum = -1)
Expand Down Expand Up @@ -2418,7 +2422,8 @@ static void recursorThread()
checkProtobufExport(luaconfsLocal);
checkOutgoingProtobufExport(luaconfsLocal);
#ifdef HAVE_FSTRM
checkFrameStreamExport(luaconfsLocal);
checkFrameStreamExport(luaconfsLocal, luaconfsLocal->frameStreamExportConfig, t_frameStreamServersInfo);
checkFrameStreamExport(luaconfsLocal, luaconfsLocal->nodFrameStreamExportConfig, t_nodFrameStreamServersInfo);
#endif

t_fdm = unique_ptr<FDMultiplexer>(getMultiplexer(log));
Expand Down
3 changes: 2 additions & 1 deletion pdns/recursordist/rec-main.hh
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ struct FrameStreamServersInfo
};

extern thread_local FrameStreamServersInfo t_frameStreamServersInfo;
extern thread_local FrameStreamServersInfo t_nodFrameStreamServersInfo;
#endif /* HAVE_FSTRM */

#ifdef HAVE_BOOST_CONTAINER_FLAT_SET_HPP
Expand Down Expand Up @@ -510,7 +511,7 @@ void parseACLs();
PacketBuffer GenUDPQueryResponse(const ComboAddress& dest, const string& query);
bool checkProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal);
bool checkOutgoingProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal);
bool checkFrameStreamExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal);
bool checkFrameStreamExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal, const FrameStreamExportConfig& config, FrameStreamServersInfo& serverInfos);
void getQNameAndSubnet(const std::string& question, DNSName* dnsname, uint16_t* qtype, uint16_t* qclass,
bool& foundECS, EDNSSubnetOpts* ednssubnet, EDNSOptionViewMap* options);
void protobufLogQuery(LocalStateHolder<LuaConfigItems>& luaconfsLocal, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const ComboAddress& mappedSource, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::unordered_set<std::string>& policyTags, const std::string& requestorId, const std::string& deviceId, const std::string& deviceName, const std::map<std::string, RecursorLua4::MetaValue>& meta);
Expand Down
2 changes: 1 addition & 1 deletion pdns/recursordist/rec-tcp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
dc->d_logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;

#ifdef HAVE_FSTRM
checkFrameStreamExport(luaconfsLocal);
checkFrameStreamExport(luaconfsLocal, luaconfsLocal->frameStreamExportConfig, t_frameStreamServersInfo);
#endif

if (needECS || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag)) || dc->d_mdp.d_header.opcode == Opcode::Notify) {
Expand Down
6 changes: 6 additions & 0 deletions pdns/remote_logger.hh
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,12 @@ public:
[[nodiscard]] virtual std::string name() const = 0;
bool logQueries(void) const { return d_logQueries; }
bool logResponses(void) const { return d_logResponses; }
bool logNODs(void) const { return d_logNODs; }
bool logUDRs(void) const { return d_logUDRs; }
void setLogQueries(bool flag) { d_logQueries = flag; }
void setLogResponses(bool flag) { d_logResponses = flag; }
void setLogNODs(bool flag) { d_logNODs = flag; }
void setLogUDRs(bool flag) { d_logUDRs = flag; }

struct Stats
{
Expand All @@ -96,6 +100,8 @@ public:
private:
bool d_logQueries{true};
bool d_logResponses{true};
bool d_logNODs{true};
bool d_logUDRs{false};
};

/* Thread safe. Will connect asynchronously on request.
Expand Down
Loading

0 comments on commit 9489e2b

Please sign in to comment.