Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
NYql::NDq::RegisterDQSolomonWriteActorFactory(*factory, federatedQuerySetup->CredentialsFactory);
bool enableStreamingQueriesCounters = NKikimr::AppData()->FeatureFlags.GetEnableStreamingQueriesCounters();
NYql::NDq::RegisterDqPqReadActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, counters->GetKqpCounters()->GetSubgroup("subsystem", "DqSourceTracker"), {}, enableStreamingQueriesCounters);
NYql::NDq::RegisterDqPqWriteActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, counters->GetKqpCounters()->GetSubgroup("subsystem", "DqSinkTracker"), enableStreamingQueriesCounters);
NYql::NDq::RegisterDqPqWriteActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, counters->GetKqpCounters()->GetSubgroup("subsystem", "DqSinkTracker"), enableStreamingQueriesCounters, NKikimr::AppData()->FeatureFlags.GetEnableStreamingQueriesPqSinkDeduplication());
}

return factory;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,5 @@ message TFeatureFlags {
optional bool EnableSysViewPermissionsExport = 227 [default = false];
optional bool EnableCmsSmartAvailabilityMode = 228 [default = false];
optional bool EnableConditionalEraseResponseBatching = 229 [default = false];
optional bool EnableStreamingQueriesPqSinkDeduplication = 230 [default = false];
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <yql/essentials/utils/log/log.h>
#include <yql/essentials/utils/yql_panic.h>
#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/base/feature_flags.h>
#include <ydb/core/fq/libs/events/events.h>
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include "probes.h"

#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/base/feature_flags.h>
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/event_local.h>
Expand Down
53 changes: 38 additions & 15 deletions ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#include "probes.h"

#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/base/feature_flags.h>

#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/event_local.h>
Expand Down Expand Up @@ -142,13 +141,15 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
};

struct TAckInfo {
TAckInfo(i64 messageSize, const TInstant& startTime)
TAckInfo(i64 messageSize, const TInstant& startTime, ui64 seqNo)
: MessageSize(messageSize)
, StartTime(startTime)
, SeqNo(seqNo)
{}

i64 MessageSize = 0;
TInstant StartTime;
ui64 SeqNo =0;
};

public:
Expand All @@ -164,7 +165,8 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
const ::NMonitoring::TDynamicCounterPtr& counters,
i64 freeSpace,
const IPqGateway::TPtr& pqGateway,
bool enableStreamingQueriesCounters)
bool enableStreamingQueriesCounters,
bool enableStreamingQueriesPqSinkDeduplicationFeatureFlag)
: TActor<TDqPqWriteActor>(&TDqPqWriteActor::StateFunc)
, OutputIndex(outputIndex)
, TxId(txId)
Expand All @@ -177,6 +179,7 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
, FreeSpace(freeSpace)
, PqGateway(pqGateway)
, TaskId(taskId)
, EnableDeduplication(enableStreamingQueriesPqSinkDeduplicationFeatureFlag && SinkParams.GetEnableDeduplication())
{
EgressStats.Level = statsLevel;
}
Expand Down Expand Up @@ -327,12 +330,20 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
}

NYdb::NTopic::TWriteSessionSettings GetWriteSessionSettings() {
return NYdb::NTopic::TWriteSessionSettings(SinkParams.GetTopicPath(), GetSourceId(), GetSourceId())
auto settings = NYdb::NTopic::TWriteSessionSettings()
.Path(SinkParams.GetTopicPath())
.TraceId(LogPrefix)
.MaxMemoryUsage(FreeSpace)
.Codec(SinkParams.GetClusterType() == NPq::NProto::DataStreams
? NYdb::NTopic::ECodec::RAW
: NYdb::NTopic::ECodec::GZIP);

settings.DeduplicationEnabled(EnableDeduplication);
if (EnableDeduplication) {
settings.ProducerId(GetSourceId());
settings.MessageGroupId(GetSourceId());
}
return settings;
}

IFederatedTopicClient& GetFederatedTopicClient() {
Expand Down Expand Up @@ -420,9 +431,14 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
}

void WriteNextMessage(NYdb::NTopic::TContinuationToken&& token) {
WriteSession->Write(std::move(token), Buffer.front(), NextSeqNo++);
std::optional<uint64_t> seqNo;
if (EnableDeduplication) {
seqNo = NextSeqNo;
}
WriteSession->Write(std::move(token), Buffer.front(), seqNo);
auto itemSize = GetItemSize(Buffer.front());
WaitingAcks.emplace(itemSize, TInstant::Now());
WaitingAcks.emplace(itemSize, TInstant::Now(), NextSeqNo);
NextSeqNo++;
EgressStats.Bytes += itemSize;
Metrics.EgressDataRate->Add(itemSize);
Buffer.pop();
Expand Down Expand Up @@ -451,7 +467,7 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu

for (auto it = ev.Acks.begin(); it != ev.Acks.end(); ++it) {
//Y_ABORT_UNLESS(it == ev.Acks.begin() || it->SeqNo == std::prev(it)->SeqNo + 1);
LOG_T(Self.LogPrefix << "Ack seq no " << it->SeqNo);
LOG_T(Self.LogPrefix << "Ack seq no (from TAcksEvent) " << it->SeqNo);
if (it->State == NYdb::NTopic::TWriteSessionEvent::TWriteAck::EEventState::EES_DISCARDED) {
TIssues issues;
issues.AddIssue(TStringBuilder() << "Message with seqNo " << it->SeqNo << " was discarded");
Expand All @@ -466,12 +482,14 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
Self.Metrics.LastAckLatency->Set((TInstant::Now() - ackInfo.StartTime).MilliSeconds());
Self.Metrics.InFlyData->Dec();
Self.FreeSpace += ackInfo.MessageSize;
ui64 seqNo = ackInfo.SeqNo; // use seqNo stored on our side because without deduplication we do not specify SeqNo on Write().
LOG_T(Self.LogPrefix << "Ack seq no (from WaitingAcks) " << seqNo);
Self.WaitingAcks.pop();

if (!Self.DeferredCheckpoints.empty() && std::get<0>(Self.DeferredCheckpoints.front()) == it->SeqNo) {
Self.ConfirmedSeqNo = it->SeqNo;
if (!Self.DeferredCheckpoints.empty() && std::get<0>(Self.DeferredCheckpoints.front()) == seqNo) {
Self.ConfirmedSeqNo = seqNo;
const auto& checkpoint = std::get<1>(Self.DeferredCheckpoints.front());
LOG_D(Self.LogPrefix << MakeStringForLog(checkpoint) << "Send a deferred checkpoint, seqNo: " << it->SeqNo);
LOG_D(Self.LogPrefix << MakeStringForLog(checkpoint) << "Send a deferred checkpoint, seqNo: " << seqNo);
Self.Callbacks->OnAsyncOutputStateSaved(Self.BuildState(checkpoint), Self.OutputIndex, checkpoint);
Self.DeferredCheckpoints.pop();
Self.Metrics.InFlyCheckpoints->Dec();
Expand Down Expand Up @@ -535,6 +553,7 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
IPqGateway::TPtr PqGateway;
ui64 TaskId;
bool Inited = false;
bool EnableDeduplication = false;
};

std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor(
Expand All @@ -550,7 +569,8 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor(
const ::NMonitoring::TDynamicCounterPtr& counters,
IPqGateway::TPtr pqGateway,
bool enableStreamingQueriesCounters,
i64 freeSpace)
i64 freeSpace,
bool enableStreamingQueriesPqSinkDeduplicationFeatureFlag)
{
const TString& tokenName = settings.GetToken().GetName();
const TString token = secureParams.Value(tokenName, TString());
Expand All @@ -568,13 +588,14 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor(
counters,
freeSpace,
pqGateway,
enableStreamingQueriesCounters);
enableStreamingQueriesCounters,
enableStreamingQueriesPqSinkDeduplicationFeatureFlag);
return {actor, actor};
}

void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters, bool enableStreamingQueriesCounters) {
void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters, bool enableStreamingQueriesCounters, bool enableStreamingQueriesPqSinkDeduplicationFeatureFlag) {
factory.RegisterSink<NPq::NProto::TDqPqTopicSink>("PqSink",
[driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters, pqGateway, enableStreamingQueriesCounters](
[driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters, pqGateway, enableStreamingQueriesCounters, enableStreamingQueriesPqSinkDeduplicationFeatureFlag](
NPq::NProto::TDqPqTopicSink&& settings,
IDqAsyncIoFactory::TSinkArguments&& args)
{
Expand All @@ -596,7 +617,9 @@ void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver dri
args.Callback,
counters ? counters : args.TaskCounters,
pqGateway,
enableStreamingQueriesCounters
enableStreamingQueriesCounters,
DqPqDefaultFreeSpace,
enableStreamingQueriesPqSinkDeduplicationFeatureFlag
);
});
}
Expand Down
5 changes: 3 additions & 2 deletions ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor(
const ::NMonitoring::TDynamicCounterPtr& counters,
IPqGateway::TPtr pqGateway,
bool enableStreamingQueriesCounters,
i64 freeSpace = DqPqDefaultFreeSpace);
i64 freeSpace = DqPqDefaultFreeSpace,
bool enableStreamingQueriesPqSinkDeduplicationFeatureFlag = true);

void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>(), bool enableStreamingQueriesCounters = true);
void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>(), bool enableStreamingQueriesCounters = true, bool enableDeduplicationFeatureFlag = true);

} // namespace NYql::NDq
1 change: 1 addition & 0 deletions ydb/library/yql/providers/pq/proto/dq_io.proto
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,5 @@ message TDqPqTopicSink {
bool AddBearerToToken = 6;
EClusterType ClusterType = 7;
bool UseActorSystemThreadsInTopicClient = 8;
bool EnableDeduplication = 9;
}
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,11 @@ class TPqDqIntegration : public TDqIntegrationBase {
sinkDesc.MutableToken()->SetName(TString(maybeToken.Cast().Name().Value()));
}

if (auto maybeEnableDeduplication = State_->Configuration->EnableDeduplication.Get()) {
maybeEnableDeduplication->to_lower();
sinkDesc.SetEnableDeduplication(*maybeEnableDeduplication == "true"sv);
}

protoSettings.PackFrom(sinkDesc);
sinkType = "PqSink";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ TPqConfiguration::TPqConfiguration() {
REGISTER_SETTING(*this, PqReadByRtmrCluster_);
REGISTER_SETTING(*this, MaxPartitionReadSkew);
REGISTER_SETTING(*this, ReadSessionBufferBytes);
REGISTER_SETTING(*this, EnableDeduplication);
}

TPqSettings::TConstPtr TPqConfiguration::Snapshot() const {
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/pq/provider/yql_pq_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ struct TPqSettings {
NCommon::TConfSetting<TString, Static> PqReadByRtmrCluster_;
NCommon::TConfSetting<TDuration, Static> MaxPartitionReadSkew;
NCommon::TConfSetting<ui64, Static> ReadSessionBufferBytes;
NCommon::TConfSetting<TString, Static> EnableDeduplication;
};

struct TPqClusterConfigurationSettings {
Expand Down
3 changes: 2 additions & 1 deletion ydb/tests/fq/pq_async_io/ut_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ void TPqIoTestFixture::InitAsyncOutput(
MakeIntrusive<NMonitoring::TDynamicCounters>(),
CreatePqNativeGateway(std::move(pqServices)),
true,
freeSpace);
freeSpace,
true);

actor.InitAsyncOutput(dqAsyncOutput, dqAsyncOutputAsActor);
});
Expand Down
4 changes: 2 additions & 2 deletions ydb/tests/fq/streaming/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ def get_checkpoint_coordinator_metric(self, kikimr: Kikimr, path: str, metric_na
def get_completed_checkpoints(self, kikimr: Kikimr, path: str) -> int:
return self.get_checkpoint_coordinator_metric(kikimr, path, "CompletedCheckpoints")

def wait_completed_checkpoints(self, kikimr: Kikimr, path: str, timeout: int = plain_or_under_sanitizer_wrapper(120, 150)) -> None:
def wait_completed_checkpoints(self, kikimr: Kikimr, path: str, timeout: int = plain_or_under_sanitizer_wrapper(120, 150), checkpoints_count=2) -> None:
current = self.get_completed_checkpoints(kikimr, path)
checkpoints_count = current + 2
checkpoints_count = current + checkpoints_count
deadline = time.time() + timeout
while True:
completed = self.get_completed_checkpoints(kikimr, path)
Expand Down
4 changes: 3 additions & 1 deletion ydb/tests/fq/streaming/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def get_ydb_config():
"enable_streaming_queries": True,
"enable_streaming_queries_counters": True,
"enable_topics_sql_io_operations": True,
"enable_streaming_queries_pq_sink_deduplication": True,
},
query_service_config={
"available_external_data_sources": ["ObjectStorage", "Ydb", "YdbTopics"],
Expand All @@ -38,7 +39,8 @@ def get_ydb_config():

return config

os.environ["YDB_TEST_DEFAULT_CHECKPOINTING_PERIOD_MS"] = "200"
checkpointing_period_ms = getattr(request, "param", {}).get("checkpointing_period_ms", "200")
os.environ["YDB_TEST_DEFAULT_CHECKPOINTING_PERIOD_MS"] = checkpointing_period_ms
os.environ["YDB_TEST_LEASE_DURATION_SEC"] = "5"

kikimr = Kikimr(get_ydb_config())
Expand Down
59 changes: 59 additions & 0 deletions ydb/tests/fq/streaming/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,3 +486,62 @@ def test_raw_format(self, kikimr, entity_name, local_topics):
assert self.read_stream(len(expected_data), topic_path=self.output_topic, endpoint=endpoint) == expected_data

kikimr.ydb_client.query(f"DROP STREAMING QUERY `{query_name}`")

@pytest.mark.parametrize("kikimr", [{"checkpointing_period_ms": "20000"}], indirect=["kikimr"])
@pytest.mark.parametrize("local_topics", [False, True])
def test_deduplication(self, kikimr, entity_name, local_topics):

sql = R'''
CREATE STREAMING QUERY `{query_name}` AS
DO BEGIN
PRAGMA pq.EnableDeduplication = "{enable}";
INSERT INTO {out} SELECT Data FROM {inp};
END DO;'''

# Disable deduplication

inp, out, endpoint = self.get_io_names(kikimr, "test_deduplication_disabled", local_topics, entity_name, partitions_count=10)
name = "test_deduplication"
path = f"/Root/{name}"
kikimr.ydb_client.query(sql.format(query_name=name, inp=inp, out=out, enable="FALSE"))
self.wait_completed_checkpoints(kikimr, path, checkpoints_count=1)

data1 = 'value1'
count1 = 1
self.write_stream([data1], topic_path=None, partition_key=''.join(random.choices(string.ascii_uppercase, k=8)), endpoint=endpoint)
assert self.read_stream(count1, topic_path=self.output_topic, endpoint=endpoint) == [data1 for i in range(count1)]

kikimr.ydb_client.query(f"ALTER STREAMING QUERY `{name}` SET (RUN = FALSE);")

data2 = 'value2'
count2 = 10
for i in range(count2):
self.write_stream([data2], topic_path=None, partition_key=''.join(random.choices(string.ascii_uppercase, k=8)), endpoint=endpoint)

kikimr.ydb_client.query(f"ALTER STREAMING QUERY `{name}` SET (RUN = TRUE);")

readed_data = self.read_stream(count1 + count2, topic_path=self.output_topic, endpoint=endpoint)
expected = [data2 for i in range(count2)] + [data1 for i in range(count1)]
assert sorted(readed_data) == sorted(expected)

kikimr.ydb_client.query(f"DROP STREAMING QUERY `{name}`;")

# Enable deduplication

inp, out, endpoint = self.get_io_names(kikimr, "test_deduplication_enabled", local_topics, entity_name, partitions_count=10)
kikimr.ydb_client.query(sql.format(query_name=name, inp=inp, out=out, enable="TRUE"))
self.wait_completed_checkpoints(kikimr, path, checkpoints_count=1)

self.write_stream([data1], topic_path=None, partition_key=''.join(random.choices(string.ascii_uppercase, k=8)), endpoint=endpoint)
assert self.read_stream(count1, topic_path=self.output_topic, endpoint=endpoint) == [data1 for i in range(count1)]

kikimr.ydb_client.query(f"ALTER STREAMING QUERY `{name}` SET (RUN = FALSE);")

for i in range(count2):
self.write_stream([data2], topic_path=None, partition_key=''.join(random.choices(string.ascii_uppercase, k=8)), endpoint=endpoint)
kikimr.ydb_client.query(f"ALTER STREAMING QUERY `{name}` SET (RUN = TRUE);")

readed_data = self.read_stream(20, topic_path=self.output_topic, endpoint=endpoint)
assert len(readed_data) == 10

kikimr.ydb_client.query(f"DROP STREAMING QUERY `{name}`;")
3 changes: 3 additions & 0 deletions ydb/tests/fq/yds/test_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ def test_recovery(self, client, kikimr):
d[n] = 1

self.dump_workers(2, 4)
kikimr.compute_plane.wait_completed_checkpoints(
query_id, self.kikimr.compute_plane.get_completed_checkpoints(query_id) + 1
)

node_to_restart = None
for node_index in kikimr.compute_plane.kikimr_cluster.nodes:
Expand Down