Skip to content

Commit

Permalink
Merge pull request #80 from ng-galien/develop
Browse files Browse the repository at this point in the history
Fix authentication node not created
  • Loading branch information
ng-galien authored Oct 30, 2024
2 parents a06a938 + 9ffcc8c commit fd57848
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 59 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@ng-galien/node-red-pulsar",
"version": "1.1.4",
"version": "1.1.5",
"description": "Node-RED nodes for Apache Pulsar",
"repository": {
"type": "git",
Expand Down
2 changes: 1 addition & 1 deletion src/PulsarDefinition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export interface NoAuthentication { blank: true }

export type AuthenticationImpl = AuthenticationToken | AuthenticationOauth2 | AuthenticationTls | NoAuthentication

export interface PulsarAuthentication extends PulsarAuthenticationProperties, NodeRED.NodeDef { }
export interface PulsarAuthenticationConfig extends PulsarAuthenticationProperties, NodeRED.NodeDef { }

//Client
export interface PulsarClientProperties {
Expand Down
74 changes: 41 additions & 33 deletions src/authentication/pulsar-authentication.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,14 @@
import * as NodeRED from "node-red";
import {AuthenticationImpl, PulsarAuthentication, PulsarAuthenticationId} from "../PulsarDefinition";
import {AuthenticationImpl, PulsarAuthenticationConfig, PulsarAuthenticationId} from "../PulsarDefinition";
import {AuthenticationOauth2, AuthenticationTls, AuthenticationToken} from "pulsar-client";
import {loadToken, parseToken} from "../Token";

type RuntimeNode = NodeRED.Node<AuthenticationImpl>

export = (RED: NodeRED.NodeAPI): void => {
RED.nodes.registerType(PulsarAuthenticationId,
function (this: NodeRED.Node<AuthenticationImpl>, config: PulsarAuthentication): void {
async function resolveAuthentication(config: PulsarAuthentication): Promise<AuthenticationImpl> {
switch (config.authType) {
case 'Token':
if(!config.jwtToken) {
return {}
}
const token = parseToken(config.jwtToken)
return new AuthenticationToken({token: await loadToken(token)})
case 'Oauth2':
if(!config.oauthType || !config.oauthIssuerUrl) {
return {}
}
return new AuthenticationOauth2({
type: config.oauthType,
issuer_url: config.oauthIssuerUrl,
client_id: config.oauthClientId,
client_secret: config.oauthClientSecret,
private_key: config.oauthPrivateKey,
audience: config.oauthAudience,
scope: config.oauthScope
})
case 'TLS':
if(!config.tlsCertificatePath || !config.tlsPrivateKeyPath) {
return {}
}
return new AuthenticationTls({
certificatePath: config.tlsCertificatePath,
privateKeyPath: config.tlsPrivateKeyPath
})
}
}
function (this: RuntimeNode, config: PulsarAuthenticationConfig): void {
RED.nodes.createNode(this, config)
resolveAuthentication(config).then(
(auth) => {
this.credentials = auth
Expand All @@ -49,3 +20,40 @@ export = (RED: NodeRED.NodeAPI): void => {
}
)
}

async function resolveAuthentication(config: PulsarAuthenticationConfig): Promise<AuthenticationImpl> {
switch (config.authType) {
case 'Token':
return createAuthenticationToken(config)
case 'Oauth2':
if(!config.oauthType || !config.oauthIssuerUrl) {
return {}
}
return new AuthenticationOauth2({
type: config.oauthType,
issuer_url: config.oauthIssuerUrl,
client_id: config.oauthClientId,
client_secret: config.oauthClientSecret,
private_key: config.oauthPrivateKey,
audience: config.oauthAudience,
scope: config.oauthScope
})
case 'TLS':
if(!config.tlsCertificatePath || !config.tlsPrivateKeyPath) {
return {}
}
return new AuthenticationTls({
certificatePath: config.tlsCertificatePath,
privateKeyPath: config.tlsPrivateKeyPath
})
}
}

async function createAuthenticationToken(config: PulsarAuthenticationConfig): Promise<AuthenticationImpl> {
if(!config.jwtToken) {
return {}
}
const token = parseToken(config.jwtToken)
const tokenContent = await loadToken(token)
return new AuthenticationToken({token: tokenContent})
}
2 changes: 1 addition & 1 deletion src/client/pulsar-client.html.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ function displayTlsFields(show: boolean): void {
* Registration of the 'pulsar-client' type with its configuration.
*/
RED.nodes.registerType<PulsarClientEditorConfig>(CLIENT_ID, {
category: 'config',
category: PULSAR_CONFIG,
icon: 'font-awesome/fa-server',
color: PULSAR_COLOR,
defaults: {
Expand Down
38 changes: 18 additions & 20 deletions src/client/pulsar-client.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import * as NodeRED from 'node-red'
import {
AuthenticationOauth2,
AuthenticationTls,
AuthenticationToken,
Client,
ClientConfig,
LogLevel
Expand All @@ -15,16 +12,15 @@ import {
} from "../PulsarDefinition";
import {parseBoolean, parseNumber, parseNonEmptyString} from "../PulsarConfig";

type ClientAuthentication = AuthenticationToken | AuthenticationOauth2 | AuthenticationTls | undefined


type RuntimeNode = NodeRED.Node<Client>

type AuthenticationNode = NodeRED.Node<AuthenticationImpl>

function createPulsarConfigNode(auth: AuthenticationNode, config: PulsarClientConfig): ClientConfig {
function createPulsarConfigNode(auth: AuthenticationImpl | undefined, config: PulsarClientConfig): ClientConfig {
return {
serviceUrl: config.serviceUrl,
authentication: resolveAuthentication(auth),
authentication: auth,
operationTimeoutSeconds: parseNumber(config.operationTimeoutSeconds),
ioThreads: parseNumber(config.ioThreads),
messageListenerThreads: parseNumber(config.messageListenerThreads),
Expand All @@ -34,16 +30,17 @@ function createPulsarConfigNode(auth: AuthenticationNode, config: PulsarClientCo
tlsValidateHostname: parseBoolean(config.tlsValidateHostname),
tlsAllowInsecureConnection: parseBoolean(config.tlsAllowInsecureConnection),
statsIntervalInSeconds: parseNumber(config.statsIntervalInSeconds),
listenerName: parseNonEmptyString(config.listenerName)
listenerName: parseNonEmptyString(config.listenerName),
}
}

export = (RED: NodeRED.NodeAPI): void => {
RED.nodes.registerType(PulsarClientId,
function (this: RuntimeNode, config: PulsarClientConfig): void {
const authNode = RED.nodes.getNode(config.authenticationNodeId) as NodeRED.Node<AuthenticationImpl>
RED.nodes.createNode(this, config)
const client = createClient(this, createPulsarConfigNode(authNode, config))
this.debug('Creating pulsar client')
const authentication = getAuthentication(RED, config)
const client = createClient(this, createPulsarConfigNode(authentication, config))
if(client) {
this.credentials = client
mapClientLoginToNode(this)
Expand All @@ -57,19 +54,16 @@ export = (RED: NodeRED.NodeAPI): void => {
)
}

function getAuthentication(RED: NodeRED.NodeAPI, config: PulsarClientConfig): AuthenticationImpl | undefined {
if(!config.authenticationNodeId) {
return undefined
}

/**
* Resolves the authentication based on the provided Node.
*
* @param {NodeRED.Node<AuthenticationImpl>} [node] - The Node containing the authentication information.
* @return {ClientAuthentication} - The resolved ClientAuthentication. Returns undefined if no authentication is provided or if the authentication is NoAuthentication.
*/
function resolveAuthentication(node?: NodeRED.Node<AuthenticationImpl>): ClientAuthentication {
if(!node) {
const authNode = RED.nodes.getNode(config.authenticationNodeId) as NodeRED.Node<AuthenticationImpl>
if(!authNode) {
return undefined
}
const auth = node.credentials
//If the authentication is NoAuthentication, return undefined
const auth = authNode.credentials
if((auth as NoAuthentication).blank) {
return undefined
}
Expand Down Expand Up @@ -107,6 +101,10 @@ function createClient(node: NodeRED.Node<Client>, clientConfig: ClientConfig): C
* @return {void}
*/
function closeClient(node: NodeRED.Node<Client>, done: () => void): void {
if(!node) {
done()
return
}
const client = node.credentials
if (client) {
client.close().then(() => {
Expand Down
38 changes: 35 additions & 3 deletions test/node-red/Nodes.spec.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
// @ts-ignore
import helper, {TestFlowsItem} from "node-red-node-test-helper";
import pulsarClientNode from "../../src/client/pulsar-client";
import pulsarAuthenticationNode from "../../src/authentication/pulsar-authentication";
import pulsarSchemaNode from "../../src/schema/pulsar-schema";
import pulsarProducerNode from "../../src/producer/pulsar-producer";
import pulsarConsumerNode from "../../src/consumer/pulsar-consumer";
import pulsarReaderNode from "../../src/reader/pulsar-reader";
import Pulsar, {Client, SchemaInfo} from 'pulsar-client';
import Pulsar, {AuthenticationToken, Client, SchemaInfo} from 'pulsar-client';
import {GenericContainer, StartedTestContainer, Wait} from "testcontainers";
// @ts-ignore
import {
AuthenticationImpl,
PulsarAuthenticationConfig,
PulsarAuthenticationId,
PulsarClientConfig,
PulsarClientId,
PulsarConsumerConfig, PulsarConsumerId,
Expand All @@ -24,6 +29,10 @@ import {expect} from "chai";

const logger = new Logger();


const jwtToken = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ"


describe('Pulsar Integration Test', function () {

const topic = "it-" + v4();
Expand Down Expand Up @@ -56,6 +65,17 @@ describe('Pulsar Integration Test', function () {
})
});

it('Authentication should be loaded', async function () {
await helper.load([pulsarAuthenticationNode], [authenticationConf()]);
const node = helper.getNode("authentication") as Node<AuthenticationImpl>;
node.should.not.be.null;
const auth: AuthenticationImpl = node.credentials;
auth.should.not.be.null;
expect(auth).to.be.an.instanceOf(AuthenticationToken);
const expectedAuth: AuthenticationToken = new AuthenticationToken({token: jwtToken});
expect(expectedAuth).to.be.eql(auth);
})

it('Schema should be loaded', async function () {
await helper.load([pulsarSchemaNode], [schemaConf()]);
const node = helper.getNode("schema") as Node<Pulsar.SchemaInfo>;
Expand Down Expand Up @@ -183,6 +203,7 @@ describe('Pulsar Integration Test', function () {

});

// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
async function createPulsarContainer() {
logger.info("Starting pulsar container")
return new GenericContainer("apachepulsar/pulsar")
Expand All @@ -192,11 +213,13 @@ async function createPulsarContainer() {
.start()
}

// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
async function stopPulsarContainer(container: StartedTestContainer) {
console.log("Stopping pulsar container")
return container.stop()
}

// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
async function createTopic(broker: StartedTestContainer, topic: string) {
const config = {
headers: {
Expand All @@ -217,12 +240,21 @@ const jsonSchema = {

helper.init(require.resolve('node-red'));

// @ts-ignore
function clientConf(broker: StartedTestContainer): TestFlowsItem<PulsarClientConfig> {
return {
id: "client",
type: PulsarClientId,
serviceUrl: brokerUrl(broker)
serviceUrl: brokerUrl(broker),
authenticationNodeId: "authentication",
}
}

function authenticationConf(): TestFlowsItem<PulsarAuthenticationConfig> {
return {
id: "authentication",
type: PulsarAuthenticationId,
authType: "Token",
jwtToken: jwtToken
}
}

Expand Down

0 comments on commit fd57848

Please sign in to comment.