diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..7356d10 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +include src/rda_python_dscheck/dscheck.usg diff --git a/README.md b/README.md index eebfe81..f62c7a5 100644 --- a/README.md +++ b/README.md @@ -1 +1,2 @@ -RDA python code template to create new and modify existing RDA python packages. +RDA python package to add and process batch jobs. + diff --git a/pyproject.toml b/pyproject.toml index b397f7f..5db932a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,12 +6,12 @@ requires = [ build-backend = "setuptools.build_meta" [project] -name = "rda_python_template" -version = "1.0.0" +name = "rda_python_dscheck" +version = "1.0.1" authors = [ { name="Zaihua Ji", email="zji@ucar.edu" }, ] -description = "RDA Python code template to create new and modify existing other RDA python packages" +description = "RDA python package to add and process batch jobs" readme = "README.md" requires-python = ">=3.7" classifiers = [ @@ -28,10 +28,10 @@ include-package-data = true where = ["src"] [tool.setuptools.package-data] -"rda_python_tmplate" = ["hello_workd.usg"] +"rda_python_tmplate" = ["dscheck.usg"] [project.urls] -"Homepage" = "https://github.com/NCAR/rda-python-template" +"Homepage" = "https://github.com/NCAR/rda-python-dscheck" [project.scripts] -hello_world = "rda_python_template.hello_world:main" +dscheck = "rda_python_dscheck.dscheck:main" diff --git a/src/rda_python_dscheck/PgCheck.py b/src/rda_python_dscheck/PgCheck.py new file mode 100644 index 0000000..0595837 --- /dev/null +++ b/src/rda_python_dscheck/PgCheck.py @@ -0,0 +1,1628 @@ +############################################################################### +# +# Title : PgCheck.py +# Author : Zaihua Ji, zji@ucar.edu +# Date : 08/26/2020 +# 2025-02-10 transferred to package rda_python_dscheck from +# https://github.com/NCAR/rda-shared-libraries.git +# Purpose : python library module for for holding some global variables and +# functions for dscheck utility +# +# Github : https://github.com/NCAR/rda-python-dscheck.git +# +############################################################################### +# +import os +import re +import time +from rda_python_common import PgLOG +from rda_python_common import PgCMD +from rda_python_common import PgSIG +from rda_python_common import PgUtil +from rda_python_common import PgLock +from rda_python_common import PgFile +from rda_python_common import PgOPT +from rda_python_common import PgDBI + +# global variables +LOOP = 0 +PLIMITS = {} +DWHOSTS = {} # hosts are down +RUNPIDS = {} +SHELLS = {} # shell names used by specialists + +# +# define initially the needed option values +# +PgOPT.OPTS = { # (!= 0) - setting actions + 'PC' : [0x0004, 'ProcessCheck', 1], + 'AC' : [0x0008, 'AddCheck', 1], + 'GD' : [0x0010, 'GetDaemon', 0], + 'SD' : [0x0020, 'SetDaemon', 1], + 'GC' : [0x0040, 'GetCheck', 0], + 'DL' : [0x0080, 'Delete', 1], + 'UL' : [0x0100, 'UnLockCheck', 1], + 'EC' : [0x0200, 'EmailCheck', 0], + 'IC' : [0x0400, 'InterruptCheck', 1], + 'CH' : [0x1000, 'CheckHost', 0], + 'SO' : [0x1000, 'SetOptions', 1], + + 'AW' : [0, 'AnyWhere', 0], + 'BG' : [0, 'BackGround', 0], + 'CP' : [0, 'CheckPending', 0], + 'CS' : [0, 'CheckStatus', 0], + 'FI' : [0, 'ForceInterrrupt', 0], + 'FO' : [0, 'FormatOutput', 0], + 'LO' : [0, 'LogOn', 0], + 'MD' : [0, 'PgDataset', 3], + 'NC' : [0, 'NoCommand', 0], + 'ND' : [0, 'NewDaemon', 0], + 'NT' : [0, 'NoTrim', 0], + 'WR' : [0, 'WithdsRqst', 0], + 'WU' : [0, 'WithdsUpdt', 0], + + 'DM' : [1, 'DaemonMode', 1], # for action PC, start|quit|logon|logoff + 'DV' : [1, 'Divider', 1], # default to <:> + 'ES' : [1, 'EqualSign', 1], # default to <=> + 'FN' : [1, 'FieldNames', 0], + 'LH' : [1, 'LocalHost', 0, ''], + 'MT' : [1, 'MaxrunTime', 0], + 'OF' : [1, 'OutputFile', 0], + 'ON' : [1, 'OrderNames', 0], + 'AO' : [1, 'ActOption', 1], # default to + 'WI' : [1, 'WaitInterval', 1], + + 'AN' : [2, 'ActionName', 0], + 'AV' : [2, 'ArgumentVector', 0], + 'AX' : [2, 'ArgumenteXtra', 0], + 'CC' : [2, 'CarbonCopy', 0], + 'CD' : [2, 'CheckDate', 256], + 'CI' : [2, 'CheckIndex', 16], + 'CM' : [2, 'Command', 1], + 'CT' : [2, 'CheckTime', 32], + 'DB' : [2, 'Debug', 0], + 'DC' : [2, 'DoneCount', 17], + 'DF' : [2, 'DownFlags', 1], + 'DI' : [2, 'DaemonIndex', 16], + 'DS' : [2, 'Dataset', 1], + 'ER' : [2, 'ERrormessage', 0], + 'EV' : [2, 'Environments', 1], + 'FC' : [2, 'FileCount', 17], + 'HN' : [2, 'HostName', 1], + 'IF' : [2, 'InputFile', 0], + 'MC' : [2, 'MaxCount', 17], + 'MH' : [2, 'MatchHost', 1], + 'MO' : [2, 'Modules', 1], + 'PI' : [2, 'ParentIndex', 17], + 'PL' : [2, 'ProcessLimit', 17], + 'PO' : [2, 'Priority', 17], + 'PQ' : [2, 'PBSQueue', 0], + 'QS' : [2, 'QSubOptions', 0], + 'SN' : [2, 'Specialist', 1], + 'ST' : [2, 'Status', 0], + 'SZ' : [2, 'DataSize', 16], + 'TC' : [2, 'TryCount', 17], + 'WD' : [2, 'WorkDir', 0], +} + +PgOPT.ALIAS = { + 'AN' : ['Action'], + 'BG' : ['b'], + 'CF' : ['Confirmation', 'ConfirmAction'], + 'CM' : ['CommandName'], + 'DL' : ['RM', 'Remove'], + 'DS' : ['Dsid', 'DatasetID'], + 'DV' : ['Delimiter', 'Separater'], + 'EV' : ['Envs'], + 'GZ' : ['GMT', 'GreenwichZone', 'UTC'], + 'MC' : ['MaximumCount', 'MaxTryCount'], + 'MH' : ['MatchHostname'], + 'NC' : ['NoRemoteCommand'], + 'MO' : ['Mods'], + 'PI' : ['ParentCheckIndex'], + 'QS' : ['PBSOptions'], + 'SO' : ['SetBatchOptions'], + 'SZ' : ['Size', "ProcSize"], + 'UL' : ['UnLock'], + 'WD' : ["WorkDirectory"], + 'WR' : ["WithRequest"], + 'WU' : ["WithUpdate"], +} + +PgOPT.TBLHASH['dscheck'] = { +#SHORTNM KEYS(PgOPT.OPTS) DBFIELD + 'C' : ['CI', "cindex", 0], + 'O' : ['CM', "command", 1], + 'V' : ['AV', "argv", 1], + 'T' : ['DS', "dsid", 1], + 'A' : ['AN', "action", 1], + 'U' : ['ST', "status", 1], + 'P' : ['PQ', "pbsqueue", 1], + 'R' : ['PI', "pindex", 0], + 'B' : ['DF', "dflags", 0], + 'F' : ['FC', "fcount", 0], + 'J' : ['DC', "dcount", 0], + 'K' : ['TC', "tcount", 0], + 'L' : ['MC', "mcount", 0], + 'Z' : ['SZ', "size", 0], + 'D' : ['CD', "date", 1], + 'Y' : ['CT', "time", 1], + 'H' : ['HN', "hostname", 1], + 'N' : ['SN', "specialist", 1], + 'W' : ['WD', "workdir", 1], + 'M' : ['MO', "modules", 1], + 'I' : ['EV', "environments", 1], + 'Q' : ['QS', "qoptions", 1], + 'X' : ['AX', "argextra", -1], + 'E' : ['ER', "errmsg", -1], +} + +PgOPT.TBLHASH['dsdaemon'] = { +#SHORTNM KEYS(PgOPT.OPTS) DBFIELD + 'I' : ['DI', "dindex", 0], + 'C' : ['CM', "command", 1], + 'H' : ['HN', "hostname", 1], + 'M' : ['MH', "matchhost", 1], + 'S' : ['SN', "specialist", 1], + 'P' : ['PL', "proclimit", 0], + 'O' : ['PO', "priority", 0], +} + +CHKHOST = { + 'curhost' : PgLOG.get_host(1), + 'chkhost' : None, + 'hostcond' : None, + 'isbatch' : 0 +} + +PgOPT.PGOPT['dscheck'] = "COVTUPFJDNW" # default +PgOPT.PGOPT['chkall'] = "COVTAUPRBFJKLZDYHNWMIQXE" # default to all +PgOPT.PGOPT['dsdaemon'] = "ICHQSPO" # default to all +PgOPT.PGOPT['waitlimit'] = 280 # limit of C and P request checks at a time +PgOPT.PGOPT['totallimit'] = 380 # maximum number of checks can be started on PBS + +PBSQUEUES = {'rda' : None, 'htc' : 'casper@casper-pbs'} +PBSTIMES = {'default' : 21600, 'rda' : PgLOG.PGLOG['PBSTIME'], 'htc' : 86400} +#DOPTHOSTS = {'rda-work' : None, 'PBS' : ['!subconv -Q']} +DOPTHOSTS = {'rda-work' : None, 'PBS' : None, 'cron' : None} +DSLMTS = {} +EMLMTS = {} + +# +# get the maximum running time for batch processes +# +def max_batch_time(qname): + + if CHKHOST['curhost'] == PgLOG.PGLOG['PBSNAME']: + if not (qname and qname in PBSTIMES): qname = 'default' + return PBSTIMES[qname] + else: + return 0 + +# +# check if enough information entered on command line and/or input file +# for given action(s) +# +def check_dscheck_options(cact, aname): + + errmsg = [ + "Option -DM(-DaemonMode) works with Action -PC(-ProcessCheck) only", + "Do not specify Check Index for Daemon Mode", + "Miss check index per Info option -CI(-CheckIndex)", + "Need Machine Hostname per -HN for new daemon control", + "Need Application command name per -CM for new daemon control", + "Must be {} to process Checks in daemon mode".format(PgLOG.PGLOG['RDAUSER']), + "Miss Command information per Info option -CM(-Command)", + ] + erridx = -1 + PgOPT.set_uid(aname) + + if 'CI' in PgOPT.params: validate_checks() + if 'DS' in PgOPT.params: validate_datasets() + + if 'DM' in PgOPT.params: + if cact != "PC": + erridx = 0 + elif PgLOG.PGLOG['CURUID'] != PgLOG.PGLOG['RDAUSER']: + erridx = 5 + elif 'CI' in PgOPT.params: + erridx = 1 + elif cact == "DL": + if not ('CI' in PgOPT.params or 'DI' in PgOPT.params): erridx = 2 + elif cact == 'SD': + validate_daemons() + if 'SD' in PgOPT.params: + if 'HN' not in PgOPT.params: + erridx = 3 + elif 'CM' not in PgOPT.params: + erridx = 4 + elif cact == "AC": + if 'CM' not in PgOPT.params: + erridx = 6 + elif 'CI' not in PgOPT.params and (cact == "IC" or cact == "UL" and 'LL' not in PgOPT.params): + erridx = 2 + + if erridx >= 0: PgOPT.action_error(errmsg[erridx], cact) + + if cact == "PC" or cact == 'UL': + if PgLOG.PGLOG['CURUID'] != PgOPT.params['LN']: + PgOPT.action_error("{}: cannot process Checks as {}".format(PgLOG.PGLOG['CURUID'], PgOPT.params['LN']), cact) + if 'LH' in PgOPT.params: + chkhost = PgLOG.get_short_host(PgOPT.params['LH']) + if not chkhost: chkhost = PgLOG.get_host(1) + CHKHOST['chkhost'] = CHKHOST['curhost'] = chkhost + if PgLOG.valid_batch_host(chkhost): + PgLOG.reset_batch_host(chkhost) + CHKHOST['isbatch'] = 1 + CHKHOST['hostcond'] = "IN ('{}', '{}')".format(chkhost, PgLOG.PGLOG['HOSTNAME']) + else: + if PgUtil.pgcmp(chkhost, PgLOG.PGLOG['HOSTNAME'], 1): + PgOPT.action_error("{}: Cannot handle checks on {}".format(PgLOG.PGLOG['HOSTNAME'], chkhost), cact) + CHKHOST['hostcond'] = "= '{}'".format(chkhost) + + if 'DM' in PgOPT.params: + if PgLOG.PGLOG['CHKHOSTS'] and PgLOG.PGLOG['CHKHOSTS'].find(PgLOG.PGLOG['HOSTNAME']) < 0: + PgOPT.action_error("Daemon mode can only be started on '{}'".format(PgLOG.PGLOG['CHKHOSTS']), cact) + if re.match(r'^(start|begin)$', PgOPT.params['DM'], re.I): + if not ('NC' in PgOPT.params or 'LH' in PgOPT.params): PgOPT.params['NC'] = 1 + wtime = PgOPT.params['WI'] if 'WI' in PgOPT.params else 0 + mtime = PgOPT.params['MT'] if 'MT' in PgOPT.params else 0 + logon = PgOPT.params['LO'] if 'LO' in PgOPT.params else 0 + PgSIG.start_daemon(aname, PgLOG.PGLOG['CURUID'], 1, wtime, logon, 0, mtime) + else: + PgSIG.signal_daemon(PgOPT.params['DM'], aname, PgOPT.params['LN']) + else: + if cact == "PC": + PgSIG.validate_single_process(aname, PgOPT.params['LN'], PgLOG.argv_to_string()) + elif cact == "SO": + plimit = PgOPT.params['PL'][0] if 'PL' in PgOPT.params and PgOPT.params['PL'][0] > 0 else 1 + PgSIG.validate_multiple_process(aname, plimit, PgOPT.params['LN'], PgLOG.argv_to_string()) + wtime = PgOPT.params['WI'] if 'WI' in PgOPT.params else 30 + logon = PgOPT.params['LO'] if 'LO' in PgOPT.params else 1 + PgSIG.start_none_daemon(aname, cact, PgOPT.params['LN'], 1, wtime, logon) + if not ('CI' in PgOPT.params or 'DS' in PgOPT.params or PgOPT.params['LN'] == PgLOG.PGLOG['RDAUSER']): + PgOPT.set_default_value("SN", PgOPT.params['LN']) + + # minimal wait interval in seconds for next check + PgOPT.PGOPT['minlimit'] = PgOPT.params['WI'] = PgSIG.PGSIG['WTIME'] + +# +# process counts of hosts in dsdaemon control records for given command and specialist +# +def get_process_limits(cmd, specialist, logact = 0): + + ckey = "{}-{}".format(cmd, specialist) + if ckey in PLIMITS: return PLIMITS[ckey] + + cnd = "command = '{}' AND specialist = '{}'".format(cmd, specialist) + if CHKHOST['chkhost']: + ecnd = " AND hostname = '{}'".format(CHKHOST['chkhost']) + hstr = " for " + CHKHOST['chkhost'] + else: + ecnd = " ORDER by priority, hostname" + hstr = "" + + pgrecs = PgDBI.pgmget("dsdaemon", "hostname, bqueues, matchhost, proclimit, priority", cnd + ecnd, logact) + if not pgrecs and PgDBI.pgget("dsdaemon", "", cnd, logact) == 0: + pgrecs = PgDBI.pgmget("dsdaemon", "hostname, matchhost, proclimit, priority", + "command = 'ALL' AND specialist = '{}'{}".format(specialist, ecnd), logact) + + cnt = (len(pgrecs['hostname']) if pgrecs else 0) + if cnt == 0: + PLIMITS[ckey] = 0 + return 0 + + j = 0 + PLIMITS[ckey] = {'host' : [], 'priority' : [], 'acnt' : [], 'match' : [], 'pcnd' : []} + for i in range(cnt): + if pgrecs['proclimit'][i] <= 0: continue + host = pgrecs['hostname'][i] + PLIMITS[ckey]['host'].append(host) + PLIMITS[ckey]['priority'].append(pgrecs['priority'][i]) + PLIMITS[ckey]['acnt'].append(pgrecs['proclimit'][i]) + PLIMITS[ckey]['match'].append(pgrecs['matchhost'][i]) + PLIMITS[ckey]['pcnd'].append("{} AND pid > 0 AND lockhost = '{}'".format(cnd, host)) + + if not PLIMITS[ckey]['host']: PLIMITS[ckey] = 0 + return PLIMITS[ckey] + +# +# find a available host name to process a dscheck record +# +def get_process_host(limits, hosts, cmd, act, logact = 0): + + cnt = len(limits['host']) + for i in range(cnt): + host = limits['host'][i] + if host in DWHOSTS: continue # the host is down + if limits['acnt'][i] > PgDBI.pgget("dscheck", "", limits['pcnd'][i], logact): + if cmd == 'dsrqst' and act == 'PR': + mflag = 'G' + else: + mflag = limits['match'][i] + if PgLOG.check_process_host(hosts, host, mflag): return i + + return -1 + +# +# reset the cached process limits +# +def reset_process_limits(): + + global LOOP, DWHOSTS, PLIMITS + + if LOOP%3 == 0: + PLIMITS = {} # clean the cache for available processes on hosts + + if LOOP%10 == 0: + DWHOSTS = {} + PgLOG.set_pbs_host(None, 1) + + LOOP += 1 + +# +# start dschecks +# +def start_dschecks(cnd, logact = 0): + + rcnt = 0 + check_dscheck_locks(cnd, logact) + if not CHKHOST['chkhost']: email_dschecks(cnd, logact) + purge_dschecks(cnd, logact) + + if 'NC' in PgOPT.params or not CHKHOST['chkhost']: return 0 + if CHKHOST['isbatch'] and 'CP' in PgOPT.params: check_dscheck_pends(cnd, logact) +# set_dscheck_options(CHKHOST['chkhost'], cnd, logact) + reset_process_limits() + if CHKHOST['isbatch']: rcnt = PgDBI.pgget("dscheck", "", "lockhost = '{}' AND pid > 0".format(PgLOG.PGLOG['PBSNAME']), logact) + + cnd += "pid = 0 AND status <> 'D' AND einfo IS NULL AND (qoptions IS NULL OR LEFT(qoptions, 1) != '!') ORDER by hostname DESC, cindex" + pgrecs = PgDBI.pgmget("dscheck", "*", cnd, logact) + cnt = (len(pgrecs['cindex']) if pgrecs else 0) + pcnt = 0 + for i in range(cnt): + if (pcnt + rcnt) > PgOPT.PGOPT['totallimit']: break + pgrec = PgUtil.onerecord(pgrecs, i) + if(pgrec['fcount'] and pgrec['dcount'] >= pgrec['fcount'] or + pgrec['tcount'] and pgrec['tcount'] >= pgrec['mcount'] or + pgrec['pindex'] and PgDBI.pgget("dscheck", "", "cindex = {} AND status <> 'D'".format(pgrec['pindex']), logact)): + continue + if pgrec['dflags'] and PgFile.check_storage_dflags(pgrec['dflags'], pgrec, logact): continue + ret = start_one_dscheck(pgrec, logact) + if ret > 0: pcnt += ret + + if cnt > 1: PgLOG.pglog("{} of {} DSCHECK records started on {}".format(pcnt, cnt, PgLOG.PGLOG['HOSTNAME']), PgLOG.WARNLG) + return pcnt + +# +# check long locked dschecks and unlock them if the processes are dead +# +def check_dscheck_locks(cnd, logact = 0): + + global RUNPIDS + ltime = int(time.time()) + lochost = PgLOG.PGLOG['HOSTNAME'] + cnd += "pid > 0 AND " + dtime = ltime - PgSIG.PGSIG['DTIME'] + ctime = ltime - PgSIG.PGSIG['CTIME'] + rtime = ltime - PgSIG.PGSIG['RTIME'] + if CHKHOST['chkhost']: + cnd += "lockhost {} AND (stttime = 0 OR chktime < {})".format(CHKHOST['hostcond'], dtime) + else: + cnd += "chktime > 0 AND (chktime < {} OR chktime < {} AND lockhost = '{}' OR chktime < {} AND lockhost = 'rda_config')".format(ctime, dtime, lochost, rtime) + + pgrecs = PgDBI.pgmget("dscheck", "*", cnd, logact) + cnt = (len(pgrecs['cindex']) if pgrecs else 0) + lcnt = 0 + for i in range(cnt): + pgrec = PgUtil.onerecord(pgrecs, i) + lmsg = "{}({}) at {} on {}".format(pgrec['lockhost'], pgrec['pid'], PgLOG.current_datetime(), PgLOG.PGLOG['HOSTNAME']) + cidx = pgrec['cindex'] + if CHKHOST['chkhost'] or pgrec['lockhost'] == lochost: + spid = "{}{}".format(pgrec['lockhost'], pgrec['pid']) + if spid not in RUNPIDS and PgLock.lock_dscheck(cidx, 0) > 0: + PgLOG.pglog("CHK{}: unlocked {}".format(cidx, lmsg), PgLOG.LOGWRN) + lcnt += 1 + else: + update_dscheck_time(pgrec, ltime, logact) + elif not pgrec['lockhost'] or pgrec['lockhost'] == 'rda_config': + record = {'pid' : 0, 'lockhost' : ''} + if PgDBI.pgupdt("dscheck", record, "cindex = {} AND pid = {}".format(cidx, pgrec['pid']), logact): + PgLOG.pglog("CHK{}: unlocked {}".format(cidx, lmsg), PgLOG.LOGWRN) + lcnt += 1 + elif (logact&PgLOG.EMEROL) == PgLOG.EMEROL: + PgLOG.pglog("Chk{}: time NOT updated for {} of {}".format(cidx, dscheck_runtime(pgrec['chktime'], ltime), lmsg), logact) + + if cnt > 0: + s = 's' if cnt > 1 else '' + PgLOG.pglog("{} of {} DSCHECK record{} unlocked on {}".format(lcnt, cnt, s, PgLOG.PGLOG['HOSTNAME']), PgLOG.WARNLG) + RUNPIDS = {} + +# +# check long pending dschecks and kill them +# +def check_dscheck_pends(cnd, logact = 0): + + ltime = int(time.time()) - PgSIG.PGSIG['RTIME'] + cnd += "pid > 0 AND " + cnd += "lockhost {} AND status = 'P' AND subtime > 0 AND subtime < {}".format(CHKHOST['hostcond'], ltime) + pgrecs = PgDBI.pgmget("dscheck", "pid", cnd, logact) + cnt = (len(pgrecs['pid']) if pgrecs else 0) + + pcnt = 0 + for i in range(cnt): + pid = pgrecs['pid'][i] + info = PgSIG.get_pbs_info(pid, 0, logact) + if info and info['State'] == 'Q': + PgLOG.pgsystem("rdakill -h {} -p {}".format(PgLOG.PGLOG['PBSNAME'], pid), PgLOG.LOGWRN, 5) + pcnt += 1 + + if cnt > 0: + s = 's' if cnt > 1 else '' + PgLOG.pglog("{} of {} Pending DSCHECK record{} stopped on {}".format(pcnt, cnt, s, PgLOG.PGLOG['HOSTNAME']), PgLOG.WARNLG) + +# +# update dscheck time in case in pending status or +# the command does not updateupdates not on time by itself +# +def update_dscheck_time(pgrec, ltime, logact = 0): + + record = {'chktime' : ltime} + if(CHKHOST['chkhost'] and CHKHOST['chkhost'] == PgLOG.PGLOG['PBSNAME'] + and pgrec['lockhost'] == PgLOG.PGLOG['PBSNAME']): + info = PgSIG.get_pbs_info(pgrec['pid'], 0, logact) + if info: + stat = info['State'] + if stat == 'Q': stat = 'P' + if stat != pgrec['status']: record['status'] = stat + else: + if pgrec['lockhost'] != PgLOG.PGLOG['HOSTNAME']: return # connot update dscheck time + if PgSIG.check_host_pid(pgrec['lockhost'], pgrec['pid']): + if pgrec['status'] != "R": record['status'] = "R" + else: + if pgrec['status'] == "R": record['status'] = "F" + + if pgrec['stttime']: + if pgrec['command'] == "dsrqst" and pgrec['oindex']: + (record['fcount'], record['dcount'], record['size']) = PgCMD.get_dsrqst_counts(pgrec, logact) + + elif 'status' in record and record['status'] == 'R': + record['stttime'] = ltime + + cnd = "cindex = {} AND pid = {}".format(pgrec['cindex'], pgrec['pid']) + if PgDBI.pgget("dscheck", "", "{} AND chktime = {}".format(cnd, pgrec['chktime']), logact): + # update only the chktime is not changed yet + PgDBI.pgupdt("dscheck", record, cnd, logact) + +# +# return a running time string for given start and end times of the process +# +def dscheck_runtime(start, end = None): + + stime = '' + + if start: + if not end: end = int(time.time()) + rtime = (end - start) + if rtime >= 60: + stime = PgLOG.seconds_to_string_time(rtime) + + return stime + +# +# check dschecks and purge them if done already +# +def purge_dschecks(cnd, logact = 0): + + cnd += "pid = 0 AND einfo IS NULL AND bid " + cnd += ('> 0' if CHKHOST['curhost'] == PgLOG.PGLOG['PGBATCH'] else '= 0') + pgrecs = PgDBI.pgmget("dscheck", "*", cnd, logact) + cnt = (len(pgrecs['cindex']) if pgrecs else 0) + ctime = int(time.time()) - PgSIG.PGSIG['CTIME'] + dcnt = 0 + for i in range(cnt): + pgrec = PgUtil.onerecord(pgrecs, i) + if(pgrec['status'] == "D" or + pgrec['status'] == "R" and pgrec['chktime'] < ctime or + pgrec['fcount'] and pgrec['dcount'] >= pgrec['fcount'] or + pgrec['tcount'] and pgrec['tcount'] >= pgrec['mcount']): + if PgLock.lock_dscheck(pgrec['cindex'], 1) <= 0: continue + dcnt += PgCMD.delete_dscheck(pgrec, None, logact) + + if dcnt and cnt > 1: PgLOG.pglog("{} of {} DSCHECK records purged on {}".format(dcnt, cnt, PgLOG.PGLOG['HOSTNAME']), PgLOG.WARNLG) + +# +# check dschecks and send saved email +# +def email_dschecks(cnd, logact = 0): + + emlact = PgLOG.LOGWRN|PgLOG.FRCLOG + if logact and (logact&PgLOG.EMEROL) == PgLOG.EMEROL: emlact |= PgLOG.EMEROL + cnd += "pid = 0 AND einfo IS NOT NULL" + pgrecs = PgDBI.pgmget("dscheck", "cindex", cnd, logact) + cnt = (len(pgrecs['cindex']) if pgrecs else 0) + ecnt = 0 + for i in range(cnt): + cidx = pgrecs['cindex'][i] + if PgLock.lock_dscheck(cidx, 1) <= 0: continue + pgrec = PgDBI.pgget("dscheck", "*", "cindex = {}".format(cidx), logact) + einfo = pgrec['einfo'] if pgrec else None + if einfo: + if pgrec['dflags'] and pgrec['tcount'] and pgrec['tcount'] < pgrec['mcount']: + msgary = PgFile.check_storage_dflags(pgrec['dflags'], pgrec, logact) + if msgary: + einfo = "The Check will be resubmitted after the down storage Up again:\n{}\n{}".format("\n".join(msgary), einfo) + + sent = 1 if(PgLOG.send_customized_email("Chk{}".format(cidx), einfo, emlact) and + PgDBI.pgexec("UPDATE dscheck set einfo = NULL WHERE cindex = {}".format(cidx), logact)) else -1 + else: + sent = 0 + + PgLock.lock_dscheck(cidx, 0) + if sent == -1: break + ecnt += sent + + if ecnt and cnt > 1: PgLOG.pglog("{} of {} DSCHECK emails sent on {}".format(ecnt, cnt, PgLOG.PGLOG['HOSTNAME']), PgLOG.WARNLG) + +# +# start a dscheck job for given dscheck record +# +def start_one_dscheck(pgrec, logact = 0): + + cidx = pgrec['cindex'] + specialist = pgrec['specialist'] + host = CHKHOST['chkhost'] + dlimit = get_system_down_limit(host, logact) + if dlimit < 0: + PgLock.lock_dscheck(cidx, 0) + return 0 + + limits = get_process_limits(pgrec['command'], specialist, logact) + if not limits: + if pgrec['hostname'] and (logact&PgLOG.EMEROL) == PgLOG.EMEROL: + host = PgLOG.get_host(1) + if PgLOG.check_process_host(pgrec['hostname'], host, 'I'): + PgLOG.pglog("Chk{}: {} is not configured properly to run on {} for {}".format(cidx, pgrec['command'], host, specialist), logact) + return 0 + + lidx = get_process_host(limits, pgrec['hostname'], pgrec['command'], pgrec['action'], logact) + if lidx < 0 or skip_dscheck_record(pgrec, host, logact): return 0 + cmd = "pgstart_{} ".format(specialist) if PgLOG.PGLOG['CURUID'] == PgLOG.PGLOG['RDAUSER'] else "" + if not PgUtil.pgcmp(host, PgLOG.PGLOG['PBSNAME'], 1): + if reach_dataset_limit(pgrec): return 0 + cmd += get_specialist_shell(specialist) + 'qsub ' + options = get_pbs_options(pgrec, dlimit, logact) + if options: + cmd += options + elif pgrec['status'] == 'E': + return 0 + bstr = " in {} Queue {} ".format(PgLOG.PGLOG['PBSNAME'], pgrec['pbsqueue']) + else: + bstr = "" + cmd += "rdasub -bg " + + if pgrec['workdir']: + if pgrec['workdir'].find('$') > -1: + cmd += "-cwd '{}' ".format(pgrec['workdir']) + else: + cmd += "-cwd {} ".format(pgrec['workdir']) + else: + cmd += "-cwd '$HOME' " + + chkcmd = pgrec['command'] + cmd += "-cmd " + chkcmd + if pgrec['argv']: + argv = pgrec['argv'] + if pgrec['argextra']: argv += pgrec['argextra'] + cmd += ' ' + argv + PgCMD.append_delayed_mode(chkcmd, argv) + chkcmd += ' ' + argv + + PgLOG.pglog("Chk{}: issues '{}' onto {} for {}".format(cidx, chkcmd, host, pgrec['specialist']), PgLOG.LOGWRN) + PgLOG.PGLOG['ERR2STD'] = ['chmod: changing'] + cstr = PgLOG.pgsystem(cmd, logact&(~PgLOG.EXITLG), 278) # 2+4+16+256 + PgLOG.PGLOG['ERR2STD'] = [] + pid = 0 + if cstr: + lines = cstr.split('\n') + for line in lines: + if not line: continue + ms = re.match(r'^Job <(\d+)> is submitted', line) + if ms: + pid = int(ms.group(1)) + break + ms = re.match(r'^(\d+)\.casper-pbs', line) + if ms: + pid = int(ms.group(1)) + break + ms = re.match(r'^Submitted batch job (\d+)', line) + if ms: + pid = int(ms.group(1)) + break + if not pid: + if PgLOG.PGLOG['SYSERR']: + if PgLOG.PGLOG['SYSERR'].find('Job not submitted') > -1: + cstr = "submit job" + elif PgLOG.PGLOG['SYSERR'].find('working directory') > -1: + cstr = "change working directory" + else: + cstr = "execute" + PgLock.lock_dscheck(cidx, 0) + return PgLOG.pglog("Chk{}: {} Failed {} on {}{}{}\n{}".format(cidx, PgCMD.get_command_info(pgrec), + cstr, PgLOG.PGLOG['HOSTNAME'], bstr, PgUtil.curtime(1), PgLOG.PGLOG['SYSERR']), + PgLOG.LOGWRN|PgLOG.FRCLOG) + + PgLOG.pglog("Chk{}: {} started on {}{}{}".format(cidx, PgCMD.get_command_info(pgrec), + PgLOG.PGLOG['HOSTNAME'], bstr, PgUtil.curtime(1)), PgLOG.LOGWRN|PgLOG.FRCLOG) + return fill_dscheck_info(pgrec, pid, host, logact) + +# +# get qsub shell command +# +def get_specialist_shell(specialist): + + if specialist not in SHELLS: + pgrec = PgDBI.pgget("dssgrp", "shell_flag", "logname = '{}'".format(specialist)) + if pgrec and pgrec['shell_flag'] == 'B': + SHELLS[specialist] = 'bash' + else: + SHELLS[specialist] = 'tcsh' + + return SHELLS[specialist] + +# +# get and cache process limit for a given dsid +# +def get_dataset_limit(dsid): + + if dsid in DSLMTS: return DSLMTS[dsid] + + pgrec = PgDBI.pgget('dslimit', 'processlimit', "dsid = '{}'".format(dsid)) + dslmt = 45 + if pgrec: + dslmt = pgrec['processlimit'] + elif 'default' in DSLMTS: + dslmt = DSLMTS['default'] + else: + pgrec = PgDBI.pgget('dslimit', 'processlimit', "dsid = 'all'") + if pgrec: DSLMTS['default'] = dslmt = pgrec['processlimit'] + DSLMTS[dsid] = dslmt + + return DSLMTS[dsid] + +# +# check if reaching running limit for a specified dataset +# +def reach_dataset_limit(pgrec): + + if pgrec['command'] != 'dsrqst': return 0 + dsid = pgrec['dsid'] + if dsid and pgrec['action'] in ['BR', 'SP', 'PP']: + dslmt = get_dataset_limit(dsid) + lmt = PgDBI.pgget('dscheck', '', "dsid = '{}' AND status <> 'C' AND action IN ('BR', 'SP', 'PP')".format(dsid)) + if lmt > dslmt: + PgLock.lock_dscheck(pgrec['cindex'], 0) + return 1 + return 0 + +# +# get and cache request limit for a given given email +# +def get_user_limit(email): + + if email in EMLMTS: return EMLMTS[email] + + emlmts = [20, 10, 36] + flds = 'maxrqstcheck, maxpartcheck' + pgrec = PgDBI.pgget('userlimit', flds, "email = '{}'".format(email)) + if pgrec: + emlmts = [pgrec['maxrqstcheck'], pgrec['maxpartcheck']] + elif 'default' in EMLMTS: + emlmts = EMLMTS['default'] + else: + pgrec = PgDBI.pgget('userlimit', flds, "email = 'all'".format(email)) + if pgrec: + EMLMTS['default'] = emlmts = [pgrec['maxrqstcheck'], pgrec['maxpartcheck']] + EMLMTS[email] = emlmts.copy() + + return EMLMTS[email] + +# +# check if reaching running limit for a specified dataset +# +def reach_dataset_limit(pgrec): + + if pgrec['command'] != 'dsrqst': return 0 + dsid = pgrec['dsid'] + if dsid and pgrec['action'] in ['BR', 'SP', 'PP']: + dslmt = get_dataset_limit(dsid) + lmt = PgDBI.pgget('dscheck', '', "dsid = '{}' AND status <> 'C' AND action IN ('BR', 'SP', 'PP')".format(dsid)) + if lmt > dslmt: + PgLock.lock_dscheck(pgrec['cindex'], 0) + return 1 + return 0 + +# +# check and return the time limit in seconds before a planned system down for given hostname +# +def get_system_down_limit(hostname, logact = 0): + + dlimit = 0 + down = PgDBI.get_system_downs(hostname, logact) + if down['start']: + dlimit = down['start'] - down['curtime'] - 2*PgSIG.PGSIG['CTIME'] + if dlimit < PgOPT.PGOPT['minlimit']: dlimit = -1 + + return dlimit + +# +# check and get the option string for submit a PBS job +# +def get_pbs_options(pgrec, limit = 0, logact = 0): + + opttime = 0 + qoptions = build_dscheck_options(pgrec, 'qoptions', 'PBS') + qname = get_pbsqueue_option(pgrec) + maxtime = max_batch_time(qname) + runtime = PBSTIMES['default'] + + if qoptions: + ms = re.match(r'^(-.+)/(-.+)$', qoptions) + if ms: qoptions = ms.group(2 if pgrec['otype'] == 'P' else 1) + + ms = re.search(r'-l\s+\S*walltime=([\d:-]+)', qoptions) + if ms: + optval = ms.group(1) + vcs = optval.split(':') + vcl = len(vcs) + vds = vcs[0].split('-') + opttime = 3600*int(vds[0]) + if len(vds) > 1: + opttime *= 24 + opttime += 3600*int(vds[1]) + if vcl > 1: + opttime += 60*int(vcs[1]) + if vcl > 2: opttime += int(vcs[2]) + runtime = opttime + qoptions += ' ' + + if limit > 0 and runtime > limit: runtime = limit + if runtime > maxtime: runtime = maxtime + if runtime != opttime and runtime != PBSTIMES['default']: + optval = "walltime={}:{:02}:{:02}".format(int(runtime/3600), int(runtime/60)%60, runtime%60) + if opttime: + if runtime < opttime: qoptions = re.sub(r'walltime=[\d:-]+', optval, qoptions) + elif qoptions.find('-l ') > -1: + qoptions = re.sub(r'-l\s+', "-l {},".format(optval), qoptions) + else: + qoptions += "-l " + optval + + if pgrec['modules']: + options = build_dscheck_options(pgrec, 'modules', 'PBS') + if options: qoptions += "-mod {} ".format(options) + if pgrec['environments']: + options = build_dscheck_options(pgrec, 'environments', 'PBS') + if options: qoptions += "-env {} ".format(options) + + if qname: qoptions += "-q {} ".format(qname) + + return qoptions + +# +# check rda queue for pending jobs to switch PBS queue if needed +# +def get_pbsqueue_option(pgrec): + + cidx = pgrec['cindex'] + for pname in PBSQUEUES: + if PBSQUEUES[pname]: + aname = pname + else: + qname = pname + pcnt = PgDBI.pgget("dscheck", '', "status = 'P' AND pbsqueue = '{}'".format(qname)) + if pcnt > 1: qname = aname + if pgrec['pbsqueue'] != qname: + PgDBI.pgexec("UPDATE dscheck SET pbsqueue = '{}' WHERE cindex = {}".format(qname, cidx)) + pgrec['pbsqueue'] = qname + + return PBSQUEUES[qname] + +# +# build individual option string for given option name +# +def build_dscheck_options(pgcheck, optname, optstr = None): + + options = pgcheck[optname] + if not options or options == 'default': return '' + if not re.match(r'^!', options): return options + cidx = pgcheck['cindex'] + # reget the option field to see if it is processed + pgrec = PgDBI.pgget('dscheck', optname, 'cindex = {}'.format(cidx)) + if not pgrec or options != pgrec[optname]: return options + + record = {} + errmsg = '' + record[optname] = options = PgCMD.get_dynamic_options(options[1:], pgcheck['oindex'], pgcheck['otype']) + if not options and PgLOG.PGLOG['SYSERR']: + record['status'] = pgcheck['status'] = 'E' + record['pid'] = 0 + record['tcount'] = pgcheck['tcount'] + 1 + if not optstr: optstr = optname.capitalize() + errmsg = "Chk{}: Fail to build {} Options, {}".format(cidx, optstr, PgLOG.PGLOG['SYSERR']) + PgDBI.pgupdt("dscheck", record, "cindex = {}".format(cidx)) + if errmsg: + pgrqst = None + if pgcheck['otype'] == 'R': + ridx = pgcheck['oindex'] + pgrqst = PgDBI.pgget('dsrqst', '*', 'rindex = {}'.format(ridx)) + if pgrqst: + record = {} + record['status'] = PgOPT.send_request_email_notice(pgrqst, errmsg, 0, 'E') + record['ecount'] = pgrqst['ecount'] + 1 + PgDBI.pgupdt("dsrqst", record, "rindex = {}".format(ridx), PgOPT.PGOPT['errlog']) + errmsg = '' + elif pgcheck['otype'] == 'P': + pidx = pgcheck['oindex'] + pgpart = PgDBI.pgget('ptrqst', 'rindex', 'pindex = {}'.format(pidx)) + if pgpart: + PgDBI.pgexec("UPDATE ptrqst SET status = 'E' WHERE pindex = {}".format(pidx)) + ridx = pgpart['rindex'] + pgrqst = PgDBI.pgget('dsrqst', '*', 'rindex = {}'.format(ridx)) + if pgrqst and pgrqst['status'] != 'E': + record = {} + record['status'] = PgOPT.send_request_email_notice(pgrqst, errmsg, 0, 'E') + record['ecount'] = pgrqst['ecount'] + 1 + PgDBI.pgupdt("dsrqst", record, "rindex = {}".format(ridx), PgOPT.PGOPT['errlog']) + errmsg = '' + if errmsg: PgLOG.pglog(errmsg, PgOPT.PGOPT['errlog']) + return options + +# +# fill up dscheck record in case the command does not do it itself +# +def fill_dscheck_info(ckrec, pid, host, logact = 0): + + chkcnd = "cindex = {}".format(ckrec['cindex']) + PgDBI.pgexec("UPDATE dscheck SET tcount = tcount+1 WHERE " + chkcnd, logact) + if pid and PgLock.lock_host_dscheck(ckrec['cindex'], pid, host, logact) <= 0: return 1 # under processing + + record = {} + stat = 'R' + if pid: + record['pid'] = pid + if host == PgLOG.PGLOG['PBSNAME']: + info = PgSIG.get_pbs_info(pid, 0, logact, 2) + if info: + stat = info['State'] + if stat == 'Q': stat = 'P' + else: + record['runhost'] = PgLOG.PGLOG['HOSTNAME'] + record['bid'] = 0 + else: + stat = 'F' + record['status'] = stat + + record['stttime'] = record['subtime'] = record['chktime'] = int(time.time()) + pgrec = PgDBI.pgget("dscheck", "status, stttime", chkcnd, logact) + if not pgrec: return 0 + if pgrec['status'] != ckrec['status'] or pgrec['stttime'] > ckrec['stttime']: return 1 + if not pid and PgLock.lock_dscheck(ckrec['cindex'], 0) <= 0: return 1 + + return PgDBI.pgupdt("dscheck", record, chkcnd, logact) + +# +# return 1 to skip running if the dscheck record is not ready; 0 otherwise +# +def skip_dscheck_record(pgrec, host, logact = 0): + + workdir = pgrec['workdir'] + if workdir and workdir.find('$') > -1: workdir = '' + + if PgFile.check_host_down(workdir, host, logact): return 1 + if pgrec['command'] == "dsrqst": + if PgFile.check_host_down(PgLOG.PGLOG['RQSTHOME'], host, logact): return 1 + elif pgrec['command'] == "dsupdt" or pgrec['command'] == "dsarch": + if PgFile.check_host_down(PgLOG.PGLOG['DSDHOME'], host, logact): return 1 + + newrec = PgDBI.pgget("dscheck", "pid, status, stttime, tcount", "cindex = {}".format(pgrec['cindex']), logact) + if(not newrec or newrec['pid'] > 0 or newrec['status'] != pgrec['status'] or + newrec['stttime'] > pgrec['stttime'] or newrec['tcount'] > pgrec['tcount']): return 1 + if PgLock.lock_dscheck(pgrec['cindex'], 1) <= 0: return 1 + + if pgrec['subtime'] or pgrec['stttime']: + newrec = {'stttime' : 0, 'subtime' : 0, 'runhost' : '', 'bid' : 0} + (newrec['ttltime'], newrec['quetime']) = PgCMD.get_dscheck_runtime(pgrec) + if not PgDBI.pgupdt("dscheck", newrec, "cindex = {}".format(pgrec['cindex']), logact): return 1 + + return 0 + +# +# start recording Queued reuqests to checks +# +def start_dsrqsts(cnd, logact = 0): + + check_dsrqst_locks(cnd, logact) + if CHKHOST['chkhost']: return 1 + email_dsrqsts(cnd, logact) + purge_dsrqsts(cnd, logact) + rcnd = cnd + rcnd += ("status = 'Q' AND rqsttype <> 'C' AND (pid = 0 OR pid < ptcount) AND " + + "einfo IS NULL ORDER BY priority, rindex") + pgrecs = PgDBI.pgmget("dsrqst", "*", rcnd, logact) + cnt = (len(pgrecs['rindex']) if pgrecs else 0) + ccnt = PgDBI.pgget("dscheck", '', "status = 'C'", logact) + pcnt = PgDBI.pgget("dscheck", '', "status = 'P'", logact) + if (ccnt+pcnt) > PgOPT.PGOPT['waitlimit']: + if cnt: PgLOG.pglog("{}/{} Checks are Waiting/Pending; Add new dscheck records {} later".format(ccnt, pcnt, PgLOG.PGLOG['HOSTNAME']), + PgLOG.LOGWRN|PgLOG.FRCLOG) + rcnt = PgOPT.PGOPT['waitlimit']-ccnt-pcnt + if cnt == 0: + acnt = 0 + cnts = start_dsrqst_partitions(None, rcnt, logact) + rcnt = cnts[0] + pcnt = cnts[1] + else: + tcnt = cnt + if cnt > rcnt: cnt = rcnt + if cnt > 1: PgLOG.pglog("Try to add dschecks for {} DSRQST records on {}".format(cnt, PgLOG.PGLOG['HOSTNAME']), PgLOG.WARNLG) + + i = acnt = ccnt = pcnt = rcnt = 0 + while i < tcnt and ccnt < cnt: + pgrec = PgUtil.onerecord(pgrecs, i) + i += 1 + if pgrec['ptcount'] == 0 and validate_dsrqst_partitions(pgrec, logact): + acnt += add_dsrqst_partitions(pgrec, logact) + elif pgrec['ptcount'] < 2: + rcnt += start_one_dsrqst(pgrec, logact) + else: + cnts = start_dsrqst_partitions(pgrec, (cnt-ccnt), logact) + rcnt += cnts[0] + pcnt += cnts[1] + ccnt += (acnt+pcnt+rcnt) + + if rcnt > 1: PgLOG.pglog("build {} requests on {}".format(rcnt, PgLOG.PGLOG['HOSTNAME']), PgLOG.WARNLG) + if pcnt > 1: PgLOG.pglog("build {} request partitions on {}".format(pcnt, PgLOG.PGLOG['HOSTNAME']), PgLOG.WARNLG) + if acnt > 1: PgLOG.pglog("Add partitions to {} requests on {}".format(acnt, PgLOG.PGLOG['HOSTNAME']), PgLOG.WARNLG) + + return rcnt + +# +# validate a given request if ok to do partitions +# +def validate_dsrqst_partitions(pgrec, logact = 0): + + pgctl = PgCMD.get_dsrqst_control(pgrec, logact) + if pgctl and (pgctl['ptlimit'] or pgctl['ptsize']): return True + + record = {'ptcount' : 1} + pgrec['ptcount'] = 1 + if pgrec['ptlimit']: pgrec['ptlimit'] = record['ptlimit'] = 0 + if pgrec['ptsize']: pgrec['ptsize'] = record['ptsize'] = 0 + + PgDBI.pgupdt('dsrqst', record, "rindex = {}".format(pgrec['rindex']), logact) + return False + +# +# call given command to evaluate dynamically the dscheck.qoptions +# +def set_dscheck_options(chost, cnd, logact): + + if chost not in DOPTHOSTS: return + qcnt = 0 + skipcmds = DOPTHOSTS[chost] + pgrecs = PgDBI.pgmget("dscheck", "*", cnd + "pid = 0 AND status = 'C' AND LEFT(qoptions, 1) = '!'", logact) + cnt = len(pgrecs['cindex']) if pgrecs else 0 + for i in range(cnt): + pgrec = PgUtil.onerecord(pgrecs, i) + if skipcmds and pgrec['qoptions'] in skipcmds: continue # skip + if PgLock.lock_dscheck(pgrec['cindex'], 1) <= 0: continue + qoptions = build_dscheck_options(pgrec, 'qoptions', 'PBS') + if not qoptions and pgrec['status'] == 'E': continue # failed evaluating qoptions + record = {'pid' : 0, 'qoptions': qoptions} + qcnt += PgDBI.pgupdt('dscheck', record, "cindex = {}".format(pgrec['cindex']), PgOPT.PGOPT['errlog']) + + if qcnt and cnt > 1: PgLOG.pglog("{} of {} DSCHECK PBS options Dynamically set on {}".format(qcnt, cnt, PgLOG.PGLOG['HOSTNAME']), PgLOG.WARNLG) + +# +# add a new dscheck record if a given request record is due +# +def start_one_dsrqst(pgrec, logact = 0): + + if PgDBI.pgget("dscheck", "", "oindex = {} AND command = 'dsrqst' AND action = 'BR'".format(pgrec['rindex']), logact): return 0 + + pgctl = PgCMD.get_dsrqst_control(pgrec, logact) + if pgctl: + if 'qoptions' in pgctl and pgctl['qoptions']: + ms = re.match(r'^(-.+)/(-.+)$', pgctl['qoptions']) + if ms: pgctl['qoptions'] = ms.group(1) + argv = "{} BR -RI {} -b -d".format(pgrec['dsid'], pgrec['rindex']) + return add_one_dscheck(pgrec['rindex'], 'R', "dsrqst", pgrec['dsid'], "BR", + '', pgrec['specialist'], argv, pgrec['email'], pgctl, logact) + +# +# add a dscheck record for a given request to setup partitions +# +def add_dsrqst_partitions(pgrec, logact = 0): + + if PgDBI.pgget("dscheck", "", "oindex = {} AND command = 'dsrqst'".format(pgrec['rindex']), logact): return 0 + + pgctl = PgCMD.get_dsrqst_control(pgrec, logact) + if pgctl: + if 'qoptions' in pgctl and pgctl['qoptions']: + ms =re.match(r'^(-.+)/(-.+)$', pgctl['qoptions']) + if ms: pgctl['qoptions'] = ms.group(1) + argv = "{} SP -RI {} -NP -b -d".format(pgrec['dsid'], pgrec['rindex']) + return add_one_dscheck(pgrec['rindex'], 'R', "dsrqst", pgrec['dsid'], 'SP', + '', pgrec['specialist'], argv, pgrec['email'], pgctl, logact) + +# +# add multiple dscheck records of partitions for a given request +# +def start_dsrqst_partitions(pgrqst, ccnt, logact = 0): + + cnts = [0, 0] + if pgrqst: + rindex = pgrqst['rindex'] + cnd = "rindex = {} AND status = ".format(rindex) + if pgrqst['pid'] == 0: + cnt = PgDBI.pgget("ptrqst", "", cnd + "'E'", logact) + if cnt > 0 and (pgrqst['ecount'] + cnt) <= PgOPT.PGOPT['PEMAX']: + # set Error partions back to Q + PgDBI.pgexec("UPDATE ptrqst SET status = 'Q' WHERE {}'E'".format(cnd), PgOPT.PGOPT['extlog']) + else: + rindex = 0 + cnd = "status = " + pgrecs = PgDBI.pgmget("ptrqst", "*", cnd + "'Q' AND pid = 0 ORDER by pindex", logact) + cnt = len(pgrecs['pindex']) if pgrecs else 0 + if cnt > 0: + if cnt > ccnt: cnt = ccnt + pgctl = PgCMD.get_dsrqst_control(pgrqst, logact) if pgrqst else None + for i in range(cnt): + pgrec = PgUtil.onerecord(pgrecs, i) + if pgrec['rindex'] != rindex: + rindex = pgrec['rindex'] + pgrqst = PgDBI.pgget("dsrqst", "*", "rindex = {}".format(rindex), logact) + if pgrqst: pgctl = PgCMD.get_dsrqst_control(pgrqst, logact) + if not pgrqst: # request missing + PgDBI.pgdel('ptrqst', "rindex = {}".format(rindex)) + continue + if pgrec['ptcmp'] == 'Y': + pgptctl = None + else: + pgptctl = PgCMD.get_partition_control(pgrec, pgrqst, pgctl, logact) + if pgptctl: + if 'qoptions' in pgptctl and pgptctl['qoptions']: + ms = re.match(r'^(-.+)/(-.+)$', pgptctl['qoptions']) + if ms: pgptctl['qoptions'] = ms.group(2) + if PgDBI.pgget("dscheck", "", "oindex = {} AND command = 'dsrqst' AND action = 'PP'".format(pgrec['pindex']), logact): continue + argv = "{} PP -PI {} -RI {} -b -d".format(pgrqst['dsid'], pgrec['pindex'], pgrqst['rindex']) + cnts[1] += add_one_dscheck(pgrec['pindex'], 'P', "dsrqst", pgrqst['dsid'], "PP", + '', pgrqst['specialist'], argv, pgrqst['email'], pgptctl, logact) + + elif pgrqst and pgrqst['pid'] == 0 and pgrqst['ptcount'] == PgDBI.pgget("ptrqst", "", cnd + " 'O'", logact): + cnts[0] = start_one_dsrqst(pgrqst, logact) + + return cnts + +# +# check long procssing reuqests and unlock the processes that are aborted +# +def check_dsrqst_locks(cnd, logact = 0): + + ltime = int(time.time()) + lochost = PgLOG.PGLOG['HOSTNAME'] + cnd += "pid > 0 AND " + dtime = ltime - PgSIG.PGSIG['DTIME'] + ctime = ltime - PgSIG.PGSIG['CTIME'] + rtime = ltime - PgSIG.PGSIG['RTIME'] + if CHKHOST['chkhost']: + cnd += "lockhost {} AND locktime < {}".format(CHKHOST['hostcond'], dtime) + else: + cnd += "locktime > 0 AND (locktime < {} OR locktime < {} AND lockhost = '{}' OR locktime < {} AND lockhost = 'rda_config')".format(ctime, dtime, lochost, rtime) + check_partition_locks(cnd, ltime, logact) # check partitions first + + pgrecs = PgDBI.pgmget("dsrqst", "rindex, lockhost, pid, locktime", cnd, logact) + cnt = (len(pgrecs['rindex']) if pgrecs else 0) + lcnt = 0 + for i in range(cnt): + pgrec = PgUtil.onerecord(pgrecs, i) + lmsg = "{}({}) at {} on {}".format(pgrec['lockhost'], pgrec['pid'], PgLOG.current_datetime(), PgLOG.PGLOG['HOSTNAME']) + ridx = pgrec['rindex'] + if CHKHOST['chkhost'] or pgrec['lockhost'] == lochost: + if PgLock.lock_request(ridx, 0) > 0: + PgLOG.pglog("Rqst{}: unlocked {}".format(ridx, lmsg), PgLOG.LOGWRN) + lcnt += 1 + continue + if(PgDBI.pgexec("UPDATE dsrqst set locktime = {} WHERE rindex = {} AND pid = {}".format(ltime, ridx, pgrec['pid']), logact) and + not PgDBI.pgget("dscheck", "", "oindex = {} AND command = 'dsrqst'".format(ridx))): + PgLOG.pglog("Rqst{}: time updated for {}".format(ridx, lmsg), PgLOG.LOGWRN|PgLOG.FRCLOG) + elif(not pgrec['lockhost'] or pgrec['lockhost'] == 'rda_config' or pgrec['lockhost'] == 'partition' and + not PgDBI.pgget('ptrqst', '', "rindex = {} AND pid > 0".format(ridx), logact)): + record = {'pid' : 0, 'lockhost' : ''} + if PgDBI.pgupdt("dsrqst", record, "rindex = {} AND pid = {}".format(ridx, pgrec['pid']), logact): + PgLOG.pglog("Rqst{}: unlocked {}".format(ridx, pgrec['lockhost'], pgrec['pid'], PgLOG.current_datetime(ltime)), PgLOG.LOGWRN) + lcnt += 1 + continue + elif (logact&PgLOG.EMEROL) == PgLOG.EMEROL: + PgLOG.pglog("Rqst{}: time NOT updated for {} of {}".format(ridx, pgrec['lockhost'], pgrec['pid'], dscheck_runtime(pgrec['locktime'], ltime)), logact) + + RUNPIDS["{}{}".format(pgrec['lockhost'], pgrec['pid'])] = 1 + + if cnt > 1: PgLOG.pglog("{} of {} DSRQST records unlocked on {}".format(lcnt, cnt, PgLOG.PGLOG['HOSTNAME']), PgLOG.WARNLG) + +# +# check long procssing reuqest partitions and unlock the processes that are aborted +# +def check_partition_locks(cnd, ltime, logact = 0): + + pgrecs = PgDBI.pgmget("ptrqst", "pindex, rindex, lockhost, pid, locktime", cnd, (logact&~PgLOG.LGEREX)) + cnt = (len(pgrecs['pindex']) if pgrecs else 0) + lcnt = 0 + for i in range(cnt): + pgrec = PgUtil.onerecord(pgrecs, i) + lmsg = "{}({}) at {} on {}".format(pgrec['lockhost'], pgrec['pid'], PgLOG.current_datetime(), PgLOG.PGLOG['HOSTNAME']) + pidx = pgrec['pindex'] + if CHKHOST['chkhost'] or pgrec['lockhost'] == PgLOG.PGLOG['HOSTNAME']: + if PgLock.lock_partition(pidx, 0) > 0: + PgLOG.pglog("RPT{}: unlocked {}".format(pidx, lmsg), PgLOG.LOGWRN) + lcnt += 1 + continue + if(PgDBI.pgexec("UPDATE ptrqst set locktime = {} WHERE pindex = {} AND pid = {}".format(ltime, pidx, pgrec['pid']), logact) and + PgDBI.pgexec("UPDATE dsrqst set locktime = {} WHERE rindex = {}".format(ltime, pgrec['rindex']), logact) and + not PgDBI.pgget("dscheck", "", "oindex = {} AND command = 'dsrqst' AND otype = 'P'".format(pidx))): + PgLOG.pglog("RPT{}: time updated for {}".format(pidx, lmsg), PgLOG.LOGWRN) + elif not pgrec['lockhost'] or pgrec['lockhost'] == 'rda_config': + record = {'pid' : 0, 'lockhost' : ''} + if PgDBI.pgupdt("ptrqst", record, "pindex = {} AND pid = {}".format(pidx, pgrec['pid']), logact): + PgLOG.pglog("RPT{}: unlocked {}".format(pidx, lmsg), PgLOG.LOGWRN) + lcnt += 1 + continue + elif (logact&PgLOG.EMEROL) == PgLOG.EMEROL: + PgLOG.pglog("RPT{}: time NOT updated for {} of {}".format(pidx, dscheck_runtime(pgrec['locktime'], ltime), lmsg), logact) + + RUNPIDS["{}{}".format(pgrec['lockhost'], pgrec['pid'])] = 1 + + if cnt > 1: PgLOG.pglog("{} of {} DSRQST partitions unlocked on {}".format(lcnt, cnt, PgLOG.PGLOG['HOSTNAME']), PgLOG.WARNLG) + +# +# check dsrqsts and purge them if done already +# +def purge_dsrqsts(cnd, logact = 0): + + (sdate, stime) = PgUtil.get_date_time() + cnd += "(status = 'P' AND (date_purge IS NULL OR date_purge < '{}' OR date_purge = '{}' AND time_purge < '{}')".format(sdate, sdate, stime) + cnd += " OR status = 'O' AND (date_purge < '{}' OR date_purge = '{}' AND time_purge < '{}')) ORDER BY rindex".format(sdate, sdate, stime) + pgrecs = PgDBI.pgmget("dsrqst", "rindex, dsid, email, specialist", cnd, logact) + cnt = (len(pgrecs['rindex']) if pgrecs else 0) + pgctl = {'qoptions' : "-l walltime=1:00:00"} + pcnt = 0 + for i in range(cnt): + pgrec = PgUtil.onerecord(pgrecs, i) + ridx = pgrec['rindex'] + if PgDBI.pgget("dscheck", "", "oindex = {} AND command = 'dsrqst'".format(ridx), logact): continue + argv = "{} PR -RI {} -b -d".format(pgrec['dsid'], ridx) + add_one_dscheck(ridx, 'R', 'dsrqst', pgrec['dsid'], 'PR', '', + pgrec['specialist'], argv, pgrec['email'], pgctl, logact) + +# +# check dsrqsts and send saved email +# +def email_dsrqsts(cnd, logact = 0): + + emlact = PgLOG.LOGWRN|PgLOG.FRCLOG + if logact and (logact&PgLOG.EMEROL) == PgLOG.EMEROL: emlact |= PgLOG.EMEROL + cnd += "pid = 0 AND einfo IS NOT NULL" + pgrecs = PgDBI.pgmget("dsrqst", "rindex, ptcount, einfo", cnd, logact) + cnt = (len(pgrecs['rindex']) if pgrecs else 0) + ecnt = 0 + for i in range(cnt): + pgrec = PgUtil.onerecord(pgrecs, i) + ridx = pgrec['rindex'] + if PgLock.lock_request(ridx, 1) <= 0: continue + einfo = verify_request_einfo(ridx, pgrec['ptcount'], pgrec['einfo'], logact) + if einfo: + sent = 1 if (PgLOG.send_customized_email("Rqst{}".format(ridx), einfo, emlact) and + PgDBI.pgexec("UPDATE dsrqst set einfo = NULL WHERE rindex = {}".format(ridx), logact)) else -1 + else: + sent = 0 + + PgLock.lock_request(ridx, 0) + if sent == -1: break + ecnt += sent + + if cnt > 1: PgLOG.pglog("{} of {} DSRQST emails sent on {}".format(ecnt, cnt, PgLOG.PGLOG['HOSTNAME']), PgLOG.WARNLG) + +# +# veriy email info for partition errors +# retrun None if not all partitions finished +# +def verify_request_einfo(ridx, ptcnt, einfo, logact = 0): + + # no further checking if no partitionseinfo is empty + if ptcnt < 2 or not einfo: return einfo + # partition processes are not all done yet + if PgDBI.pgget("ptrqst", "", "rindex = {} AND (pid > 0 OR status = 'R')".format(ridx), logact): return None + + pkey = ["", ""] + # einfo does not contain partition error key + if einfo.find(pkey[0]) < 0: return einfo + einfo = re.sub(pkey[0], '', einfo) + ecnt = PgDBI.pgget("ptrqst", "", "rindex = {} AND status = 'E'".format(ridx), logact) + cbuf = "{} of {}".format(ecnt, ptcnt) + einfo = re.sub(pkey[1], cbuf, einfo) + + return einfo + +# +# start recording due updates to checks +# +def start_dsupdts(cnd, logact = 0): + + ctime = PgUtil.curtime(1) + check_dsupdt_locks(cnd, logact) + if CHKHOST['chkhost']: return 0 + email_dsupdt_controls(cnd, logact) + email_dsupdts(cnd, logact) + + cnd += "pid = 0 and cntltime <= '{}' and action > '' AND einfo IS NULL ORDER by cntltime".format(ctime) + pgrecs = PgDBI.pgmget("dcupdt", "*", cnd, logact) + cnt = (len(pgrecs['cindex']) if pgrecs else 0) + ucnt = 0 + for i in range(cnt): + pgrec = PgUtil.onerecord(pgrecs, i) + if PgDBI.pgget("dscheck", "pid, lockhost", "oindex = {} AND command = 'dsupdt'".format(pgrec['cindex']), logact): continue + if pgrec['pindex'] and not PgOPT.valid_data_time(pgrec): continue + argv = "{} {} -CI {} -b -d".format(pgrec['dsid'], pgrec['action'], pgrec['cindex']) + if not add_one_dscheck(pgrec['cindex'], 'C', "dsupdt", pgrec['dsid'], pgrec['action'], + '', pgrec['specialist'], argv, None, pgrec, logact): break + ucnt += 1 + + if cnt > 1: PgLOG.pglog("update {} of {} DSUPDT controls on {}".format(ucnt, cnt, PgLOG.PGLOG['HOSTNAME']), PgLOG.WARNLG) + return ucnt + +# +# check if the parent update control is finished +# +def parent_not_finished(pgrec): + + freq = [0, 0, 0] + ms = re.match(r'^(\d+)([YMWDH])$', pgrec['frequency'], re.I) + if ms: + val = int(ms.group(1)) + unit = ms.group(2).upper() + if not val: return 0 + if unit == 'Y': + freq[0] = val + elif unit == 'M': + freq[1] = val + elif unit == 'W': + freq[2] = 7 * val + elif unit == 'D': + freq[2] = val + elif unit == 'H': # update frequency is hourly controlled + freq.append(val) + else: + ms = re.match(r'^(\d+)M/(\d+)', pgrec['frequency'], re.I) + if ms: + val = int(ms.group(1)) + nf = int(ms.group(2)) + if nf < 2 or nf > 10 or (30%nf): return 0 + freq = [0, val, 0, 0, 0, 0, nf] # number of fractions in a month + + dtime = PgUtil.adddatetime(pgrec['datatime'], freq[0], freq[1], freq[2], freq[3], freq[4], freq[5], freq[6]) + if PgDBI.pgget("dcupdt", "", "cindex = {} AND datatime < '{}'".format(pgrec['pindex'], dtime), PgOPT.PGOPT['extlog']): + return 1 + else: + return 0 + +# +# check long procssing updates and unlock the processes that are aborted +# +def check_dsupdt_locks(ocnd, logact = 0): + + ltime = int(time.time()) + lochost = PgLOG.PGLOG['HOSTNAME'] + dtime = ltime - PgSIG.PGSIG['DTIME'] + cnd = ocnd + "pid > 0 AND " + ctime = ltime - 4*PgSIG.PGSIG['CTIME'] + rtime = ltime - PgSIG.PGSIG['RTIME'] + if CHKHOST['chkhost']: + cnd += "lockhost {} AND chktime < {}".format(CHKHOST['hostcond'], dtime) + else: + cnd += "chktime > 0 AND (chktime < {} OR chktime < {} AND lockhost = '{}' OR chktime < {} AND lockhost = 'rda_config')".format(ctime, dtime, lochost, rtime) + + pgrecs = PgDBI.pgmget("dcupdt", "cindex, lockhost, pid, chktime", cnd, logact) + cnt = (len(pgrecs['cindex']) if pgrecs else 0) + lcnt = 0 + for i in range(cnt): + pgrec = PgUtil.onerecord(pgrecs, i) + lmsg = "{}({}) at {} on {}".format(pgrec['lockhost'], pgrec['pid'], PgLOG.current_datetime(), PgLOG.PGLOG['HOSTNAME']) + idx = pgrec['cindex'] + if CHKHOST['chkhost'] or pgrec['lockhost'] == lochost: + if PgLock.lock_update_control(idx, 0) > 0: + PgLOG.pglog("UC{}: unlocked {}".format(idx, lmsg), PgLOG.LOGWRN) + lcnt += 1 + continue + if(PgDBI.pgexec("UPDATE dcupdt SET chktime = {} WHERE cindex = {} AND pid = {}".format(ltime, idx, pgrec['pid']), logact) and + not PgDBI.pgget("dscheck", "", "oindex = {} AND command = 'dsupdt'".format(idx))): + PgLOG.pglog("UC{}: time updated for {}".format(idx, lmsg), PgLOG.LOGWRN) + elif not pgrec['lockhost'] or pgrec['lockhost'] == 'rda_config': + record = {'pid' : 0, 'lockhost' : ''} + if PgDBI.pgupdt("dcupdt", record, "cindex = {} AND pid = {}".format(idx, pgrec['pid']), logact): + PgLOG.pglog("UC{}: unlocked {}".format(idx, lmsg), PgLOG.LOGWRN) + lcnt += 1 + continue + elif (logact&PgLOG.EMEROL) == PgLOG.EMEROL: + PgLOG.pglog("UC{}: time NOT updated for {} of {}".format(idx, dscheck_runtime(pgrec['chktime'], ltime), lmsg), logact) + + RUNPIDS["{}{}".format(pgrec['lockhost'], pgrec['pid'])] = 1 + + if cnt > 1: PgLOG.pglog("{} of {} DSUPDT Controls unlocked on {}".format(lcnt, cnt, PgLOG.PGLOG['HOSTNAME']), PgLOG.WARNLG) + + cnd = ocnd + "pid > 0 AND locktime > 0 AND " + if CHKHOST['chkhost']: + cnd += "hostname {} AND locktime < {}".format(CHKHOST['hostcond'], dtime) + else: + cnd += "(locktime < {} OR locktime < {} AND hostname = '{}' OR locktime < {} AND hostname = 'rda_config')".format(ctime, dtime, lochost, rtime) + + pgrecs = PgDBI.pgmget("dlupdt", "lindex, hostname, pid, locktime", cnd, logact) + cnt = (len(pgrecs['lindex']) if pgrecs else 0) + lcnt = 0 + for i in range(cnt): + pgrec = PgUtil.onerecord(pgrecs, i) + lmsg = "{}({}) at {} on {}".format(pgrec['hostname'], pgrec['pid'], PgLOG.current_datetime(), PgLOG.PGLOG['HOSTNAME']) + idx = pgrec['lindex'] + if CHKHOST['chkhost'] or pgrec['hostname'] == lochost: + if PgLock.lock_update(idx, None, 0) > 0: + PgLOG.pglog("Updt{}: unlocked {}".format(idx, lmsg), PgLOG.LOGWRN) + lcnt += 1 + continue + PgDBI.pgexec("UPDATE dlupdt SET locktime = {} WHERE lindex = {} AND pid = {}".format(ltime, idx, pgrec['pid']), logact) + elif not pgrec['hostname'] or pgrec['hostname'] == 'rda_config': + record = {'pid' : 0, 'hostname' : ''} + if PgDBI.pgupdt("dlupdt", record, "lindex = {} AND pid = {}".format(idx, pgrec['pid']), logact): + PgLOG.pglog("Updt{}: unlocked {}".format(idx, lmsg), PgLOG.LOGWRN) + lcnt += 1 + continue + elif (logact&PgLOG.EMEROL) == PgLOG.EMEROL: + PgLOG.pglog("Updt{}: time NOT updated for {} of {}".format(idx, dscheck_runtime(pgrec['locktime'], ltime), lmsg), logact) + + RUNPIDS["{}{}".format(pgrec['hostname'], pgrec['pid'])] = 1 + + if cnt > 1: PgLOG.pglog("{} of {} DSUPDT Local Files unlocked on {}".format(lcnt, cnt, PgLOG.PGLOG['HOSTNAME']), PgLOG.WARNLG) + +# +# check dsupdts and send saved email +# +def email_dsupdt_controls(cnd, logact = 0): + + emlact = PgLOG.LOGWRN|PgLOG.FRCLOG + if logact and (logact&PgLOG.EMEROL) == PgLOG.EMEROL: emlact |= PgLOG.EMEROL + cnd += "pid = 0 AND einfo IS NOT NULL" + pgrecs = PgDBI.pgmget("dcupdt", "cindex", cnd, logact) + cnt = (len(pgrecs['cindex']) if pgrecs else 0) + ecnt = 0 + for i in range(cnt): + cidx = pgrecs['cindex'][i] + if PgLock.lock_update_control(cidx, 1) <= 0: continue + pgrec = PgDBI.pgget("dcupdt", "einfo", "cindex = {}".format(cidx), logact) + if pgrec['einfo']: + sent = 1 if (PgLOG.send_customized_email("UC{}".format(cidx), pgrec['einfo'], emlact) and + PgDBI.pgexec("UPDATE dcupdt set einfo = NULL WHERE cindex = {}".format(cidx), logact)) else -1 + else: + sent = 0 + + PgLock.lock_update_control(cidx, 0) + if sent == -1: break + ecnt += sent + + if cnt > 1: PgLOG.pglog("{} of {} DSUPDT Control emails sent on {}".format(ecnt, cnt, PgLOG.PGLOG['HOSTNAME']), PgLOG.WARNLG) + +# +# check dsupdts and send saved email +# +def email_dsupdts(cnd, logact = 0): + + emlact = PgLOG.LOGWRN|PgLOG.FRCLOG + if logact and (logact&PgLOG.EMEROL) == PgLOG.EMEROL: emlact |= PgLOG.EMEROL + cnd += "pid = 0 AND emnote IS NOT NULL" + pgrecs = PgDBI.pgmget("dlupdt", "lindex, cindex", cnd, logact) + cnt = (len(pgrecs['lindex']) if pgrecs else 0) + ecnt = 0 + for i in range(cnt): + idx = pgrecs['cindex'][i] + if idx > 0 and PgDBI.pgget("dcupdt", "", "cindex = {} AND pid > 0".format(idx), logact): continue + idx = pgrecs['lindex'][i] + if PgLock.lock_update(idx, None, 1) <= 0: continue + pgrec = PgDBI.pgget("dlupdt", "emnote", "lindex = {}".format(idx), logact) + if pgrec['emnote']: + sent = 1 if(PgLOG.send_customized_email("Updtidx", pgrec['emnote'], emlact) and + PgDBI.pgexec("UPDATE dlupdt set emnote = NULL WHERE lindex = {}".format(idx), logact)) else -1 + else: + sent = 0 + + PgLock.lock_update(idx, None, 0) + if sent == -1: break + ecnt += sent + + if cnt > 0: PgLOG.pglog("{} of {} DSUPDT emails sent on {}".format(ecnt, cnt, PgLOG.PGLOG['HOSTNAME']), PgLOG.WARNLG) + +# +# create an dscheck record for a given command +# +def add_one_dscheck(oindex, otype, cmd, dsid, action, workdir, specialist, argv, remail, btctl, logact = 0): + + cidx = 0 + + if len(argv) > 100: + argextra = argv[100:] + argv = argv[0:100] + else: + argextra = None + + record = {'command' : cmd, 'argv' : argv, 'specialist' : specialist, 'workdir' : workdir, + 'dsid' : dsid, 'action' : action, 'oindex' : oindex, 'otype' : otype} + (record['date'], record['time']) = PgUtil.get_date_time() + if argextra: record['argextra'] = argextra + if 'PI' in PgOPT.params: record['pindex'] = PgOPT.params['PI'][0] + if 'MC' in PgOPT.params and PgOPT.params['MC'][0] > 0: record['mcount'] = PgOPT.params['MC'][0] + record.update(PgCMD.get_batch_options(btctl)) + + if cmd == 'dsrqst' and remail: + record['remail'] = remail + if otype == 'P': + pgcnt = PgDBI.pgget("dscheck", "", "remail = '{}' AND otype = 'P'" .format(remail), logact) + if pgcnt >= get_user_limit(remail)[1]: return PgLOG.FAILURE + elif action != 'PR': + pgcnt = PgDBI.pgget("dscheck", "", "remail = '{}' AND otype = 'R'".format(remail), logact) + if pgcnt >= get_user_limit(remail)[0]: return PgLOG.FAILURE + + if oindex and otype: + pgrec = PgDBI.pgget('dscheck', '*', "oindex = {} AND otype = '{}'".format(oindex, otype), logact) + else: + pgrec = PgCMD.get_dscheck(cmd, argv, workdir, specialist, argextra, logact) + + if pgrec: + return PgLOG.pglog("Chk{}: {} added already {} {}".format(pgrec['cindex'], PgCMD.get_command_info(pgrec), pgrec['date'], pgrec['time']), PgLOG.LOGWRN|PgLOG.FRCLOG) + + cidx = PgDBI.pgadd("dscheck", record, logact|PgLOG.AUTOID) + if cidx: + PgLOG.pglog("Chk{}: {} added {} {}".format(cidx, PgCMD.get_command_info(record), record['date'], record['time']), PgLOG.LOGWRN|PgLOG.FRCLOG) + else: + if oindex and otype: + PgLOG.pglog("{}-{}-{}: Fail add check for {}".format(cmd, otype, oindex, specialist), PgLOG.LOGWRN|PgLOG.FRCLOG) + else: + PgLOG.pglog("{}: Fail add check for {}".format(cmd, specialist), PgLOG.LOGWRN|PgLOG.FRCLOG) + + time.sleep(PgSIG.PGSIG['ETIME']) + return PgLOG.FAILURE + + return PgLOG.SUCCESS + +# +# get dscheck status +# +def dscheck_status(stat): + + STATUS = { + 'C' : "Created", + 'D' : "Done", + 'E' : "Exit", + 'F' : "Finished", + 'H' : "Held", + 'I' : "Interrupted", + 'P' : "Pending", + 'Q' : "Queueing", + 'R' : "Run", + 'S' : "Suspended", + } + return (STATUS[stat] if stat in STATUS else "Unknown") + +# +# validate given daemon control indices +# +def validate_daemons(): + + if PgOPT.OPTS['DI'][2]&8: return # already validated + + dcnt = len(PgOPT.params['DI']) if 'DI' in PgOPT.params else 0 + if not dcnt: + if PgOPT.PGOPT['CACT'] == 'SD': + if 'ND' not in PgOPT.params: + PgOPT.action_error("Mode option -ND must be present to add new Daemon Control record") + dcnt = PgOPT.get_max_count("HN", "CM") + if dcnt > 0: + PgOPT.params['DI'] = [0]*dcnt + return + i = 0 + while i < dcnt: + val = PgOPT.params['DI'][i] + if val: + if not isinstance(val, int): + if re.match(r'^(!|<|>|<>)$', val): + if PgOPT.OPTS[PgOPT.PGOPT['CACT']][2] > 0: + PgOPT.action_error("Invalid condition '{}' of Daemon Control index".format(val)) + break + PgOPT.params['DI'][i] = int(val) + else: + PgOPT.params['DI'][i] = 0 + i += 1 + if i >= dcnt: # normal daemon control index given + for i in range(dcnt): + val = PgOPT.params['DI'][i] + if not val: + if PgOPT.PGOPT['CACT'] != 'SD': + PgOPT.action_error("Daemon Control Index 0 is not allowed\nUse Action SD with Mode option -ND to add new record") + elif not PgOPT.params['ND']: + PgOPT.action_error("Mode option -ND must be present to add new Daemon Control record") + continue + if i > 0 and val == PgOPT.params['DI'][i-1]: continue + pgrec = PgDBI.pgget("dsdaemon", "specialist", "dindex = {}".format(val), PgOPT.PGOPT['extlog']) + if not pgrec: + PgOPT.action_error("Daemon Control Index '{}' is not in RDADB".format(val)) + elif(PgOPT.OPTS[PgOPT.PGOPT['CACT']][2] > 0 and PgOPT.params['LN'] != pgrec['specialist'] and + PgLOG.PGLOG['CURUID'] != PgLOG.PGLOG['RDAUSER']): + PgOPT.action_error("{}: must be {}, owner of Daemon Control Index {}".format(PgOPT.params['LN'], pgrec['specialist'], val)) + else: # found none-equal condition sign + pgrec = PgDBI.pgmget("dsdaemon", "DISTINCT dindex", + PgDBI.get_field_condition("dindex", PgOPT.params['DI'], 0, 1), PgOPT.PGOPT['extlog']) + if not pgrec: PgOPT.action_error("No Daemon Control matches given Index condition") + PgOPT.params['DI'] = pgrec['dindex'] + + PgOPT.OPTS['DI'][2] |= 8 # set validated flag + +# +# validate given check indices +# +def validate_checks(): + + if (PgOPT.OPTS['CI'][2]&8) == 8: return # already validated + + if 'CI' in PgOPT.params: + cnt = len(PgOPT.params['CI']) + i = 0 + while i < cnt: + val = PgOPT.params['CI'][i] + if val: + if not isinstance(val, int): + if re.match(r'^(!|<|>|<>)$', val): + if PgOPT.OPTS[PgOPT.PGOPT['CACT']][2] > 0: + PgOPT.action_error("Invalid condition '{}' of Check index".format(val)) + break + PgOPT.params['CI'][i] = int(val) + else: + PgOPT.params['CI'][i] = 0 + i += 1 + if i >= cnt: # normal check index given + for i in range(cnt): + val = PgOPT.params['CI'][i] + if not val: PgOPT.action_error("Check Index 0 is not allowed") + if i > 0 and val == PgOPT.params['CI'][i-1]: continue + pgrec = PgDBI.pgget("dscheck", "specialist", "cindex = {}".format(val), PgOPT.PGOPT['extlog']) + if not pgrec: + PgOPT.action_error("Check Index '{}' is not in RDADB".format(val)) + elif(PgOPT.OPTS[PgOPT.PGOPT['CACT']][2] > 0 and PgOPT.params['LN'] != pgrec['specialist'] and + PgLOG.PGLOG['CURUID'] != PgLOG.PGLOG['RDAUSER']): + PgOPT.action_error("{}: must be {}, owner of Check Index {}".format(PgOPT.params['LN'], pgrec['specialist'], val)) + else: # found none-equal condition sign + pgrec = PgDBI.pgmget("dscheck", "cindex", PgDBI.get_field_condition("cindex", PgOPT.params['CI'], 0, 1), PgOPT.PGOPT['extlog']) + if not pgrec: PgOPT.action_error("No Check matches given Index condition") + PgOPT.params['CI'] = pgrec['cindex'] + + PgOPT.OPTS['CI'][2] |= 8 # set validated flag + +# +# validate given dataset IDs +# +def validate_datasets(): + + if PgOPT.OPTS['DS'][2]&8: return # already validated + + dcnt = len(PgOPT.params['DS']) + for i in range(dcnt): + dsid = PgOPT.params['DS'][i] + if not dsid: PgOPT.action_error("Empty Dataset ID is not allowed") + if i and dsid == PgOPT.params['DS'][i-1]: continue + if not PgDBI.pgget("dataset", "", "dsid = '{}'".format(dsid), PgOPT.PGOPT['extlog']): + PgOPT.action_error("Dataset '{}' is not in RDADB".format(dsid)) + + PgOPT.OPTS['DS'][2] |= 8 # set validated flag diff --git a/src/rda_python_template/__init__.py b/src/rda_python_dscheck/__init__.py similarity index 100% rename from src/rda_python_template/__init__.py rename to src/rda_python_dscheck/__init__.py diff --git a/src/rda_python_dscheck/dscheck.py b/src/rda_python_dscheck/dscheck.py new file mode 100644 index 0000000..ebbe9fb --- /dev/null +++ b/src/rda_python_dscheck/dscheck.py @@ -0,0 +1,671 @@ +#!/usr/bin/env python3 +# +################################################################################## +# +# Title: dscheck +# Author: Zaihua Ji, zji@ucar.edu +# Date: 09/28/2020 +# 2025-02-05 transferred to package rda_python_dscheck from +# https://github.com/NCAR/rda-utility-programs.git +# Purpose: python utility program to check and start command saved in dscheck +# +# Github: https://github.com/NCAR/rda-python-dscheck.git +# +################################################################################## +# +import os +import re +import sys +import time +from os import path as op +from rda_python_common import PgLOG +from rda_python_common import PgCMD +from rda_python_common import PgSIG +from rda_python_common import PgLock +from rda_python_common import PgUtil +from rda_python_common import PgFile +from rda_python_common import PgOPT +from rda_python_common import PgDBI +from . import PgCheck + +ALLCNT = 0 # global counting variables + +# +# main function to run dscheck +# +def main(): + + aname = 'dscheck' + PgOPT.parsing_input(aname) + PgCheck.check_dscheck_options(PgOPT.PGOPT['CACT'], aname) + start_action() + + if PgOPT.OPTS[PgOPT.PGOPT['CACT']][2]: PgLOG.cmdlog() # log end time if not getting action + + PgLOG.pgexit(0) + +# +# start action of dscheck +# +def start_action(): + + global ALLCNT + if PgOPT.PGOPT['CACT'] == 'AC': + add_check_info() + elif PgOPT.PGOPT['CACT'] == 'CH': + check_host_connection() + elif PgOPT.PGOPT['CACT'] == 'DL': + if 'CI' in PgOPT.params: + ALLCNT = len(PgOPT.params['CI']) + delete_check_info() + if 'DI' in PgOPT.params: + ALLCNT = len(PgOPT.params['DI']) + delete_daemon_info() + elif PgOPT.PGOPT['CACT'] == 'EC': + email_check_info() + elif PgOPT.PGOPT['CACT'] == 'GC': + get_check_info() + elif PgOPT.PGOPT['CACT'] == 'GD': + get_daemon_info() + elif PgOPT.PGOPT['CACT'] == "IC": + ALLCNT = len(PgOPT.params['CI']) + interrupt_dschecks() + elif PgOPT.PGOPT['CACT'] == 'PC': + PgCMD.set_batch_options(PgOPT.params, 2, 1) + if 'DM' in PgOPT.params: + ALLCNT = 0 + handle_dschecks() + else: + process_dschecks() + elif PgOPT.PGOPT['CACT'] == 'SD': + ALLCNT = len(PgOPT.params['DI']) + set_daemon_info() + elif PgOPT.PGOPT['CACT'] == 'SO': + PgCMD.set_batch_options(PgOPT.params, 2, 1) + process_dscheck_options() + elif PgOPT.PGOPT['CACT'] == "UL": + ALLCNT = len(PgOPT.params['CI']) if 'CI' in PgOPT.params else 0 + unlock_checks() + +# +# add a check for customized command +# +def add_check_info(): + + cmd = PgOPT.params['CM'].pop(0) + argstr = PgLOG.argv_to_string(PgOPT.params['CM'], 0) + if 'AV' in PgOPT.params: + if argstr: argstr += " " + argstr += PgLOG.argv_to_string(PgOPT.params['AV'], 0) + dsid = PgOPT.params['DS'][0] if 'DS' in PgOPT.params else None + action = PgOPT.params['AN'][0] if 'AN' in PgOPT.params else None + PgCMD.set_batch_options(PgOPT.params, 2, 1) + specialist = PgOPT.params['SN'][0] if 'SN' in PgOPT.params else PgOPT.params['LN'] + workdir = PgOPT.params['WD'][0] if 'WD' in PgOPT.params else PgLOG.PGLOG['CURDIR'] + PgCheck.add_one_dscheck(0, '', cmd, dsid, action, workdir, specialist, + argstr, None, None, PgOPT.PGOPT['extlog']) + +# +# delete dscheck daemon controls for given daemon control indices +# +def delete_daemon_info(): + + s = 's' if ALLCNT > 1 else '' + PgLOG.pglog("Delete {} dscheck daemon control{} ...".format(ALLCNT, s), PgLOG.WARNLG) + + delcnt = 0 + for i in range(ALLCNT): + delcnt += PgDBI.pgdel("dsdaemon", "dindex = {}".format(PgOPT.params['DI'][i]), PgOPT.PGOPT['extlog']) + PgLOG.pglog("{} of {} dscheck daemon control{} deleted".format(delcnt, ALLCNT, s), PgOPT.PGOPT['wrnlog']) + +# +# delete checks for given check indices +# +def delete_check_info(): + + s = 's' if ALLCNT > 1 else '' + PgLOG.pglog("Delete {} dscheck record{} ...".format(ALLCNT, s), PgLOG.WARNLG) + + delcnt = 0 + for i in range(ALLCNT): + cidx = PgLock.lock_dscheck(PgOPT.params['CI'][i], 2, PgOPT.PGOPT['extlog']) + if cidx <= 0: continue + delcnt += PgCMD.delete_dscheck(None, "cindex = {}".format(cidx), PgOPT.PGOPT['extlog']) + PgLOG.pglog("{} of {} check record{} deleted".format(delcnt, ALLCNT, s), PgOPT.PGOPT['wrnlog']) + +# +# email notice of check status for specialist +# +def email_check_info(): + + cnd = PgOPT.get_hash_condition("dscheck", None, None, 1) + pgrecs = PgDBI.pgmget("dscheck", "*", cnd + " ORDER BY cindex", PgOPT.PGOPT['extlog']) + + allcnt = (len(pgrecs['cindex']) if pgrecs else 0) + if not allcnt: return PgLOG.pglog("{}: No Check Information Found to send email for {}".format(PgLOG.PGLOG['CURUID'], cnd), PgLOG.LOGWRN) + if allcnt > 1: + s = 's' + ss = "are" + else: + s = '' + ss = "is" + subject = "{} active Check Record{}".format(allcnt, s) + mbuf = "{} {} listed:\n".format(subject, ss) + pgrecs = {'status' : get_check_status(pgrecs, allcnt)} + + for i in range(allcnt): + if i > 0: mbuf += PgLOG.PGLOG['SEPLINE'] + mbuf += build_check_message(PgUtil.onerecord(pgrecs, i)) + + if 'CC' in PgOPT.params: PgLOG.add_carbon_copy(PgOPT.params['CC']) + subject += " found" + PgLOG.send_email(subject, PgOPT.params['LN'], mbuf) + PgLOG.pglog("Email sent to {} With Subject '{}'".format(PgOPT.params['LN'], subject), PgLOG.LOGWRN) + +# +# build email message for a given check record +# +def build_check_message(pgrec): + + msg = "Check Index: {}\nCommand: {} {}".format(pgrec['cindex'], pgrec['command'], pgrec['argv']) + if pgrec['argextra']: msg += PgLOG.break_long_string(pgrec['argextra'], 100, "...", 1) + msg += ("\nWork Directory: {}\n".format(pgrec['workdir']) + + "Initial Execution: {} {} byb {}\n".format(pgrec['date'], pgrec['time'], pgrec['specialist']) + + "Current Status: {}\n".format(pgrec['status'])) + if pgrec['errmsg']: + msg += "Error Message: {}\n".format(pgrec['errmsg']) + elif not pgrec['pid']: + msg += "Error Message: Aborted abnormally\n"; + + return msg + +# +# get dscheck daemon control information +# +def get_daemon_info(): + + tname = "dsdaemon" + hash = PgOPT.TBLHASH[tname] + PgLOG.pglog("Get dscheck daemon control information from RDADB ...", PgLOG.WARNLG) + + oflds = lens = fnames = None + if 'FN' in PgOPT.params: fnames = PgOPT.params['FN'] + fnames = PgDBI.fieldname_string(fnames, PgOPT.PGOPT[tname], PgOPT.PGOPT[tname]) + onames = PgOPT.params['ON'] if 'ON' in PgOPT.params else "I" + qnames = fnames + PgOPT.append_order_fields(onames, fnames, tname) + condition = PgOPT.get_hash_condition(tname, None, None, 1); + if 'ON' in PgOPT.params and 'OB' in PgOPT.params: + oflds = PgOPT.append_order_fields(onames, None, tname) + else: + condition += PgOPT.get_order_string(onames, tname) + + pgrecs = PgDBI.pgmget(tname, PgOPT.get_string_fields(qnames, tname), condition, PgOPT.PGOPT['extlog']) + if pgrecs: + if 'OF' in PgOPT.params: lens = PgUtil.all_column_widths(pgrecs, fnames, hash) + if oflds: pgrecs = PgUtil.sorthash(pgrecs, fnames, hash, PgOPT.params['OB']) + + PgOPT.OUTPUT.write(PgOPT.get_string_titles(fnames, hash, lens) + "\n") + if pgrecs: + cnt = PgOPT.print_column_format(pgrecs, fnames, hash, lens) + s = 's' if cnt > 1 else '' + PgLOG.pglog("{} daemon control{} retrieved".format(cnt, s), PgOPT.PGOPT['wrnlog']) + else: + PgLOG.pglog("No daemon control information retrieved", PgOPT.PGOPT['wrnlog']) + +# +# get check information +# +def get_check_info(): + + tname = 'dscheck' + hash = PgOPT.TBLHASH[tname] + PgLOG.pglog("Get check information from RDADB ...", PgLOG.WARNLG) + + lens = oflds = fnames = None + if 'FN' in PgOPT.params: fnames = PgOPT.params['FN'] + fnames = PgDBI.fieldname_string(fnames, PgOPT.PGOPT[tname], PgOPT.PGOPT['chkall']) + onames = PgOPT.params['ON'] if 'ON' in PgOPT.params else "I" + condition = PgOPT.get_hash_condition(tname, None, None, 1); + if 'ON' in PgOPT.params and 'OB' in PgOPT.params: + oflds = PgOPT.append_order_fields(onames, None, tname) + else: + condition += PgOPT.get_order_string(onames, tname) + + pgrecs = PgDBI.pgmget(tname, "*", condition, PgOPT.PGOPT['extlog']) + if pgrecs: + if 'CS' in PgOPT.params: + pgrecs['status'] = get_check_status(pgrecs) + if fnames.find('U') < 0: fnames == 'U' + if 'FO' in PgOPT.params: lens = PgUtil.all_column_widths(pgrecs, fnames, hash) + if oflds: pgrecs = PgUtil.sorthash(pgrecs, oflds, hash, PgOPT.params['OB']) + + PgOPT.OUTPUT.write(PgOPT.get_string_titles(fnames, hash, lens) + "\n") + if pgrecs: + cnt = PgOPT.print_column_format(pgrecs, fnames, hash, lens) + s = 's' if cnt > 1 else '' + PgLOG.pglog("{} check record{} retrieved".format(cnt, s), PgOPT.PGOPT['wrnlog']) + else: + PgLOG.pglog("No check information retrieved", PgOPT.PGOPT['wrnlog']) + +# +# add or modify dscheck daemon control information +# +def set_daemon_info(): + + tname = "dsdaemon" + hash = PgOPT.TBLHASH[tname] + s = 's' if ALLCNT > 1 else '' + PgLOG.pglog("Set information of {} dscheck daemon control{} ...".format(ALLCNT, s), PgLOG.WARNLG) + + addcnt = modcnt = 0 + flds = PgOPT.get_field_keys(tname, None, 'I') + PgOPT.validate_multiple_values(tname, ALLCNT, flds) + + for i in range(ALLCNT): + didx = PgOPT.params['DI'][i] if 'DI' in PgOPT.params else 0 + if didx > 0: + cnd = "dindex = {}".format(didx) + pgrec = PgDBI.pgget(tname, "*", cnd, PgOPT.PGOPT['extlog']) + if not pgrec: PgOPT.action_error("Miss daemon record for " + cnd, 'SD') + else: + pgrec = None + + record = PgOPT.build_record(flds, pgrec, tname, i) + if record: + if 'priority' in record and (record['priority'] < 0 or record['priority'] > 10): + PgOPT.action_error("{}: Priority value must in range 0(highest) - 10(lowest)".format(record['priority']), 'SD') + + if pgrec: + modcnt += PgDBI.pgupdt(tname, record, cnd, PgOPT.PGOPT['extlog']) + else: + if 'specialist' not in record and PgOPT.params['LN'] != PgLOG.PGLOG['RDAUSER']: record['specialist'] = PgOPT.params['LN'] + didx = PgDBI.pgadd(tname, record, PgOPT.PGOPT['extlog']|PgLOG.AUTOID) + if didx: + PgLOG.pglog("Daemon Control Index {} added".format(didx), PgOPT.PGOPT['wrnlog']) + addcnt += 1 + + PgLOG.pglog("{}/{} of {} daemon control{} added/modified in RDADB!".format(addcnt, modcnt, ALLCNT, s), PgOPT.PGOPT['wrnlog']) + +# +# expand check status info +# +def get_check_status(pgrecs, cnt = 0): + + if not cnt: cnt = (len(pgrecs['cindex']) if pgrecs else 0) + stats = [None]*cnt + for i in range(cnt): + pgrec = PgUtil.onerecord(pgrecs, i) + if pgrec['pid']: + percent = complete_percentage(pgrec) + runhost = "" + if percent < 0: + stats[i] = "Pending" + else: + stats[i] = get_execution_string(pgrec['status'], pgrec['tcount']) + rtime = PgCheck.dscheck_runtime(pgrec['stttime']) + if rtime: stats[i] += " {}".format(rtime) + if percent > 0: stats[i] += ", {}% done".format(percent) + if pgrec['runhost']: runhost = pgrec['runhost'] + stats[i] += PgLock.lock_process_info(pgrec['pid'], pgrec['lockhost'], runhost) + else: + stats[i] = PgCheck.dscheck_status(pgrec['status']) + if pgrec['status'] == 'D' or pgrec['status'] == 'P': + runhost = (pgrec['runhost'] if pgrec['runhost'] else pgrec['lockhost']) + if runhost: stats[i] += " on " + runhost + elif pgrec['status'] == 'C' and pgrec['pindex']: + stats[i] = "Wait on CHK {}".format(pgrec['pindex']) + + return stats + +# +# get the percentage of the check job done +# +def complete_percentage(check): + + percent = 0 + + if check['bid'] and not check['stttime']: + percent = -1 + elif check['fcount'] > 0 and check['dcount']: + percent = int(100*check['dcount']/check['fcount']) + elif check['command'] == "dsrqst" and check['oindex']: + if check['otype'] == 'P': + percent = get_partition_percentage(check['oindex']) + else: + percent = get_dsrqst_percentage(check['oindex']) + + return (percent if percent < 100 else 99) + +# +# get a request percentage finished +# +def get_dsrqst_percentage(ridx): + + rcnd = "rindex = {}".format(ridx) + pgrqst = PgDBI.pgget("dsrqst", "fcount, pcount", rcnd) + if pgrqst: + fcnt = pgrqst['fcount'] if pgrqst['fcount'] else 0 + if fcnt < 1: fcnt = PgDBI.pgget("wfrqst", "", rcnd) + if fcnt > 0: + dcnt = pgrqst['pcount'] if pgrqst['pcount'] else 0 + if dcnt < 1: dcnt = PgDBI.pgget("wfrqst", "", rcnd + " AND status = 'O'") + if dcnt > 0: + percent = int(100*dcnt/fcnt) + if percent > 99: percent = 99 + return percent + return 0 + +# +# get a partition percentage finished +# +def get_partition_percentage(pidx, cidx = 0): + + pcnd = "pindex = {}".format(pidx) + pgrec = PgDBI.pgget('ptrqst', "fcount", pcnd) + if pgrec: + fcnt = pgrec['fcount'] if pgrec['fcount'] else 0 + if fcnt < 1: fcnt = PgDBI.pgget("wfrqst", "", pcnd) + if fcnt > 0: + dcnt = PgDBI.pgget("wfrqst", "", pcnd + " AND status = 'O'") + if dcnt > 0: + percent = int(100*dcnt/fcnt) + if percent > 99: percent = 99 + return percent + return 0 + +# +# get excecution string for give try count +# +def get_execution_string(stat, trycnt = 0): + + str = PgCheck.dscheck_status(stat) + if trycnt > 1: str += "({})".format(PgLOG.int2order(trycnt)) + + return str + +# +# interrupt checks for given dscheck indices +# +def interrupt_dschecks(): + + s = 's' if ALLCNT > 1 else '' + delcnt = 0 + for i in range(ALLCNT): + cidx = PgOPT.params['CI'][i] + cnd = "cindex = {}".format(cidx) + cstr = "Check Index {}".format(cidx) + pgrec = PgDBI.pgget("dscheck", "*", cnd, PgOPT.PGOPT['extlog']) + if not pgrec: PgLOG.pglog(cstr +": NOT in RDADB", PgOPT.PGOPT['extlog']) + pid = pgrec['pid'] + if pid == 0: + PgLOG.pglog(cstr + ": Check is not under process; no interruption", PgOPT.PGOPT['wrnlog']) + continue + + host = pgrec['lockhost'] + if not PgFile.local_host_action(host, "interrupt check", cstr, PgOPT.PGOPT['errlog']): continue + + opts = "-h {} -p {}".format(host, pid) + buf = PgLOG.pgsystem("rdaps " + opts, PgLOG.LOGWRN, 20) # 21 = 4 + 16 + if buf: + ms = re.match(r'^\s*(\w+)\s+', buf) + if ms: + uid = ms.group(1) + if uid != PgOPT.params['LN']: + PgLOG.pglog("{}: login name '{}'; must be '{}' to interrupt".format(cstr, PgOPT.params['LN'], uid), PgOPT.PGOPT['wrnlog']) + continue + if 'FI' not in PgOPT.params: + PgLOG.pglog("{}: locked by {}/{}; must add Mode option -FI (-ForceInterrupt) to interrupt".format(cstr, pid, host), PgOPT.PGOPT['wrnlog']) + continue + if not PgLOG.pgsystem("rdakill " + opts, PgLOG.LOGWRN, 7): + PgLOG.pglog("{}: Failed to interrupt Check locked by {}/{}".format(cstr, pid, host), PgOPT.PGOPT['errlog']) + continue + else: + PgLOG.pglog("{}: check process stopped for {}/{}".format(cstr, pid, host), PgOPT.PGOPT['wrnlog']) + + pgrec = PgDBI.pgget("dscheck", "*", cnd, PgOPT.PGOPT['extlog']) + if not pgrec['pid']: + if PgLock.lock_dscheck(cidx, 1, PgOPT.PGOPT['extlog']) <= 0: continue + elif pid != pgrec['pid'] or host != pgrec['lockhost']: + PgLOG.pglog("{}: Check is relocked by {}/{}".format(cstr, pgrec['pid'], pgrec['lockhost']), PgOPT.PGOPT['errlog']) + continue + + pgrec['status'] = 'I' + PgCMD.delete_dscheck(pgrec, None, PgOPT.PGOPT['extlog']) + if pgrec['command'] == 'dsupdt': + if pgrec['oindex']: + cnd = "cindex = {} AND pid = {} AND ".format(pgrec['oindex'], pid) + if PgDBI.pgexec("UPDATE dcupdt set pid = 0 WHERE {}lockhost = '{}'".format(cnd, host), PgOPT.PGOPT['extlog']): + PgLOG.pglog("Update Control Index {} unlocked".format(pgrec['oindex']), PgLOG.LOGWRN) + else: + cnd = "dsid = '{}' AND pid = {} AND ".format(pgrec['dsid'], pid) + + dlupdt = PgDBI.pgget("dlupdt", "lindex", "{}hostname = '{}'".format(cnd , host)) + if dlupdt and PgDBI.pgexec("UPDATE dlupdt set pid = 0 WHERE lindex = {}".format(dlupdt['lindex']), PgOPT.PGOPT['extlog']): + PgLOG.pglog("Update Local File Index {} unlocked".format(dlupdt['lindex']), PgLOG.LOGWRN) + + elif pgrec['command'] == 'dsrqst': + record = {'status' : 'I', 'pid' : 0} + if pgrec['otype'] == 'P': + table = "ptrqst" + field = "pindex" + msg = "Request Partition Index" + else: + table = "dsrqst" + field = "rindex" + msg = "Request Index" + + if pgrec['oindex']: + cnd = "{} = {} AND pid = {} AND lockhost = '{}'".format(field, pgrec['oindex'], pid, host) + else: + cnd = "dsid = '{}' AND pid = {} AND lockhost = '{}'".format(pgrec['dsid'], pid, host) + + if PgDBI.pgupdt(table, record, cnd, PgOPT.PGOPT['extlog']): + PgLOG.pglog("{} {} unlocked".format(msg, pgrec['oindex']), PgLOG.LOGWRN) + delcnt += 1 + + if ALLCNT > 1: PgLOG.pglog("{} of {} check{} interrupted".format(delcnt, ALLCNT, s), PgLOG.LOGWRN) + + +# +# unlock checks for given check indices +# +def unlock_checks(): + + if ALLCNT > 0: + s = 's' if ALLCNT > 1 else '' + PgLOG.pglog("Unlock {} check{} ...".format(ALLCNT, s), PgLOG.WARNLG) + modcnt = 0 + for cidx in PgOPT.params['CI']: + pgrec = PgDBI.pgget("dscheck", "pid, lockhost", "cindex = {}".format(cidx), PgOPT.PGOPT['extlog']) + if not pgrec: + PgLOG.pglog("Check {}: Not exists".format(cidx), PgOPT.PGOPT['errlog']) + elif not pgrec['pid']: + PgLOG.pglog("Check {}: Not locked".format(cidx), PgOPT.PGOPT['wrnlog']) + elif PgLock.lock_dscheck(cidx, -1, PgOPT.PGOPT['extlog']) > 0: + modcnt += 1 + PgLOG.pglog("Check {}: Unlocked {}/{}".format(cidx, pgrec['pid'], pgrec['lockhost']), PgOPT.PGOPT['wrnlog']) + elif(PgFile.check_host_down(None, pgrec['lockhost']) and + PgLock.lock_dscheck(cidx, -2, PgOPT.PGOPT['extlog']) > 0): + modcnt += 1 + PgLOG.pglog("Check {}: Force unlocked {}/{}".format(cidx, pgrec['pid'], pgrec['lockhost']), PgOPT.PGOPT['wrnlog']) + else: + PgLOG.pglog("Check {}: Unable to unlock {}/{}".format(cidx, pgrec['pid'], pgrec['lockhost']), PgOPT.PGOPT['wrnlog']) + + if ALLCNT > 1: PgLOG.pglog("{} of {} check{} unlocked from RDADB".format(modcnt, ALLCNT, s), PgLOG.LOGWRN) + else: + cnd = "lockhost = '{}' AND ".format(PgLOG.get_host(1)) + PgCheck.check_dsrqst_locks(cnd, PgOPT.PGOPT['extlog']) + PgCheck.check_dsupdt_locks(cnd, PgOPT.PGOPT['extlog']) + PgCheck.check_dscheck_locks(cnd, PgOPT.PGOPT['extlog']) + +# +# process the checks +# +def process_dschecks(): + + logact = PgLOG.LOGERR + + if PgLOG.PGLOG['CURUID'] == PgLOG.PGLOG['RDAUSER'] and (time.time()%(3*PgSIG.PGSIG['CTIME'])) < 60: + logact |= PgLOG.EMEROL + + cnd = PgOPT.get_hash_condition("dscheck", "ST", None, 1) + if cnd: cnd += " AND " + if 'SN' not in PgOPT.params and PgOPT.params['LN'] != PgLOG.PGLOG['RDAUSER']: + cnd += "specialist = '{}' AND ".format(PgOPT.params['LN']) + + if 'WR' in PgOPT.params: PgCheck.start_dsrqsts(cnd, logact) + if 'WU' in PgOPT.params: PgCheck.start_dsupdts(cnd, logact) + + acnd = PgOPT.get_hash_condition("dscheck", None, "ST", 1) + if acnd: acnd += " AND " + PgCheck.start_dschecks(cnd + acnd, logact) + + if PgLOG.PGLOG['ERRCNT']: send_error_email() + +# +# process the checks +# +def process_dscheck_options(): + + logact = PgLOG.LOGERR + + if PgLOG.PGLOG['CURUID'] == PgLOG.PGLOG['RDAUSER'] and (time.time()%(3*PgSIG.PGSIG['CTIME'])) < 60: + logact |= PgLOG.EMEROL + + cnd = PgOPT.get_hash_condition("dscheck", "ST", None, 1) + if cnd: cnd += " AND " + if 'SN' not in PgOPT.params and PgOPT.params['LN'] != PgLOG.PGLOG['RDAUSER']: + cnd += "specialist = '{}' AND ".format(PgOPT.params['LN']) + + acnd = PgOPT.get_hash_condition("dscheck", None, "ST", 1) + if acnd: acnd += " AND " + PgCheck.set_dscheck_options(PgLOG.get_host(1), cnd + acnd, logact) + + if PgLOG.PGLOG['ERRCNT']: send_error_email() + +# +# send an email notice to the running specialist +# +def send_email_notice(cmd, pgrec): + + s = 's' if pgrec['tcount'] > 1 else '' + msg = ("Check Index {} for command:\n {}\n".format(pgrec['cindex'], cmd) + + "under '{}' has be executed {} time{}.\n".format(pgrec['workdir'], pgrec['tcount'], s)) + if pgrec['errmsg']: + msg += "Error message from previous execution:\n {}\n".format(pgrec['errmsg']) + + msg += ("If there is any problem, please fix it, delete the dscheck record via " + + "'dscheck dl -ci '\nand restart the command.\n".format(pgrec['cindex'])) + + PgLOG.send_email("Check Index {} reprocessed {} time{}".format(pgrec['cindex'], pgrec['tcount'], s), None, msg) + +# +# rdadata daemon handles the daemon controls +# +def handle_dschecks(): + + logact = ccnt = rcnt = ucnt = 0 + PgLOG.PGLOG['NOQUIT'] = 1 + ctime = 4*PgSIG.PGSIG['CTIME'] + etime = ctime + + while not PgSIG.PGSIG['QUIT']: + if etime >= ctime: + logact = PgLOG.LGEREX|PgLOG.EMEROL + etime = 0 + else: + logact = PgLOG.LGEREX + + ncnt = 0 + cnt = PgCheck.start_dsrqsts("", logact) + ncnt += cnt + rcnt += cnt + cnt = PgCheck.start_dsupdts("", logact) + ncnt += cnt + ucnt += cnt + cnt = PgCheck.start_dschecks("", logact) + ncnt += cnt + ccnt += cnt + + if PgLOG.PGLOG['ERRCNT']: send_error_email() + if not ncnt: PgDBI.pgdisconnect(1) + + etime += PgSIG.sleep_daemon() + + PgLOG.PGLOG['NOQUIT'] = 0 + PgSIG.stop_daemon(prepare_quit(ccnt, rcnt, ucnt)) + +# +# send an error email to the specialist +# +def send_error_email(): + + msg = "Error message for DSCHECK on " + PgLOG.PGLOG['HOSTNAME'] + + PgLOG.set_email(msg, PgLOG.EMLTOP) + msg = PgLOG.send_email(msg) + PgLOG.pglog(msg, PgLOG.MSGLOG|PgLOG.FRCLOG) + +# +# prepare a summary string for quit +# +def prepare_quit(ccnt, rcnt, ucnt): + + msg = "" + if rcnt > 0: + s = 's' if rcnt > 1 else '' + msg = "{} dsrqst{}".format(rcnt, s) + if ccnt > 0: + if msg: msg += ", " + s = 's' if ccnt > 1 else '' + msg += "{} dscheck{}".format(ccnt, s) + if ucnt > 0: + if msg: msg += ", " + s = 's' if ucnt > 1 else '' + msg += "{} dsupdt{}".format(ucnt, s) + + return msg + +# +# check a daemon host if connectable +# +def check_host_connection(): + + tname = "dsdaemon" + hash = PgOPT.TBLHASH[tname] + condition = PgOPT.get_hash_condition(tname, None, "H", 1) + if 'HN' in PgOPT.params: + pgrecs = {'specialist' : [], 'hostname' : []} + spclsts = PgDBI.pgmget(tname, "DISTINCT specialist", condition, PgOPT.PGOPT['extlog']) + if spclsts: + for specialist in spclsts['specialist']: + for hostname in PgOPT.params['HN']: + pgrecs['specialist'].append(specialist) + pgrecs['hostname'].append(hostname) + else: + pgrecs = PgDBI.pgmget(tname, "DISTINCT specialist, hostname", condition, PgOPT.PGOPT['extlog']) + + cnt = len(pgrecs['specialist']) if pgrecs else 0 + if not cnt: + PgLOG.pglog("No daemon host found to check connectivity", PgLOG.LOGWRN) + return + if cnt > 1: PgLOG.pglog("Check {} daemon hosts for connectivity ...".format(cnt), PgLOG.WARNLG) + + for i in range(cnt): + specialist = pgrecs['specialist'][i] + hostname = pgrecs['hostname'][i] + cmd = "ssh {} ps".format(hostname) + if specialist != PgLOG.PGLOG['CURUID']: + if PgLOG.PGLOG['CURUID'] != PgLOG.PGLOG['RDAUSER']: + PgLOG.pglog("{}: Cannot check connection to '{}' for {}".format(PgLOG.PGLOG['CURUID'], hostname, specialist), PgLOG.LOGERR) + continue + else: + cmd = "pgstart_{} {}".format(specialist, cmd) + + PgLOG.pglog("Check conection to '{}' for {} ...".format(hostname, specialist), PgLOG.WARNLG) + PgLOG.pgsystem(cmd, PgLOG.LOGERR, 4, None, 15) + +# +# call main() to start program +# +if __name__ == "__main__": main() diff --git a/src/rda_python_dscheck/dscheck.usg b/src/rda_python_dscheck/dscheck.usg new file mode 100644 index 0000000..903cec1 --- /dev/null +++ b/src/rda_python_dscheck/dscheck.usg @@ -0,0 +1,737 @@ + +1 INTRODUCTION + +Program 'dscheck' is a utility to add, remove, view and process recorded commands +of other utility programs in Research Data Archive Management System (RDAMS). For +delayed execution, or called batch process, of utility programs, 'dsarch', 'dsupdt' +and 'dsrqst', the commands information and the directories where the commands are +initiated are saved into RDADB as check records. For any other specialist-defined +commands, they can be also put in delayed mode if the commands are added to +'dscheck' control via Action -AC (-AddCheck). + +The check records are processed automatically via a centralized 'dscheck' daemon, +although they can be processed manually on command line. During the execution of a +recorded command, the check record is locked in RDADB to prevent multiple executions +of the same command. The check records that retain command information are +automatically purged into check history when the commands are finished. + +When a recorded command fails due to failures of computer system, storage disk/tape, +or the Database Management System, the check record is normally purged with status 'E' +for error, unless check-reprocessing ability is built into the utility program, such +as 'dsarch'. For a check-reprocessing command, the check record is retained in RDADB +Until the command is processed successfully or the number of executions reaches the +try limits allowed. Utility programs 'dsrqst' and 'dsupdt' carry their own failure- +recovering ability and they do not need check-reprocessing. + +The purged check records are retained in RDADB. The check history can be viewed via +utility program 'viewcheckusage'. + +Program 'dscheck' supports the following major functions: + - Set daemon control records for individual specialists to configure how many + concurrent processes of a specified command can be executed on a given host, and + the host priorities to define the order of which host is picked for processing + a check. Without the daemon control information, a recorded command will not be + started automatically. + - ADD a check record for delayed command execution of any specialist-defined commands + - View utility command information currently saved in RDADB + - Email the current status of a specified utility command or a list of commands, + and include error messages if any, to a specialist + - Delete recorded command information, because the command is not needed anymore + - Unlock a given recorded command in case lock information was not cleaned + properly when the command was failed + - Interrupt a utility command that is under execution and kill recursively all + the associated child processes + - Add the due 'dsrqst' and 'dsupdt' actions into dscheck records + - Process commands that have been recorded into RDADB or the ones have failed + previously. + +The specialist who executes a utility command under 'dscheck' control remains +the exclusive owner of the check record in RDADB. This prevents the command to be +processed or deleted accidentally by other specialists. + +In the following sections, general usages of 'dscheck' are described first; and +detail descriptions of Action options are given; and finally Mode and Info +options are explained. + +2 GENERAL DSCHECK USAGE + + dscheck [Action Option] [Mode Options] [Info Options] + or + dscheck [-(IF|InputFile)] InputFileNames + +Quotes [] indicate optional. A pipeline '|' in parentheses as in format (A|B) +means either A or B can be used. The options applied to 'dscheck' are divided +into three categories, Action, Mode, and Information (Info for short) options. +Action options are used to specify what tasks this utility program to execute, +Mode options are used to modify behaviors of the actions, and the Info options +are used to pass information, one or multiple values, to run 'dscheck'. An option +can be given in form of either short name or long name, -DS or -Dataset for +example. Some options have alias names for convenience; -UnLock, for example, +is an alias option name for Mode option -UL (-UnLockCheck). Option names can be +given in either upper or lower cases, while the values following Info options +are case sensitive. + +Specify one of the Action options each time to execute 'dscheck'. Based on what +Action is chosen, some of the Info options are mandatory and others are optional +and certain Mode options can be applied to alter the behaviors of the Action. + +All options, except Info option -IF (-InputFile), can be given either on command +line or in input files. Input file names are presented per Info option -IF and +can only be provided on command line. Referring to description of Info option -IF +(-InputFile) for detail on how to present options in input files. One or multiple +input files, combined with options on command line, are allowed to run 'dscheck'. +The option name, -IF (-InputFile), itself can be omitted if a single input file is +given on command line and all other option information are provided inside the +input file. + +When information of daemons and checks are retrieved, Info options are used to +specify conditions for querying information from RDADB. Some special signs can be +used to further confine the information with special and complicated conditions; +they are '!', '<', '>' and '<>'. These special signs, if provided on command +line, must be quoted or escaped to avoid of being interpreted by Unix OS system. +The '!', or \!, means exclusion to the following value(s) and it must be the +first item following an Info option name, while '<' or '>' mean greater or less +than the following value and '<>' means between the following two values. +Combine '!' and '<', as syntax "'!' '<' OptionValue", to make a condition of +'larger than or equal to OptionValue'. + +Description of an individual option is displayed if 'dscheck' is issued on +command line as + + dscheck [Option] -(h|help) [Option] + +A description is displayed for an option given either before or after -(h|help). +If no option is specified or 'dscheck' is issued by itself, this whole document +is displayed per UNIX utility 'more'. A hard copy of this help document can be +printed from the saved file: ${DSSHOME}/dssdb/prog_usage/dscheck.usg. + +#The online HTML version of this document is available at +#http://dss.ucar.edu/internal/docs/dscheck/ + + +3 ACTION OPTIONS + +Action options are used to specify what task 'dscheck' executes. No values +are allowed to follow Action options. Multiple Action options provided +simultaneously are blocked. + +Based on the information being manipulated, the actions are divided into three +categories: + Daemon Control Actions - create, delete, modify and view daemon control information + in RDADB, of specified specialist, command and hostname + Check Actions - add, delete, unlock and view check information of the + active individual checks + Check Process Actions - process checks by starting commands on remote hosts as + configured in daemon controls and purge checks by recording + the commands and their execution information into check + history; interrupt the current executing commands by killing + the current process and its all children; and email status + of current checks + Daemon Host Connectivity - check connectivity of daemon hosts for specialists + +3.1 Daemon Control Actions + + A daemon control record for a command, a specialist and a hostname is used to + configure how many concurrent processes the specified command can be executed + for the specialist on specified hostname, and the priority the the hostname is + picked to start the command. A running centralized daemon reads this record + periodically in case the configuration is changed while the daemon is still + running, so that specialists can reset the values in daemon control records to + change the behave of dscheck daemon dynamically without shutting the daemon down. + + Daemon control information can be created, modified and viewed via Actions + included in this section: + Set Daemon Control - create and modify daemon control information for specified + specialists, commands and hostnames + Get Daemon Control - retrieve information of existing daemon controls + Delete Daemon Control - delete one or multiple daemon control records + +3.1.1 Set Daemon Control + -SD or -SetDaemon, creates and modifies daemon control information into + RDADB for given specialist login names, commands, and hostnames of computers on + which the check commands are processed. One or multiple records can be set each + time. + + dscheck -(SD|SetDaemon) [Mode Option] + [-(DI|DaemonIndex) controlIndices] + [-(CM|Command) UtilityProgramNames] # ALL for any command names + [-(SN|Specialist) DECSSpecialists] + [-(HN|HostName) HostMachineNames] + [-(MH|MatchHost) FlagToMatchHostname] + [-(PL|ProcessLimit) MaxNumberOfProcesses], + [-(PO|Priority) HostListOrder] + + Mode option that can be specified for this action include: + -(ND|NewDaemon) - sets a new daemon control record into RDADB + + If information of a daemon control exists already in RDADB for a given specialist, + a command and a hostname, the daemon control record is modified; otherwise, a new + daemon control record is added if daemon index is 0 and Mode option -ND (-NewDaemon) + is present. Combination of specialist login name, command name and hostname of + computer must be unique for for each daemon control record. + + Specify host name 'SLURM' for putting the command in the SLURM batch control system. If + a specified command name is not found in the daemon control, the general 'dscheck' + configuration for command name 'ALL' is used. + + For example, set daemon control information for schuster, all commands on SLURM hosts, + for maximum 4 checks can be processed at the same time with priority 1, the smaller + the number the higher the priority is, via input file daemon.ctl + + dsrqst SD -ND -IF daemon.ctl + +<> +DaemonIndex<:>Command<:>Specialist<:>Hostname<:>ProcessLimit<:>Priority<:> +0<:>schuster<:>ALL<:>SLURM<:>4<:>1<:> + + +3.1.2 Get Daemon Control + -GD or -GetDaemon, retrieves daemon control information for given commands, + specialists or hostnames. Daemon control information of specified specialists + are retrieved if the specialist login names are provided. Without specified + condition, only the daemon control records set for the specialist who runs + 'dscheck' are retrieved. + + dscheck -(GD|GetDaemon) [Mode Option] + [-(FN|FieldNames) FieldNameString] + [-(DI|DaemonIndex) controlIndices] + [-(CM|Command) UtilityProgramNames] + [-(SN|Specialist) DECSSpecialists] + [-(HN|HostName) HostMachineName] + [-(PL|ProcessLimit) MaxNumberOfProcesses] + [-(PO|Priority) ProcessPriority] + [-(OF|OutputFile) OutputFileName] + [-(DB|Debug) DebugModeInfo] + + Mode option that can be specified for getting check control Action: + -(FO|FormatOutput) - format the column output with a fix width for all values + of a given field + + Use Info option -FN (-FieldNames) to specify what daemon control fields to be + retrieved. It defaults to all available fields if option -FN is not provided. + + Valid field names of daemon controls and their corresponding Info option + names: + + Names Info Options Descriptions + I -(DI|DaemonIndex) daemon control index + C -(CM|Command) command names of utility programs + H -(HN|Hostname) computer hostname + M -(MH|MatchHost) Flage to control hostname match + S -(SN|Specialist) DECS specialist the daemon set for + P -(PL|ProcessLimit) Max check count to be processed at the same times + O -(PO|Priority) host priority a specified command to start on + + Daemon control information can be retrieved for specified specialist per option + -SN (-Specialist), and/or other conditions. Info option -SN, -CM and -HN accept + wildcard input of '%' for matching any number of characters. + + If daemon control index is not given, 'dscheck' gathers only the daemon control + records owned by the specialist who executes this getting daemon control Action. + To view daemon control records owned by another specialist, you need specify Info + option -SN (-Specialist). To view all control records, you simply provide option + -SN with value of 'ALL'. + + For example, to get all daemon control information currently set for you + + dscheck GD + + +3.1.3 Delete Daemon Control + -DL or -Delete (Alias: -RM, -Remove), deletes one or multiple daemon control records + from RDADB for given daemon control indices. + + dscheck -(DL|Delete) + -(DI|DaemonIndex) DaemonControlIndices + [-(DB|Debug) DebugModeInfo] + + Use this action to delete daemon control information. Delete a daemon + control record to remove the daemon specific configuration for a command, + a specialist and a hostname. + +3.2 Check Actions + Delayed mode command executions for due actions of 'dsupdt' and 'dsrqst' are + recorded automatically, while other commands, including 'dsarch' and specialist- + defined ones, can be manually added into 'dscheck'. Command information can be + added, viewed and manipulated via 'dscheck' actions: + Add Check - Add a new check record for a specified command + Get Check - get the command information recorded in check records + Delete Check - delete check records for no need of processing the commands + Unlock Check - unlock check records in case that its recorded command is aborted + without cleaning the lock + +3.2.1 Add Check + -AC or -AddCheck, adds check information for a delayed mode command execution. + + dscheck -(AC|AddCheck) [Mode Option] + -(CM|Command) CommandNames + [-(AV|ArgumentVector) ArgumentVectorString] + [-(SN|Specialist) SpecialistNames] + [-(HN|HostName) HostNames] + [-(DS|Dataset) DatasetIDs] + [-(AN|ActionName) ActionNames] + [-(PI|ParentIndex) ParentCheckIndex] + [-(PQ|PBSQueue) PBSBatchQueue] + [-(WD|WorkDir) WorkingDirectory] + [-(MC|MaxCount) MaxTryCount] + [-(MO|Modules) ModuleList] + [-(EV|Environments) EnvironmentPairList] + [-(QS|QsubOptions) PBSBatchOptions] + [-(OF|OutputFile) OutputFileName] + [-(DB|Debug) DebugModeInfo] + + Mode option that can be specified for adding check control Action: + -(AW|AnyWhere) - sets Working directory empty in check record to start processing + the check anywhere. + + Command name is mandatory for adding a new check for delayed mode execution. + Unless they are specified, the current specialist who adds the command is defaulted + as the owner of the added check record, and the current path is defaulted as the + working directory when the command is executed later. + + Specified addtional PBS batch options via Info option -QS (-QSubOptions) to add a check; + and specify a parent check index to put the current command on hold until the parent + check is finished. + + Commands containing redirections and pipes are not supported for delayed mode. + A simple shell script can be used to wrap a complicated command. + + For example, to list file names in the current directory with name containing 'test' + by catching the standard output into a log and display it on screen + +<> +#!/bin/sh +(ls -l | grep test) | tee test1.out + + For example, to list file names in the current directory with name containing 'test' + by catching the standard output and error into separate log files + +<> +#!/bin/sh +(ls -l | grep test) 1> test2.out 2>test2.err + + For example, to add testing command 'test2' into 'dscheck' for delayed mode execution on + SLURM + + dsheck AC -CM test2 -HN SLURM + + The command 'test2' must be executable at the current working directory on SLURM machines. + + +3.2.2 Get Check + -GC or -GetCheck, gets check information recorded in RDADB. + + dscheck -(GC|GetCheck) [Mode Options] + [-(FN|FieldNames) FieldNameString] + [-(CI|CheckIndex) CheckIndices] + [-(ON|OrderNames) OrderNameString] + [-(CM|Command) CommandNames] + [-(AV|ArgumentVector) ArgumentVectorString] + [-(SN|Specialist) SpecialistNames] + [-(DS|Dataset) DatasetIDs] + [-(AN|ActionName) ActionNames] + [-(CD|CheckDate) CommandDate] + [-(CT|CheckTime) CommandTime] + [-(WD|WorkDir) WorkingDirectory] + [-(PQ|PBSQueue) PBSBatchQueue] + [-(OF|OutputFile) OutputFileName] + [-(DB|Debug) DebugModeInfo] + + Mode options that can be specified for getting check Action: + -(CS|CheckStatus) - check and show detail information on check status + -(FO|FormatOutput) - format the column output with a fix width for all values + of a given field + + Use Info option -FN (-FieldNames) to specify what check fields to retrieve. + It defaults to "COVTUFJDNW" if -FN is not given. + + Valid check field names and their corresponding Info options: + + Names Info Options Descriptions + C -(CI|CheckIndex) check index + O -(CM|Command) original command name + V -(AV|ArgumentVector) argument line following command, up to 100 chars + T -(DS|Dataset) dataset ID, the original command run against + A -(AN|ActionName) action name for a given command + U -(ST|Status) check status for a recorded command + B -(DF|DownFlags) Storage system down flags: H-HPSS,D-DRDATA,G-GLADE,O-ObjectStore + P -(PQ|PBSQueue) PBS batch queue name: rda or htc + R -(PI|ParentIndex) parent check index the current one to wait on + F -(FC|FileCount) number of files need to be processed + J -(DC|DoneCount) number of files are processed already + K -(TC|TryCount) number of tries the command is executed + L -(MC|MaxCount) upper limit for number of command tries + Z -(SZ|DataSize) total tytes of data processed for the command + D -(CD|CheckDate) date the command is initially recorded + Y -(CT|CheckTime) time the command is initially recorded + H -(HN|HostName) host names the command can or cannot run on + N -(SN|Specialist) specialist login name who starts the command + W -(WD|WorkDir) working directory where the command is started + M -(MO|Modules) include modules to load to batch job script + I -(EV|Environments) include environment vairables to load to batch job script + Q -(QS|QsubOptions) additional PBS batch options for qsub + X -(AX|ArgumentExtra) additional argument line beyond 100 characters + E -(ER|ErrorMessage) error message from failed command + + Check information can be retrieved for specified check index per option + -CI (-CheckIndex). Without any condition, the check records owned by the + current specialist are retrieved. + + +3.2.3 Delete Check + -DL or -Delete, deletes one or multiple 'dscheck' records from RDADB + for given check indices. + + dscheck -(DL|Delete) + -(CI|CheckIndex) CheckIndices + [-(DB|Debug) DebugModeInfo] + + Use this action to delete check information. A deleted check record is saved + into check history for viewing later via utility program 'viewcheckusage'. + + +3.2.4 Unlock Check + -UL or -UnLockCheck, (Alias: -UnLock), unlocks check records that their + commands aborted abnormally during processes. + + Process ID and computer hostname are saved in a check record when the recorded + command is running. If the process aborts abnormally, the PID and hostname may + sometimes be not cleaned properly. Use this action to clean up the locking + information to allow the command to be reprocessed or purged. + + dscheck -(-UL|UnLockCheck) + -(CI|CheckIndex) CheckIndices + [-(DB|Debug) DebugModeInfo] + + It is mandatory to provide a check index to remove lock on a recorded command. + + +3.3 Check Process Actions + Delayed mode command executions recorded in RDADB are automatically started, + or restarted, by the common 'dscheck' daemon. Running commands can be interrupted + at any time and the child processed of the interrupted commands are also cleaned up. + The status of the current check records can be gathered and emailed to a specialist. + + Here are the actions for process checks: + Process Check - start a command from its information recorded in 'dscheck' + Interrupt Check - interrupt a running command for given check record and clean the + child processes + Email Check - email a specialist for the status of the current check records + +3.3.1 Process Check + -PC or -ProcessCheck, starts process commands in check records in daemon or + non-daemon modes. Check indices can be specified if in non-daemon mode. + + dscheck -(PC|ProcessCheck) [Mode Options] + [-(CI|CheckIndex) CheckIndices] + [-(DM|DaemonMode) (start|stop|logon|logoff)] + [-(LH|LocalHost) [LocalHostname]] + [-(WI|WaitInterval) WaitIntervalInSeconds] + [-(MT|MaxrunTime) MaximumRunTimeInSeconds] + [-(DB|Debug) DebugModeInfo] + + Mode options that can be specified for processing check Action: + -(BG|BackGround) - background process to turn off screen display for both + standard outputs and errors + -(CP|CheckPending) - Check and kill long pending checks + -(LO|LogOn) - log detail for daemon mode + -(NC|NoCommand) - does not issue remote commands if this Mode option is present + -(WU|WithdsUpdt) - in non-daemon mode, add check records for due 'dsupdt' actions + configured in update control records + -(WR|WithdsRqst) - in non-daemon mode, add check records for 'dsrqst' records + due to be built or purged + + In daemon mode, 'dscheck' sleeps two minutes (120 seconds as default), unless + provided differently per option -WI (-WaitInterval), between processing check + records. Every time it wakes up, 'dscheck' first tries to add the due update + controls for command 'dsupdt' and add requests due to be built or purged for + command 'dsrqst', and then starts, or restarts, commands recorded in check + records on a computer according the priorities configured in the daemon controls. + + In non-daemon mode, Mode options -WU and -WR must be present for due actions + of 'dsupdt' and 'dsrqst' to be added to check records. All current standing + check records, unless check indices are specified, are processed on machines + according to the configured information in daemon controls. + +3.3.2 Interrupt Check + -IC or -InterruptCheck, interrupts checks that their commands are currently running + and also kills recursively all the child processes that are running under the + commands. Locks are cleaned too when the checks are interrupted. + + dscheck -(-IC|InterruptCheck) [Mode Option] + -(CI|CheckIndex) CheckIndices + [-(DB|Debug) DebugModeInfo] + + Mode option that can be specified for this action: + -(FI|ForceInterrupt) - it must be present for this action to interrupt a check + that is under processing. + + It is mandatory to provide a check index to interrupt its command. + +3.3.3 Email Check + -EC or -EmailCheck, sends an email to a specialist for the current status of all check + records owned by the specialist, unless more specific conditions are provided. + + dscheck -(EC|EmailCheck) + [-(CI|CheckIndex) CheckIndices] + [-(CM|Command) CommandNames] + [-(AV|ArgumentVector) ArgumentVectorStrings] + [-(SN|Specialist) SpecialistNames] + [-(DS|Dataset) DatasetNames] + [-(CD|CheckDate) CommandDate] + [-(CT|CheckTime) CommandTime] + [-(WD|WorkDir) WorkingDirectory] + [-(CC|CarbonCopy) Cc'dEmailAddresses] + [-(DB|Debug) DebugModeInfo] + + Command error messages are included too if any error messages are recorded in + the check record. + +3.4 Daemon Host Connectivity + -CH or -CheckHost, checks connectivity of daemon hosts for specialists. + + dscheck -(CH|CheckHost) + [-(DI|DaemonIndex) controlIndices] + [-(CM|Command) UtilityProgramNames] + [-(SN|Specialist) DECSSpecialists] + [-(HN|HostName) HostMachineName] + [-(PL|ProcessLimit) MaxNumberOfProcesses] + [-(PO|Priority) ProcessPriority] + [-(DB|Debug) DebugModeInfo] + +3.5 Set Batch Options Dynamically + -SO or -SetOptions (Alias: -SetBatchOptions), processes batch options with + a leading '!' to build options dynamically. + + dscheck -(SO|SetOptions) + [-(HN|HostName) HostNames] + [-(PL|ProcessLimit) MaxNumberOfProcesses] + [-(MO|Modules) ModuleList] + [-(EV|Environments) EnvironmentPairList] + [-(QS|QsubOptions) PBSBatchOptions] + [-(DB|Debug) DebugModeInfo] + + +4 MODE OPTIONS + +Use proper Mode options to modify behaviors of Action options. Mode options +are all optional. No value is allowed to be passed in following any Mode option. + + -AW or -AnyWhere, for Action -AC (-AddCheck), sets Working directory empty in + cehck record to start processing the check anywhere. + + -BG or -BackGround (Alias: -b), background process. When it presents + screen display is turned off for both standard outputs and errors. -(CP|CheckPending) - Check and kill long pending checks + + -CP or -CheckPending, if present for Action -PC (-ProcessCheck), check and kill + long pending checks + + -CS or -CheckStatus, if present, displays detailed status information of the + recorded commands, including progress percentage for a running command and + error message for a failed command. + + -FI or -ForceInterrupt, if present, force interrupts a check that is still under + processing; otherwise a warning message will display. + + -FO or -FormatOutput, if present, formats column output results for get + actions. A same width, evaluated dynamically, is applied for all values of a + given field. + + -LO or -LogOn (Alias: -LoggingOn), turns detail logging on when 'dscheck' starts + in Daemon mode. The detail logging is off as default if this Mode option is + not present. + + -MD or -MyDataset, allows a specialist to manipulate check information of + a given dataset listed for another specialist. + + -NC or -NoCommand (Alias: -NoRemoteCommand), does not issue remote commands if + this Mode option is present for action -PC (-Processcheck). + + -ND or -NewDaemon, a new daemon control can only be added when this Mode option is + present and daemon control index is given as 0 when Action -SD (-SetDaemon) of + 'dscheck' is executed. This Mode option blocks mistakes of adding daemon control + records unintentionally. + + -NT or -NoTrim, skips trimming of spaces and comments from input values to + speed up reading input file(s). + + -WR or -WithdsRqst, adds requests due to be built or purged of command 'dsrqst' + for 'dscheck' Action -PC (-ProcessCheck) in non-daemon mode. + + -WU or -WithdsUpdt, adds due update controls of command 'dsrqst' for 'dscheck' + Action -PC (-ProcessCheck) in non-daemon mode. + +5 INFORMATION OPTIONS + +Information options are used to pass information, one or multiple values, into +'dscheck'. Two types of Info options are used: + Single-Value Info Options - pass a single value to 'dscheck' + Multi-Value Info Options - pass one or multiple values to 'dscheck' + +5.1 Single-Value Info Options + + A single-value Info option is used to pass one value into this application. + One value, and one only, must follow a single-value option; otherwise an + error message is displayed if no value or more than one value passed in. + + -DM or -DaemonMode, passes daemon mode values, start, stop, logon or logoff + to 'dscheck' Action -PC (-ProcessCheck) to start, to stop, to log detail or + not to log detail, correspondingly. + + -DV or -Divider (Alias: -Delimiter, -Separator), delimiter for separating + columns of multi-value Info options in input files. It is default to '<:>'. + + -ES or -EqualSign, for an equal sign of assigning one value to either a + single-value option or multi-value option in input files. It is defaulted + to '<=>'. + + -FN or -FieldNames, for a string of single letter field names. Values of + the selected fields are retrieved for check information per actions -GC + (-GetCheck). + + -LH or -LocalHost, specify a local hostname to processes checks on the host for + action -PC(-ProcessCheck). It defaults to '' to use the local host name. Specify + SLURM or PBS to process batch jobs. + + -MT or -MaxrunTime, specify the maxmum run time for deamon mode. It defaults to 0 + for unlimit time. For examples, 5000 means seconds, and 1D means 1 day for 86400 + seconds. + + -OF or -OutputFile, leads an output file name into which the output result + of this application is dumped. If this option is not given, the result is + displayed as standard output. + + -ON or -OrderNames, for a string of single letter field names use to order + the results of getting check information via action -GC (-GetCheck). + + -AO or -ActOption, for setting Action and Mode options in input files. It is + default to ''. + + -WI or -WaitInterval, defaults to 2 minutes (120 seconds). It is used as a time + intervals for the 'dscheck' daemon to sleep between processing check records. + +5.2 Multi-Value Info Options + + A multi-value Info option is used to pass multiple values for one Info option + into 'dscheck'. At lease one value must follow each multi-value option. + + -AN or -ActionName (Alias: -Action), specifies an action name for given command + name recorded in check record. + + -AV or -ArgumentVector, the space delimited argument vector string up to 100 + characters. It is quoted with single quotes '' for a individual argument + containing spaces. + + -AX or -ArgumentExtra, the space delimited argument vector string beyond 100 + characters. This field is not empty only if the argument vector string is + longer than 100 characters. + + -CC or -CarbonCopy, provides additional one or multiple email addresses on + command line to send Cc'd email notification of the check status. For + DECS specialist, login user names themselves are acceptable; otherwise full + email addresses are required for email domains other than 'ucar.edu'. + + -CD or -CheckDate, the check date of the recorded command is first processed. + + -CI or -CechkIndex, check record indices for commands recorded in RDADB. A check + record is automatically purged if the command is finished. + + -CM or -Command (Alias: -CommandName), the command name recorded in check record. + + -CT or -CheckTime, the check time of the recorded command is first processed. + + -DB or -Debug, turns on debug mode with specified information. This option + provides up to 3 values, they are Debug Level, debug log file path and debug + log file name. The debug level is mandatory for this option. It can be a + single integer value, for example, 1000 means to log debug messages for debug + levels 1 to 1000; or a range of values, for example, 200-1000 means to log + debug messages from debug levels 200 to 1000. The default debug file path is + '$DSSHOME}/dssdb/log' and the default debug file name is 'mydss.dbg'. Provides + the second and third values for this option to override the default ones + respectively. + + -DC or -DoneCount, the number of files that are processed successfully already. + + -DF or -DownFlags, storage system down flags. The current supported flags are: + H-HPSS, D-DRDATA, G-GLADE, O-ObjectStore. It can hold multiple flags, up to 5, + for all down storage systems. + + -DI or -DaemonIndex, daemon control indices. A daemon control record retains + configuration information for how many processes on what host and its priority + can a command be started for specified specialist. + + -DS or -Dataset, for dataset numbers, or called dataset IDs in form as [a-z]NNNNNN. + + -ER or -ErrorMessage, error message of a failed check command. + + -EV or -Environments, (Alias: -Envs), specifies environment variables, in form of + VarName=VarValue and separated by ',', needed to be set to execute a command as + a batch job. The environment varaibles will be set in the batch starting script. + + -FC or -FileCount, the number of files to be processed by a check command. + + -HN or -HostName, specify the host names the check can or cannot be processed on. + + -IF or -InputFile, for input file names; one or multiple file names may be + given on command line. Input files are used to hold all valid options and + the associated values of Info options that need to be passed in for + execution of 'dscheck'. + + In a input file, lines start with sign '#' are considered as comments; + Option Names can be given either short, long or alias names. Action and Mode + options are given in format of OptionName. Single value Assignment is + given in format of OptionName<=>OptionValue. One option is given on each line. + Different setting sign of Action and Mode options can be provided by Info + option -AO (-ActOption, default to ); and different equal sign of single + value assignment can be provided by Info option -ES, (-EqualSign, default to + '<=>'). Multi-value assignments can be given in columns delimited with + separator specified per option -SP (-Separator, default to '<:>'). It starts + with a column title line for multi-value option names and the rest holds + values corresponding to each column titles. The value information stops at + the end of the file or when a new column name line or another single value + assignment appears. If the last column is a multi-line value field, an + additional separator must be appended for each line, including the column + title line to end lines properly. + + -MC or -MaxCount, the maximum number of tries for a check command can be processed + if the command is failed. + + -MH or -MatchHost (Alias: -MatchHostname), flag to control hostname match. + 'G' - general match, including emty hostname specified, or exclusive hostnames + given but the hostname is not in the list, and 'M' - match only the hostname + specified is identical to the current host. + + -MO or -Modules, (Alias: -Mods), specifies module names, separated by ',', + needed to be loaded to execute a command as a batch job. The modules will + be set in the batch starting script. + + -PI or -ParentIndex, specifies a parent check index for the current check to + wait on. + + -PO or -Priority, specifies the priority of a given host so that the host is + picked in such an order to start a 'dscheck' process. + + -PL or -ProcessLimit, specifies how many processes can be started for specified + command and specialist on a given host; Work for Action -SO (-SetOptions) to limit + how many concurrent cron processes. + + -PQ or -PBSQueue, specifies PBS batch queue index the current check to submit to. + + -QS or -QsubOptions, (Alias: -PBSOptions), specifies options to execute a command + as a batch job via qsub on PBS nodes. The qsub options must be quoted when prsented + on command line, such as, -QS '-l walltime=12:00:00'. + + -SN or -Specialist, the specialist who runs the original command. This Info + option is working with Action -GC (-GetCheck) only to view check information + of a specified specialist. + + -ST or -Status, check command status for a check record. Include command + progress percentage if under process and error message for a failed command. + + -SZ or -DataSize, the total bytes of data is processed for the check record. + + -TC or -TryCount, the number of tries for a check command being processed. It + is up to the number of tries specified via info option -MC. + + -WD or -WorkDir (Alias: -WorkDirectory), the working directory where the + the recorded command is started. diff --git a/src/rda_python_template/hello_world.py b/src/rda_python_template/hello_world.py deleted file mode 100644 index 0382c56..0000000 --- a/src/rda_python_template/hello_world.py +++ /dev/null @@ -1,15 +0,0 @@ -# -# main function to run hello_world -# -def main(): - - str = get_string('Hua') - print(str) - -def get_string(name): - return name + ": Hello World!" - -# -# call main() to start program -# -if __name__ == "__main__": main() diff --git a/src/rda_python_template/hello_world.usg b/src/rda_python_template/hello_world.usg deleted file mode 100644 index ddee2ad..0000000 --- a/src/rda_python_template/hello_world.usg +++ /dev/null @@ -1 +0,0 @@ -Usage: hello_world diff --git a/tests/test_dscheck.py b/tests/test_dscheck.py new file mode 100644 index 0000000..0ce7780 --- /dev/null +++ b/tests/test_dscheck.py @@ -0,0 +1,6 @@ +# test_dscheck.py + +import pytest + +def test_something(): + pass diff --git a/tests/test_hello_world.py b/tests/test_hello_world.py deleted file mode 100644 index 0e2ce45..0000000 --- a/tests/test_hello_world.py +++ /dev/null @@ -1,11 +0,0 @@ -# test_hello_world.py - -import pytest -from src.rda_python_template.hello_world import get_string - -def test_get_string(): - assert get_string('Bob') == 'Bob: Hello World!' - -def test_raises_exception_on_non_string_arguments(): - with pytest.raises(TypeError): - get_string(9)