Skip to content

Commit

Permalink
Merge pull request #2 from NCAR/init-with-rda-ispd-python
Browse files Browse the repository at this point in the history
remove root file PgDBI.py and Fix PgUtil.py
  • Loading branch information
zaihuaji authored Jan 8, 2025
2 parents 46a8248 + a9f3fc5 commit a3d7229
Show file tree
Hide file tree
Showing 10 changed files with 36 additions and 2,270 deletions.
2,225 changes: 0 additions & 2,225 deletions PgDBI.py

This file was deleted.

2 changes: 1 addition & 1 deletion src/rda_python_common/PgCMD.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ def get_partition_control(pgpart, pgrqst = None, pgctl = None, logact = 0):
if not pgctl:
if not pgrqst and pgpart['rindex']:
pgrqst = PgDBI.pgget("dsrqst", "dsid, gindex, cindex, rqsttype", "rindex = {}".format(pgpart['rindex']), logact)
if pgrqst: pgctl = get_dsrqst_control(dsrqst, logact)
if pgrqst: pgctl = get_dsrqst_control(pgrqst, logact)

return pgctl

Expand Down
26 changes: 9 additions & 17 deletions src/rda_python_common/PgDBI.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def starttran():
else:
try:
pgdb.isolation_level
except OperationalError as e:
except PgSQL.OperationalError as e:
pgconnect(0, 0, False)
if pgdb.closed:
pgconnect(0, 0, False)
Expand Down Expand Up @@ -413,9 +413,9 @@ def add_new_table(tname, pre = None, suf = None, logact = 0):
def valid_table(tname, pre = None, suf = None, logact = 0):

if pre:
tbname = '{}_{}'.format(pre, tbname)
tbname = '{}_{}'.format(pre, tname)
elif suf:
tbname = '{}_{}'.format(tbname, suf)
tbname = '{}_{}'.format(tname, suf)
else:
tbname = tname
if tbname in ADDTBLS: return tbname
Expand Down Expand Up @@ -1141,7 +1141,7 @@ def pghupdt(tablename, record, cnddict, logact = PGDBI['ERRLOG']):
break
pgcnt += 1

if PgLOG.PGLOG['DBGLEVEL']: PgLOG.pgdbg(1000, "pgmupdt: {}/{} record(s) updated to {}".format(count, cntrow, tablename))
if PgLOG.PGLOG['DBGLEVEL']: PgLOG.pgdbg(1000, "pghupdt: {}/{} record(s) updated to {}".format(ucnt, tablename))
if(logact&PgLOG.ENDLCK):
endtran()
elif curtran:
Expand Down Expand Up @@ -1191,7 +1191,7 @@ def pgmupdt(tablename, records, cnddicts, logact = PGDBI['ERRLOG']):

pgcur.close()

if PgLOG.PGLOG['DBGLEVEL']: PgLOG.pgdbg(1000, "pgmupdt: {}/{} record(s) updated to {}".format(ucnt, cntrow, tablename))
if PgLOG.PGLOG['DBGLEVEL']: PgLOG.pgdbg(1000, "pgmupdt: {} record(s) updated to {}".format(ucnt, tablename))
if(logact&PgLOG.ENDLCK):
endtran()
elif curtran:
Expand Down Expand Up @@ -1264,7 +1264,7 @@ def pghdel(tablename, cnddict, logact = PGDBI['ERRLOG']):
if not cnddict or isinstance(cnddict, int): PgLOG.pglog("Miss condition dict to delete from " + tablename, logact)
sqlstr = prepare_delete(tablename, None, list(cnddict))

values = tuple(cnddicts.values())
values = tuple(cnddict.values())
if PgLOG.PGLOG['DBGLEVEL']: PgLOG.pgdbg(1000, "Delete from {} for {}".format(tablename, values))

dcnt = pgcnt = 0
Expand Down Expand Up @@ -1306,7 +1306,7 @@ def pgmdel(tablename, cnddicts, logact = PGDBI['ERRLOG']):
values = list(zip(*v))
if PgLOG.PGLOG['DBGLEVEL']:
for row in values:
PgLOG.pgdbg(1000, "Delete from {} for {}".format(tablenames, row))
PgLOG.pgdbg(1000, "Delete from {} for {}".format(tablename, row))

dcnt = pgcnt = 0
while True:
Expand Down Expand Up @@ -1411,7 +1411,7 @@ def pgcheck(tablename, logact = 0):
def check_user_uid(userno, date = None):

if not userno: return 0
if tyep(userno) is str: userno = int(userno)
if type(userno) is str: userno = int(userno)

if date is None:
datecond = "until_date IS NULL"
Expand All @@ -1433,7 +1433,7 @@ def check_user_uid(userno, date = None):
pgrec = ucar_user_info(userno)
if not pgrec: pgrec = {'userno' : userno, 'stat_flag' : 'M'}
uid = pgadd("dssdb.user", pgrec, (PGDBI['EXITLG']|PgLOG.AUTOID))
if uid: PgLOG.pglog("{}: Scientist ID Added as user.uid = {}".format(useno, uid), PgLOG.LGWNEM)
if uid: PgLOG.pglog("{}: Scientist ID Added as user.uid = {}".format(userno, uid), PgLOG.LGWNEM)

return uid

Expand Down Expand Up @@ -1629,14 +1629,6 @@ def check_cdp_wuser(username):
pgrec = pgget("wuser", "wuid", "cdpname = '{}'".format(username), PGDBI['EXITLG'])
if pgrec: return pgrec['wuid']

# missing wuser record add one in
pgrec = get_cdp_user(None, None, username)
if not pgrec:
if username not in LMISSES:
PgLOG.pglog("Missing CDP User '{}'".format(username), PgLOG.LGWNEM)
LMISSES['username'] = 1
return 0

idrec = pgget("wuser", "wuid", "email = '{}'".format(pgrec['email']), PGDBI['EXITLG'])
wuid = idrec['wuid'] if idrec else 0
if wuid > 0:
Expand Down
13 changes: 6 additions & 7 deletions src/rda_python_common/PgFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ def move_object_file(tofile, fromfile, tobucket, frombucket, logact = 0):
return PgLOG.FAILURE

cmd = "{} mv -b {} -db {} -k {} -dk {}".format(OBJCTCMD, frombucket, tobucket, fromfile, tofile)
ucmd = "{} gm -k {} -b {}".format(OBJCTCMD, fromfile, bucket)
ucmd = "{} gm -k {} -b {}".format(OBJCTCMD, fromfile, frombucket)
ubuf = PgLOG.pgsystem(ucmd, PgLOG.LOGWRN, CMDRET)
if ubuf and re.match(r'^\{', ubuf): cmd += " -md '{}'".format(ubuf)

Expand Down Expand Up @@ -1146,7 +1146,7 @@ def make_one_backup_directory(dir, odir, endpoint = None, logact = 0):
if not endpoint: endpoint = PgLOG.PGLOG['BACKUPEP']
info = check_backup_file(dir, endpoint, 0, logact)
if info:
if info['isfile']: return errlog("{}-{}: is file, cannot make backup directory".format(enpdpoint, dir), 'B', 1, logact)
if info['isfile']: return errlog("{}-{}: is file, cannot make backup directory".format(endpoint, dir), 'B', 1, logact)
return PgLOG.SUCCESS
elif info != None:
return PgLOG.FAILURE
Expand All @@ -1166,10 +1166,9 @@ def make_one_backup_directory(dir, odir, endpoint = None, logact = 0):
if syserr:
if syserr.find("No such file or directory") > -1:
ret = make_one_backup_directory(op.dirname(dir), odir, endpoint, logact)
if ret == PgLOG.SUCCESS or loop or opt&64 == 0: break
if ret == PgLOG.SUCCESS or loop: break
time.sleep(PgSIG.PGSIG['ETIME'])
else:
if opt&64 == 0: return PgLOG.FAILURE
errmsg = "Error Execute: {}\n{}".format(cmd, syserr)
(hstat, msg) = host_down_status('', QHOSTS[endpoint], 1, logact)
if hstat: errmsg += "\n" + msg
Expand Down Expand Up @@ -1293,7 +1292,7 @@ def change_local_group(file, ngrp = None, ogrp = None, logname = None, logact =
if ngid == ogid: return PgLOG.SUCCESS

try:
os.chown(file, nuid, ngid)
os.chown(file, ouid, ngid)
except Exception as e:
return errlog(str(e), 'L', 1, logact)

Expand Down Expand Up @@ -1705,7 +1704,7 @@ def remote_file_stat(line, opt):
if opt&17:
mdate = PgUtil.format_date(items[2], "YYYY-MM-DD", "YYYY/MM/DD")
mtime = items[3]
if PgLOG.PGLOG['GMTZ']: (mdate, mtime) = PgUtil.addhour(date, time, PgLOG.PGLOG['GMTZ'])
if PgLOG.PGLOG['GMTZ']: (mdate, mtime) = PgUtil.addhour(mdate, mtime, PgLOG.PGLOG['GMTZ'])
if opt&1:
info['date_modified'] = mdate
info['time_modified'] = mtime
Expand Down Expand Up @@ -1892,7 +1891,7 @@ def backup_file_stat(line, opt):
mtime = items[5]
ms = re.match(r'^(\d+:\d+:\d+)', mtime)
if ms: mtime = ms.group(1)
if PgLOG.PGLOG['GMTZ']: (mdate, mtime) = PgUtil.addhour(date, time, PgLOG.PGLOG['GMTZ'])
if PgLOG.PGLOG['GMTZ']: (mdate, mtime) = PgUtil.addhour(mdate, mtime, PgLOG.PGLOG['GMTZ'])
if opt&1:
info['date_modified'] = mdate
info['time_modified'] = mtime
Expand Down
2 changes: 1 addition & 1 deletion src/rda_python_common/PgLock.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ def lock_partition(pidx, dolock, logact = 0):
if logout: PgLOG.pglog(pinfo + ": error update lock", logout)
pidx = -pidx
else:
PgLOG.pglog("{}: Relocked {}/{}".format(pinfom, lkrec['pid'], lkrec['lockhost']), logout)
PgLOG.pglog("{}: Relocked {}/{}".format(pinfo, lkrec['pid'], lkrec['lockhost']), logout)
pidx = -pidx

return end_db_transaction(pidx)
Expand Down
8 changes: 4 additions & 4 deletions src/rda_python_common/PgOPT.py
Original file line number Diff line number Diff line change
Expand Up @@ -1240,14 +1240,14 @@ def compress_files(files, formats, count):
strcmp = 'Compress'
actcmp = 1
fmtcnt = len(formats)
if not fmtcont: return files # just in case
if not fmtcnt: return files # just in case
s = 's' if count > 1 else ''
PgLOG.pglog("{}ing {} File{} for {} ...".format(strcmp, count, s, params['DS']), PGOPT['wrnlog'])
cmpcnt = 0
for i in range(count):
fmt = formats[i] if(i < fntcmt and formats[i]) else formats[0]
(ofile, fmt) = PgFile.compress_local_file(files[i], fmt, cmpact, MUYOPT['extlog'])
if ofiles != files[i]:
fmt = formats[i] if(i < fmtcnt and formats[i]) else formats[0]
(ofile, fmt) = PgFile.compress_local_file(files[i], fmt, actcmp, PGOPT['extlog'])
if ofile != files[i]:
files[i] = ofile
cmpcnt += 1

Expand Down
14 changes: 7 additions & 7 deletions src/rda_python_common/PgPGS.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ def pgs_sql_file(tablenames, fields, condition = None):

sqlfile = "{}/pgs{}.sql".format(PGPGS['SQLPATH'], os.getpid())

sqlstr = "SELECT {}\nFROM {}".format(fields, tablename)
sqlstr = "SELECT {}\nFROM {}".format(fields, tablenames)
if condition:
if re.match(r'^\s*(ORDER|GROUP|HAVING)\s', condition, re.I):
slqstr += "\n{}".format(condition)
sqlstr += "\n{}".format(condition)
else:
sqlstr += "\nWHERE {}".format(condition)
asqlstr += ";\n"
sqlstr += ";\n"
try:
SQL = open(sqlfile, 'w')
SQL.write(sqlstr)
close(SQL)
SQL.close()
except Exception as e:
PgLOG.pglog("Error Open '{}': {}".format(sqlfile, str(e)), PgLOG.LGWNEX)

Expand All @@ -63,7 +63,7 @@ def pgsget(tablenames, fields, condition = None, logact = 0):
for line in re.split(r'\n', sqlout):
vals = re.split(r'\s*\|\s+', line)
if colcnt: # gather data
record = dict(zip(filds, vals))
record = dict(zip(fields, vals))
break
else: # gather field names
flds = vals
Expand All @@ -73,7 +73,7 @@ def pgsget(tablenames, fields, condition = None, logact = 0):

if PgLOG.PGLOG['DBGLEVEL']:
if record:
PgLOG.pgdbg(1000, "pgsget: 1 record retrieved from {}:\n{}".format(tablename, str(record)))
PgLOG.pgdbg(1000, "pgsget: 1 record retrieved from {}:\n{}".format(tablenames, str(record)))
else:
PgLOG.pgdbg(1000, "pgsget: 0 record retrieved from " + tablenames)

Expand All @@ -91,7 +91,7 @@ def pgsget(tablenames, fields, condition = None, logact = 0):
def pgsmget(tablenames, fields, condition = None, logact = 0):

sqlfile = pgs_sql_file(tablenames, fields, condition)
sqlout = pgsystem("psql {} < {}".format(PGPGS['PGSSERV'], sqlfile), logact, 273+1024) # 1+16+256
sqlout = PgLOG.pgsystem("psql {} < {}".format(PGPGS['PGSSERV'], sqlfile), logact, 273+1024) # 1+16+256

rowcnt = colcnt = 0
records = {}
Expand Down
10 changes: 5 additions & 5 deletions src/rda_python_common/PgSIG.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def validate_multiple_process(aname, plimit, uname = None, sargv = None, logact
if pcnt >= plimit:
msg = aname
if sargv: msg += ' ' + sargv
msg += ": already running in {} processes on {}".format(pid, PgLOG.PGLOG['HOSTNAME'])
msg += ": already running in {} processes on {}".format(pcnt, PgLOG.PGLOG['HOSTNAME'])
if uname: msg += ' By ' + uname
PgLOG.pglog(msg + ', Quit Now', logact)
sys.exit(0)
Expand Down Expand Up @@ -1044,9 +1044,8 @@ def check_background(bcmd, bid = 0, logact = PgLOG.LOGWRN, dowait = 0):

if logact&PgLOG.EXITLG: logact &= ~PgLOG.EXITLG
if not bid and bcmd: bid = bcmd2cbid(bcmd)
i = 0
bcnt = i = 0
while True:
bcnt = 0;
if bid:
if check_process(bid): # process is not done yet
if bcmd:
Expand All @@ -1064,8 +1063,9 @@ def check_background(bcmd, bid = 0, logact = PgLOG.LOGWRN, dowait = 0):
del CBIDS[bid]

if not (bcnt and dowait): break
show_wait_message(i, "{}: wait {}/{} background processes".format(PGSIG['DSTR'], pcnt, PGSIG['MPROC']), logact, dowait)
show_wait_message(i, "{}: wait {}/{} background processes".format(PGSIG['DSTR'], bcnt, PGSIG['MPROC']), logact, dowait)
i += 1
bcnt = 0

return bcnt

Expand Down Expand Up @@ -1135,7 +1135,7 @@ def show_wait_message(loop, msg, logact = PgLOG.LOGWRN, dowait = 0):
@contextmanager
def pgtimeout(seconds = 0, logact = 0):

if not seconds: seconds = PGLOG['TIMEOUT']
if not seconds: seconds = PgLOG.PGLOG['TIMEOUT']
signal.signal(signal.SIGALRM, raise_pgtimeout)
signal.alarm(seconds)
try:
Expand Down
2 changes: 1 addition & 1 deletion src/rda_python_common/PgSplit.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def pgadd_wfile(dsid, wfrec, logact = PgLOG.LOGERR, getid = None):
def pgmadd_wfile(dsid, wfrecs, logact = PgLOG.LOGERR, getid = None):

records = {'wfile' : wfrecs['wfile'],
'dsid' : (wfrecs['dsid'] if 'dsid' in wfrecs else [dsid]*len(records['wfile']))}
'dsid' : (wfrecs['dsid'] if 'dsid' in wfrecs else [dsid]*len(wfrecs['wfile']))}
wret = PgDBI.pgmadd('wfile', records, logact, 'wid')
wcnt = wret if isinstance(wret, int) else len(wret)
if wcnt:
Expand Down
4 changes: 2 additions & 2 deletions src/rda_python_common/PgUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ def fmtdatehour(yr, mn, dy, hr, tofmt = None):
hr += 24
dy -= 1
elif hr > 23:
while ht > 23:
while hr > 23:
hr -= 24
dy += 1

Expand Down Expand Up @@ -1479,7 +1479,7 @@ def addtime(sdate, stime, h, m, s):
def addintervals(sdatetime, intv, opt = 1):

if not isinstance(sdatetime, str): sdatetime = str(sdatetime)
if not intervals: return sdatetime
if not intv: return sdatetime
tv = [0]*7
i = 0
for v in intv:
Expand Down

0 comments on commit a3d7229

Please sign in to comment.