-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathw_clock.py
250 lines (195 loc) · 6.44 KB
/
w_clock.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
#====================================================================
# w_clock.py 7-Segment LED Clock with mqtt control
#
# scase 11/25/22 From node_timer.py programs
# 03/14/25 Was node_clock2.py in sandbox
# 03/29/25 Added NTP from uPython library
# and removed the manual timesetting feature
#
#====================================================================
from umqtt.simple import MQTTClient
from machine import Pin, RTC, SoftSPI, Timer
import ubinascii
import time
import json
import network
import machine
import max7219
import ntptime
#----------------------------------------------------------------------#
# Pin definitions on GPS Clock board
#
# Pin 2 - Blue led on ESP32
# Pin 4 - On board yellow led
# Pin 23 - CS for SPI on MAX7219 LED board
# Pin 25 - TP1 Output pins
# Pin 26 - TP3
# Pin 27 - TP5 Using for timeset switch
ledB = Pin(2, Pin.OUT)
ledB.value(0)
ledY = Pin(4, Pin.OUT)
timeset = Pin(27, Pin.IN, Pin.PULL_UP)
ledY.value(1)
hours = 0
mins = 0
seconds = 0
times_up = 0
onthehour = True
#----------------------------------------------------------------------#
# Read json configuration file
#
with open('config.json') as fptr:
config = json.load(fptr)
SERVER = config["server"]
SSID = config["ssid"]
PWD = config["password"]
GMTOFFSET = config["gmtoffset"]
pflag = config["printflag"]
print("Configuration : --------------------------------------- ")
print(config)
gmt_correction = GMTOFFSET + 1
# Default MQTT server to connect to
CLIENT_ID = ubinascii.hexlify(machine.unique_id())
rtc = RTC()
# The real time clock keeps time used by the display
# The RTC is initialized by the server over MQTT messages, with the
# server time syncronized to NTP.
# The RTC time is re-syncronized by a one-hour timer requesting an update
#-------------------------------------------------------------------------#
def do_message(topic, msg):
global hours
global mins
global seconds
global pflag
if pflag:
print("Topic %s Message %s " % (topic, msg) )
if b'ntpSEC' in topic :
mess = str(msg, 'UTF-8')
seconds = int(float(mess))
elif b'ntpMIN' in topic :
mess = str(msg, 'UTF-8')
mins = int(float(mess))
elif b'ntpHOUR' in topic :
mess = str(msg, 'UTF-8')
hours = int(float(mess))
else :
print("Invalid MQTT command decode ")
rtc.datetime((2022, 11, 11, 1, hours, mins, seconds,0))
#--------------------------------------------------------------------------#
def sub_topic(TOPIC, pflag) :
global server
c.subscribe(TOPIC)
if pflag :
print("Connected to %s, subscribed to %s topic" % (server, TOPIC))
#--------------------------------------------------------------------------#
def init_display() :
# Clear the display and set a test pattern after boot
# display.clear()
for i in list(range(1,9)):
display.register(i, 0XF) # this register instruction 0XF clears the
# digit at location i
#time.sleep(1)
for i in list((3, 6)): # write the dashes on the dispay __-__-__
display.register(i, 0xA) # 0xA is the code for "-"
#-------------------------------------------------------------------------#
# These are the ISRs for the 1 second and hour timers
def flip_led(tim1):
global times_up # *** TODO - not sure if we need this
if (not times_up) :
write_time()
else :
init_display()
def hour_timer(tim2):
global onthehour
# set the onthehour flag so that RTC can be updated
onthehour = True
#-------------------------------------------------------------------------#
# take time units and place on clock digits
def write_time():
global hours
global seconds
global mins
global times_up
t = rtc.datetime() # take the current time from the RTC
seconds = t[6] # 5th tuple is seconds
mins = t[5]
hours = t[4]
if (seconds == 59) :
seconds = 0
mins += 1
else :
seconds += 1
if (mins == 59) :
mins = 0
hours += 1
if (hours > 23):
hours = 0
sec_tens = seconds // 10
sec_units = seconds % 10
min_tens = mins // 10
min_units = mins % 10
hour_tens = hours // 10
hour_units = hours % 10
display.register(8, hour_tens)
display.register(7, hour_units)
display.register(5, min_tens)
display.register(4, min_units)
display.register(2, sec_tens)
display.register(1, sec_units)
#-------------------------------------------------------------------------#
# get an offical update on the time from the NTP pool
# built-in micropython function
def set_NTP():
global GMTOFFSET
gmt_correction = GMTOFFSET
# get the time from the ntp pool
ntptime.settime()
rtime = time.localtime()
if rtime[7] > 306 : # fall back
gmt_correction = GMTOFFSET + 1
rtc.datetime((rtime[0],rtime[1],rtime[2],rtime[6],rtime[3] - gmt_correction ,rtime[4],rtime[5], 0))
# Initialization ------------------------------------------------------
#
time.sleep(0.5)
n=network.WLAN(network.STA_IF)
n.active(True)
n.connect(SSID,PWD)
while not n.isconnected() :
time.sleep(1)
print("WLAN:", n.ifconfig())
server=SERVER
c = MQTTClient(CLIENT_ID, server)
# Subscribed messages will be delivered to a callback
c.set_callback(do_message)
c.connect()
sub_topic("ntpHOUR", 0)
sub_topic("ntpMIN", 0)
sub_topic("ntpSEC", 0)
# setup pin that is the control stobe for the display
cs = Pin(23, Pin.OUT)
cs.off()
# init the display object and the spi interface
spi = SoftSPI(baudrate=10000000, polarity=1, phase=0, sck=Pin(21), mosi=Pin(22),miso=Pin(13))
display = max7219.Max7219(spi, cs)
init_display()
tim1 = Timer(1)
tim2 = Timer(2)
# standard time sync will come from internal ntp pool servers, not from mqtt servers time
# set the time and rtc from ntp server data
set_NTP()
tim1.init(period=1000, mode=Timer.PERIODIC, callback=flip_led)
tim2.init(period=3600000, mode=Timer.PERIODIC, callback=hour_timer)
# Main loop -----------------------------------------------------------
#
while 1:
print("*** in the inner loop ***")
try:
while 1:
if onthehour :
# flags are used because uPython is multi-threaded and putting
# a MQTT request inside a callback is not advised
set_NTP()
onthehour = False
c.check_msg()
finally:
c.disconnect()