Skip to content

Commit

Permalink
Fix source record timestamp -- seconds to milliseconds (#26)
Browse files Browse the repository at this point in the history
* Update redis source timestamp from seconds to milliseconds (#1)

* updated source record converter timestamp

* removed logging

* updated version and changelog

* fixed changelog message improvement

Co-authored-by: Jared Petersen <[email protected]>
  • Loading branch information
padariya-shraddha and jaredpetersen authored Mar 16, 2022
1 parent 69baf71 commit 5341700
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 3 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.2.3] - 2022-03-15
### Fixed
- Fixed a bug with record timestamps for the source connector -- switched from epoch seconds (incorrect) to epoch milliseconds

## [1.2.2] - 2021-07-22
### Changed
- Use capitalization in log messages
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>io.github.jaredpetersen</groupId>
<artifactId>kafka-connect-redis</artifactId>
<version>1.2.2</version>
<version>1.2.3</version>
<packaging>jar</packaging>

<name>Kafka Redis Connector (Sink and Source)</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public SourceRecord convert(RedisMessage redisMessage) {
key,
VALUE_SCHEMA,
value,
Instant.now().getEpochSecond()
Instant.now().toEpochMilli()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ void convertTransformsRedisMessageToSourceRecord() {
assertEquals(redisMessage.getPattern(), ((Struct) sourceRecord.key()).getString("pattern"));
assertEquals(Schema.Type.STRUCT, sourceRecord.valueSchema().type());
assertEquals(redisMessage.getMessage(), ((Struct) sourceRecord.value()).getString("message"));
assertTrue(sourceRecord.timestamp() <= Instant.now().getEpochSecond());
assertTrue(sourceRecord.timestamp() <= Instant.now().toEpochMilli());
}
}

0 comments on commit 5341700

Please sign in to comment.