Skip to content

Commit be69dee

Browse files
authored
Merge pull request #1009: Cassandra writetime stamp
2 parents c0e8384 + db24450 commit be69dee

File tree

6 files changed

+50
-19
lines changed

6 files changed

+50
-19
lines changed

direct/ingest-server/src/main/java/cz/o2/proxima/direct/server/RetrieveService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,8 @@ public void listAttributes(
160160
response.addValue(
161161
ListResponse.AttrValue.newBuilder()
162162
.setAttribute(kv.getAttribute())
163-
.setValue(ByteString.copyFrom(kv.getValue()))));
163+
.setValue(ByteString.copyFrom(kv.getValue()))
164+
.setStamp(kv.getStamp())));
164165
noticeListResult(request, entity, wildcard, kvs);
165166
replyLogged(responseObserver, request, response.build());
166167
} catch (Status s) {
@@ -297,6 +298,7 @@ public void get(Rpc.GetRequest request, StreamObserver<Rpc.GetResponse> response
297298
Rpc.GetResponse.newBuilder()
298299
.setStatus(200)
299300
.setValue(ByteString.copyFrom(kv.getValue()))
301+
.setStamp(kv.getStamp())
300302
.build());
301303
}
302304
} catch (Status s) {

direct/ingest-server/src/test/java/cz/o2/proxima/direct/server/RetrieveServiceTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,6 +1091,7 @@ public void testGetValidExtendedScheme() throws InvalidProtocolBufferException {
10911091
String key = "my-fancy-entity-key";
10921092
ExtendedMessage payload = ExtendedMessage.newBuilder().setFirst(1).setSecond(2).build();
10931093

1094+
long now = System.currentTimeMillis();
10941095
Optionals.get(server.direct.getWriter(attribute))
10951096
.write(
10961097
StreamElement.upsert(
@@ -1099,7 +1100,7 @@ public void testGetValidExtendedScheme() throws InvalidProtocolBufferException {
10991100
UUID.randomUUID().toString(),
11001101
key,
11011102
attribute.getName(),
1102-
System.currentTimeMillis(),
1103+
now,
11031104
payload.toByteArray()),
11041105
CommitCallback.noop());
11051106
Rpc.GetRequest request =
@@ -1140,6 +1141,7 @@ public void onCompleted() {
11401141
200,
11411142
response.getStatus());
11421143
assertEquals(payload, ExtendedMessage.parseFrom(response.getValue().toByteArray()));
1144+
assertEquals(now, response.getStamp());
11431145
}
11441146

11451147
@Test
@@ -1192,14 +1194,15 @@ public void testListWithLongPrefix() {
11921194
String key = "my-fancy-entity-key";
11931195

11941196
OnlineAttributeWriter writer = Optionals.get(server.direct.getWriter(attribute));
1197+
long now = System.currentTimeMillis();
11951198
writer.write(
11961199
StreamElement.upsert(
11971200
entity,
11981201
attribute,
11991202
UUID.randomUUID().toString(),
12001203
key,
12011204
attribute.toAttributePrefix() + "non-prefix",
1202-
System.currentTimeMillis(),
1205+
now,
12031206
new byte[] {}),
12041207
CommitCallback.noop());
12051208
writer.write(
@@ -1209,7 +1212,7 @@ public void testListWithLongPrefix() {
12091212
UUID.randomUUID().toString(),
12101213
key,
12111214
attribute.toAttributePrefix() + "prefix",
1212-
System.currentTimeMillis(),
1215+
now,
12131216
new byte[] {}),
12141217
CommitCallback.noop());
12151218

direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CassandraRandomReader.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ public <T> Optional<KeyValue<T>> get(
5959
for (Row row : result) {
6060
ByteBuffer val = row.get(0, ByteBuffer.class);
6161
if (val != null) {
62+
long rowStamp =
63+
row.size() > 1 ? row.get(1, Long.class) / 1000L : System.currentTimeMillis();
6264
byte[] rowValue = val.array();
6365
try {
6466
return Optional.ofNullable(
@@ -69,7 +71,7 @@ public <T> Optional<KeyValue<T>> get(
6971
desc,
7072
key,
7173
attribute,
72-
System.currentTimeMillis(),
74+
rowStamp,
7375
new Offsets.Raw(attribute),
7476
rowValue));
7577
} catch (Exception ex) {
@@ -122,6 +124,8 @@ public <T> void scanWildcard(
122124
Object attribute = row.getObject(0);
123125
ByteBuffer val = row.get(1, ByteBuffer.class);
124126
if (val != null) {
127+
long rowStamp =
128+
row.size() > 2 ? row.get(2, Long.class) / 1000L : System.currentTimeMillis();
125129
byte[] rowValue = val.array();
126130
// by convention
127131
String name = wildcard.toAttributePrefix() + accessor.asString(attribute);
@@ -134,7 +138,7 @@ public <T> void scanWildcard(
134138
wildcard,
135139
key,
136140
name,
137-
System.currentTimeMillis(),
141+
rowStamp,
138142
new Raw(name),
139143
rowValue);
140144
if (keyValue != null) {

direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/DefaultCqlFactory.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -320,26 +320,34 @@ protected String createDeleteWildcardStatement(StreamElement what) {
320320

321321
@Override
322322
protected String createGetStatement(String attribute, AttributeDescriptor<?> desc) {
323-
324323
if (desc.isWildcard()) {
325324
String colName = toColName(desc);
325+
String payloadCol = toPayloadCol(desc);
326326
return String.format(
327-
"SELECT %s FROM %s WHERE %s=? AND %s=?",
328-
toPayloadCol(desc), getTableName(), primaryField, toUnderScore(colName));
327+
"SELECT %s, WRITETIME(%s) FROM %s WHERE %s=? AND %s=?",
328+
payloadCol, payloadCol, getTableName(), primaryField, toUnderScore(colName));
329329
}
330330

331+
String payloadCol = toUnderScore(attribute);
331332
return String.format(
332-
"SELECT %s FROM %s WHERE %s=?", toUnderScore(attribute), getTableName(), primaryField);
333+
"SELECT %s, WRITETIME(%s) FROM %s WHERE %s=?",
334+
payloadCol, payloadCol, getTableName(), primaryField);
333335
}
334336

335337
@Override
336338
protected String createListStatement(AttributeDescriptor<?> attr) {
337-
338339
String colName = toColName(attr);
339340
String dataCol = toUnderScore(colName);
341+
String payloadCol = toPayloadCol(attr);
340342
return String.format(
341-
"SELECT %s, %s FROM %s WHERE %s=? AND %s%s? LIMIT ?",
342-
dataCol, toPayloadCol(attr), getTableName(), primaryField, dataCol, reversed ? "<" : ">");
343+
"SELECT %s, %s, WRITETIME(%s) FROM %s WHERE %s=? AND %s%s? LIMIT ?",
344+
dataCol,
345+
payloadCol,
346+
payloadCol,
347+
getTableName(),
348+
primaryField,
349+
dataCol,
350+
reversed ? "<" : ">");
343351
}
344352

345353
private byte[] serializeValue(StreamElement ingest) {

direct/io-cassandra/src/test/java/cz/o2/proxima/direct/io/cassandra/DefaultCqlFactoryTest.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,9 @@ public void testGetAttribute() {
343343
verify(statement).bind(eq("key"));
344344
assertNotNull("Bound statement cannot be null", boundStatement);
345345
assertEquals(1, preparedStatement.size());
346-
assertEquals("SELECT my_attribute FROM my_table WHERE hgw=?", preparedStatement.get(0));
346+
assertEquals(
347+
"SELECT my_attribute, WRITETIME(my_attribute) FROM my_table WHERE hgw=?",
348+
preparedStatement.get(0));
347349
}
348350

349351
@Test
@@ -358,7 +360,9 @@ public void testGetAttributeWildcard() {
358360
verify(statement).bind(eq("key"), eq("1"));
359361
assertNotNull("Bound statement cannot be null", boundStatement);
360362
assertEquals(1, preparedStatement.size());
361-
assertEquals("SELECT my_col FROM my_table WHERE hgw=? AND device=?", preparedStatement.get(0));
363+
assertEquals(
364+
"SELECT my_col, WRITETIME(my_col) FROM my_table WHERE hgw=? AND device=?",
365+
preparedStatement.get(0));
362366
}
363367

364368
@Test
@@ -373,7 +377,9 @@ public void testGetAttributeWildcardWithNonCharacterSuffix() {
373377
verify(statement).bind(eq("key"), eq("1:2"));
374378
assertNotNull("Bound statement cannot be null", boundStatement);
375379
assertEquals(1, preparedStatement.size());
376-
assertEquals("SELECT my_col FROM my_table WHERE hgw=? AND device=?", preparedStatement.get(0));
380+
assertEquals(
381+
"SELECT my_col, WRITETIME(my_col) FROM my_table WHERE hgw=? AND device=?",
382+
preparedStatement.get(0));
377383
}
378384

379385
@Test
@@ -388,7 +394,7 @@ public void testListWildcardWithoutStart() {
388394
assertNotNull("Bound statement cannot be null", boundStatement);
389395
assertEquals(1, preparedStatement.size());
390396
assertEquals(
391-
"SELECT device, my_col FROM my_table WHERE hgw=? AND device>? LIMIT ?",
397+
"SELECT device, my_col, WRITETIME(my_col) FROM my_table WHERE hgw=? AND device>? LIMIT ?",
392398
preparedStatement.get(0));
393399
}
394400

@@ -404,7 +410,7 @@ public void testListWildcardWithStart() {
404410
assertNotNull("Bound statement cannot be null", boundStatement);
405411
assertEquals(1, preparedStatement.size());
406412
assertEquals(
407-
"SELECT device, my_col FROM my_table WHERE hgw=? AND device>? LIMIT ?",
413+
"SELECT device, my_col, WRITETIME(my_col) FROM my_table WHERE hgw=? AND device>? LIMIT ?",
408414
preparedStatement.get(0));
409415
}
410416

@@ -437,7 +443,7 @@ public void testListWildcardWithExplicitSecondaryField() {
437443
assertNotNull("Bound statement cannot be null", boundStatement);
438444
assertEquals(1, preparedStatement.size());
439445
assertEquals(
440-
"SELECT stamp, my_col FROM my_table WHERE hgw=? AND stamp>? LIMIT ?",
446+
"SELECT stamp, my_col, WRITETIME(my_col) FROM my_table WHERE hgw=? AND stamp>? LIMIT ?",
441447
preparedStatement.get(0));
442448
}
443449

rpc/src/main/proto/rpc.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,11 @@ message GetResponse {
197197
**/
198198
bytes value = 3;
199199

200+
/**
201+
* Timestamp in ms.
202+
**/
203+
uint64 stamp = 4;
204+
200205
}
201206

202207
/**
@@ -241,6 +246,9 @@ message ListResponse {
241246
/** The value of the attribute. */
242247
bytes value = 2;
243248

249+
/** Timestamp */
250+
uint64 stamp = 3;
251+
244252
}
245253

246254
/**

0 commit comments

Comments
 (0)