-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtpo_common.py
1287 lines (1091 loc) · 51 KB
/
tpo_common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#! /usr/bin/env python
# tpo_common.py: Python module with common utilities mostly for debugging,
# mostly superceded by debug.py, system.py, and misc_util.py """
#
# sample usage:
# import tpo_common as tpo
# tpo.debug_print("start: " + tpo.debug_timestamp(), 3)
#
#------------------------------------------------------------------------
#
# Note:
# - *** Obsolete: use system.py, debug.py, etc. instead.
# - Use Emacs's query-replace-regexp (e.g., via M-#) to interative convert debug_format to debug.trace_fmt:
# from: debug_format(\(\"[^\"]*\"\), \([0-9]*\),
# to: debug.trace_fmt(\2, \1,")
# - The TPO prefix was used originally to avoid conflicts. Some of the older scripts
# rely upon this, but it will be eventually phased out.
# - This redefines some built-in functions (e.g., format and exit), so
# you shouldn't import using 'import *'.
# - Debugging output and error messages are converted to UTF8 before output
# via s=unicode(s, "utf8"), which is same as s=codecs.encode(s, "utf8").
# - Debugging code is "conditionally compiled" to avoid overhead of passing
# arguments to functions that do nothing in release code.
# - Includes support for overriding stderr (e.g., for use with embedded apps as in adhoc/optimize_company_extraction.py's invocation of extract_company_info.py).
#
# TODO:
# - Change all functions to using docstrings for comments already!!!
# - Add support for using pprint module
# - Put a version of assertion() here (i.e., from glue_helpers.py).
# - Extend getenv support to indicate user-vs-devel options.
# - Remove obsolete functions (e.g., get_current_function_name).
# - Use debug_trace in place of debug_print in code run often.
#
#------------------------------------------------------------------------
#
"""Common utility functions"""
# Load required libraries
#
# - For Debugging timestamp purposes (e.g., OUTPUT_DEBUG_TIMESTAMPS support)
from datetime import datetime
import atexit
#
# - The usuals:
import sys
import os
import re
# - Others
import inspect
import logging
import pickle
from six import string_types
import time
import types
# Load OrderedDict (added in Python 2.7)
try:
from collections import OrderedDict
except ImportError:
try:
from ordereddict import OrderedDict
except ImportError:
def OrderedDict():
"""Legacy stub"""
assert False, "requires ordereddict package (or python 2.7+)"
# Defaults for globals
debug_level = 0 # level at which debug tracing occurs
output_timestamps = False # whether to prefix output with timestamp
use_logging = False # traces via logging facility (and stderr)
skip_format_warning = False # skip warning about format w/o namespace
stderr = sys.stderr # file handle for error messages
# Constants for use with debug_print, etc.
LEVEL1 = 1
LEVEL2 = 2
LEVEL3 = 3
LEVEL4 = 4
LEVEL5 = 5
LEVEL6 = 6
# Aliases for debugging levels (by convention)
ALWAYS = 0
CRITICAL = ALWAYS
ERROR = 1
WARNING = 2
USUAL = 3
DETAILED = 4
VERBOSE = 5
QUITE_DETAILED = 6
QUITE_VERBOSE = 7
DEFAULT_DEBUG_LEVEL = ALWAYS
# Other constants
# TODO: use sys.version_info.major, etc.
USE_SIMPLE_FORMAT = (sys.version_info[0] <= 2) and (sys.version_info[1] <= 5)
#------------------------------------------------------------------------
# Debugging functions
#
# Notes:
# - These are no-op's unless __debug__ is True.
# - Running python with the -O (optimized) option ensures that __debug__ is False.
#
if __debug__:
def set_debug_level(level):
"""Set new debugging LEVEL"""
global debug_level
debug_level = level
return
def debugging_level():
"""Get current debugging level"""
global debug_level
return debug_level
def debug_trace_without_newline(text, *args, **kwargs):
"""Print TEXT without trailing newline, provided at debug trace LEVEL or higher, using ARGS for %-placeholders
Notes: ensures text encoded as UTF8 if under Python 2.x;
also, exceptions are ignored (to encourage more tracing)."""
# Warning: To avoid recursion don't call other user functions here unless
# they don't uses tracing (e.g., _normalize_unicode).
# TODO: work out shorter name (e.g., debug_trace_no_eol)
#
global debug_level
level = kwargs.get('level', 1)
if debug_level >= level:
if debug_level >= 96:
stderr.write("debug_trace_without_newline(text=%s, level=%s, args=%s)" %
(_normalize_unicode(text), level, [_normalize_unicode(v) for v in args]))
if args:
try:
text = (text % args)
except (AttributeError, IndexError, NameError, TypeError, ValueError):
print_stderr("Exception in debug_print: " + str(sys.exc_info()))
# Output optional timestamp (e.g., for quick-n-dirty profiling)
if output_timestamps:
# Get time-proper from timestamp (TODO: find standard way to do this)
# TODO: make date-stripping optional
timestamp = re.sub(r"^\d+-\d+-\d+\s*", "", debug_timestamp())
stderr.write("[%s] " % timestamp)
# Output text, making sure text represented in UTF8 if needed (n.b., via inlined ensure_unicode)
text = _normalize_unicode(text)
stderr.write(text)
if use_logging:
logging.debug(text)
##
## # Sanity check to ensure no instantiatable templates used in text
## # TODO: rework debug_print to safely handle templates by default
## while True:
## # +-pre--++---variable----++--rest-----+
## match = re.search(r"(^|[^{])({[A-Za-z0-9_]+})(([^}]|$|).*)", text)
## if not match:
## break
## text = match.group(3)
## var_format = match.group(2)
## if format(var_format, True, True) != var_format:
## print_stderr("Warning: template used with instantiated variable; try debug_format instead")
return
def debug_trace(text, *args, **kwargs):
"""Prints TEXT (formatted with ARG1, ...) if at LEVEL or higher.
Note: To avoid needless evaluation arguments should be specified
as distinct paramaters rather than using string format operator (%)."""
# Note: Implemented in terms of debug_trace_without_newline to keep timestamp support in one place.
# TODO: add skip_newline option
global debug_level
debug_trace_without_newline(text, *args, **kwargs)
level = kwargs.get('level', 1)
# TODO: assertion(isinstance(level, int))
if debug_level >= level:
stderr.write("\n")
return
def debug_print_without_newline(text, level=1):
"""Wrapper around debug_trace_without_newline (q.v.)
Note: Consider using debug_trace_without_newline directly."""
return debug_trace_without_newline(text, level=level)
def debug_print(text, level=1, skip_newline=False):
"""Print TEXT if at debug trace LEVEL or higher.
Notes:
- Implemented in terms of debug_print_without_newline to keep timestamp support in one place.
- Consider using debug_trace instead (to avoid string interpolation overhead)."""
# TODO: assertion(isinstance(level, int))
global debug_level
debug_trace_without_newline(text, level=level)
if (debug_level >= level) and (not skip_newline):
stderr.write("\n")
return
def debug_format(text, level=1, skip_newline=False, **namespace):
"""Version of debug_print that expands TEXT using format.
Note: Exceptions are ignored (to encourage tracing, not discourage)."
"""
# NOTE: String values from namespace need to be in UTF-8 format.
# TODO: rename as debug_print_format as not just formatting the text
global debug_level
assert(isinstance(level, int))
if debug_level >= level:
ignore_exc = (debug_level < QUITE_DETAILED)
debug_print(format(text, indirect_caller=True,
ignore_exception=ignore_exc, **namespace),
skip_newline=skip_newline)
return
def debug_timestamp():
"""Return timestamp for use in debugging traces"""
# EX: debug_timestamp() => "2015-01-18 16:39:45.224768"
# TODO: use format compatible with logging (e.g., comma in place of period before micrososeconds)
return to_string(datetime.now())
def debug_raise(level=1):
"""Raise an exception if debugging at specified trace LEVEL or higher"""
# Note: Intended for use in except clause to produce full stacktrace when debugging.
# TODO: Have version that just prints complete stacktrace (i.e., without breaking).
if debug_level >= level:
raise # pylint: disable=misplaced-bare-raise
return
def trace_array(array, level=VERBOSE, label=None):
"""Output the (list) array to STDERR if debugging at specified trace LEVEL or higher, using LABEL as prefix"""
global debug_level
assert(isinstance(level, int))
if debug_level >= level:
trace_output = ("%s: " % label) if label else ""
for i, item in enumerate(array):
trace_output += " " if (i > 0) else ""
trace_output += "%d: %s" % (i, item)
debug_print(trace_output)
return
def trace_object(obj, level=VERBOSE, label=None, show_private=None, show_methods_etc=None, indent="", max_value_len=1024):
"""Traces out OBJ instance to stderr if at debugging LEVEL or higher,
optionally preceded by LABEL, using INDENT, and limiting value lengths to MAX_VALUE_LEN"""
# based on http://stackoverflow.com/questions/192109/is-there-a-function-in-python-to-print-all-the-current-properties-and-values-of
debug_format("trace_object(_, lvl={lvl}, lab={lab}, prv={prv}, mth={mth}, ind={ind})", 7,
lvl=level, lab=label, prv=show_private, mth=show_methods_etc, ind=indent)
global debug_level
assert(isinstance(level, int))
if debug_level >= level:
debug_print("%s%s: {" % (indent, label if label else obj))
if show_private is None:
show_private = (level >= 6)
if show_methods_etc is None:
show_methods_etc = (level >= 7)
for attr in dir(obj):
# Note: unable to filter properties directly (so try/except handling added)
# See Take1 and Take2 below.
try:
## TAKE2:
## if re.search("property|\?\?\?", str(getattr(type(obj), attr, "???"))):
## debug_print("%s: property" % attr, 7)
## continue
# note: omits internal variables unless at debug level 6+; also omits methods unless at debug level 7+
debug_print("%s\tattr: name: %s" % (indent, attr), 8)
if (attr[0] != '_') or show_private:
attr_value = getattr(obj, attr)
debug_trace("%s\tattr: type: %s",
indent, type(attr_value), level=8)
method_types = (
types.MethodType, types.FunctionType,
types.BuiltinFunctionType, types.BuiltinMethodType,
types.ModuleType)
if ((not isinstance(attr_value, method_types))
or ("method-wrapper" not in str(type(attr_value)))
or show_methods_etc):
if len(to_string(attr_value)) > max_value_len:
attr_value = to_string(attr_value)[:max_value_len] + "..."
debug_print("%s\t%s = %r" % (indent, attr, attr_value))
except (KeyError, ValueError, AttributeError):
debug_print("%s\t%s = ???" % (indent, attr))
debug_print("%s}" % indent)
return
def trace_value(value, level=5, label=None):
"""Traces out VALUE to stderr if at debugging LEVEL or higher with optional '<LABEL>: ' prefix.
@Note: If value is an array or dict, each entry is output on a separate line."""
global debug_level
assert(isinstance(level, int))
if debug_level >= level:
prefix = ""
value_spec = value
if label:
prefix = ("%s: " % label)
if isinstance(value, list):
value_spec = "{\n" + "\n".join(["\t%s: %s" % (i, v) for (i, v) in enumerate(value)]) + "}\n"
elif isinstance(value, dict):
value_spec = "{\n"
for key in value.keys():
# Note: Exception handling used in case value access leads to error,
# such as with hash tables having Mako placeholders for undefined values.
try:
key_value = value[key]
debug_print("key=%s key_value=%s" % (key, key_value), 9)
except NameError:
print_stderr("Exception in trace_value getting value for key %s" % key)
key_value = "n/a"
value_spec += "\t%s: %s\n" % (key, key_value)
value_spec += "}\n"
debug_print("%s%s" % (prefix, value_spec))
return
def trace_current_context(level=QUITE_DETAILED, label=None,
show_methods_etc=False):
"""Traces out current context (local and global variables), with output
prefixed by "LABEL context" (e.g., "current context: {\nglobals: ...}").
Notes: By default the debugging level must be quite-detailed (6).
If the debugging level is higher, the entire stack frame is traced.
Also, methods are omitted by default."""
frame = None
if label is None:
label = "current"
try:
frame = inspect.currentframe().f_back
except (AttributeError, KeyError, ValueError):
debug_print("Exception during trace_current_context: " +
to_string(sys.exc_info()), 5)
debug_format("{label} context: {{", level, label=label)
prefix = " "
if (debugging_level() - level) > 1:
trace_object(frame, (level + 2), "frame", indent=prefix,
show_methods_etc=show_methods_etc)
else:
debug_format("frame = {f}", level, f=frame)
if frame:
trace_object(frame.f_globals, level, "globals", indent=prefix,
show_methods_etc=show_methods_etc)
trace_object(frame.f_locals, level, "locals", indent=prefix,
show_methods_etc=show_methods_etc)
debug_trace("}", level=level)
return
def during_debugging(expression=True):
"""Returns True if debugging, optionally conditioned upon EXPRESSION
Note: The expression is not considered in non-debug mode"""
return (debugging_level() > 0) and expression
else:
def set_debug_level(*_args, **_kwargs):
"""Non-debug stub"""
pass
def debugging_level(*_args, **_kwargs):
"""Non-debug stub"""
return 0
def debug_trace_without_newline(*_args, **_kwargs):
"""Non-debug stub"""
pass
def debug_trace(*_args, **_kwargs):
"""Non-debug stub"""
pass
def debug_print_without_newline(*_args, **_kwargs):
"""Non-debug stub"""
pass
def debug_print(*_args, **_kwargs):
"""Non-debug stub"""
pass
def debug_format(*_args, **_kwargs):
"""Non-debug stub"""
pass
def debug_timestamp(*_args, **_kwargs):
"""Non-debug stub"""
return ""
def debug_raise(*_args, **_kwargs):
"""Non-debug stub"""
pass
def trace_array(*_args, **_kwargs):
"""Non-debug stub"""
pass
def trace_object(*_args, **_kwargs):
"""Non-debug stub"""
pass
def trace_value(*_args, **_kwargs):
"""Non-debug stub"""
pass
def trace_current_context(*_args, **_kwargs):
"""Non-debug stub"""
pass
def during_debugging(*_args, **_kwargs):
"""Non-debug stub"""
return False
def debugging(level=(DEFAULT_DEBUG_LEVEL + 1)):
"""Whether running with LEVEL or higher debugging (1 by default)"""
return debugging_level() >= level
def detailed_debugging():
"""Whether running with detailed debug tracing"""
return debugging_level() >= DETAILED
def verbose_debugging():
"""Whether running with verbose debug tracing"""
return debugging_level() >= VERBOSE
#------------------------------------------------------------------------
# General utility functions
def to_string(obj):
"""Returns string rendition of OBJ (i.e., %s-based)"""
# Note: str() not used in order to properly handle unicode
# EX: to_string(123) => "123"
# EX: to_string(u"\u1234") => u"\u1234"
# EX: to_string(None) => "None"
# TODO: have specialization that returns "" for None
result = obj
## NOTE: Gotta hate python
## if not isinstance(result, types.StringTypes):
if not isinstance(result, string_types):
result = "%s" % (obj,)
return result
def normalize_unicode(text, encoding="utf8"):
"""Converts Unicode TEXT to ENCODING (normally UTF-8) if necessary, such as
to prepare for output. Note: this is a no-op in Python 3 or higher"""
result = _normalize_unicode(text, encoding)
debug_format("normalize_unicode({t}, {e}) => {r}", 10,
t=text, e=encoding, r=result)
debug_format("\ttypes: in={it} out={ot}", 11,
it=type(text), ot=type(result))
return result
#
def _normalize_unicode(text, encoding="utf8"):
"""Implementation of normalize_unicode (q.v.)"""
# EX: normalize_unicode(u'\u1234') => '\xe1\x88\xb4'
# Note: As this is used in suport for debug_print, no tracing is done.
# TODO: work out more-untuitive name
result = text
if sys.version_info[0] < 3:
if isinstance(result, unicode):
result = result.encode(encoding)
return result
def ensure_unicode(text, encoding="utf8"):
"""Ensures TEXT is encoded as unicode, using ENCODING (e.g., UTF8). Note: this is a no-op in Python 3 or higher"""
# EX: ensure_unicode('\xe1\x88\xb4') => u'\u1234'
result = _ensure_unicode(text, encoding)
debug_format("ensure_unicode({t}) => {r}", 10,
t=text, r=result)
debug_format("\ttypes: in={it} out={ot}", 11,
it=type(text), ot=type(result))
return result
#
def _ensure_unicode(text, encoding="utf8"):
"""Implementation of ensure_unicode (q.v.)"""
# Note: As this used in support for debug_print, no tracing is done.
result = text
if sys.version_info[0] < 3:
if not isinstance(result, unicode):
result = unicode(result, encoding)
return result
def print_stderr(text, **namespace):
"""Output TEXT to standard error (with newline added), using optional
NAMESPACE for format"""
# Note: ensures text encoded as UTF8 if under Python 2.x
if namespace:
text = format(text, **namespace)
stderr.write(normalize_unicode(text) + "\n")
return
def redirect_stderr(filename):
"""Redirects error output to FILENAME"""
global stderr
assert(stderr == sys.stderr)
stderr = open(filename, "w")
assert(stderr)
def restore_stderr():
"""Restores error output to system stderr, closing handle for redirection"""
global stderr
assert(stderr != sys.stderr)
stderr.close()
stderr = sys.stderr
def exit(message, **namespace): # pylint: disable=redefined-builtin
"""Display error MESSAGE to stderr and then exit, using optional
NAMESPACE for format"""
if namespace:
message = format(message, **namespace)
print_stderr(message + "\n")
return sys.exit()
def setenv(var, value):
"""Set environment VAR to VALUE"""
debug_print("setenv('%s', '%s')" % (var, value), 6)
os.environ[var] = to_string(value) if value is not None else ""
return
def chomp(text, line_separator=os.linesep):
"""Removes trailing occurrence of LINE_SEPARATOR from TEXT"""
# EX: chomp("abc\n") => "abc"
# EX: chomp("http://localhost/", "/") => "http://localhost"
result = text
if result.endswith(line_separator):
new_len = len(result) - len(line_separator)
result = result[:new_len]
debug_format("chomp({t}, {sep}) => {r}", 8,
t=text, sep=line_separator, r=result)
return result
#-------------------------------------------------------------------------------
# Environment accessors and setters
def getenv(var):
"""Return value of environment VAR"""
value = os.getenv(var)
debug_print("getenv('%s') => '%s'" % (var, value), 8)
return value
env_options = {}
env_defaults = {}
#
def register_env_option(var, description, default):
"""Register environment VAR as option with DESCRIPTION and DEFAULT"""
debug_format("register_env_option({v}, {d})", 7, v=var, d=description)
global env_options
global env_defaults
env_options[var] = (description or "")
env_defaults[var] = default
return
def get_registered_env_options():
"""Returns list of environment options registered via register_env_option"""
option_names = [k for k in env_options if (env_options[k] and env_options[k].strip())]
debug_format("get_registered_env_options() => {option_names}", 5)
return option_names
def get_environment_option_descriptions(include_all=None, include_default=None, indent=" "):
"""Returns list of environment options and their descriptions"""
debug_format("env_options={eo}", 5, eo=env_options)
debug_format("env_defaults={ed}", 5, ed=env_defaults)
if include_all is None:
include_all = verbose_debugging()
if include_default is None:
include_default = True
#
def _format_env_option(opt):
"""Returns OPT description and optionally default value (if INCLUDE_DEFAULT)"""
debug_format("_format_env_option({opt})", 7)
desc_spec = env_options.get(opt, "_")
default_spec = ""
if include_default:
default_value = env_defaults.get(opt, None)
has_default = (default_value is not None)
default_spec = ("(%s)" % default_value) if has_default else "n/a"
default_spec = default_spec.replace("\n", "\\n")
return (opt, desc_spec + indent + default_spec)
#
option_descriptions = [_format_env_option(opt) for opt in env_options if (env_options[opt] or include_all)]
debug_format("get_environment_option_descriptions() => {od}", 5,
od=option_descriptions)
return option_descriptions
def formatted_environment_option_descriptions(sort=False, include_all=None, indent="\t"):
"""Returns string list of environment options and their descriptions (separated by newlines and tabs), optionally SORTED"""
option_info = get_environment_option_descriptions(include_all)
if sort:
option_info = sorted(option_info)
entry_separator = "\n%s" % indent
descriptions = entry_separator.join(["%s%s%s" % (opt, indent, (desc if desc else "n/a")) for (opt, desc) in option_info])
debug_format("formatted_environment_option_descriptions() => {d}", 6,
d=descriptions)
return descriptions
def getenv_value(var, default=None, description=None, export=None):
"""Returns textual value for environment variable VAR; if not set, uses DEFAULT which can be None. If DESCRIPTION given, the variable is added to list of registered options. If EXPORT specified, the environment entry will be created if necessary (useful during sub-script invocation)."""
# Note: Use one of the specialized variants (e.g., getenv_text, getenv_boolean, or getenv_number).
value = getenv(var)
if export is None:
## OLD: export = description is not None
export = False
if not value:
debug_print("getenv_value: no value for %s" % var, 7)
value = default
if export:
setenv(var, value)
register_env_option(var, description, default)
debug_print("getenv_value('%s') => %s" % (var, value), 6)
return value
def getenv_text(var, default="", description=None):
"""Returns textual value for environment variable VAR (or string version of DEFAULT unless None)"""
# Note: This is a simple wrapper around getenv_value using lower tracing level (as former is intended as helper for specialized types given below).
value = getenv_value(var, default, description=description)
text_value = to_string(value) if value is not None else ""
debug_print("getenv_text('%s', '%s') => '%s'" % (var, default, text_value), 4)
return text_value
def getenv_boolean(var, default=False, description=None):
"""Returns boolean flag based on environment VAR (or DEFAULT value which can be None). Note: "0" or "False" is interpreted as False, and any value as True."""
bool_value = default
value_text = getenv_text(var, default, description=description)
# TODO: gh.assertion(type(default) == bool or default is None)
if ((default is not None) and (not isinstance(default, bool))):
print_stderr("Warning: unexpected default type to getenv_boolean: " +
"{t} vs. bool or None; var={v}", t=type(default), v=var)
# TODO: assert(isinstance(value_text, str))
if value_text:
bool_value = True
if (value_text.lower() == "false") or (value_text == "0"):
bool_value = False
debug_print("getenv_boolean(%s, %s) => %s" % (var, default, bool_value), 4)
return bool_value
def getenv_number(var, default=-1, description=None, integral=False):
"""Returns number based on environment VAR (or DEFAULT value which can be None)"""
num_value = default
value_text = getenv_text(var, default, description=description)
# TODO: assert(isinstance(value_text, str))
# TODO: make sure misc_utils version reconciled (and rework tpo_common to use latter!)
if value_text:
# Note: safe_int/_float not used so that exception into always displayed
try:
num_value = int(value_text) if integral else float(value_text)
except:
print_stderr(format("Exception converting env number for {v} ('{val}'): {exc}",
v=var, val=value_text, exc=str(sys.exc_info())))
debug_print("getenv_number(%s, %s) => %s" % (var, default, num_value), 4)
return num_value
def getenv_integer(var, default=-1, description=None):
"""Variant of getenv_number for integers. Note: returns integer unless None is default."""
return getenv_number(var, default, description=description, integral=True)
def getenv_real(var, default=-1, description=None):
"""Variant of getenv_number for reals (i.e., floating point). Note: (returns float unless None is default)."""
return getenv_number(var, default, description=description, integral=False)
def getenv_int(var, default=-1, description=None):
"""Alias for getenv_integer"""
return getenv_integer(var, default, description=description)
def getenv_float(var, default=-1, description=None):
"""Alias for getenv_real"""
return getenv_number(var, default, description=description)
def getenv_bool(var, default=False, description=None):
"""Alias for getenv_boolean"""
return getenv_boolean(var, default, description=description)
def get_current_function_name():
"""Returns the name of the currently executing function (excluding this function of course)"""
# Note: Based on http://code.activestate.com/recipes/66062-determining-current-function-name.
# Also see http://docs.python.org/2/reference/datamodel.html and http://docs.python.org/2/library/inspect.html.
# TODO: remove as no longer used???
name = "???"
try:
name = sys._getframe(1).f_code.co_name # pylint: disable=protected-access
except (AttributeError, ValueError):
print_stderr("Exception in get_current_function_name: " + str(sys.exc_info()))
debug_print("get_current_function_name() => %s" % name, 5)
return name
def get_property_value(obj, property_name, default_value=None):
"""Gets property value for NAME from OBJ, using DEFAULT"""
# EX: import datetime; (get_property_value(datetime.date.today(), 'year', -1) >= 2012) => True
# EX: get_property_value(datetime.date.today(), 'minute', -1) => -1
value = default_value
if hasattr(obj, property_name):
value = getattr(obj, property_name)
debug_print("get_property_value%s => %s" % (to_string((obj, property_name, default_value)),
value), level=4)
return value
def simple_format(text, namespace):
"""Resolve format-style command argument specifications within TEXT (within angle brackets), using bindings from NAMESPACE"""
# Note: This only supports evaluation of the text within braces (e.g., format specifiers not handled)
result = text
# Replace all placeholders via eval of expression
while (True):
# Find next variable reference (uses lookbehind to exclude double braces)
# +no brace+ {expressn} +
match = re.search("(?:[^{]|^)({([^{}]+)})", result)
if not match:
break
pattern = match.group(1)
var = match.group(2)
# Determine value and update argument
value = None
try:
# note: no global namespace specified (but should be part of local one as with format below)
value = eval(var, None, namespace) # pylint: disable=eval-used
except (IndexError, NameError, SyntaxError, TypeError, ValueError):
debug_print("Exception during eval: " + str(sys.exc_info()), 6)
if value is None:
debug_print("Unable to resolve replacement '%s' in '%s'" % (var, result), 2)
break
result = result.replace(pattern, str(value))
# Handle common escapes
result = result.replace("{{", "{")
result = result.replace("}}", "}")
debug_print("simple_format('%s', [namespace]) => %s" % (text, result), 6)
debug_print("\tnamespace: %s" % namespace, 7)
return result
warned_about_namespace = {}
#
def format(text, indirect_caller=False, ignore_exception=False, **namespace): # pylint: disable=redefined-builtin
"""Formats TEXT using local namespace, optionally using additional level or
indirection. If no keywords NAMESPACE specified, they are taken from local
environment (which is convenient but un-pythonic and a bit inefficient).
Exceptions can be ignored (to support debug_format)."""
# Notes:
# - Argments need to be in UTF-8 if the template text is an ascii string.
# - The technique for resolving the locals is based on Stack Overflow:
# http://stackoverflow.com/questions/6618795/get-locals-from-calling-namespace-in-python
# - This conflicts with the format builtin, but that is only a potential
# issue if immported via 'from tpo_common import *'. The standard function
# can be accessed via __builtin__.format (or via builtins under Python 3).
# - Encodes text in unicode prior to format proper and then encodes result
# into UTF-8.
# Warning: don't call other user functions except debug_print or helper
# functions that don't trace (e.g., _ensure_unicode).
if debug_level >= 99:
debug_trace("format(%s, %s, %s, %s)",
text, indirect_caller, ignore_exception, namespace)
# TODO: make unicode conversion optional
## OLD: text = _ensure_unicode(text)
frame = None
result = ""
try:
if not namespace:
frame = inspect.currentframe().f_back
if indirect_caller:
frame = frame.f_back
if not skip_format_warning:
filename = frame.f_globals.get("__file__", "???")
if filename.endswith(".pyc"):
filename = filename[:-1]
source_line_key = "{f}:{l}".format(f=filename, l=frame.f_lineno)
if source_line_key not in warned_about_namespace:
debug_trace("Warning: deriving namespace from locals and " +
"globals at %s: inefficient and un-pythonic!",
source_line_key, level=WARNING)
warned_about_namespace[source_line_key] = True
trace_object(frame, 9, "frame")
namespace = frame.f_globals.copy()
namespace.update(frame.f_locals)
else:
namespace = namespace.copy()
if USE_SIMPLE_FORMAT:
result = simple_format(text, namespace)
else:
# Make sure values to output are UTF-8 unless text is unicode
# Note: only converts strngs, lists, and hashes (e.g., omitting user objects and numbers)
if sys.version_info[0] < 3:
debug_trace("type(text)=%s", type(text), level=91)
debug_trace("namespace=%s", namespace, level=94)
## OLD: transform_fn = _normalize_unicode if not type(text) == unicode else _ensure_unicode
transform_fn = _normalize_unicode if (not isinstance(text, unicode)) else _ensure_unicode
for k in namespace:
## BAD: if isinstance(namespace[k], types.StringTypes):
if isinstance(namespace[k], string_types):
namespace[k] = transform_fn(namespace[k])
## OLD: elif type(namespace[k]) in [list, dict]:
elif isinstance(namespace[k], (list, dict)):
namespace[k] = transform_fn(to_string(namespace[k]))
# Do the actual conversion
result = text.format(**namespace)
except (AttributeError, KeyError, UnicodeDecodeError, ValueError):
debug_print("Exception during format(%s,...): %s" % (text, str(sys.exc_info())), 5)
if not ignore_exception:
raise
finally:
if frame:
del frame
## OLD: result = _normalize_unicode(result)
debug_trace("txt: " + text, level=91)
debug_trace("res: " + result, level=91)
debug_trace("format(%s,...) => %s", text, result, level=91)
return result
def init_logging():
"""Enable logging with INFO level by default or with DEBUG if detailed debugging"""
debug_print("init_logging", 4)
# TODO: use mapping from symbolic LEVEL user option (e.g., via getenv)
level = logging.DEBUG if detailed_debugging() else logging.INFO
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=level)
logging.debug("init_logging")
return
def load_object(filename):
"""Load object data from FILENAME in pickle format"""
debug_print("Loading object from %s" % filename, 3)
object_data = None
f = open(filename, 'r')
if f:
object_data = pickle.load(f)
f.close()
return object_data
def store_object(filename, object_data):
"""Store OBJECT_DATA in FILENAME using pickle format"""
debug_print("Saving object to %s" % filename, 3)
f = open(filename, 'w')
if f:
pickle.dump(object_data, f)
f.close()
return
def dump_stored_object(object_filename, dump_filename, level=1):
"""Traces out object stored in OBJECT_FILENAME to DUMP_FILENAME"""
# Note: convenience function for interactive usage
redirect_stderr(dump_filename)
obj = load_object(object_filename)
## OLD: if isinstance(obj, dict) or isinstance(obj, list):
if isinstance(obj, (dict, list)):
trace_value(obj, level)
else:
trace_object(obj, level)
restore_stderr()
return
def create_lookup_table(filename, use_linenum=False):
"""Create lookup hash table from string keys to string values (one pair per line, tab separated), optionally with the line number serving as implicit key. Note: The keys are made lowercase, and lines with multiple tabs are ignored."""
# TODO: rename as create_lookup_hash? (see ShelveLookup.from_hash in table_lookup.py)
# TODO: use enumerate(f); refine exception in except
debug_print("create_lookup_table(%s)" % filename, 4)
lookup_hash = {} if (not use_linenum) else OrderedDict()
f = None
try:
f = open(filename)
line_num = 0
for line in f:
line = line.strip("\n")
line_num += 1
fields = line.split("\t")
if len(fields) == 2:
key = fields[0].lower()
lookup_hash[key] = fields[1]
elif (len(fields) == 1) and use_linenum:
key = line_num
lookup_hash[key] = fields[0]
else:
debug_print("Warning: Ignoring entry at line %d (%s): %s" % (line_num, filename, line), 3)
except (IOError, ValueError):
debug_print("Warning: Exception creating lookup table from %s: %s" % (filename, str(sys.exc_info())), 2)
finally:
if f:
f.close()
debug_print("create_lookup_table => %s" % lookup_hash, 8)
return lookup_hash
def lookup_key(table, key, default):
"""Looks up KEY in table (ensuring lowercase)"""
result = table.get(key.lower(), default)
debug_format("lookup_key(_, {k}, {d}) => {r}", 6,
k=key, d=default, r=result)
return result
def create_boolean_lookup_table(filename):
"""Create lookup hash table from string keys to boolean occurrence indicator. Note: The keys are made lowercase."""
# TODO: allow for tab-delimited value to be ignored
debug_print("create_boolean_lookup_table(%s)" % filename, 4)
lookup_hash = {}
f = open(filename)
for line in f:
key = line.strip().lower()
lookup_hash[key] = True
if f:
f.close()
debug_print("create_boolean_lookup_table => %s" % lookup_hash, 8)
return lookup_hash
def normalize_frequencies(dictionary):
"""Normalize the frequencies in DICTIONARY (i.e., in range [0, 1])"""
total = 0
for k in dictionary:
total += dictionary[k]
if total > 0:
for k in dictionary:
dictionary[k] /= total
return
def sort_frequencies(dictionary):
"""Returns the keys in DICTIONARY sorted in reverse by frequency"""
sorted_keys = sorted(dictionary.keys(), reverse=True,
key=lambda k: dictionary[k])
debug_format("sort_frequencies({d}) => {r}", 7,
d=dictionary, r=sorted_keys)
return sorted_keys
def sort_weighted_hash(weights, max_num=None, reverse=None):
"""sorts the enties in WEIGHTS hash, returns list of (key, freq) tuples.
Note; sorted in REVERSE order by default"""
if max_num is None:
max_num = len(weights)
if reverse is None:
reverse = True
sorted_keys = sorted(weights.keys(), reverse=reverse,
key=lambda k: weights[k])
top_values = [(k, weights[k]) for k in sorted_keys[:max_num]]
debug_format("sort_weighted_hash(_, _) => {r}", 5, r=top_values)
return top_values
def format_freq_hash(freqs, label, max_num=None, prec=0, indent="\t"):
"""Returns formatted string representing top entries in FREQS hash,
preceded by LABEL, using up to MAX_NUM entries, with numbers rounded to
PRECISION places
Note; Result is covertedto UTF-8 if necessary (e.g., for printing)"""
if max_num is None:
max_num = len(freqs)
sorted_keys = sorted(freqs.keys(), reverse=True, key=lambda k: freqs[k])
top_values = [(indent + k + "\t" + round_num(freqs[k], prec))
for k in sorted_keys[:max_num]]
hash_listing = normalize_unicode(label + '\n' + "\n".join(top_values))
return hash_listing
def union(list1, list2):
""""Returns set union of LIST1 and LIST2 (preserving order)"""
# TODO: rework in terms of built-in set operations???
result = list1[:]
for item in list2:
if item not in list1:
result.append(item)
debug_format("union({l1}, {l2}) => {r}", 7, l1=list1, l2=list2, r=result)
return result