Skip to content

Commit 21ce56a

Browse files
committed
feat(binding-opcua): fix connection mutualisation bug
- and refactor connection management
1 parent 100a9fd commit 21ce56a

File tree

2 files changed

+186
-83
lines changed

2 files changed

+186
-83
lines changed

packages/binding-opcua/src/opcua-protocol-client.ts

Lines changed: 119 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,6 @@ interface OPCUAConnection {
9595
namespaceArray?: string[];
9696
}
9797

98-
type Resolver = (...arg: [...unknown[]]) => void;
99-
100-
interface OPCUAConnectionEx extends OPCUAConnection {
101-
pending?: Resolver[];
102-
}
103-
10498
function _variantToJSON(variant: Variant, contentType: string) {
10599
contentType = contentType.split(";")[0];
106100

@@ -118,95 +112,130 @@ function _variantToJSON(variant: Variant, contentType: string) {
118112
}
119113

120114
export class OPCUAProtocolClient implements ProtocolClient {
121-
private _connections: Map<string, OPCUAConnectionEx> = new Map<string, OPCUAConnectionEx>();
115+
private _connections = new Map<string, Promise<OPCUAConnection>>();
122116

123117
private _securityMode: MessageSecurityMode = MessageSecurityMode.None;
124118
private _securityPolicy: SecurityPolicy = SecurityPolicy.None;
125119
private _useAutoChannel: boolean = false;
126120
private _userIdentity: UserIdentityInfo = <AnonymousIdentity>{ type: UserTokenType.Anonymous };
127121

128-
private async _withConnection<T>(form: OPCUAForm, next: (connection: OPCUAConnection) => Promise<T>): Promise<T> {
129-
const endpoint = form.href;
130-
const matchesScheme: boolean = endpoint?.match(/^opc.tcp:\/\//) != null;
131-
if (!matchesScheme) {
132-
debug(`invalid opcua:endpoint ${endpoint} specified`);
133-
throw new Error("Invalid OPCUA endpoint " + endpoint);
134-
}
135-
let c: OPCUAConnectionEx | undefined = this._connections.get(endpoint);
136-
if (!c) {
137-
const clientCertificateManager = await CertificateManagerSingleton.getCertificateManager();
138-
139-
if (this._useAutoChannel) {
140-
if (this._securityMode === MessageSecurityMode.Invalid) {
141-
const { messageSecurityMode, securityPolicy } = await findMostSecureChannel(endpoint);
142-
this._securityMode = messageSecurityMode;
143-
this._securityPolicy = securityPolicy;
144-
}
122+
/**
123+
* return the number of active connections to an OPCUA Server
124+
*/
125+
public get connectionCount() {
126+
return this._connections.size;
127+
}
128+
private async _createConnection(endpoint: string): Promise<OPCUAConnection> {
129+
debug(`_createConnection: creating new connection to ${endpoint}`);
130+
131+
const clientCertificateManager = await CertificateManagerSingleton.getCertificateManager();
132+
133+
let securityMode = this._securityMode;
134+
let securityPolicy = this._securityPolicy;
135+
136+
if (this._useAutoChannel) {
137+
if (securityMode === MessageSecurityMode.Invalid) {
138+
const mostSecure = await findMostSecureChannel(endpoint);
139+
securityMode = mostSecure.messageSecurityMode;
140+
securityPolicy = mostSecure.securityPolicy;
141+
debug(
142+
`_createConnection: auto-selected security mode ${MessageSecurityMode[securityMode]} with policy ${securityPolicy}`
143+
);
145144
}
146-
const client = OPCUAClient.create({
147-
endpointMustExist: false,
148-
connectionStrategy: {
149-
maxRetry: 1,
150-
},
151-
securityMode: this._securityMode,
152-
securityPolicy: this._securityPolicy,
153-
clientCertificateManager,
154-
});
155-
client.on("backoff", () => {
156-
debug(`connection:backoff: cannot connection to ${endpoint}`);
157-
});
145+
}
158146

159-
c = {
160-
client,
161-
pending: [] as Resolver[],
162-
} as OPCUAConnectionEx; // but incomplete still
147+
const client = OPCUAClient.create({
148+
endpointMustExist: false,
149+
connectionStrategy: {
150+
maxRetry: 1,
151+
},
152+
securityMode,
153+
securityPolicy,
154+
clientCertificateManager,
155+
});
163156

164-
this._connections.set(endpoint, c);
165-
try {
166-
await client.connect(endpoint);
167-
} catch (err) {
168-
const errMessage = "Cannot connected to endpoint " + endpoint + "\nmsg = " + (<Error>err).message;
169-
debug(errMessage);
170-
throw new Error(errMessage);
157+
client.on("backoff", () => {
158+
debug(`connection:backoff: cannot connect to ${endpoint}`);
159+
});
160+
161+
try {
162+
await client.connect(endpoint);
163+
debug(`_createConnection: client connected to ${endpoint}`);
164+
165+
// adjust with private key
166+
if (this._userIdentity.type === UserTokenType.Certificate && !this._userIdentity.privateKey) {
167+
const internalKey = readPrivateKey(client.clientCertificateManager.privateKey);
168+
const privateKeyPem = coercePrivateKeyPem(internalKey);
169+
this._userIdentity.privateKey = privateKeyPem;
171170
}
172-
try {
173-
// adjust with private key
174-
if (this._userIdentity.type === UserTokenType.Certificate && !this._userIdentity.privateKey) {
175-
const internalKey = readPrivateKey(client.clientCertificateManager.privateKey);
176-
const privateKeyPem = coercePrivateKeyPem(internalKey);
177-
this._userIdentity.privateKey = privateKeyPem;
178-
}
179-
const session = await client.createSession(this._userIdentity);
180-
c.session = session;
181-
182-
const subscription = await session.createSubscription2({
183-
maxNotificationsPerPublish: 100,
184-
publishingEnabled: true,
185-
requestedLifetimeCount: 100,
186-
requestedPublishingInterval: 250,
187-
requestedMaxKeepAliveCount: 10,
188-
priority: 1,
189-
});
190-
c.subscription = subscription;
171+
const session = await client.createSession(this._userIdentity);
172+
debug(`_createConnection: session created for ${endpoint}`);
173+
174+
const subscription = await session.createSubscription2({
175+
maxNotificationsPerPublish: 100,
176+
publishingEnabled: true,
177+
requestedLifetimeCount: 100,
178+
requestedPublishingInterval: 250,
179+
requestedMaxKeepAliveCount: 10,
180+
priority: 1,
181+
});
182+
debug(`_createConnection: subscription created for ${endpoint}`);
183+
184+
return { client, session, subscription };
185+
} catch (err) {
186+
// Make sure to disconnect if any post-connection step fails
187+
await client.disconnect();
188+
const errMessage = `Failed to establish a full connection to ${endpoint}: ${(err as Error).message}`;
189+
debug(errMessage);
190+
throw new Error(errMessage);
191+
}
192+
}
191193

192-
const p = c.pending;
193-
c.pending = undefined;
194-
p && p.forEach((t) => t());
194+
private async _withConnection<T>(form: OPCUAForm, next: (connection: OPCUAConnection) => Promise<T>): Promise<T> {
195+
const href = form.href;
196+
if (!href) {
197+
const err = new Error("Invalid OPCUA endpoint: href is missing in form");
198+
debug(err.message);
199+
throw err;
200+
}
195201

196-
this._connections.set(endpoint, c);
197-
} catch (err) {
198-
await client.disconnect();
199-
const errMessage = "Cannot handle session on " + endpoint + "\nmsg = " + (<Error>err).message;
200-
debug(errMessage);
201-
throw new Error(errMessage);
202+
// Use modern URL API and ensure path is included for endpoint uniqueness
203+
let endpoint: string;
204+
try {
205+
const parsedUrl = new URL(href);
206+
if (parsedUrl.protocol !== "opc.tcp:") {
207+
throw new Error(`Unsupported protocol: ${parsedUrl.protocol}`);
202208
}
209+
// We use the full href as the canonical endpoint identifier, without the query and fragment
210+
parsedUrl.hash = "";
211+
parsedUrl.search = "";
212+
endpoint = parsedUrl.href;
213+
} catch (err) {
214+
debug(`Invalid OPCUA endpoint href: ${href}. Error: ${(err as Error).message}`);
215+
throw new Error(`Invalid OPCUA endpoint: ${href}`);
203216
}
204-
if (c.pending) {
205-
await new Promise((resolve) => {
206-
c?.pending?.push(resolve);
217+
218+
let connectionPromise = this._connections.get(endpoint);
219+
220+
if (!connectionPromise) {
221+
debug(`_withConnection: no cached connection for ${endpoint}. Creating a new one.`);
222+
connectionPromise = this._createConnection(endpoint);
223+
this._connections.set(endpoint, connectionPromise);
224+
225+
// If the connection fails, remove the rejected promise from the cache
226+
// to allow future retries.
227+
connectionPromise.catch((err) => {
228+
debug(`_withConnection: connection to ${endpoint} failed. Evicting from cache. Error: ${err.message}`);
229+
if (this._connections.get(endpoint) === connectionPromise) {
230+
this._connections.delete(endpoint);
231+
}
207232
});
233+
} else {
234+
debug(`_withConnection: using cached connection promise for ${endpoint}`);
208235
}
209-
return next(c);
236+
237+
const connection = await connectionPromise;
238+
return next(connection);
210239
}
211240

212241
private async _withSession<T>(form: OPCUAForm, next: (session: ClientSession) => Promise<T>): Promise<T> {
@@ -475,11 +504,18 @@ export class OPCUAProtocolClient implements ProtocolClient {
475504

476505
async stop(): Promise<void> {
477506
debug("stop");
478-
for (const connection of this._connections.values()) {
479-
await connection.subscription.terminate();
480-
await connection.session.close();
481-
await connection.client.disconnect();
507+
// Wait for all connection promises to resolve before trying to close them.
508+
const connections = await Promise.all(this._connections.values());
509+
for (const connection of connections) {
510+
try {
511+
await connection.subscription.terminate();
512+
await connection.session.close();
513+
await connection.client.disconnect();
514+
} catch (err) {
515+
debug(`Error while stopping a connection: ${(err as Error).message}`);
516+
}
482517
}
518+
this._connections.clear();
483519
await CertificateManagerSingleton.releaseCertificateManager();
484520
}
485521

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/********************************************************************************
2+
* Copyright (c) 2025 Contributors to the Eclipse Foundation
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information regarding copyright ownership.
6+
*
7+
* This program and the accompanying materials are made available under the
8+
* terms of the Eclipse Public License v. 2.0 which is available at
9+
* http://www.eclipse.org/legal/epl-2.0, or the W3C Software Notice and
10+
* Document License (2015-05-13) which is available at
11+
* https://www.w3.org/Consortium/Legal/2015/copyright-software-and-document.
12+
*
13+
* SPDX-License-Identifier: EPL-2.0 OR W3C-20150513
14+
********************************************************************************/
15+
16+
import debugFactory from "debug";
17+
import { expect } from "chai";
18+
import { OPCUAServer } from "node-opcua-server";
19+
20+
import { startServer } from "./fixture/basic-opcua-server";
21+
import { OPCUAForm, OPCUAProtocolClient } from "../src/opcua-protocol-client";
22+
23+
const debug = debugFactory("binding-opcua:test");
24+
25+
describe("OPCUA Protocol Client", function () {
26+
this.timeout(20000);
27+
28+
let opcuaServer: OPCUAServer;
29+
let endpoint: string;
30+
before(async () => {
31+
opcuaServer = await startServer();
32+
endpoint = opcuaServer.getEndpointUrl();
33+
debug(`endpoint = ${endpoint}`);
34+
});
35+
after(async () => {
36+
await opcuaServer.shutdown();
37+
});
38+
39+
it("should read temperature and pressure property without recreating a connection", async () => {
40+
const client = new OPCUAProtocolClient();
41+
42+
try {
43+
const form1: OPCUAForm = {
44+
href: `${endpoint}?id=ns=1;s=Temperature`,
45+
contentType: "application/json",
46+
};
47+
48+
await client.readResource(form1);
49+
expect(client.connectionCount).equals(1);
50+
51+
await client.readResource(form1);
52+
expect(client.connectionCount).equals(1);
53+
54+
const form2: OPCUAForm = {
55+
href: `${endpoint}?id=ns=1;s=Pressure`,
56+
contentType: "application/json",
57+
};
58+
59+
await client.readResource(form2);
60+
61+
// connection should be reused, eventhough hrefs differ
62+
expect(client.connectionCount).equals(1);
63+
} finally {
64+
await client.stop();
65+
}
66+
});
67+
});

0 commit comments

Comments
 (0)