@@ -36,8 +36,29 @@ import (
3636func init () {
3737 register.DoFn2x0 [[]byte , func (string , int )](& inputFn [string , int ]{})
3838 register.DoFn6x0 [beam.Window , state.Provider , timers.Provider , string , int , func (kv [string , int ])](& eventTimeFn {})
39+ register.DoFn5x0 [beam.Window , timers.Provider , string , int , func (int )](& eventTimeFnWithOutputTimestamp {})
40+ register.DoFn3x0 [beam.EventTime , int , func (int )](& checkTimestampFn {})
3941 register .Emitter2 [string , int ]()
40- register .Emitter1 [kv [string , int ]]()
42+ register .Emitter1 [int ]()
43+ }
44+
45+ // checkTimestampFn validates that elements arrived at the expected timestamp.
46+ type checkTimestampFn struct {
47+ Timestamp int64 // millisecond epoch
48+ ExpectMaxTimestamp bool
49+ }
50+
51+ func (fn * checkTimestampFn ) ProcessElement (ts beam.EventTime , val int , emit func (int )) {
52+ if fn .ExpectMaxTimestamp {
53+ if mtime .Time (ts ) != mtime .MaxTimestamp {
54+ panic (fmt .Errorf ("timestamp mismatch: got %v, want %v (MaxTimestamp)" , ts , mtime .MaxTimestamp ))
55+ }
56+ } else {
57+ if got := int64 (ts ); got != int64 (mtime .FromMilliseconds (fn .Timestamp )) {
58+ panic (fmt .Errorf ("timestamp mismatch: got %v, want %v (as mtime)" , got , fn .Timestamp ))
59+ }
60+ }
61+ emit (val )
4162}
4263
4364type kv [K , V any ] struct {
@@ -154,6 +175,97 @@ func TimersEventTimeUnbounded(s beam.Scope) {
154175 })(s )
155176}
156177
178+ type eventTimeFnWithOutputTimestamp struct {
179+ Callback timers.EventTime
180+
181+ Offset int
182+ TimerOutput int
183+ OutputTimestamp int64 // millisecond epoch
184+ NoOutputTimestamp bool
185+ }
186+
187+ func (fn * eventTimeFnWithOutputTimestamp ) ProcessElement (w beam.Window , tp timers.Provider , key string , value int , emit func (int )) {
188+ if fn .NoOutputTimestamp {
189+ fn .Callback .Set (tp , w .MaxTimestamp ().ToTime (), timers .WithNoOutputTimestamp ())
190+ } else {
191+ fn .Callback .Set (tp , w .MaxTimestamp ().ToTime (), timers .WithOutputTimestamp (time .UnixMilli (fn .OutputTimestamp )))
192+ }
193+ }
194+
195+ func (fn * eventTimeFnWithOutputTimestamp ) OnTimer (ctx context.Context , ts beam.EventTime , tp timers.Provider , key string , timer timers.Context , emit func (int )) {
196+ if fn .Callback .Family != timer .Family || timer .Tag != "" {
197+ panic ("unexpected timer, family: " + timer .Family + " tag:" + timer .Tag + " want: " + fn .Callback .Family + ", for key:" + key )
198+ }
199+ emit (fn .TimerOutput )
200+ }
201+
202+ // timersEventTimePipelineBuilderWithOutputTimestamp validates EventTime timers with explicit output timestamp.
203+ func timersEventTimePipelineBuilderWithOutputTimestamp (makeImp func (s beam.Scope ) beam.PCollection ) func (s beam.Scope ) {
204+ return func (s beam.Scope ) {
205+ var inputs []kv [string , int ]
206+
207+ offset := 5000
208+ timerOutput := 4093
209+ outputTimestamp := int64 (1234567890000 )
210+
211+ inputs = append (inputs , kvfn ("key" , 0 ))
212+ imp := makeImp (s )
213+
214+ keyed := beam .ParDo (s , & inputFn [string , int ]{
215+ Inputs : inputs ,
216+ }, imp )
217+ times := beam .ParDo (s , & eventTimeFnWithOutputTimestamp {
218+ Offset : offset ,
219+ TimerOutput : timerOutput ,
220+ OutputTimestamp : outputTimestamp ,
221+ Callback : timers .InEventTime ("Callback" ),
222+ }, keyed )
223+
224+ // Check that the output element has the expected timestamp.
225+ validatedTimestamps := beam .ParDo (s , & checkTimestampFn {Timestamp : outputTimestamp }, times )
226+ wantOutputs := []int {timerOutput }
227+ passert .EqualsList (s , validatedTimestamps , wantOutputs )
228+ }
229+ }
230+
231+ // timersEventTimePipelineBuilderWithNoOutputTimestamp validates EventTime timers with no output timestamp.
232+ func timersEventTimePipelineBuilderWithNoOutputTimestamp (makeImp func (s beam.Scope ) beam.PCollection ) func (s beam.Scope ) {
233+ return func (s beam.Scope ) {
234+ var inputs []kv [string , int ]
235+
236+ offset := 5000
237+ timerOutput := 4093
238+ inputs = append (inputs , kvfn ("key" , 0 ))
239+
240+ imp := makeImp (s )
241+
242+ keyed := beam .ParDo (s , & inputFn [string , int ]{
243+ Inputs : inputs ,
244+ }, imp )
245+ times := beam .ParDo (s , & eventTimeFnWithOutputTimestamp {
246+ Offset : offset ,
247+ TimerOutput : timerOutput ,
248+ NoOutputTimestamp : true ,
249+ Callback : timers .InEventTime ("Callback" ),
250+ }, keyed )
251+
252+ // Check that the output element has MaxTimestamp.
253+ validatedTimestamps := beam .ParDo (s , & checkTimestampFn {ExpectMaxTimestamp : true }, times )
254+ wantOutputs := []int {timerOutput }
255+ passert .EqualsList (s , validatedTimestamps , wantOutputs )
256+ }
257+ }
258+
259+ // TimersEventTime_WithOutputTimestamp validates event time timers with explicit output timestamp.
260+ func TimersEventTime_WithOutputTimestamp (s beam.Scope ) {
261+ timersEventTimePipelineBuilderWithOutputTimestamp (beam .Impulse )(s )
262+ }
263+
264+ // TimersEventTime_WithNoOutputTimestamp validates event time timers with no output timestamp.
265+ func TimersEventTime_WithNoOutputTimestamp (s beam.Scope ) {
266+ timersEventTimePipelineBuilderWithNoOutputTimestamp (beam .Impulse )(s )
267+ }
268+
157269// Below here are tests for ProcessingTime timers.
158270
159271func init () {
0 commit comments