@@ -7,39 +7,14 @@ class EventRepository
77
88 def initialize ( model_factory : WithDefaultModels . new , serializer :)
99 @serializer = serializer
10- @event_klass , @stream_klass = model_factory . call
11- if serializer == NULL && json_data_type?
12- warn <<~MSG
13- The data or metadata column is of a JSON/B type and expects a JSON string.
14-
15- Yet the repository serializer is configured as #{ serializer } and it would not
16- produce the expected JSON string.
17-
18- In ActiveRecord there's an implicit serialization to JSON for JSON/B column types
19- that made it work so far. This behaviour is unfortunately also a source of undesired
20- double serialization — first in the EventRepository, second in the ActiveRecord.
21-
22- In the past we've advised workarounds that introduced configuration incosistency
23- with other data types and serialization formats, i.e. explicitly passing NULL serializer
24- just for the JSON/B data types.
25-
26- As of now this special ActiveRecord behaviour is disabled. You should be using JSON
27- serializer back again:
28-
29- RubyEventStore::ActiveRecord::EventRepository.new(serializer: JSON)
30- MSG
31- else
32- @event_klass . include ( SkipJsonSerialization )
33- end
34- @repo_reader = EventRepositoryReader . new ( @event_klass , @stream_klass , serializer )
35- @index_violation_detector = IndexViolationDetector . new ( @event_klass . table_name , @stream_klass . table_name )
10+ @model_factory = model_factory
3611 end
3712
3813 def rescue_from_double_json_serialization!
39- if @serializer == JSON && json_data_type?
40- @ repo_reader. instance_eval { alias __record__ record }
14+ if @serializer == JSON && json_data_type? ( event_klass )
15+ repo_reader . instance_eval { alias __record__ record }
4116
42- @ repo_reader. define_singleton_method :unwrap do |column_name , payload |
17+ repo_reader . define_singleton_method :unwrap do |column_name , payload |
4318 if String === payload && payload . start_with? ( "\{ " )
4419 warn "Double serialization of #{ column_name } column detected"
4520 @serializer . load ( payload )
@@ -48,7 +23,7 @@ def rescue_from_double_json_serialization!
4823 end
4924 end
5025
51- @ repo_reader. define_singleton_method :record do |record |
26+ repo_reader . define_singleton_method :record do |record |
5227 r = __record__ ( record )
5328
5429 Record . new (
@@ -76,31 +51,31 @@ def link_to_stream(event_ids, stream, expected_version)
7651 end
7752
7853 def delete_stream ( stream )
79- @ stream_klass. where ( stream : stream . name ) . delete_all
54+ stream_klass . where ( stream : stream . name ) . delete_all
8055 end
8156
8257 def has_event? ( event_id )
83- @ repo_reader. has_event? ( event_id )
58+ repo_reader . has_event? ( event_id )
8459 end
8560
8661 def last_stream_event ( stream )
87- @ repo_reader. last_stream_event ( stream )
62+ repo_reader . last_stream_event ( stream )
8863 end
8964
9065 def read ( specification )
91- @ repo_reader. read ( specification )
66+ repo_reader . read ( specification )
9267 end
9368
9469 def count ( specification )
95- @ repo_reader. count ( specification )
70+ repo_reader . count ( specification )
9671 end
9772
9873 def update_messages ( records )
9974 hashes = records . map { |record | upsert_hash ( record , record . serialize ( @serializer ) ) }
10075 for_update = records . map ( &:event_id )
10176 start_transaction do
10277 existing =
103- @ event_klass
78+ event_klass
10479 . where ( event_id : for_update )
10580 . pluck ( :event_id , :id , :created_at )
10681 . reduce ( { } ) { |acc , ( event_id , id , created_at ) | acc . merge ( event_id => [ id , created_at ] ) }
@@ -109,31 +84,31 @@ def update_messages(records)
10984 h [ :id ] = existing . fetch ( h . fetch ( :event_id ) ) . at ( 0 )
11085 h [ :created_at ] = existing . fetch ( h . fetch ( :event_id ) ) . at ( 1 )
11186 end
112- @ event_klass. upsert_all ( hashes )
87+ event_klass . upsert_all ( hashes )
11388 end
11489 end
11590
11691 def streams_of ( event_id )
117- @ repo_reader. streams_of ( event_id )
92+ repo_reader . streams_of ( event_id )
11893 end
11994
12095 def position_in_stream ( event_id , stream )
121- @ repo_reader. position_in_stream ( event_id , stream )
96+ repo_reader . position_in_stream ( event_id , stream )
12297 end
12398
12499 def global_position ( event_id )
125- @ repo_reader. global_position ( event_id )
100+ repo_reader . global_position ( event_id )
126101 end
127102
128103 def event_in_stream? ( event_id , stream )
129- @ repo_reader. event_in_stream? ( event_id , stream )
104+ repo_reader . event_in_stream? ( event_id , stream )
130105 end
131106
132107 private
133108
134109 def add_to_stream ( event_ids , stream , expected_version )
135110 last_stream_version = -> ( stream_ ) do
136- @ stream_klass. where ( stream : stream_ . name ) . order ( "position DESC" ) . first . try ( :position )
111+ stream_klass . where ( stream : stream_ . name ) . order ( "position DESC" ) . first . try ( :position )
137112 end
138113 resolved_version = expected_version . resolve_for ( stream , last_stream_version )
139114
@@ -148,7 +123,7 @@ def add_to_stream(event_ids, stream, expected_version)
148123 created_at : Time . now . utc ,
149124 }
150125 end
151- @ stream_klass. insert_all! ( in_stream ) unless stream . global?
126+ stream_klass . insert_all! ( in_stream ) unless stream . global?
152127 end
153128 self
154129 rescue ::ActiveRecord ::RecordNotUnique => e
@@ -165,7 +140,7 @@ def compute_position(resolved_version, index)
165140 end
166141
167142 def detect_index_violated ( message )
168- @ index_violation_detector. detect ( message )
143+ index_violation_detector . detect ( message )
169144 end
170145
171146 def insert_hash ( record , serialized_record )
@@ -194,11 +169,11 @@ def optimize_timestamp(valid_at, created_at)
194169 end
195170
196171 def start_transaction ( &block )
197- @ event_klass. transaction ( requires_new : true , &block )
172+ event_klass . transaction ( requires_new : true , &block )
198173 end
199174
200175 def link_to_stream_ ( event_ids , stream , expected_version )
201- ( event_ids - @ event_klass. where ( event_id : event_ids ) . pluck ( :event_id ) ) . each { |id | raise EventNotFound . new ( id ) }
176+ ( event_ids - event_klass . where ( event_id : event_ids ) . pluck ( :event_id ) ) . each { |id | raise EventNotFound . new ( id ) }
202177 add_to_stream ( event_ids , stream , expected_version )
203178 end
204179
@@ -209,11 +184,50 @@ def append_to_stream_(records, stream, expected_version)
209184 hashes << insert_hash ( record , record . serialize ( @serializer ) )
210185 event_ids << record . event_id
211186 end
212- add_to_stream ( event_ids , stream , expected_version ) { @event_klass . insert_all! ( hashes ) }
187+ add_to_stream ( event_ids , stream , expected_version ) { event_klass . insert_all! ( hashes ) }
188+ end
189+
190+ def model_klasses
191+ @model_klasses ||= @model_factory . call . tap do |event_klass , stream_klass |
192+ if @serializer == NULL && json_data_type? ( event_klass )
193+ warn <<~MSG
194+ The data or metadata column is of a JSON/B type and expects a JSON string.
195+
196+ Yet the repository serializer is configured as #{ @serializer } and it would not
197+ produce the expected JSON string.
198+
199+ In ActiveRecord there's an implicit serialization to JSON for JSON/B column types
200+ that made it work so far. This behaviour is unfortunately also a source of undesired
201+ double serialization — first in the EventRepository, second in the ActiveRecord.
202+
203+ In the past we've advised workarounds that introduced configuration incosistency
204+ with other data types and serialization formats, i.e. explicitly passing NULL serializer
205+ just for the JSON/B data types.
206+
207+ As of now this special ActiveRecord behaviour is disabled. You should be using JSON
208+ serializer back again:
209+
210+ RubyEventStore::ActiveRecord::EventRepository.new(serializer: JSON)
211+ MSG
212+ else
213+ event_klass . include ( SkipJsonSerialization )
214+ end
215+ end
216+ end
217+
218+ def event_klass = model_klasses . first
219+ def stream_klass = model_klasses . last
220+
221+ def repo_reader
222+ @repo_reader ||= EventRepositoryReader . new ( event_klass , stream_klass , @serializer )
223+ end
224+
225+ def index_violation_detector
226+ @index_violation_detector ||= IndexViolationDetector . new ( event_klass . table_name , stream_klass . table_name )
213227 end
214228
215- def json_data_type?
216- %i[ data metadata ] . any? { |attr | @event_klass . column_for_attribute ( attr ) . type . start_with? ( "json" ) }
229+ def json_data_type? ( klass )
230+ %i[ data metadata ] . any? { |attr | klass . column_for_attribute ( attr ) . type . start_with? ( "json" ) }
217231 end
218232 end
219233 end
0 commit comments