Skip to content

Commit

Permalink
Clean up socket error handling during connection establishment.
Browse files Browse the repository at this point in the history
  • Loading branch information
arthurschreiber committed Sep 14, 2024
1 parent 109c8ea commit 8be285a
Show file tree
Hide file tree
Showing 2 changed files with 600 additions and 145 deletions.
283 changes: 138 additions & 145 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1964,110 +1964,154 @@ class Connection extends EventEmitter {
* @private
*/
initialiseConnection() {
const controller = new AbortController();

setTimeout(() => {
const hostPostfix = this.config.options.port ? `:${this.config.options.port}` : `\\${this.config.options.instanceName}`;
// If we have routing data stored, this connection has been redirected
const server = this.routingData ? this.routingData.server : this.config.server;
const port = this.routingData ? `:${this.routingData.port}` : hostPostfix;
// Grab the target host from the connection configuration, and from a redirect message
// otherwise, leave the message empty.
const routingMessage = this.routingData ? ` (redirected from ${this.config.server}${hostPostfix})` : '';
const message = `Failed to connect to ${server}${port}${routingMessage} in ${this.config.options.connectTimeout}ms`;
this.debug.log(message);

controller.abort(new ConnectionError(message, 'ETIMEOUT'));
}, this.config.options.connectTimeout).unref();

const signal = controller.signal;

(async () => {
let port = this.config.options.port;
const timeoutController = new AbortController();

const connectTimer = setTimeout(() => {
const hostPostfix = this.config.options.port ? `:${this.config.options.port}` : `\\${this.config.options.instanceName}`;
// If we have routing data stored, this connection has been redirected
const server = this.routingData ? this.routingData.server : this.config.server;
const port = this.routingData ? `:${this.routingData.port}` : hostPostfix;
// Grab the target host from the connection configuration, and from a redirect message
// otherwise, leave the message empty.
const routingMessage = this.routingData ? ` (redirected from ${this.config.server}${hostPostfix})` : '';
const message = `Failed to connect to ${server}${port}${routingMessage} in ${this.config.options.connectTimeout}ms`;
this.debug.log(message);

timeoutController.abort(new ConnectionError(message, 'ETIMEOUT'));
}, this.config.options.connectTimeout);

try {
let signal = timeoutController.signal;

let port = this.config.options.port;

if (!port) {
try {
port = await instanceLookup({
server: this.config.server,
instanceName: this.config.options.instanceName!,
timeout: this.config.options.connectTimeout,
signal: signal
});
} catch (err: any) {
if (signal.aborted) {
throw signal.reason;
}

throw new ConnectionError(err.message, 'EINSTLOOKUP', { cause: err });
}
}

if (!port) {
let socket;
try {
port = await instanceLookup({
server: this.config.server,
instanceName: this.config.options.instanceName!,
timeout: this.config.options.connectTimeout,
signal: signal
});
socket = await this.connectOnPort(port, this.config.options.multiSubnetFailover, signal, this.config.options.connector);
} catch (err: any) {
if (signal.aborted) {
throw signal.reason;
}

throw new ConnectionError(err.message, 'EINSTLOOKUP', { cause: err });
}
}

let socket;
try {
socket = await this.connectOnPort(port, this.config.options.multiSubnetFailover, signal, this.config.options.connector);
} catch (err: any) {
if (signal.aborted) {
throw signal.reason;
throw this.wrapSocketError(err);
}

throw this.wrapSocketError(err);
}
const controller = new AbortController();
const onError = (err: Error) => {
controller.abort(err);
};
const onClose = () => {
this.debug.log('connection to ' + this.config.server + ':' + this.config.options.port + ' closed');
};
const onEnd = () => {
this.debug.log('socket ended');

this.socketHandlingForSendPreLogin(socket);
this.sendPreLogin();
const error: ErrorWithCode = new Error('socket hang up');
error.code = 'ECONNRESET';
controller.abort(this.wrapSocketError(error));
};

this.transitionTo(this.STATE.SENT_PRELOGIN);
await this.performSentPrelogin(signal);
socket.once('error', onError);
socket.once('close', onClose);
socket.once('end', onEnd);

this.sendLogin7Packet();
signal = AbortSignal.any([signal, controller.signal]);

try {
const { authentication } = this.config;
switch (authentication.type) {
case 'token-credential':
case 'azure-active-directory-password':
case 'azure-active-directory-msi-vm':
case 'azure-active-directory-msi-app-service':
case 'azure-active-directory-service-principal-secret':
case 'azure-active-directory-default':
this.transitionTo(this.STATE.SENT_LOGIN7_WITH_FEDAUTH);
this.routingData = await this.performSentLogin7WithFedAuth(signal);
break;
case 'ntlm':
this.transitionTo(this.STATE.SENT_LOGIN7_WITH_NTLM);
this.routingData = await this.performSentLogin7WithNTLMLogin(signal);
break;
default:
this.transitionTo(this.STATE.SENT_LOGIN7_WITH_STANDARD_LOGIN);
this.routingData = await this.performSentLogin7WithStandardLogin(signal);
break;
}
} catch (err: any) {
if (isTransientError(err)) {
this.debug.log('Initiating retry on transient error');
this.transitionTo(this.STATE.TRANSIENT_FAILURE_RETRY);
this.performTransientFailureRetry();
return;
}
try {
socket.setKeepAlive(true, KEEP_ALIVE_INITIAL_DELAY);

this.messageIo = new MessageIO(socket, this.config.options.packetSize, this.debug);
this.messageIo.on('secure', (cleartext) => { this.emit('secure', cleartext); });

this.socket = socket;

this.closed = false;
this.debug.log('connected to ' + this.config.server + ':' + this.config.options.port);

this.sendPreLogin();

this.transitionTo(this.STATE.SENT_PRELOGIN);
await this.performSentPrelogin(signal);

this.sendLogin7Packet();

try {
const { authentication } = this.config;
switch (authentication.type) {
case 'token-credential':
case 'azure-active-directory-password':
case 'azure-active-directory-msi-vm':
case 'azure-active-directory-msi-app-service':
case 'azure-active-directory-service-principal-secret':
case 'azure-active-directory-default':
this.transitionTo(this.STATE.SENT_LOGIN7_WITH_FEDAUTH);
this.routingData = await this.performSentLogin7WithFedAuth(signal);
break;
case 'ntlm':
this.transitionTo(this.STATE.SENT_LOGIN7_WITH_NTLM);
this.routingData = await this.performSentLogin7WithNTLMLogin(signal);
break;
default:
this.transitionTo(this.STATE.SENT_LOGIN7_WITH_STANDARD_LOGIN);
this.routingData = await this.performSentLogin7WithStandardLogin(signal);
break;
}
} catch (err: any) {
if (isTransientError(err)) {
this.debug.log('Initiating retry on transient error');
this.transitionTo(this.STATE.TRANSIENT_FAILURE_RETRY);
this.performTransientFailureRetry();
return;
}

throw err;
}

throw err;
}
// If routing data is present, we need to re-route the connection
if (this.routingData) {
this.transitionTo(this.STATE.REROUTING);
this.performReRouting();
return;
}

// If routing data is present, we need to re-route the connection
if (this.routingData) {
this.transitionTo(this.STATE.REROUTING);
this.performReRouting();
return;
}
this.transitionTo(this.STATE.LOGGED_IN_SENDING_INITIAL_SQL);
await this.performLoggedInSendingInitialSql(signal);
} finally {
socket.removeListener('error', onError);
socket.removeListener('close', onClose);
socket.removeListener('end', onEnd);
}

this.transitionTo(this.STATE.LOGGED_IN_SENDING_INITIAL_SQL);
await this.performLoggedInSendingInitialSql(signal);
socket.on('error', this._onSocketError);
socket.on('close', this._onSocketClose);
socket.on('end', this._onSocketEnd);

this.transitionTo(this.STATE.LOGGED_IN);
this.transitionTo(this.STATE.LOGGED_IN);

process.nextTick(() => {
this.emit('connect');
});
process.nextTick(() => {
this.emit('connect');
});
} finally {
clearTimeout(connectTimer);
}
})().catch((err) => {
this.transitionTo(this.STATE.FINAL);

Expand Down Expand Up @@ -2119,21 +2163,6 @@ class Connection extends EventEmitter {
return new TokenStreamParser(message, this.debug, handler, this.config.options);
}

socketHandlingForSendPreLogin(socket: net.Socket) {
socket.on('error', this._onSocketError);
socket.on('close', this._onSocketClose);
socket.on('end', this._onSocketEnd);
socket.setKeepAlive(true, KEEP_ALIVE_INITIAL_DELAY);

this.messageIo = new MessageIO(socket, this.config.options.packetSize, this.debug);
this.messageIo.on('secure', (cleartext) => { this.emit('secure', cleartext); });

this.socket = socket;

this.closed = false;
this.debug.log('connected to ' + this.config.server + ':' + this.config.options.port);
}

wrapWithTls(socket: net.Socket, signal: AbortSignal): Promise<tls.TLSSocket> {
signal.throwIfAborted();

Expand Down Expand Up @@ -3609,75 +3638,39 @@ Connection.prototype.STATE = {
enter: function() {
this.initialiseConnection();
},
events: {
socketError: function() {
this.transitionTo(this.STATE.FINAL);
}
}
events: {}
},
SENT_PRELOGIN: {
name: 'SentPrelogin',
events: {
socketError: function() {
this.transitionTo(this.STATE.FINAL);
}
}
events: {}
},
REROUTING: {
name: 'ReRouting',
events: {
socketError: function() {
this.transitionTo(this.STATE.FINAL);
}
}
events: {}
},
TRANSIENT_FAILURE_RETRY: {
name: 'TRANSIENT_FAILURE_RETRY',
events: {
socketError: function() {
this.transitionTo(this.STATE.FINAL);
}
}
events: {}
},
SENT_TLSSSLNEGOTIATION: {
name: 'SentTLSSSLNegotiation',
events: {
socketError: function() {
this.transitionTo(this.STATE.FINAL);
}
}
events: {}
},
SENT_LOGIN7_WITH_STANDARD_LOGIN: {
name: 'SentLogin7WithStandardLogin',
events: {
socketError: function() {
this.transitionTo(this.STATE.FINAL);
}
}
events: {}
},
SENT_LOGIN7_WITH_NTLM: {
name: 'SentLogin7WithNTLMLogin',
events: {
socketError: function() {
this.transitionTo(this.STATE.FINAL);
}
}
events: {}
},
SENT_LOGIN7_WITH_FEDAUTH: {
name: 'SentLogin7WithFedauth',
events: {
socketError: function() {
this.transitionTo(this.STATE.FINAL);
}
}
events: {}
},
LOGGED_IN_SENDING_INITIAL_SQL: {
name: 'LoggedInSendingInitialSql',
events: {
socketError: function socketError() {
this.transitionTo(this.STATE.FINAL);
}
}
events: {}
},
LOGGED_IN: {
name: 'LoggedIn',
Expand Down
Loading

0 comments on commit 8be285a

Please sign in to comment.