-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdsupdt.py
More file actions
2456 lines (2175 loc) · 105 KB
/
dsupdt.py
File metadata and controls
2456 lines (2175 loc) · 105 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
#
##################################################################################
#
# Title: dsupdt
# Author: Zaihua Ji, zji@ucar.edu
# Date: 10/10/2020
# 2025-02-05 transferred to package rda_python_dsupdt from
# https://github.com/NCAR/rda-utility-programs.git
# Purpose: python utility program to download remote files,
# process downloaded files and create local file, and
# archive local files onto RDA Server
# save information of web online data files or Saved files into RDADB
#
# Github: https://github.com/NCAR/rda-python-dsupdt.git
#
##################################################################################
#
import sys
import os
import re
from os import path as op
from rda_python_common import PgLOG
from rda_python_common import PgSIG
from rda_python_common import PgLock
from rda_python_common import PgCMD
from rda_python_common import PgFile
from rda_python_common import PgUtil
from rda_python_common import PgOPT
from rda_python_common import PgDBI
from rda_python_common import PgSplit
from . import PgUpdt
TEMPINFO = {}
TOPMSG = SUBJECT = ACTSTR = None
ALLCNT = 0
DEFTYPES = {'WT' : 'D', 'ST' : 'P', 'QT' : 'B'}
#
# main function to run dsupdt
#
def main():
global SUBJECT
PgOPT.parsing_input('dsupdt')
PgUpdt.check_enough_options(PgOPT.PGOPT['CACT'], PgOPT.PGOPT['ACTS'])
start_action()
if SUBJECT and 'NE' not in PgOPT.params and (PgLOG.PGLOG['ERRCNT'] or 'EE' not in PgOPT.params):
SUBJECT += " on " + PgLOG.PGLOG['HOSTNAME']
PgLOG.set_email("{}: {}".format(SUBJECT, TOPMSG), PgLOG.EMLTOP)
if ACTSTR: SUBJECT = "{} for {}".format(ACTSTR, SUBJECT)
if PgSIG.PGSIG['PPID'] > 1: SUBJECT += " in CPID {}".format(PgSIG.PGSIG['PID'])
if PgLOG.PGLOG['ERRCNT'] > 0: SUBJECT += " With Error"
if PgLOG.PGLOG['DSCHECK']:
PgDBI.build_customized_email("dscheck", "einfo", "cindex = {}".format(PgLOG.PGLOG['DSCHECK']['cindex']),
SUBJECT, PgOPT.PGOPT['wrnlog'])
elif PgOPT.PGOPT['UCNTL']:
PgDBI.build_customized_email("dcupdt", "einfo", "cindex = {}".format(PgOPT.PGOPT['UCNTL']['cindex']),
SUBJECT, PgOPT.PGOPT['wrnlog'])
else:
PgLOG.pglog(SUBJECT, PgOPT.PGOPT['wrnlog']|PgLOG.SNDEML)
if PgLOG.PGLOG['DSCHECK']:
if PgLOG.PGLOG['ERRMSG']:
PgDBI.record_dscheck_error(PgLOG.PGLOG['ERRMSG'])
else:
PgCMD.record_dscheck_status("D")
if PgOPT.OPTS[PgOPT.PGOPT['CACT']][2]: PgLOG.cmdlog() # log end time if not getting only action
PgLOG.pgexit(0)
#
# start action of dsupdt
#
def start_action():
global ALLCNT
if PgOPT.PGOPT['ACTS']&PgOPT.OPTS['CU'][0]:
if 'CI' in PgOPT.params:
if PgUpdt.cache_update_control(PgOPT.params['CI'][0], 1):
check_dataset_status()
else:
ALLCNT = PgOPT.get_option_count(["ED", "EH"])
check_dataset_status(0)
elif PgOPT.PGOPT['ACTS'] == PgOPT.OPTS['DL'][0]:
if 'CI' in PgOPT.params:
ALLCNT = len(PgOPT.params['CI'])
delete_control_info()
elif 'RF' in PgOPT.params:
ALLCNT = len(PgOPT.params['RF'])
delete_remote_info()
else:
ALLCNT = len(PgOPT.params['LI'])
delete_local_info()
elif PgOPT.OPTS[PgOPT.PGOPT['CACT']][0]&PgOPT.OPTS['GA'][0]:
get_update_info()
elif PgOPT.PGOPT['CACT'] == 'PC':
process_update_controls()
elif PgOPT.PGOPT['ACTS'] == PgOPT.OPTS['SA'][0]:
if 'IF' not in PgOPT.params:
PgOPT.action_error("Missing input file via Option -IF")
if PgOPT.get_input_info(PgOPT.params['IF'], 'DCUPDT'):
PgUpdt.check_enough_options('SC', PgOPT.OPTS['SC'][0])
ALLCNT = len(PgOPT.params['CI'])
set_control_info()
if PgOPT.get_input_info(PgOPT.params['IF'], 'DLUPDT'):
PgUpdt.check_enough_options('SL', PgOPT.OPTS['SL'][0])
ALLCNT = len(PgOPT.params['LI'])
set_local_info()
if PgOPT.get_input_info(PgOPT.params['IF'], 'DRUPDT') and PgOPT.params['RF']:
PgUpdt.check_enough_options('SR', PgOPT.OPTS['SR'][0])
ALLCNT = len(PgOPT.params['RF']) if 'RF' in PgOPT.params else 0
set_remote_info()
elif PgOPT.PGOPT['ACTS'] == PgOPT.OPTS['SC'][0]:
ALLCNT = len(PgOPT.params['CI'])
set_control_info()
elif PgOPT.PGOPT['ACTS'] == PgOPT.OPTS['SL'][0]:
ALLCNT = len(PgOPT.params['LI'])
set_local_info()
elif PgOPT.PGOPT['ACTS'] == PgOPT.OPTS['SR'][0]:
ALLCNT = len(PgOPT.params['RF'])
set_remote_info()
elif PgOPT.PGOPT['ACTS']&PgOPT.OPTS['UF'][0]:
if 'CI' in PgOPT.params:
if PgUpdt.cache_update_control(PgOPT.params['CI'][0], 1): dataset_update()
else:
ALLCNT = PgOPT.get_option_count(["ED", "EH"])
dataset_update()
elif PgOPT.PGOPT['ACTS'] == PgOPT.OPTS['UL'][0]:
if 'CI' in PgOPT.params:
ALLCNT = len(PgOPT.params['CI'])
unlock_control_info()
if 'LI' in PgOPT.params:
ALLCNT = len(PgOPT.params['LI'])
unlock_update_info()
#
# delete update control records for given dsid and control indices
#
def delete_control_info():
s = 's' if ALLCNT > 1 else ''
PgLOG.pglog("Delete {} update control record{} ...".format(ALLCNT, s), PgLOG.WARNLG)
delcnt = modcnt = 0
for i in range(ALLCNT):
cidx = PgLock.lock_update_control(PgOPT.params['CI'][i], 2, PgOPT.PGOPT['extlog'])
if cidx <= 0: continue
ccnd = "cindex = {}".format(cidx)
delcnt += PgDBI.pgdel("dcupdt", ccnd, PgOPT.PGOPT['extlog'])
modcnt += PgDBI.pgexec("UPDATE dlupdt SET cindex = 0 WHERE " + ccnd, PgOPT.PGOPT['extlog'])
PgLOG.pglog("{} of {} update control record{} deleted".format(delcnt, ALLCNT, s), PgOPT.PGOPT['wrnlog'])
if modcnt > 0:
s = 's' if modcnt > 1 else ''
PgLOG.pglog("{} associated local file record{} modified".format(modcnt, s), PgOPT.PGOPT['wrnlog'])
#
# delete local files for given dsid and locfile indices
#
def delete_local_info():
s = 's' if ALLCNT > 1 else ''
PgLOG.pglog("Delete {} Locfile record{} ...".format(ALLCNT, s), PgLOG.WARNLG)
dcnt = delcnt = 0
for i in range(ALLCNT):
lidx = PgOPT.params['LI'][i]
lcnd = "lindex = {}".format(lidx)
if PgLock.lock_update(lidx, None, 2, PgOPT.PGOPT['errlog']) <= 0: continue
cnt = PgDBI.pgget("drupdt", "", lcnd, PgOPT.PGOPT['extlog'])
if cnt > 0:
ss = 's' if cnt > 1 else ''
PgLOG.pglog("Delete {} associated remote file record{} for Locfile index {} ...".format(cnt, ss, lidx), PgLOG.WARNLG)
dcnt += PgDBI.pgdel("drupdt", lcnd, PgOPT.PGOPT['extlog'])
delcnt += PgDBI.pgdel("dlupdt", lcnd, PgOPT.PGOPT['extlog'])
PgLOG.pglog("{} of {} Locfile record{} deleted".format(delcnt, ALLCNT, s), PgOPT.PGOPT['wrnlog'])
if dcnt > 0:
s = "s" if (dcnt > 1) else ""
PgLOG.pglog("{} associated Remote file record{} deleted too".format(dcnt, s), PgOPT.PGOPT['wrnlog'])
#
# delete update remote files for given dsid and remote files/locfile indices
#
def delete_remote_info():
s = 's' if ALLCNT > 1 else ''
PgLOG.pglog("Delete {} remote file record{} ...".format(ALLCNT, s), PgLOG.WARNLG)
PgOPT.validate_multiple_options(ALLCNT, ["LI", "DO"])
delcnt = 0
for i in range(ALLCNT):
lcnd = "lindex = {} AND remotefile = '{}'".format(PgOPT.params['LI'][i], PgOPT.params['RF'][i])
if 'DO' in PgOPT.params: lcnd += " AND dindex = {}".format(PgOPT.params['DO'][i])
delcnt += PgDBI.pgdel("drupdt", lcnd, PgOPT.PGOPT['extlog'])
PgLOG.pglog("{} of {} remote file record{} deleted".format(delcnt, ALLCNT, s), PgOPT.PGOPT['wrnlog'])
#
# get update control information
#
def get_control_info():
tname = "dcupdt"
hash = PgOPT.TBLHASH[tname]
PgLOG.pglog("Get update control info of {} from RDADB ...".format(PgOPT.params['DS']), PgLOG.WARNLG)
lens = fnames = None
if 'FN' in PgOPT.params: fnames = PgOPT.params['FN']
fnames = PgDBI.fieldname_string(fnames, PgOPT.PGOPT[tname], PgOPT.PGOPT['dcall'])
onames = PgOPT.params['ON'] if 'ON' in PgOPT.params else "C"
condition = PgUpdt.file_condition(tname) + PgOPT.get_order_string(onames, tname)
pgrecs = PgDBI.pgmget(tname, "*", condition, PgOPT.PGOPT['extlog'])
if pgrecs and 'FO' in PgOPT.params: lens = PgUtil.all_column_widths(pgrecs, fnames, hash)
PgOPT.OUTPUT.write("{}{}{}\n".format(PgOPT.OPTS['DS'][1], PgOPT.params['ES'], PgOPT.params['DS']))
if PgOPT.PGOPT['CACT'] == "GA": PgOPT.OUTPUT.write("[{}]\n".format(tname.upper()))
PgOPT.OUTPUT.write(PgOPT.get_string_titles(fnames, hash, lens) + "\n")
if pgrecs:
cnt = PgOPT.print_column_format(pgrecs, fnames, hash, lens)
s = 's' if cnt > 1 else ''
PgLOG.pglog("{} update control record{} retrieved".format(cnt, s), PgOPT.PGOPT['wrnlog'])
else:
PgLOG.pglog("no update control record retrieved", PgOPT.PGOPT['wrnlog'])
#
# get local file update information
#
def get_local_info():
tname = "dlupdt"
hash = PgOPT.TBLHASH[tname]
PgLOG.pglog("Get local file update info of {} from RDADB ...".format(PgOPT.params['DS']), PgLOG.WARNLG)
lens = fnames = None
if 'FN' in PgOPT.params: fnames = PgOPT.params['FN']
fnames = PgDBI.fieldname_string(fnames, PgOPT.PGOPT[tname], PgOPT.PGOPT['dlall'])
onames = PgOPT.params['ON'] if 'ON' in PgOPT.params else "XL"
condition = PgUpdt.file_condition(tname) + PgOPT.get_order_string(onames, tname)
pgrecs = PgDBI.pgmget(tname, "*", condition, PgOPT.PGOPT['extlog'])
if pgrecs and 'FO' in PgOPT.params: lens = PgUtil.all_column_widths(pgrecs, fnames, hash)
if PgOPT.PGOPT['CACT'] == "GL":
PgOPT.OUTPUT.write("{}{}{}\n".format(PgOPT.OPTS['DS'][1], PgOPT.params['ES'], PgOPT.params['DS']))
else:
PgOPT.OUTPUT.write("[{}]\n".format(tname.upper()))
PgOPT.OUTPUT.write(PgOPT.get_string_titles(fnames, hash, lens) + "\n")
if pgrecs:
cnt = PgOPT.print_column_format(pgrecs, fnames, hash, lens)
s = 's' if cnt > 1 else ''
PgLOG.pglog("{} locfile record{} retrieved".format(cnt, s), PgOPT.PGOPT['wrnlog'])
else:
PgLOG.pglog("no locfile record retrieved", PgOPT.PGOPT['wrnlog'])
#
# get remote file update information
#
def get_remote_info():
tname = "drupdt"
hash = PgOPT.TBLHASH[tname]
PgLOG.pglog("Get remote file update info of {} from RDADB ...".format(PgOPT.params['DS']), PgLOG.WARNLG)
lens = fnames = None
if 'FN' in PgOPT.params: fnames = PgOPT.params['FN']
fnames = PgDBI.fieldname_string(fnames, PgOPT.PGOPT[tname], PgOPT.PGOPT['drall'])
onames = PgOPT.params['ON'] if 'ON' in PgOPT.params else "LDF"
condition = PgUpdt.file_condition(tname) + PgOPT.get_order_string(onames, tname)
pgrecs = PgDBI.pgmget(tname, "*", condition, PgOPT.PGOPT['extlog'])
if pgrecs and 'FO' in PgOPT.params: lens = PgUtil.all_column_widths(pgrecs, fnames, hash)
if PgOPT.PGOPT['CACT'] == "GR":
PgOPT.OUTPUT.write("{}{}{}\n".format(PgOPT.OPTS['DS'][1], PgOPT.params['ES'], PgOPT.params['DS']))
else:
PgOPT.OUTPUT.write("[{}]\n".format(tname.upper()))
PgOPT.OUTPUT.write(PgOPT.get_string_titles(fnames, hash, lens) + "\n")
if pgrecs:
cnt = PgOPT.print_column_format(pgrecs, fnames, hash, lens)
s = 's' if cnt > 1 else ''
PgLOG.pglog("{} remote file record{} retrieved".format(cnt, s), PgOPT.PGOPT['wrnlog'])
else:
PgLOG.pglog("no remote file record retrieved", PgOPT.PGOPT['wrnlog'])
#
# add or modify update control information
#
def set_control_info():
tname = 'dcupdt'
s = 's' if ALLCNT > 1 else ''
PgLOG.pglog("Set {} update control record{} ...".format(ALLCNT, s), PgLOG.WARNLG)
addcnt = modcnt = 0
flds = PgOPT.get_field_keys(tname, None, 'C')
if not flds: return PgLOG.pglog("Nothing to set for update control!", PgOPT.PGOPT['errlog'])
PgOPT.validate_multiple_values(tname, ALLCNT, flds)
fields = PgOPT.get_string_fields(flds, tname)
for i in range(ALLCNT):
cidx = PgOPT.params['CI'][i]
if cidx > 0:
if PgLock.lock_update_control(cidx, 2, PgOPT.PGOPT['errlog']) <= 0: continue
cnd = "cindex = {}".format(cidx)
pgrec = PgDBI.pgget(tname, fields, cnd, PgOPT.PGOPT['errlog'])
if not pgrec: PgOPT.action_error("Error get update control record for " + cnd)
else:
pgrec = None
record = PgOPT.build_record(flds, pgrec, tname, i)
if record:
if 'pindex' in record and record['pindex'] and not PgDBI.pgget("dcupdt", "", "cindex = {}".format(record['pindex'])):
PgOPT.action_error("Parent control Index {} is not in RDADB".format(record['pindex']))
if 'action' in record and not re.match(r'^({})$'.format(PgOPT.PGOPT['UPDTACTS']), record['action']):
PgOPT.action_error("Action Name '{}' must be one of dsupdt Actions ({})".format(record['action'], PgOPT.PGOPT['UPDTACTS']))
if pgrec:
record['pid'] = 0
record['lockhost'] = ''
modcnt += PgDBI.pgupdt(tname, record, cnd, PgOPT.PGOPT['errlog']|PgLOG.DODFLT)
else:
record['dsid'] = PgOPT.params['DS']
if 'specialist' not in record: record['specialist'] = PgOPT.params['LN']
addcnt += PgDBI.pgadd(tname, record, PgOPT.PGOPT['errlog']|PgLOG.DODFLT)
elif cidx: # unlock
PgLock.lock_update_control(cidx, 0, PgOPT.PGOPT['errlog'])
PgLOG.pglog("{}/{} of {} control record{} added/modified".format(addcnt, modcnt, ALLCNT, s), PgOPT.PGOPT['wrnlog'])
#
# add or modify local file update information
#
def set_local_info():
tname = 'dlupdt'
s = 's' if ALLCNT > 1 else ''
PgLOG.pglog("Set {} local file record{} ...".format(ALLCNT, s), PgLOG.WARNLG)
addcnt = modcnt = 0
flds = PgOPT.get_field_keys(tname, None, 'L')
if 'RO' in PgOPT.params and 'XO' not in PgOPT.params: flds += 'X'
if not flds: return PgLOG.pglog("Nothing to set for update local file!", PgOPT.PGOPT['errlog'])
PgOPT.validate_multiple_values(tname, ALLCNT, flds)
fields = PgOPT.get_string_fields(flds, tname)
for i in range(ALLCNT):
lidx = PgOPT.params['LI'][i]
if lidx > 0:
if PgLock.lock_update(lidx, None, 2, PgOPT.PGOPT['errlog']) <= 0: continue
cnd = "lindex = {}".format(lidx)
pgrec = PgDBI.pgget(tname, fields, cnd, PgOPT.PGOPT['errlog'])
if not pgrec: PgOPT.action_error("Error get Local file record for " + cnd)
else:
pgrec = None
if 'RO' in PgOPT.params: PgOPT.params['XO'][i] = PgUpdt.get_next_exec_order(PgOPT.params['DS'], 0)
record = PgOPT.build_record(flds, pgrec, tname, i)
if record:
if 'cindex' in record and record['cindex'] and not PgDBI.pgget("dcupdt", "", "cindex = {}".format(record['cindex'])):
PgOPT.action_error("Update control Index {} is not in RDADB".format(record['cindex']))
if 'action' in record and not re.match(r'^({})$'.format(PgOPT.PGOPT['ARCHACTS']), record['action']):
PgOPT.action_error("Action Name '{}' must be one of dsarch Actions ({})".format(record['action'], PgOPT.PGOPT['ARCHACTS']))
if pgrec:
if 'VI' in record and not record['VI'] and pgrec['missdate']: record['missdate'] = record['misshour'] = None
record['pid'] = 0
record['hostname'] = 0
modcnt += PgDBI.pgupdt(tname, record, cnd, PgOPT.PGOPT['errlog']|PgLOG.DODFLT)
else:
record['dsid'] = PgOPT.params['DS']
if 'specialist' not in record: record['specialist'] = PgOPT.params['LN']
if 'execorder' not in record: record['execorder'] = PgUpdt.get_next_exec_order(PgOPT.params['DS'], 1)
addcnt += PgDBI.pgadd(tname, record, PgOPT.PGOPT['errlog']|PgLOG.DODFLT)
elif lidx: # unlock
PgLock.lock_update(lidx, None, 0, PgOPT.PGOPT['errlog'])
PgLOG.pglog("{}/{} of {} Locfile record{} added/modified".format(addcnt, modcnt, ALLCNT, s), PgOPT.PGOPT['wrnlog'])
#
# add or modify remote file update information
#
def set_remote_info():
tname = 'drupdt'
s = 's' if ALLCNT > 1 else ''
PgLOG.pglog("Set {} update remote file{} ...".format(ALLCNT, s), PgLOG.WARNLG)
addcnt = modcnt = 0
flds = PgOPT.get_field_keys(tname)
if not flds: return PgLOG.pglog("Nothing to set for update remote file!", PgOPT.PGOPT['errlog'])
PgOPT.validate_multiple_values(tname, ALLCNT, flds)
fields = PgOPT.get_string_fields(flds, tname)
for i in range(ALLCNT):
lidx = PgOPT.params['LI'][i]
didx = PgOPT.params['DO'][i] if 'DO' in PgOPT.params else 0
cnd = "lindex = {} AND remotefile = '{}' AND dindex = {}".format(lidx, PgOPT.params['RF'][i], didx)
pgrec = PgDBI.pgget("drupdt", fields, cnd, PgOPT.PGOPT['errlog'])
record = PgOPT.build_record(flds, pgrec, tname, i)
if record:
if 'lindex' in record and record['lindex'] and not PgDBI.pgget("dlupdt", "", "lindex = {}".format(record['lindex'])):
PgOPT.action_error("Local file Index {} is not in RDADB".format(record['lindex']))
if pgrec:
modcnt += PgDBI.pgupdt("drupdt", record, cnd, PgOPT.PGOPT['errlog']|PgLOG.DODFLT)
else:
record['lindex'] = lidx
record['dsid'] = PgOPT.params['DS']
addcnt += PgDBI.pgadd("drupdt", record, PgOPT.PGOPT['errlog']|PgLOG.DODFLT)
PgLOG.pglog("{}/{} of {} remote file record{} added/modified".format(addcnt, modcnt, ALLCNT, s), PgOPT.PGOPT['wrnlog'])
#
# unlock update records for given locfile indices
#
def unlock_update_info():
s = 's' if ALLCNT > 1 else ''
PgLOG.pglog("Unlock {} update locfile{} ...".format(ALLCNT, s), PgLOG.WARNLG)
modcnt = 0
for lidx in PgOPT.params['LI']:
cnd = "lindex = {}".format(lidx)
pgrec = PgDBI.pgget("dlupdt", "pid, hostname", cnd, PgOPT.PGOPT['extlog'])
if not pgrec:
PgLOG.pglog("{}: Local File Not exists".format(lidx), PgOPT.PGOPT['errlog'])
elif not pgrec['pid']:
PgLOG.pglog("{}: Local File Not locked".format(lidx), PgOPT.PGOPT['wrnlog'])
elif PgLock.lock_update(lidx, None, -1, PgOPT.PGOPT['errlog']) > 0:
modcnt += 1
PgLOG.pglog("{}: Local File Unlocked {}/{}".format(lidx, pgrec['pid'], pgrec['hostname']), PgOPT.PGOPT['wrnlog'])
elif (PgFile.check_host_down(None, pgrec['hostname']) and
PgLock.lock_update(lidx, None, -2, PgOPT.PGOPT['errlog']) > 0):
modcnt += 1
PgLOG.pglog("{}: Local File Force unlocked {}/{}".format(lidx, pgrec['pid'], pgrec['hostname']), PgOPT.PGOPT['wrnlog'])
else:
PgLOG.pglog("{}: Local File Unable to unlock {}/{}".format(lidx, pgrec['pid'], pgrec['hostname']), PgOPT.PGOPT['wrnlog'])
PgLOG.pglog("{} of {} local file record{} unlocked from RDADB".format(modcnt, ALLCNT, s), PgLOG.LOGWRN)
#
# unlock update control records for given locfile indices
#
def unlock_control_info():
s = 's' if ALLCNT > 1 else ''
PgLOG.pglog("Unlock {} update control{} ...".format(ALLCNT, s), PgLOG.WARNLG)
modcnt = 0
for cidx in PgOPT.params['CI']:
pgrec = PgDBI.pgget("dcupdt", "pid, lockhost", "cindex = {}".format(cidx), PgOPT.PGOPT['extlog'])
if not pgrec:
PgLOG.pglog("{}: Update Control Not exists".format(cidx), PgOPT.PGOPT['errlog'])
elif not pgrec['pid']:
PgLOG.pglog("{}: Update Control Not locked".format(cidx), PgOPT.PGOPT['wrnlog'])
elif PgLock.lock_update_control(cidx, -1, PgOPT.PGOPT['extlog']) > 0:
modcnt += 1
PgLOG.pglog("{}: Update Control Unlocked {}/{}".format(cidx, pgrec['pid'], pgrec['lockhost']), PgOPT.PGOPT['wrnlog'])
elif (PgFile.check_host_down(None, pgrec['lockhost']) and
PgLock.lock_update_control(cidx, -2, PgOPT.PGOPT['extlog']) > 0):
modcnt += 1
PgLOG.pglog("{}: Update Control Force unlocked {}/{}".format(cidx, pgrec['pid'], pgrec['lockhost']), PgOPT.PGOPT['wrnlog'])
else:
PgLOG.pglog("{}: Undate Control Unable to unlock {}/{}".format(cidx, pgrec['pid'], pgrec['lockhost']), PgOPT.PGOPT['wrnlog'])
PgLOG.pglog("{} of {} update control record{} unlocked from RDADB".format(modcnt, ALLCNT, s), PgLOG.LOGWRN)
#
# get update info of local and remote files owned by login name
#
def get_update_info():
if 'DS' in PgOPT.params:
dsids = {'dsid' : [PgOPT.params['DS']]}
dscnt = 1
else:
tname = "dlupdt"
cnd = PgUpdt.file_condition(tname, None, None, 1)
if not cnd:
PgOPT.set_default_value("SN", PgOPT.params['LN'])
cnd = PgUpdt.file_condition(tname, None, None, 1)
dsids = PgDBI.pgmget(tname, "DISTINCT dsid", cnd, PgOPT.PGOPT['extlog'])
dscnt = len(dsids['dsid']) if dsids else 0
if dscnt == 0:
return PgLOG.pglog("NO dataset identified for giving condition", PgOPT.PGOPT['wrnlog'])
elif dscnt > 1:
PgLOG.pglog("Get Update Info for {} datasets".format(dscnt), PgOPT.PGOPT['wrnlog'])
PgOPT.PGOPT['AUTODS'] = dscnt
for i in range(dscnt):
PgOPT.params['DS'] = dsids['dsid'][i]
if PgOPT.PGOPT['ACTS'] == PgOPT.OPTS['GC'][0]:
get_control_info()
elif PgOPT.PGOPT['ACTS'] == PgOPT.OPTS['GL'][0]:
get_local_info()
elif PgOPT.PGOPT['ACTS'] == PgOPT.OPTS['GR'][0]:
get_remote_info()
else:
if 'ON' in PgOPT.params: del PgOPT.params['ON'] # use default order string
if 'FN' not in PgOPT.params: PgOPT.params['FN'] = 'ALL'
if PgOPT.PGOPT['ACTS']&PgOPT.OPTS['GC'][0]: get_control_info()
if PgOPT.PGOPT['ACTS']&PgOPT.OPTS['GL'][0]: get_local_info()
if PgOPT.PGOPT['ACTS']&PgOPT.OPTS['GR'][0]: get_remote_info()
if dscnt > 1: PgLOG.pglog("Update Info of {} datasets retrieved".format(dscnt), PgOPT.PGOPT['wrnlog'])
#
# gather due datasets for data update
#
def dataset_update():
global SUBJECT, TOPMSG, ACTSTR
actcnd = "specialist = '{}'".format(PgOPT.params['LN'])
if PgOPT.PGOPT['ACTS']&PgOPT.OPTS['AF'][0]: actcnd += " AND action IN ('AW', 'AS', 'AQ')"
(PgOPT.PGOPT['CURDATE'], PgOPT.PGOPT['CURHOUR']) = PgUtil.curdatehour()
if 'CD' not in PgOPT.params: PgOPT.params['CD'] = PgOPT.PGOPT['CURDATE'] # default to current date
if 'CH' not in PgOPT.params: PgOPT.params['CH'] = PgOPT.PGOPT['CURHOUR'] # default to current hour
if ALLCNT > 1 and PgOPT.params['MU']: del PgOPT.params['MU']
if 'CN' in PgOPT.params and 'RD' in PgOPT.params: del PgOPT.params['CN']
if 'CN' in PgOPT.params or 'RD' in PgOPT.params or 'RA' in PgOPT.params:
if 'MO' in PgOPT.params: del PgOPT.params['MO']
elif 'MO' not in PgOPT.params and PgOPT.PGOPT['CACT'] == "UF":
PgOPT.params['MO'] = -1
if 'DS' in PgOPT.params:
dsids = [PgOPT.params['DS']]
dscnt = 1
else:
if 'CI' not in PgOPT.params: actcnd += " AND cindex = 0"
loccnd = PgUpdt.file_condition('dlupdt', "LQFIXA", None, 1)
dscnd = actcnd
if loccnd: dscnd += " AND " + loccnd
pgrecs = PgDBI.pgmget("dlupdt", "DISTINCT dsid", dscnd, PgOPT.PGOPT['extlog'])
dsids = pgrecs['dsid'] if pgrecs else []
dscnt = len(dsids)
if not dscnt: return PgLOG.pglog("NO dataset is due for update on {} for {}".format(PgOPT.params['CD'], PgOPT.params['LN']), PgOPT.PGOPT['wrnlog'])
PgOPT.PGOPT['AUTODS'] = dscnt
actcnd += " ORDER BY execorder, lindex"
if PgLOG.PGLOG['DSCHECK']:
fcnt = 0
for i in range(dscnt):
PgOPT.params['DS'] = dsids[i]
loccnd = PgUpdt.file_condition('dlupdt', "LQFIXA")
locrecs = PgDBI.pgmget("dlupdt", "*", "{} AND {}".format(loccnd, actcnd), PgOPT.PGOPT['extlog'])
loccnt = len(locrecs['locfile']) if locrecs else 0
if loccnt == 0: continue
for j in range(loccnt):
locrec = PgUtil.onerecord(locrecs, j)
if (loccnt == 1 and 'LI' in PgOPT.params and 'LF' in PgOPT.params and
len(PgOPT.params['LF']) == 1 and PgOPT.params['LF'][0] != locrec['locfile']):
locrec['locfile'] = PgOPT.params['LF'][0]
fcnt += file_update(locrec, PgLOG.LOGWRN, 1)
PgCMD.set_dscheck_fcount(fcnt, PgLOG.LOGERR)
# check and update data for each dataset
logact = PgOPT.PGOPT['emllog']
acnt = ucnt = 0
for i in range(dscnt):
PgOPT.params['DS'] = dsids[i]
loccnd = PgUpdt.file_condition('dlupdt', "LQFIXA")
locrecs = PgDBI.pgmget("dlupdt", "*", "{} AND {}".format(loccnd, actcnd), PgOPT.PGOPT['extlog'])
loccnt = len(locrecs['locfile']) if locrecs else 0
if loccnt == 0:
s = "-UC{}".format(PgOPT.params['CI'][0]) if ('CI' in PgOPT.params and len(PgOPT.params['CI']) == 1) else ""
PgLOG.pglog("{}{}: no config record of local file found to update for '{}'".format(PgOPT.params['DS'], s, PgOPT.params['LN']), PgOPT.PGOPT['wrnlog'])
continue
s = 's' if loccnt > 1 else ''
PgLOG.pglog("{}: {} for {} update record{}".format(PgOPT.params['DS'], PgOPT.PGOPT['CACT'], loccnt, s), logact)
logact = PgOPT.PGOPT['emlsep']
for j in range(loccnt):
locrec = PgUtil.onerecord(locrecs, j)
if (loccnt == 1 and 'LI' in PgOPT.params and 'LF' in PgOPT.params and
len(PgOPT.params['LF']) == 1 and PgOPT.params['LF'][0] != locrec['locfile']):
locrec['locfile'] = PgOPT.params['LF'][0]
if locrec['cindex']:
if 'CI' not in PgOPT.params:
PgOPT.params['CI'] = [locrec['cindex']]
PgUpdt.cache_update_control(locrec['cindex'], 0)
if 'CN' in PgOPT.params and 'RD' in PgOPT.params: del PgOPT.params['CN']
if 'CN' in PgOPT.params or 'RD' in PgOPT.params or 'RA' in PgOPT.params:
if 'MO' in PgOPT.params: del PgOPT.params['MO']
elif 'MO' not in PgOPT.params and PgOPT.PGOPT['CACT'] == "UF":
PgOPT.params['MO'] = -1
elif locrec['cindex'] != PgOPT.params['CI'][0]:
PgLOG.pglog("{}-{}: Skipped due to control index {} mismatches {}".format(PgOPT.params['DS'], locrec['lindex'], locrec['cindex'], PgOPT.params['CI'][0]), PgOPT.PGOPT['emlerr'])
continue
PgOPT.PGOPT['rstat'] = 1 # reset remote download status for each local file
if PgSIG.PGSIG['MPROC'] > 1: acnt += 1
fcnt = file_update(locrec, logact)
if PgSIG.PGSIG['PPID'] > 1:
if PgOPT.PGOPT['AUTODS'] > 1: PgOPT.PGOPT['AUTODS'] = dscnt = 1
acnt = ucnt = 0 # reinitialize counts for child process
break # stop loop in child
if PgSIG.PGSIG['MPROC'] > 1:
if fcnt == 0:
break # quit
else:
if fcnt > 0: ucnt += 1 # record update count, s is either -1 or 1
continue # non-daemon parent
if 'QE' in PgOPT.params and fcnt <= 0: break
if PgOPT.PGOPT['vcnt'] > 0:
renew_internal_version(PgOPT.params['DS'], PgOPT.PGOPT['vcnt'])
PgOPT.PGOPT['vcnt'] = 0
if PgSIG.PGSIG['MPROC'] > 1:
if not PgSIG.PGSIG['QUIT'] and j == loccnt: continue
break
if PgOPT.PGOPT['rcnt']:
if PgOPT.PGOPT['CACT'] == "DR":
acnt += PgOPT.PGOPT['rcnt']
ucnt += PgOPT.PGOPT['dcnt']
s = 's' if PgOPT.PGOPT['rcnt'] > 1 else ''
if loccnt > 1:
PgLOG.pglog("{}: {} of {} rfile{} gotten!".format(PgOPT.params['DS'], PgOPT.PGOPT['dcnt'], PgOPT.PGOPT['rcnt'], s), PgOPT.PGOPT['emllog'])
PgOPT.PGOPT['rcnt'] = PgOPT.PGOPT['dcnt'] = 0
if PgOPT.PGOPT['lcnt']:
if PgOPT.PGOPT['CACT'] == "BL" or PgOPT.PGOPT['CACT'] == "PB":
acnt += PgOPT.PGOPT['lcnt']
ucnt += PgOPT.PGOPT['bcnt']
s = 's' if PgOPT.PGOPT['lcnt'] > 1 else ''
if loccnt > 1 and PgOPT.PGOPT['bcnt'] > 0:
PgLOG.pglog("{}: {} of {} lfile{} built!".format(PgOPT.params['DS'], PgOPT.PGOPT['bcnt'], PgOPT.PGOPT['lcnt'], s), PgOPT.PGOPT['emllog'])
PgOPT.PGOPT['lcnt'] = PgOPT.PGOPT['bcnt'] = 0
if PgOPT.PGOPT['acnt']:
acnt += PgOPT.PGOPT['acnt']
ucnt += PgOPT.PGOPT['ucnt']
s = 's' if PgOPT.PGOPT['acnt'] > 1 else ''
PgLOG.pglog("{}: {} of {} local file{} archived!".format(PgOPT.params['DS'], PgOPT.PGOPT['ucnt'], PgOPT.PGOPT['acnt'], s),
(PgOPT.PGOPT['emlsum'] if dscnt > 1 else PgOPT.PGOPT['emllog']))
PgOPT.PGOPT['acnt'] = PgOPT.PGOPT['ucnt'] = 0
if PgSIG.PGSIG['PPID'] > 1: break # stop loop child
if acnt > 0:
TOPMSG = detail = ""
if PgSIG.PGSIG['MPROC'] > 1:
s = 's' if acnt > 1 else ''
ACTSTR = "{} of {} CPIDs{} for 'dsupdt {}' started".format(ucnt, acnt, s, PgOPT.PGOPT['CACT'])
else:
s = 's' if ucnt > 1 else ''
TOPMSG = ""
if PgOPT.PGOPT['CACT'] == "DR":
atype = "remote file{} gotten".format(s)
elif PgOPT.PGOPT['CACT'] == "BL" or PgOPT.PGOPT['CACT'] == "PB":
atype = "local file{} built".format(s)
else:
atype = "local file{} archived".format(s)
if PgOPT.PGOPT['rdcnt'] > 0:
s = 's' if PgOPT.PGOPT['rdcnt'] > 1 else ''
TOPMSG = "{} remote server file{} downloaded and ".format(PgOPT.PGOPT['rdcnt'], s)
if PgOPT.PGOPT['udcnt'] > 0:
if detail: detail += " & "
detail += "{} Web Online".format(PgOPT.PGOPT['udcnt'])
if PgOPT.PGOPT['uncnt'] > 0:
if detail: detail += " & "
detail += "{} Glade Only".format(PgOPT.PGOPT['uncnt'])
if PgOPT.PGOPT['uwcnt'] > 0:
if detail: detail += " & "
detail += "{} Web".format(PgOPT.PGOPT['uwcnt'])
if PgOPT.PGOPT['uscnt'] > 0:
if detail: detail += " & "
detail += "{} Saved".format(PgOPT.PGOPT['uscnt'])
if PgOPT.PGOPT['qbcnt'] > 0:
if detail: detail += " & "
detail += "{} Quasar Backup".format(PgOPT.PGOPT['qbcnt'])
if PgOPT.PGOPT['qdcnt'] > 0:
if detail: detail += " & "
detail += "{} Quasar Drdata".format(PgOPT.PGOPT['qdcnt'])
ACTSTR = "{} {}".format(ucnt, atype)
TOPMSG += ACTSTR
if detail: TOPMSG += " ({})".format(detail)
if dscnt > 1:
PgLOG.pglog("{} datasets: {}".format(dscnt, TOPMSG), PgOPT.PGOPT['emlsum'])
SUBJECT = "DSUPDT of "
if PgOPT.PGOPT['AUTODS'] < 2:
SUBJECT += PgOPT.params['DS'].upper()
else:
SUBJECT += "{} Datasets".format(PgOPT.PGOPT['AUTODS'])
if PgOPT.PGOPT['UCNTL']:
PgUpdt.reset_control_time()
if SUBJECT: SUBJECT += "-C{}".format(PgOPT.PGOPT['UCNTL']['cindex'])
# renew internal version number for given dataset
def renew_internal_version(dsid, vcnt):
s = 's' if vcnt > 1 else ''
cmd = "dsarch {} SV -NV -DE '{} Data file{} rearchived'".format(dsid, vcnt, s)
if PgLOG.pgsystem(cmd, PgOPT.PGOPT['emerol'], 5): # 1 + 4
pgrec = PgDBI.pgget('dsvrsn', '*', "dsid = '{}' and status = 'A'".format(dsid), PgOPT.PGOPT['emerol'])
if pgrec:
vmsg = "set to {} for DOI {}".format(pgrec['iversion'], pgrec['doi'])
else:
vmsg = 'renewed'
PgLOG.pglog("{}: {} Data file{} rearchived, Internal version number {}".format(dsid, vcnt, s, vmsg), PgOPT.PGOPT['emlsum'])
#
# cach the total count of files to be archived
#
def count_caching(locrec, locinfo):
files = PgUpdt.expand_serial_pattern(locrec['locfile'])
scnt = len(files) if files else 1
if ALLCNT > 1:
ecnt = ALLCNT
else:
tinfo = TEMPINFO[locrec['lindex']] = get_tempinfo(locrec, locinfo, 0)
ecnt = len(tinfo['ED']) if tinfo else 1
return ecnt * scnt
#
# gather/archive due data file for update of each local file
#
def file_update(locrec, logact, caching = 0):
lfile = locrec['locfile']
endonly = retcnt = 0
lindex = locrec['lindex']
loccnd = "lindex = {}".format(lindex)
locinfo = "{}-L{}".format(locrec['dsid'], lindex)
if not lfile:
if caching:
return None
else:
return PgLOG.pglog(locinfo + ": local file name NOT specified", PgOPT.PGOPT['emlerr'])
locinfo += "-" + lfile
if locrec['specialist'] != PgOPT.params['LN']:
if caching:
return None
else:
return PgLOG.pglog("{}: owner '{}', NOT '{}'".format(locinfo, locrec['specialist'], PgOPT.params['LN']), PgOPT.PGOPT['emlerr'])
if caching: return count_caching(locrec, locinfo)
tempinfo = TEMPINFO[lindex] if lindex in TEMPINFO else get_tempinfo(locrec, locinfo, 0)
if not tempinfo: return 0 # simply return if miss temporal info for update
rmtcnd = loccnd
rcnd = PgUpdt.file_condition('drupdt', ('D' if 'DO' in PgOPT.params else "RS"), None, 1)
if rcnd: rmtcnd += " AND " + rcnd
rmtrecs = PgDBI.pgmget("drupdt", "*", rmtcnd + " ORDER BY dindex, remotefile", PgOPT.PGOPT['extlog'])
rcnt = len(rmtrecs['remotefile']) if rmtrecs else 0
if rcnt == 0:
if rcnd and PgDBI.pgget("drupdt", "", loccnd):
return PgLOG.pglog("{}: NO remote file record matched for {}".format(locinfo, rcnd), PgOPT.PGOPT['emlerr'])
# create a empty record remote file
rcnt = 1
rmtrecs = {'lindex' : [lindex], 'dindex' : [0]}
rflds = ['remotefile', 'serverfile', 'download', 'begintime', 'endtime', 'tinterval']
for rfld in rflds: rmtrecs[rfld] = [None]
if rcnt == 1:
if 'RF' in PgOPT.params and len(PgOPT.params['RF']) == 1 and not (rmtrecs['remotefile'][0] and PgOPT.params['RF'][0] == rmtrecs['remotefile'][0]):
rmtrecs['remotefile'][0] = PgOPT.params['RF'][0]
if 'SF' in PgOPT.params and len(PgOPT.params['SF']) == 1 and not (rmtrecs['serverfile'][0] and PgOPT.params['SF'][0] == rmtrecs['serverfile'][0]):
rmtrecs['serverfile'][0] = PgOPT.params['SF'][0]
ecnt = ALLCNT if ALLCNT > 1 else len(tempinfo['ED']) # should be at least one
if PgSIG.PGSIG['MPROC'] > 1:
pname = "updt{}".format(lindex)
pid = PgSIG.start_child(pname, PgOPT.PGOPT['wrnlog'], 1) # try to start a child process
if pid <= 0: return pid # failed to start a child process
if PgSIG.PGSIG['PPID'] > 1:
PgLOG.set_email() # empty email in child process
PgOPT.PGOPT['acnt'] = PgOPT.PGOPT['ucnt'] = 0
else:
edate = tempinfo['ED'][0]
ehour = tempinfo['EH'][0]
lfile = PgUpdt.replace_pattern(locrec['locfile'], edate, ehour, tempinfo['FQ'])
locinfo = "{}-L{}-{}".format(locrec['dsid'], lindex, lfile)
if ecnt > 1: locinfo += ", {} Update Periods".format(ecnt)
PgLOG.pglog("CPID {} for 'dsupdt {}' of {}".format(PgSIG.pname2cpid(pname), PgOPT.PGOPT['CACT'], locinfo), PgOPT.PGOPT['emllog'])
return 1 # no further action in non-daemon program
if PgLock.lock_update(lindex, locinfo, 1, PgOPT.PGOPT['emllog']) <= 0: return 0
PgOPT.PGOPT['lindex'] = lindex
tempinfo['prcmd'] = PgOPT.params['PR'][0] if 'PR' in PgOPT.params else locrec['processremote']
tempinfo['blcmd'] = PgOPT.params['BC'][0] if 'BC' in PgOPT.params else locrec['buildcmd']
postcnt = -1
if PgOPT.PGOPT['UCNTL'] and PgOPT.PGOPT['CACT'] == PgOPT.PGOPT['UCNTL']['action']:
tempinfo['postcmd'] = PgOPT.params['XC'][0] if 'XC' in PgOPT.params else PgOPT.PGOPT['UCNTL']['execcmd']
if tempinfo['postcmd']: postcnt = 0
setmiss = 1 if tempinfo['VD'] else 0
ufile = uinfo = None
rscnt = ucnt = lcnt = 0
for i in range(ecnt):
if ALLCNT > 1 and i > 0:
tempinfo = get_tempinfo(locrec, locinfo, i)
if not tempinfo: break
edate = tempinfo['ED'][0]
ehour = tempinfo['EH'][0]
else:
edate = tempinfo['ED'][i]
ehour = tempinfo['EH'][i]
if 'RE' in PgOPT.params and i and PgUtil.diffdatehour(edate, ehour, tempinfo['edate'], tempinfo['ehour']) <= 0:
continue
if ucnt and tempinfo['RS'] == 1 and i%20 == 0: refresh_metadata(locrec['dsid'])
tempinfo['edate'] = edate
if ehour != None:
tempinfo['einfo'] = "end data date:hour {}:{:02}".format(edate, ehour)
tempinfo['ehour'] = ehour
else:
tempinfo['einfo'] = "end data date {}".format(edate)
tempinfo['ehour'] = None
if 'GZ' in PgOPT.params: tempinfo['einfo'] += "(UTC)"
locfiles = PgUpdt.get_local_names(locrec['locfile'], tempinfo)
lcnt = len(locfiles) if locfiles else 0
if not lcnt: break
rmtcnt = acnt = ccnt = ut = 0
rfiles = rfile = None
if tempinfo['RS'] == 0 and lcnt > 2: tempinfo['RS'] = 1
for l in range(lcnt):
if PgLOG.PGLOG['DSCHECK'] and ((l+1)%20) == 0:
PgCMD.add_dscheck_dcount(20, 0, PgOPT.PGOPT['extlog'])
lfile = locfiles[l]
locinfo = "{}-L{}-{}".format(locrec['dsid'], lindex, lfile)
tempinfo['gotnew'] = tempinfo['archived'] = 0
tempinfo['ainfo'] = None
tempinfo['ainfo'] = file_archive_info(lfile, locrec, tempinfo)
if not tempinfo['ainfo']: continue
if tempinfo['ainfo']['archived'] == tempinfo['ainfo']['archcnt']:
ufile = "{} at {} {}".format(lfile, tempinfo['ainfo']['adate'], tempinfo['ainfo']['atime'])
tempinfo['archived'] = 1
if 'MO' in PgOPT.params:
if PgOPT.params['MO'] < 0:
PgLOG.pglog("{}: {} already for {}".format(locinfo, PgOPT.PGOPT['CACT'], tempinfo['einfo']), PgOPT.PGOPT['emlsum'])
if i == 0: PgLOG.pglog("Add Mode option -RA if you want to re-archive", PgOPT.PGOPT['wrnlog'])
if 'UT' in PgOPT.params or 'ED' not in PgOPT.params: ut = 1
retcnt += 1
continue
else:
if PgOPT.PGOPT['ACTS']&PgOPT.OPTS['AF'][0]: uinfo = locinfo
PgLOG.pglog("{}: {} for {}".format(locinfo, PgOPT.PGOPT['CACT'], tempinfo['einfo']), logact)
if not change_workdir(locrec['workdir'], locinfo, tempinfo['edate'], tempinfo['ehour'], tempinfo['FQ']):
break
if PgOPT.PGOPT['ACTS']&PgOPT.OPTS['AF'][0]: PgOPT.PGOPT['acnt'] += 1
if PgOPT.PGOPT['ACTS']&PgOPT.OPTS['BL'][0]: PgOPT.PGOPT['lcnt'] += 1
opt = 1 if tempinfo['AQ'] else 65 # 1+64(remove small file)
linfo = PgFile.check_local_file(lfile, opt, PgOPT.PGOPT['emerol'])
cnt = -1
if rmtcnt > 0:
cnt = rmtcnt
rfile = rfiles[l]
else:
dr = 1 if PgOPT.PGOPT['ACTS']&PgOPT.OPTS['PB'][0] else 0
if linfo and PgOPT.PGOPT['CACT'] == "BL" and not tempinfo['prcmd']: dr = 0 # skip download for BL only
if dr:
dfiles = None
for j in range(rcnt): # processs each remote record
pgrec = PgUtil.onerecord(rmtrecs, j)
if dfiles and pgrec['remotefile'] == rfile and not PgOPT.PGOPT['mcnt']:
continue # skip
rfile = pgrec['remotefile']
act = 0 if locrec['action'] == 'AQ' else PgOPT.PGOPT['ACTS']&PgOPT.OPTS['DR'][0]
dfiles = download_remote_files(pgrec, lfile, linfo, locrec, locinfo, tempinfo, act)
if PgOPT.PGOPT['rstat'] < 0:
i = ecnt
break
if dfiles: rfiles = PgUtil.joinarray(rfiles, dfiles)
rmtcnt = len(rfiles) if rfiles else 0
if rmtcnt > 0:
if lcnt > 1 and rmtcnt != lcnt:
PgLOG.pglog("{}: {} files found for {} local files".format(locrec['locinfo'], rmtcnt, lcnt), PgOPT.PGOPT['emlerr'])
i = ecnt
break
cnt = rmtcnt
rfile = rfiles[l] if lcnt > 1 else rfiles[rmtcnt-1] # record the break remote file name
else:
rfile = None
if linfo and PgOPT.PGOPT['rstat'] == 0: PgOPT.PGOPT['rstat'] = 1
if cnt != 0 and PgOPT.PGOPT['rstat'] > 0:
if PgOPT.PGOPT['ACTS']&(PgOPT.OPTS['BL'][0]|PgOPT.OPTS['AF'][0]):
if cnt < 0 and linfo:
if tempinfo['archived'] and PgOPT.PGOPT['CACT'] == "UF" and not tempinfo['gotnew']:
if PgOPT.PGOPT['ACTS']&PgOPT.OPTS['AF'][0] and 'RA' not in PgOPT.params:
PgLOG.pglog(lfile + ": local file archived already", PgOPT.PGOPT['emllog'])
cnt = 0
else:
if PgOPT.PGOPT['ACTS']&PgOPT.OPTS['BL'][0]:
PgLOG.pglog(lfile + ": local file exists already", PgOPT.PGOPT['emllog'])
cnt = 1
elif rmtcnt == lcnt and lfile == rfile:
if PgOPT.PGOPT['ACTS']&PgOPT.OPTS['BL'][0]:
PgLOG.pglog(lfile + ": local file same as remote file", PgOPT.PGOPT['emllog'])
elif not (PgOPT.PGOPT['ACTS']&PgOPT.OPTS['BL'][0]):
PgLOG.pglog(lfile + ": local file not built yet", PgOPT.PGOPT['emlerr'])
cnt = 0
else:
cnt = build_local_file(rfiles, lfile, linfo, locrec, tempinfo, lcnt, l)
if cnt and 'lfile' in tempinfo:
lfile = tempinfo['lfile']
del tempinfo['lfile']
if cnt != 0 and (PgOPT.PGOPT['ACTS']&PgOPT.OPTS['AF'][0]):
file_status_info(lfile, rfile, tempinfo)
cnt = archive_data_file(lfile, locrec, tempinfo, i)
if cnt > 0:
ucnt += 1
if tempinfo['RS'] == 1: rscnt += 1
if postcnt > -1: postcnt += 1
elif cnt > 0:
cnt = 0
if cnt > 0 and PgOPT.PGOPT['rstat'] > 0:
ccnt += 1
elif 'UT' in PgOPT.params or tempinfo['archived']:
ut = 1
if cnt > 0: acnt += 1
if PgLOG.PGLOG['DSCHECK']:
PgCMD.add_dscheck_dcount(lcnt%20, 0, PgOPT.PGOPT['extlog'])
if ccnt == lcnt and (PgOPT.PGOPT['ACTS']&PgOPT.OPTS['CF'][0]) and locrec['cleancmd']:
if tempinfo['CVD'] and PgUtil.diffdate(edate, tempinfo['CVD']) > 0:
clean_older_files(locrec['cleancmd'], locrec['workdir'], locinfo, tempinfo['CVD'], locrec['locfile'], rmtrecs, rcnt, tempinfo)
else:
if not rfiles and rcnt and locrec['cleancmd'].find(' -RF') > -1:
rfiles = get_all_remote_files(rmtrecs, rcnt, tempinfo, edate)
clean_files(locrec['cleancmd'], edate, ehour, locfiles, rfiles, tempinfo['FQ'])
if PgOPT.PGOPT['ACTS']&PgOPT.OPTS['AF'][0] or PgOPT.PGOPT['UCNTL'] and PgOPT.PGOPT['CACT'] == PgOPT.PGOPT['UCNTL']['action']:
rmonly = 1 if PgOPT.PGOPT['rstat'] > 0 else 0
if ccnt == lcnt:
PgUpdt.reset_update_time(locinfo, locrec, tempinfo, ccnt, endonly)
elif ut:
PgUpdt.reset_update_time(locinfo, locrec, tempinfo, acnt, endonly)
else:
if PgOPT.PGOPT['rstat'] == 0:
if tempinfo['VD'] and PgUtil.diffdatehour(edate, ehour, tempinfo['VD'], tempinfo['VH']) < 0:
PgUpdt.reset_update_time(locinfo, locrec, tempinfo, 0, endonly) # skip update
PgOPT.PGOPT['rstat'] = 1 # reset remote download status
elif 'IE' in PgOPT.params:
if tempinfo['VD'] and PgUtil.diffdatehour(edate, ehour, tempinfo['VD'], tempinfo['VH']) >= 0:
endonly = 1
PgUpdt.reset_update_time(locinfo, locrec, tempinfo, 0, endonly) # skip update
PgOPT.PGOPT['rstat'] = 1 # reset remote download status
if setmiss: setmiss = PgUpdt.set_miss_time(lfile, locrec, tempinfo, rmonly)
if postcnt > 0:
postcmd = PgUpdt.executable_command(PgUpdt.replace_pattern(tempinfo['postcmd'], edate, ehour, tempinfo['FQ']),
lfile, PgOPT.params['DS'], edate, ehour)
PgLOG.pgsystem(postcmd, PgOPT.PGOPT['emllog'], 5)
postcnt = 0
if rscnt >= PgOPT.PGOPT['RSMAX']:
refresh_metadata(locrec['dsid'])
rscnt = 0
if PgOPT.PGOPT['rstat'] < -1 or PgOPT.PGOPT['rstat'] < 0 and 'QE' in PgOPT.params: break # unrecoverable errors
if rscnt > 0: refresh_metadata(locrec['dsid'])
if ufile and uinfo and ucnt == 0:
PgLOG.pglog("{}: Last successful update - {}".format(uinfo, ufile), PgOPT.PGOPT['emlsum'])
PgLock.lock_update(lindex, locinfo, 0, PgOPT.PGOPT['errlog'])
PgOPT.PGOPT['lindex'] = 0
return retcnt
#
# refresh the gathered metadata with speed up option -R and -S
#
def refresh_metadata(dsid):
sx = "{} -d {} -r".format(PgOPT.PGOPT['scm'], dsid)
if PgOPT.PGOPT['wtidx']:
if 0 in PgOPT.PGOPT['wtidx']:
PgLOG.pgsystem(sx + 'w all', PgOPT.PGOPT['emllog'], 5)
else:
for tidx in PgOPT.PGOPT['wtidx']:
PgLOG.pgsystem("{}w {}".format(sx, tidx), PgOPT.PGOPT['emllog'], 5)
PgOPT.PGOPT['wtidx'] = {}
#
# retrieve remote files
# act: > 0 - create filenames and get data files physically; 0 - create filenames only
#
def download_remote_files(rmtrec, lfile, linfo, locrec, locinfo, tempinfo, act = 0):
emlsum = PgOPT.PGOPT['emlsum'] if PgOPT.PGOPT['CACT'] == "DR" else PgOPT.PGOPT['emllog']
rfile = rmtrec['remotefile']
rmtinfo = locinfo
dfiles = []
if not rfile:
rfile = lfile
rcnt = 1
if rfile != locrec['locfile']: rmtinfo += "-" + rfile
if act:
tempinfo['DC'] = (PgOPT.params['DC'][0] if 'DC' in PgOPT.params and PgOPT.params['DC'][0] else
(rmtrec['download'] if rmtrec['download'] else locrec['download']))
rfiles = PgUpdt.get_remote_names(rfile, rmtrec, rmtinfo, tempinfo)
rcnt = len(rfiles) if rfiles else 0