Skip to content

Commit f6d9040

Browse files
authored
Implement publishing for TimeseriesDataTimestamp (#130)
* Add publish API for TimeseriesDataTimestamp * Add missing buffer publish api for TimeseriesDataRaw
1 parent 0f342e0 commit f6d9040

File tree

10 files changed

+402
-34
lines changed

10 files changed

+402
-34
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
namespace QuixStreams.Streaming.UnitTests.Models
99
{
10-
public class StreamParametersConsumerShould
10+
public class StreamTimeseriesConsumerShould
1111
{
1212

1313
[Fact]
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
namespace QuixStreams.Streaming.UnitTests.Models
1212
{
13-
public class StreamParametersProducerShould
13+
public class StreamTimeseriesProducerShould
1414
{
1515

1616
[Fact]

src/CsharpClient/QuixStreams.Streaming.UnitTests/Models/TimeseriesDataTimestampShould.cs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,5 +49,35 @@ public void AddTags_WithTags_ShouldNotThrowException()
4949
tsdts.Tags["test4"].Should().Be("val4");
5050

5151
}
52+
53+
[Fact]
54+
public void ConvertToTimeseriesDataRaw_ReturnsWithSingleTimestamp()
55+
{
56+
// Arrange
57+
var timeseriesData = new TimeseriesData();
58+
timeseriesData.AddTimestampNanoseconds(100)
59+
.AddValue("string", "1")
60+
.AddTag("1", "tag1");
61+
62+
var timestampToConvert = timeseriesData.AddTimestampNanoseconds(200)
63+
.AddValue("string", "2")
64+
.AddValue("double", 2)
65+
.AddValue("byte", new byte[]{0x02})
66+
.AddTag("1", "tag1")
67+
.AddTag("2", "tag2");
68+
69+
70+
71+
// Act
72+
var raw = timestampToConvert.ConvertToTimeseriesDataRaw();
73+
74+
// Assert
75+
raw.Timestamps.Length.Should().Be(1);
76+
raw.Timestamps[0].Should().Be(200);
77+
raw.StringValues.Count.Should().Be(1);
78+
raw.NumericValues.Count.Should().Be(1);
79+
raw.BinaryValues.Count.Should().Be(1);
80+
raw.TagValues.Count.Should().Be(2);
81+
}
5282
}
5383
}

src/CsharpClient/QuixStreams.Streaming/Models/StreamProducer/StreamTimeseriesProducer.cs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public void Publish(TimeseriesData data)
7373

7474
this.streamProducer.Publish(data.ConvertToTimeseriesDataRaw());
7575
}
76-
76+
7777
/// <summary>
7878
/// Publish data in TimeseriesDataRaw format without any buffering
7979
/// </summary>
@@ -100,7 +100,7 @@ public void Publish(QuixStreams.Telemetry.Models.TimeseriesDataRaw data)
100100
updatedTimestamps[i] = data.Timestamps[i] + epochDiff;
101101
}
102102

103-
QuixStreams.Telemetry.Models.TimeseriesDataRaw new_data = new QuixStreams.Telemetry.Models.TimeseriesDataRaw(
103+
QuixStreams.Telemetry.Models.TimeseriesDataRaw newData = new QuixStreams.Telemetry.Models.TimeseriesDataRaw(
104104
data.Epoch,
105105
updatedTimestamps,
106106
data.NumericValues,
@@ -109,10 +109,29 @@ public void Publish(QuixStreams.Telemetry.Models.TimeseriesDataRaw data)
109109
data.TagValues
110110
);
111111

112-
this.streamProducer.Publish(new_data);
112+
this.streamProducer.Publish(newData);
113113
}
114114

115+
/// <summary>
116+
/// Publish single timestamp to stream without any buffering
117+
/// </summary>
118+
/// <param name="timestamp">Timeseries timestamp to publish</param>
119+
public void Publish(TimeseriesDataTimestamp timestamp)
120+
{
121+
if (isDisposed)
122+
{
123+
throw new ObjectDisposedException(nameof(StreamTimeseriesProducer));
124+
}
125+
126+
if (!timestamp.EpochIncluded)
127+
{
128+
timestamp.TimestampNanoseconds += this.streamProducer.Epoch.ToUnixNanoseconds();
129+
timestamp.EpochIncluded = true;
130+
}
115131

132+
this.streamProducer.Publish(timestamp.ConvertToTimeseriesDataRaw());
133+
}
134+
116135
/// <summary>
117136
/// Default Location of the parameters. Parameter definitions added with <see cref="AddDefinition"/> will be inserted at this location.
118137
/// See <see cref="AddLocation"/> for adding definitions at a different location without changing default.

src/CsharpClient/QuixStreams.Streaming/Models/StreamProducer/TimeseriesBufferProducer.cs

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ private TimeseriesDataBuilder AddTimestampNanoseconds(long timestampNanoseconds,
9393

9494

9595
/// <summary>
96-
/// Publish the provided timeseries data to the buffer.
96+
/// Publish timeseries data to the buffer.
9797
/// </summary>
9898
/// <param name="data">Data to publish</param>
9999
public void Publish(TimeseriesData data)
@@ -119,6 +119,78 @@ public void Publish(TimeseriesData data)
119119

120120
this.WriteChunk(data.ConvertToTimeseriesDataRaw(false, false)); // use merge & clean of Buffer is more efficient
121121
}
122+
123+
/// <summary>
124+
/// Publish timeseries data raw to the buffer.
125+
/// </summary>
126+
/// <param name="data">Data to publish</param>
127+
public void Publish(TimeseriesDataRaw data)
128+
{
129+
long epochDifference = this.streamProducer.Epoch.ToUnixNanoseconds();
130+
long[] updatedTimestamps = null;
131+
132+
if (epochDifference != 0)
133+
{
134+
updatedTimestamps = new long[data.Timestamps.Length];
135+
for (int i = 0; i < updatedTimestamps.Length; i++)
136+
{
137+
updatedTimestamps[i] = data.Timestamps[i] + epochDifference;
138+
}
139+
}
140+
141+
Dictionary<string, string[]> updatedTagValues = null;
142+
143+
if (this.DefaultTags.Count > 0)
144+
{
145+
updatedTagValues = new Dictionary<string, string[]>(data.TagValues);
146+
foreach (var tag in this.DefaultTags)
147+
{
148+
if (!updatedTagValues.TryGetValue(tag.Key, out var tagValues))
149+
{
150+
tagValues = new string[data.Timestamps.Length];
151+
updatedTagValues[tag.Key] = tagValues;
152+
}
153+
154+
for (int i = 0; i < data.Timestamps.Length; i++)
155+
{
156+
tagValues[i] = tag.Value;
157+
}
158+
}
159+
}
160+
161+
var newData = new QuixStreams.Telemetry.Models.TimeseriesDataRaw(
162+
data.Epoch,
163+
updatedTimestamps ?? data.Timestamps,
164+
data.NumericValues,
165+
data.StringValues,
166+
data.BinaryValues,
167+
updatedTagValues ?? data.TagValues
168+
);
169+
170+
this.WriteChunk(newData);
171+
}
172+
173+
/// <summary>
174+
/// Publish single timestamp to the buffer.
175+
/// </summary>
176+
/// <param name="timestamp">Timeseries timestamp to publish</param>
177+
public void Publish(TimeseriesDataTimestamp timestamp)
178+
{
179+
if (!timestamp.EpochIncluded)
180+
{
181+
timestamp.TimestampNanoseconds += this.Epoch.ToUnixNanoseconds();
182+
timestamp.EpochIncluded = true;
183+
}
184+
185+
foreach (var kv in this.DefaultTags)
186+
{
187+
if (!timestamp.Tags.ContainsKey(kv.Key))
188+
{
189+
timestamp.AddTag(kv.Key, kv.Value);
190+
}
191+
}
192+
this.WriteChunk(timestamp.ConvertToTimeseriesDataRaw());
193+
}
122194

123195

124196
/// <summary>

src/CsharpClient/QuixStreams.Streaming/Models/TimeseriesDataTimestamp.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using QuixStreams.Streaming.Utils;
4+
using QuixStreams.Telemetry.Models;
45
using QuixStreams.Telemetry.Models.Utility;
56

67
namespace QuixStreams.Streaming.Models
@@ -222,5 +223,10 @@ public TimeseriesDataTimestamp RemoveTag(string tagId)
222223

223224
return this;
224225
}
226+
227+
internal TimeseriesDataRaw ConvertToTimeseriesDataRaw()
228+
{
229+
return new TimeseriesData(new List<TimeseriesDataTimestamp>{this}).rawData;
230+
}
225231
}
226232
}

src/PythonClient/src/quixstreams/models/streamproducer/streamtimeseriesproducer.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from .timeseriesbufferproducer import TimeseriesBufferProducer
77
from ...builders import ParameterDefinitionBuilder
88
from ...helpers.nativedecorator import nativedecorator
9-
from ...models import TimeseriesData, TimeseriesDataRaw
9+
from ...models import TimeseriesData, TimeseriesDataRaw, TimeseriesDataTimestamp
1010
from ...native.Python.QuixStreamsStreaming.Models.StreamProducer.StreamTimeseriesProducer import StreamTimeseriesProducer as stspi
1111

1212

@@ -102,12 +102,12 @@ def buffer(self) -> TimeseriesBufferProducer:
102102
self._buffer = TimeseriesBufferProducer(self._stream_producer, self._interop.get_Buffer())
103103
return self._buffer
104104

105-
def publish(self, packet: Union[TimeseriesData, pd.DataFrame, TimeseriesDataRaw]) -> None:
105+
def publish(self, packet: Union[TimeseriesData, pd.DataFrame, TimeseriesDataRaw, TimeseriesDataTimestamp]) -> None:
106106
"""
107107
Publish the given packet to the stream without any buffering.
108108
109109
Args:
110-
packet: The packet containing TimeseriesData, TimeseriesDataRaw, or pandas DataFrame.
110+
packet: The packet containing TimeseriesData, TimeseriesDataRaw, TimeseriesDataTimestamp, or pandas DataFrame.
111111
112112
Note:
113113
- Pandas DataFrame should contain 'time' label, else the first integer label will be taken as time.
@@ -141,6 +141,9 @@ def publish(self, packet: Union[TimeseriesData, pd.DataFrame, TimeseriesDataRaw]
141141
if isinstance(packet, TimeseriesDataRaw):
142142
self._interop.Publish2(packet.get_net_pointer())
143143
return
144+
if isinstance(packet, TimeseriesDataTimestamp):
145+
self._interop.Publish3(packet.get_net_pointer())
146+
return
144147
if isinstance(packet, pd.DataFrame):
145148
data = TimeseriesDataRaw.from_dataframe(packet)
146149
with data:

src/PythonClient/src/quixstreams/models/streamproducer/timeseriesbufferproducer.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from ..netdict import NetDict
99
from ..timeseriesbuffer import TimeseriesBuffer
10-
from ... import TimeseriesData
10+
from ... import TimeseriesData, TimeseriesDataRaw, TimeseriesDataTimestamp
1111
from ...builders import TimeseriesDataBuilder
1212
from ...helpers.dotnet.datetimeconverter import DateTimeConverter as dtc
1313
from ...helpers.nativedecorator import nativedecorator
@@ -117,12 +117,12 @@ def flush(self):
117117
"""
118118
self._interop.Flush()
119119

120-
def publish(self, packet: Union[TimeseriesData, pd.DataFrame]) -> None:
120+
def publish(self, packet: Union[TimeseriesData, pd.DataFrame, TimeseriesDataRaw, TimeseriesDataTimestamp]) -> None:
121121
"""
122122
Publish the provided timeseries packet to the buffer.
123123
124124
Args:
125-
packet: The packet containing TimeseriesData or panda DataFrame
125+
packet: The packet containing TimeseriesData, TimeseriesDataRaw, TimeseriesDataTimestamp, or pandas DataFrame.
126126
- packet type panda.DataFrame:
127127
* Note 1: panda data frame should contain 'time' label, else the first integer label will be taken as time.
128128
* Note 2: Tags should be prefixed by TAG__ or they will be treated as timeseries parameters
@@ -155,6 +155,12 @@ def publish(self, packet: Union[TimeseriesData, pd.DataFrame]) -> None:
155155
if isinstance(packet, TimeseriesData):
156156
self._interop.Publish(packet.get_net_pointer())
157157
return
158+
if isinstance(packet, TimeseriesDataRaw):
159+
self._interop.Publish2(packet.get_net_pointer())
160+
return
161+
if isinstance(packet, TimeseriesDataTimestamp):
162+
self._interop.Publish3(packet.get_net_pointer())
163+
return
158164
if isinstance(packet, pd.DataFrame):
159165
data = TimeseriesData.from_panda_dataframe(packet)
160166
with data:

0 commit comments

Comments
 (0)