11from typing import (
2+ Callable ,
23 Optional ,
34 Sequence ,
45 Tuple ,
56 Union
67)
78from datetime import date , timedelta
89from epiweeks import Week , Year
9-
10+ import logging
1011
1112def time_value_to_date (value : int ) -> date :
1213 year , month , day = value // 10000 , (value % 10000 ) // 100 , value % 100
@@ -26,7 +27,7 @@ def week_value_to_week(value: int) -> Week:
2627
2728def guess_time_value_is_day (value : int ) -> bool :
2829 # YYYYMMDD type and not YYYYMM
29- return len (str (value )) > 6
30+ return len (str (value )) == 8
3031
3132def guess_time_value_is_week (value : int ) -> bool :
3233 # YYYYWW type and not YYYYMMDD
@@ -77,7 +78,7 @@ def weeks_in_range(week_range: Tuple[int, int]) -> int:
7778 acc += year .totalweeks ()
7879 return acc + 1 # same week should lead to 1 week that will be queried
7980
80- def dates_to_ranges (values : Optional [Sequence [Union [Tuple [int , int ], int ]]]) -> Optional [Sequence [Union [Tuple [int , int ], int ]]]:
81+ def time_values_to_ranges (values : Optional [Sequence [Union [Tuple [int , int ], int ]]]) -> Optional [Sequence [Union [Tuple [int , int ], int ]]]:
8182 """
8283 Converts a mixed list of dates and date ranges to an optimized list where dates are merged into ranges where possible.
8384 e.g. [20200101, 20200102, (20200101, 20200104), 20200106] -> [(20200101, 20200104), 20200106]
@@ -87,84 +88,55 @@ def dates_to_ranges(values: Optional[Sequence[Union[Tuple[int, int], int]]]) ->
8788 return values
8889
8990 # determine whether the list is of days (YYYYMMDD) or weeks (YYYYWW) based on first element
90- try :
91- if (isinstance (values [0 ], tuple ) and guess_time_value_is_day (values [0 ][0 ]))\
92- or (isinstance (values [0 ], int ) and guess_time_value_is_day (values [0 ])):
93- return days_to_ranges (values )
94- elif (isinstance (values [0 ], tuple ) and guess_time_value_is_week (values [0 ][0 ]))\
95- or (isinstance (values [0 ], int ) and guess_time_value_is_week (values [0 ])):
96- return weeks_to_ranges (values )
97- else :
98- return values
99- except :
91+ first_element = values [0 ][0 ] if isinstance (values [0 ], tuple ) else values [0 ]
92+ if guess_time_value_is_day (first_element ):
93+ return days_to_ranges (values )
94+ elif guess_time_value_is_week (first_element ):
95+ return weeks_to_ranges (values )
96+ else :
10097 return values
10198
10299def days_to_ranges (values : Sequence [Union [Tuple [int , int ], int ]]) -> Sequence [Union [Tuple [int , int ], int ]]:
103- intervals = []
104-
105- # populate list of intervals based on original values
106- for v in values :
107- if isinstance (v , int ):
108- # 20200101 -> [20200101, 20200101]
109- intervals .append ([time_value_to_date (v ), time_value_to_date (v )])
110- else : # tuple
111- # (20200101, 20200102) -> [20200101, 20200102]
112- intervals .append ([time_value_to_date (v [0 ]), time_value_to_date (v [1 ])])
113-
114- intervals .sort (key = lambda x : x [0 ])
115-
116- # merge overlapping intervals https://leetcode.com/problems/merge-intervals/
117- merged = []
118- for interval in intervals :
119- # no overlap, append the interval
120- # caveat: we subtract 1 from interval[0] so that contiguous intervals are considered overlapping. i.e. [1, 1], [2, 2] -> [1, 2]
121- if not merged or merged [- 1 ][1 ] < interval [0 ] - timedelta (days = 1 ):
122- merged .append (interval )
123- # overlap, merge the current and previous intervals
124- else :
125- merged [- 1 ][1 ] = max (merged [- 1 ][1 ], interval [1 ])
126-
127- # convert intervals from dates back to integers
128- ranges = []
129- for m in merged :
130- if m [0 ] == m [1 ]:
131- ranges .append (date_to_time_value (m [0 ]))
132- else :
133- ranges .append ((date_to_time_value (m [0 ]), date_to_time_value (m [1 ])))
134-
135- return ranges
100+ return _to_ranges (values , time_value_to_date , date_to_time_value , timedelta (days = 1 ))
136101
137102def weeks_to_ranges (values : Sequence [Union [Tuple [int , int ], int ]]) -> Sequence [Union [Tuple [int , int ], int ]]:
138- intervals = []
139-
140- # populate list of intervals based on original values
141- for v in values :
142- if isinstance (v , int ):
143- # 202001 -> [202001, 202001]
144- intervals .append ([week_value_to_week (v ), week_value_to_week (v )])
145- else : # tuple
146- # (202001, 202002) -> [202001, 202002]
147- intervals .append ([week_value_to_week (v [0 ]), week_value_to_week (v [1 ])])
148-
149- intervals .sort (key = lambda x : x [0 ])
150-
151- # merge overlapping intervals https://leetcode.com/problems/merge-intervals/
152- merged = []
153- for interval in intervals :
154- # no overlap, append the interval
155- # caveat: we subtract 1 from interval[0] so that contiguous intervals are considered overlapping. i.e. [1, 1], [2, 2] -> [1, 2]
156- if not merged or merged [- 1 ][1 ] < interval [0 ] - 1 :
157- merged .append (interval )
158- # overlap, merge the current and previous intervals
159- else :
160- merged [- 1 ][1 ] = max (merged [- 1 ][1 ], interval [1 ])
161-
162- # convert intervals from weeks back to integers
163- ranges = []
164- for m in merged :
165- if m [0 ] == m [1 ]:
166- ranges .append (week_to_time_value (m [0 ]))
167- else :
168- ranges .append ((week_to_time_value (m [0 ]), week_to_time_value (m [1 ])))
169-
170- return ranges
103+ return _to_ranges (values , week_value_to_week , week_to_time_value , 1 )
104+
105+ def _to_ranges (values : Sequence [Union [Tuple [int , int ], int ]], value_to_date : Callable , date_to_value : Callable , time_unit : Union [int , timedelta ]) -> Sequence [Union [Tuple [int , int ], int ]]:
106+ try :
107+ intervals = []
108+
109+ # populate list of intervals based on original date/week values
110+ for v in values :
111+ if isinstance (v , int ):
112+ # 20200101 -> [20200101, 20200101]
113+ intervals .append ([value_to_date (v ), value_to_date (v )])
114+ else : # tuple
115+ # (20200101, 20200102) -> [20200101, 20200102]
116+ intervals .append ([value_to_date (v [0 ]), value_to_date (v [1 ])])
117+
118+ intervals .sort ()
119+
120+ # merge overlapping intervals https://leetcode.com/problems/merge-intervals/
121+ merged = []
122+ for interval in intervals :
123+ # no overlap, append the interval
124+ # caveat: we subtract 1 from interval[0] so that contiguous intervals are considered overlapping. i.e. [1, 1], [2, 2] -> [1, 2]
125+ if not merged or merged [- 1 ][1 ] < interval [0 ] - time_unit :
126+ merged .append (interval )
127+ # overlap, merge the current and previous intervals
128+ else :
129+ merged [- 1 ][1 ] = max (merged [- 1 ][1 ], interval [1 ])
130+
131+ # convert intervals from dates/weeks back to integers
132+ ranges = []
133+ for m in merged :
134+ if m [0 ] == m [1 ]:
135+ ranges .append (date_to_value (m [0 ]))
136+ else :
137+ ranges .append ((date_to_value (m [0 ]), date_to_value (m [1 ])))
138+
139+ return ranges
140+ except Exception as e :
141+ logging .info ('bad input to date ranges' , input = values , exception = e )
142+ return values
0 commit comments