From 93b40940da7ae2afd9de98810028905c2ea91fbb Mon Sep 17 00:00:00 2001 From: Chris Marshall Date: Tue, 14 Mar 2017 11:33:55 -0400 Subject: [PATCH] Raise InvalidCredentialsError from C extension --- giraffez/__init__.py | 7 +- giraffez/cmd.py | 19 +++-- giraffez/connection.py | 3 +- giraffez/core.py | 4 +- giraffez/errors.py | 160 +-------------------------------------- giraffez/export.py | 36 +++++---- giraffez/mload.py | 54 +++++-------- giraffez/src/cmdobject.c | 16 ++-- giraffez/tptmodule.cc | 1 - 9 files changed, 69 insertions(+), 231 deletions(-) diff --git a/giraffez/__init__.py b/giraffez/__init__.py index 005030e..9095fb6 100644 --- a/giraffez/__init__.py +++ b/giraffez/__init__.py @@ -29,7 +29,7 @@ """ __title__ = 'giraffez' -__version__ = '2.0.0-dev0' +__version__ = '2.0.0beta1' __authors__ = ['Christopher Marshall', 'Kyle Travis'] __license__ = 'Apache 2.0' __all__ = ['Export', 'MLoad', 'Load', 'Cmd', 'Config', 'Secret'] @@ -53,14 +53,13 @@ http://www.capitalone.io/giraffez/intro.html#environment. """.format(error.msg)) +from ._cli import TeradataError as TeradataCLIError +from ._tpt import TeradataError as TeradataPTError from .cmd import TeradataCmd as Cmd from .config import Config from .constants import SILENCE, VERBOSE, DEBUG, INFO from .errors import ( - GeneralError, GiraffeError, - MultiLoadError, - TeradataError, GiraffeTypeError, GiraffeEncodeError, InvalidCredentialsError, diff --git a/giraffez/cmd.py b/giraffez/cmd.py index ff10139..1e9b89c 100644 --- a/giraffez/cmd.py +++ b/giraffez/cmd.py @@ -14,11 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._cli import Cmd, RequestEnded, StatementEnded, TeradataError - from .constants import * from .errors import * +from ._cli import Cmd, RequestEnded, StatementEnded, TeradataError + from .connection import Connection from .fmt import truncate from .logging import log @@ -144,7 +144,6 @@ class TeradataCmd(Connection): :param int log_level: Specify the desired level of output from the job. Possible values are :code:`giraffez.SILENCE` :code:`giraffez.INFO` (default), :code:`giraffez.VERBOSE` and :code:`giraffez.DEBUG` - :param bool panic: If :code:`True` stop executing commands and return after the first error. :param str config: Specify an alternate configuration file to be read from, when previous paramaters are omitted. :param str key_file: Specify an alternate key file to use for configuration decryption :param string dsn: Specify a connection name from the configuration file to be @@ -153,8 +152,7 @@ class TeradataCmd(Connection): locks the connection used in the configuration file. This can be unlocked using the command :code:`giraffez config --unlock ` changing the connection password, or via the :meth:`~giraffez.config.Config.unlock_connection` method. - :param string mload_session: Identifies mload sessions to suppress duplicate log output. - Used internally only. + :param string silent: Suppress log output. Used internally only. :raises `giraffez.errors.InvalidCredentialsError`: if the supplied credentials are incorrect :raises `giraffez.errors.TeradataError`: if the connection cannot be established @@ -187,17 +185,18 @@ def close(self): if getattr(self, 'cmd', None): self.cmd.close() - def execute(self, command, sanitize=True, multi_statement=False, prepare_only=False, - silent=False, coerce_floats=True, parse_dates=False): - # TODO: Improve/fix this docstring + def execute(self, command, coerce_floats=True, parse_dates=False, multi_statement=False, + sanitize=True, silent=False, prepare_only=False): """ Execute commands using CLIv2. :param str command: The SQL command to be executed - :param bool sanitize: Whether or not to call :func:`~giraffez.sql.prepare_statement` on the command + :param bool coerce_floats: Coerce Teradata decimal types into Python floats + :param bool parse_dates: Parses Teradata datetime types into Python datetimes :param bool multi_statement: Execute in multi-statement mode - :param bool prepare_only: Only prepare the command (no results) + :param bool sanitize: Whether or not to call :func:`~giraffez.sql.prepare_statement` on the command :param bool silent: Silence console logging (within this function only) + :param bool prepare_only: Only prepare the command (no results) :return: the results of each statement in the command :rtype: :class:`~giraffez.types.Rows` :raises `giraffez.errors.TeradataError`: if the query is invalid diff --git a/giraffez/connection.py b/giraffez/connection.py index bbcf12f..79ffd85 100644 --- a/giraffez/connection.py +++ b/giraffez/connection.py @@ -19,6 +19,7 @@ from .constants import * from .errors import * +from ._cli import InvalidCredentialsError from .config import Config from .logging import log from .utils import show_warning, suppress_context @@ -69,7 +70,7 @@ def __init__(self, host=None, username=None, password=None, log_level=INFO, conf log.info("Connection", "Connection to '{}' established successfully.".format(self.dsn)) except InvalidCredentialsError as error: if self.protect: - Config.lock_connection(self.config, self.dsn) + Config.lock_connection(self.config, self.dsn, self.key) raise error def _connect(self, host, username, password): diff --git a/giraffez/core.py b/giraffez/core.py index f7004e5..6167d0f 100644 --- a/giraffez/core.py +++ b/giraffez/core.py @@ -19,6 +19,8 @@ from .constants import * from .errors import * +from . import _cli +from . import _tpt from .config import Config, message_write_default from .encrypt import create_key_file from .io import home_file @@ -100,7 +102,7 @@ def run(self, test_args=None): self.run() else: raise error - except InvalidCredentialsError as error: + except (_cli.InvalidCredentialsError, _tpt.InvalidCredentialsError) as error: if args.protect: Config.lock_connection(args.conf, args.dsn, args.key) raise error diff --git a/giraffez/errors.py b/giraffez/errors.py index 4b5aa3d..e24e8c0 100644 --- a/giraffez/errors.py +++ b/giraffez/errors.py @@ -14,52 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .utils import suppress_context - - class GiraffeError(Exception): """ Baseclass for all giraffez errors. """ -class GeneralError(GiraffeError): - """ - General giraffez error. - """ - -class GiraffeNotFound(GiraffeError): - """ - Raised when giraffez C modules not found. - """ - -class TeradataCLIv2NotFound(GiraffeNotFound): - """ - Unable to import giraffez._cli. This indicates that either the - giraffez C extensions did not compile correctly, or more likely, - there is an issue with the environment or installation of the - Teradata dependencies. The Teradata Call-Level Interface Version 2 - requires several environment variables to be set to find the shared - library files and error message catalog. - - For more information, refer to this section in the giraffez - documentation: - http://www.capitalone.io/giraffez/intro.html#environment. - """ - -class TeradataPTAPINotFound(GiraffeNotFound): - """ - Unable to import giraffez._tpt. This indicates that either the - giraffez C extensions did not compile correctly, or more likely, - there is an issue with the environment or installation of the - Teradata dependencies. The Teradata Parallel Transporter API - requires several environment variables to be set to find the shared - library files and error message catalog. - - For more information, refer to this section in the giraffez - documentation: - http://www.capitalone.io/giraffez/intro.html#environment. - """ - class GiraffeTypeError(GiraffeError): """ Baseclass for all giraffez type errors. @@ -95,6 +54,10 @@ class ConfigReadOnly(ConfigurationError): Raised when a write is attempted on a configuration file was opened in read mode. """ +class InvalidCredentialsError(ConfigurationError): + """ + Raised when connection credentials are incorrect. + """ class ConnectionLock(ConfigurationError): """ @@ -105,118 +68,3 @@ class ConnectionLock(ConfigurationError): def __init__(self, dsn): super(ConnectionLock, self).__init__(("Connection {0} is currently locked. please update " "credentials and run:\n\tgiraffez config --unlock {0}").format(dsn)) - -class TeradataError(Exception): - """ - Baseclass for all Teradata errors. This exception represents any - error that originates from Teradata CLIv2/TPTAPI. - - This will attempt to parse error codes from the message in the case - that the error is a Teradata CLIv2 error. - """ - - def __init__(self, message, code=None): - super(TeradataError, self).__init__(message) - - if isinstance(message, Exception): - #message = getattr(message, 'message', '') - message = str(message) - - - if code is None: - if isinstance(message, (list, tuple)): - code, message = message[0], message[1] - else: - if ":" in message: - parts = [x.strip() for x in message.split(":", 1)] - try: - code = int(parts[0]) - message = parts[1] - except ValueError as error: - code = None - message = "Unable to parse CLI error code/message" - else: - code = None - message = "Unable to parse CLI error code/message" - if code == ObjectDoesNotExist.code: - raise suppress_context(ObjectDoesNotExist(message, code)) - elif code == ObjectNotTable.code: - raise suppress_context(ObjectNotTable(message, code)) - elif code == ObjectNotView.code: - raise suppress_context(ObjectNotView(message, code)) - elif code == TransactionAborted.code: - raise suppress_context(TransactionAborted(message, code)) - elif code == CannotReleaseMultiLoad.code: - raise suppress_context(CannotReleaseMultiLoad(message, code)) - elif code == MultiLoadTableExists.code: - raise suppress_context(MultiLoadTableExists(message, code)) - elif code == MultiLoadWorkTableNotFound.code: - raise suppress_context(MultiLoadWorkTableNotFound(message, code)) - elif code == InvalidCredentialsError.code: - raise suppress_context(InvalidCredentialsError(message, code)) - - self.message = message - self.code = code - - def __repr__(self): - return "{}: {}".format(self.code, self.message) - -class ObjectDoesNotExist(TeradataError): - """ - Teradata object does not exist. - """ - code = 3807 - -class ObjectNotTable(TeradataError): - """ - Teradata object not a table. - """ - code = 3853 - -class ObjectNotView(TeradataError): - """ - Teradata object not a view. - """ - code = 3854 - -class TransactionAborted(TeradataError): - """ - Teradata transaction aborted. - """ - code = 2631 - -class MultiLoadError(TeradataError): - """ - General MultiLoad error class. - """ - -class CannotReleaseMultiLoad(MultiLoadError): - """ - Raised when MultiLoad cannot be released because it is in the - apply phase. - """ - code = 2572 - -class MultiLoadTableExists(MultiLoadError): - """ - Raised when MultiLoad worktables exist. - """ - code = 2574 - -class MultiLoadLocked(MultiLoadError): - """ - Raised when MultiLoad appears to be locked. Used to determine when - MultiLoad should be released. - """ - -class MultiLoadWorkTableNotFound(MultiLoadLocked): - """ - Raised when MultiLoad worktable is missing during restart. - """ - code = 2583 - -class InvalidCredentialsError(TeradataError): - """ - Raised when connection credentials are incorrect. - """ - code = 8017 diff --git a/giraffez/export.py b/giraffez/export.py index 90000fd..55e2c3f 100644 --- a/giraffez/export.py +++ b/giraffez/export.py @@ -19,6 +19,7 @@ from .constants import * from .errors import * +from ._tpt import InvalidCredentialsError from .config import Config from .connection import Connection from .encoders import dict_to_json, TeradataEncoder @@ -45,15 +46,6 @@ class TeradataExport(Connection): :param str host: Omit to read from :code:`~/.girafferc` configuration file. :param str username: Omit to read from :code:`~/.girafferc` configuration file. :param str password: Omit to read from :code:`~/.girafferc` configuration file. - :param str delimiter: The delimiter to use when exporting with the 'text' - encoding. Defaults to '|' - :param str null: The string to use to represent a null value with the - 'text' encoding. Defaults to 'NULL' - :param str encoding: The encoding format in which to export the data. - Defaults to 'text'. Possible values are 'text' - delimited text - output, 'dict' - to output each row as a :code:`dict` object mapping column names - to values, 'json' - to output each row as a JSON encoded string, and - 'archive' to output in giraffez archive format :param int log_level: Specify the desired level of output from the job. Possible values are :code:`giraffez.SILENCE` :code:`giraffez.INFO` (default), :code:`giraffez.VERBOSE` and :code:`giraffez.DEBUG` @@ -65,6 +57,7 @@ class TeradataExport(Connection): locks the connection used in the configuration file. This can be unlocked using the command :code:`giraffez config --unlock ` changing the connection password, or via the :meth:`~giraffez.config.Config.unlock_connection` method. + :param bool coerce_floats: Coerce Teradata decimal types into Python floats :raises `giraffez.errors.InvalidCredentialsError`: if the supplied credentials are incorrect :raises `giraffez.errors.TeradataError`: if the connection cannot be established @@ -76,7 +69,7 @@ class TeradataExport(Connection): with giraffez.Export('dbc.dbcinfo') as export: print(export.header) - for row in export.results(): + for row in export.values(): print(row) """ @@ -155,6 +148,8 @@ def header(self): """ if self.query is None: raise GiraffeError("Must set target table or query.") + if not self.delimiter: + self.delimiter = DEFAULT_DELIMITER return self.delimiter.join(self.columns.names) def initiate(self): @@ -205,7 +200,17 @@ def query(self, query): else: self._query = statement self.initiated = False - self.export.set_query(statement) + # Since CLIv2 is used in set_query (instead of relying on the + # colums from the TPT Export driver) and set_query will always + # happen before calls to initiate, set_query will always fail + # with InvalidCredentialsError before initiate despite initiate + # presumably failing after this point as well. + try: + self.export.set_query(statement) + except InvalidCredentialsError as error: + if args.protect: + Config.lock_connection(args.conf, args.dsn, args.key) + raise error def archive(self, writer): if 'b' not in writer.mode: @@ -266,14 +271,7 @@ def fetchall(self): if self.query is None: raise GiraffeError("Must set target table or query.") if not self.initiated: - # TODO: this may not be necessary, InvalidCredentialsError - # is being checked in the base class. - try: - self.initiate() - except InvalidCredentialsError as error: - if self.protect: - Config.lock_connection(self.config, self.dsn) - raise suppress_context(error) + self.initiate() while True: data = self.export.get_buffer() if not data: diff --git a/giraffez/mload.py b/giraffez/mload.py index 384d27f..e2c8ed6 100644 --- a/giraffez/mload.py +++ b/giraffez/mload.py @@ -17,7 +17,7 @@ import struct import threading -from ._tpt import MLoad, TeradataError, EncoderError +from ._tpt import EncoderError, InvalidCredentialsError, MLoad, TeradataError from .constants import * from .errors import * @@ -69,10 +69,9 @@ class TeradataMLoad(Connection): is complete. """ checkpoint_interval = DEFAULT_CHECKPOINT_INTERVAL - _valid_encodings = {"str", "archive", "dict"} def __init__(self, table=None, host=None, username=None, password=None, log_level=INFO, - config=None, key_file=None, dsn=None, protect=False, cleanup=False): + config=None, key_file=None, dsn=None, protect=False, coerce_floats=False, cleanup=False): super(TeradataMLoad, self).__init__(host, username, password, log_level, config, key_file, dsn, protect) @@ -80,23 +79,18 @@ def __init__(self, table=None, host=None, username=None, password=None, log_leve self._columns = None self._table_name = None self._encoding = None + self._delimiter = DEFAULT_DELIMITER + self._null = DEFAULT_NULL - #: Flags tracking the important stages of MultiLoad to ensure - #: that the job is shutdown smoothly (if possible). self.initiated = False - self.end_acquisition = False + self.coerce_floats = coerce_floats self.perform_cleanup = cleanup self.applied_count = 0 self.error_count = 0 self.preprocessor = lambda s: s - - self.coerce_floats = False if table is not None: self.table = table - self._delimiter = DEFAULT_DELIMITER - self._null = DEFAULT_NULL - self.preprocessor = lambda s: s def _connect(self, host, username, password): self.mload = MLoad(host, username, password) @@ -132,7 +126,6 @@ def _update_error_count(self): def _end_acquisition(self): log.info("MLoad", "Ending acquisition phase ...") self.mload.end_acquisition() - self.end_acquisition = True log.info("MLoad", "Acquisition phase ended.") def checkpoint(self): @@ -161,9 +154,6 @@ def cleanup(self): def close(self): log.info("MLoad", "Closing Teradata PT connection ...") - #if not self.end_acquisition and self.initiated: - #log.info("MLoad", "Acquisition phase was not called before closing.") - #self._end_acquisition() self.mload.close() log.info("MLoad", "Teradata PT request complete.") @@ -190,13 +180,9 @@ def columns(self): @columns.setter def columns(self, field_names): - # TODO: check if list of strings if not isinstance(field_names, list): raise GiraffeError("Must set .columns property as type ") self._columns = field_names - #if not self.table: - #raise GiraffeError("Table name not set") - #self._columns.set_filter(field_names) @property def delimiter(self): @@ -237,7 +223,7 @@ def finish(self): self._apply_rows() return self.exit_code - def from_file(self, filename, table=None, null=DEFAULT_NULL, delimiter=None, panic=True, quotechar='"'): + def from_file(self, filename, table=None, null=None, delimiter=None, panic=True, quotechar='"'): """ Load from a file into the target table, handling each step of the load process. @@ -277,24 +263,21 @@ def from_file(self, filename, table=None, null=DEFAULT_NULL, delimiter=None, pan if not table: raise GiraffeError("Table must be set or specified to load a file.") self.table = table - + if null is not None: + self.null = null + if delimiter is not None: + self.delimiter = delimiter with Reader(filename, delimiter=delimiter, quotechar=quotechar) as f: if isinstance(f, ArchiveFileReader): self.mload.set_encoding(ROW_ENCODING_RAW) self.columns = f.header self.preprocessor = lambda s: s - self.null = null - if delimiter is None: - delimiter = "|" - self.delimiter = delimiter self.initiate() i = 0 for i, line in enumerate(f, 1): self.load_row(line, panic=panic) - #self.load_row(b"whoops", panic=panic) if i % self.checkpoint_interval == 1: log.info("\rMLoad", "Processed {} rows".format(i), console=True) - # TODO: get error info and catch error 10517: UPDATE_OPERATOR: TPT10510: Error Limit has been reached or exceeded. checkpoint_status = self.checkpoint() exit_code = self.exit_code if exit_code != 0: @@ -307,12 +290,17 @@ def initiate(self): raise GiraffeError("Table must be set prior to initiating.") if self.initiated: raise GiraffeError("Already initiated connection.") - if self.perform_cleanup: - self.cleanup() - elif any(filter(lambda x: self.mload.exists(x), self.tables)): - raise GiraffeError("Cannot continue without dropping previous job tables. Exiting ...") - log.info("MLoad", "Initiating Teradata PT request (awaiting server) ...") - self.mload.initiate(self.table, self.columns) + try: + if self.perform_cleanup: + self.cleanup() + elif any(filter(lambda x: self.mload.exists(x), self.tables)): + raise GiraffeError("Cannot continue without dropping previous job tables. Exiting ...") + log.info("MLoad", "Initiating Teradata PT request (awaiting server) ...") + self.mload.initiate(self.table, self.columns) + except InvalidCredentialsError as error: + if args.protect: + Config.lock_connection(args.conf, args.dsn, args.key) + raise error self.mload.set_delimiter(self.delimiter) self.mload.set_null(self.null) log.info("MLoad", "Teradata PT request accepted.") diff --git a/giraffez/src/cmdobject.c b/giraffez/src/cmdobject.c index 1d6dc25..082206a 100644 --- a/giraffez/src/cmdobject.c +++ b/giraffez/src/cmdobject.c @@ -35,7 +35,11 @@ PyObject* check_parcel_error(TeradataConnection *conn) { Py_RETURN_NONE; case PclFAILURE: err = tdcli_read_failure(conn->dbc->fet_data_ptr); - PyErr_Format(TeradataError, "%d: %s", err->Code, err->Msg); + if (err->Code == TD_ERROR_INVALID_USER) { + PyErr_Format(InvalidCredentialsError, "%d: %s", err->Code, err->Msg); + } else { + PyErr_Format(TeradataError, "%d: %s", err->Code, err->Msg); + } return NULL; case PclERROR: err = tdcli_read_error(conn->dbc->fet_data_ptr); @@ -206,7 +210,11 @@ PyObject* teradata_handle_record(TeradataEncoder *e, const uint32_t parcel_t, un return NULL; case PclFAILURE: err = tdcli_read_failure((char*)*data); - PyErr_Format(TeradataError, "%d: %s", err->Code, err->Msg); + if (err->Code == TD_ERROR_INVALID_USER) { + PyErr_Format(InvalidCredentialsError, "%d: %s", err->Code, err->Msg); + } else { + PyErr_Format(TeradataError, "%d: %s", err->Code, err->Msg); + } return NULL; case PclERROR: err = tdcli_read_error((char*)*data); @@ -320,11 +328,9 @@ static PyObject* Cmd_set_encoding(Cmd *self, PyObject *args) { uint32_t new_settings = 0; uint32_t settings; PyObject *null = NULL, *delimiter = NULL; - if (!PyArg_ParseTuple(args, "i|OO", &settings, &null, &delimiter)) { return NULL; } - if (settings & ROW_RETURN_MASK) { new_settings = (self->encoder->Settings & ~ROW_RETURN_MASK) | settings; } @@ -334,14 +340,12 @@ static PyObject* Cmd_set_encoding(Cmd *self, PyObject *args) { if (settings & DECIMAL_RETURN_MASK) { new_settings = (self->encoder->Settings & ~DECIMAL_RETURN_MASK) | settings; } - if (encoder_set_encoding(self->encoder, new_settings) != 0) { PyErr_Format(PyExc_ValueError, "Encoder set_encoding failed, bad encoding '0x%06x'.", settings); return NULL; } Py_RETURN_ERROR(encoder_set_null(self->encoder, null)); Py_RETURN_ERROR(encoder_set_delimiter(self->encoder, delimiter)); - Py_RETURN_NONE; } diff --git a/giraffez/tptmodule.cc b/giraffez/tptmodule.cc index 9c081e3..7dbc208 100644 --- a/giraffez/tptmodule.cc +++ b/giraffez/tptmodule.cc @@ -41,7 +41,6 @@ MOD_INIT(_tpt) MOD_DEF(m, "_tpt", "", module_methods); - // TODO: should also check if these are ready? giraffez_types_import(); if (m == NULL) {