Skip to content

Commit

Permalink
feat: move common adapter stuff to base class (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
fmvilas authored Jul 16, 2021
1 parent a6f2209 commit 0c5966f
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 9 deletions.
9 changes: 4 additions & 5 deletions src/adapters/mqtt/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ class MqttAdapter extends Adapter {

_connect () {
return new Promise((resolve) => {
const channelNames = this.parsedAsyncAPI.channelNames()
const subscribedChannels = channelNames.filter(chan => this.parsedAsyncAPI.channel(chan).hasPublish())
const subscribedChannels = this.getSubscribedChannels()
const serverBinding = this.AsyncAPIServer.binding('mqtt')
const securityRequirements = (this.AsyncAPIServer.security() || []).map(sec => {
const secName = Object.keys(sec.json())[0]
Expand Down Expand Up @@ -53,7 +52,7 @@ class MqttAdapter extends Adapter {
this.client.on('connect', () => {
if (!this.firstConnect) {
this.firstConnect = true
this.emit('connect', { name: this.name(), adapter: this, connection: this.client, channels: channelNames })
this.emit('connect', { name: this.name(), adapter: this, connection: this.client, channels: this.channelNames })
}

if (Array.isArray(subscribedChannels)) {
Expand All @@ -77,14 +76,14 @@ class MqttAdapter extends Adapter {
this.client.on('reconnect', () => {
this.emit('reconnect', {
connection: this.client,
channels: channelNames,
channels: this.channelNames,
})
})

this.client.on('close', () => {
this.emit('close', {
connection: this.client,
channels: channelNames,
channels: this.channelNames,
})
})

Expand Down
3 changes: 1 addition & 2 deletions src/adapters/socket.io/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ class SocketIOAdapter extends Adapter {

_connect () {
return new Promise((resolve) => {
const channelNames = this.parsedAsyncAPI.channelNames()
const serverUrl = new URL(this.serverUrlExpanded)
const asyncapiServerPort = serverUrl.port || 80
const optionsPort = this.glee.options?.websocket?.port
Expand Down Expand Up @@ -49,7 +48,7 @@ class SocketIOAdapter extends Adapter {
}

this.server.on('connect', (socket) => {
this.emit('server:ready', { name: this.name(), adapter: this, connection: socket, channels: channelNames })
this.emit('server:ready', { name: this.name(), adapter: this, connection: socket, channels: this.channelNames })

socket.onAny((eventName, payload) => {
const msg = this._createMessage(eventName, payload)
Expand Down
3 changes: 1 addition & 2 deletions src/adapters/ws/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ class WebSocketsAdapter extends Adapter {

_connect () {
return new Promise((resolve, reject) => {
const channelNames = this.parsedAsyncAPI.channelNames()
const serverUrl = new URL(this.serverUrlExpanded)
const wsHttpServer = this.glee.options?.websocket?.httpServer || http.createServer()
const asyncapiServerPort = serverUrl.port || 80
Expand All @@ -33,7 +32,7 @@ class WebSocketsAdapter extends Adapter {
}

const servers = {}
channelNames.forEach(channelName => {
this.channelNames.forEach(channelName => {
servers[channelName] = new WebSocket.Server({ noServer: true })
})

Expand Down
17 changes: 17 additions & 0 deletions src/lib/adapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class GleeAdapter extends EventEmitter {
this.AsyncAPIServer = server

this.parsedAsyncAPI = parsedAsyncAPI
this.channelNames = this.parsedAsyncAPI.channelNames()
this.connections = []

const uriTemplateValues = {}
Expand Down Expand Up @@ -102,6 +103,22 @@ class GleeAdapter extends EventEmitter {
})
}

/**
* Returns a list of the channels a given adapter has to subscribe to.
*
* @return {Promise}
*/
getSubscribedChannels() {
return this.channelNames
.filter(channelName => {
const channel = this.parsedAsyncAPI.channel(channelName)
if (!channel.hasPublish()) return false

const channelServers = channel.publish().ext('x-servers') || this.parsedAsyncAPI.serverNames()
return channelServers.includes(this.serverName)
})
}

/**
* Connects to the remote server.
*
Expand Down

0 comments on commit 0c5966f

Please sign in to comment.