Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ES --> OS migration for Opensearch consolidation #152

Merged
merged 14 commits into from
Jan 9, 2025
8 changes: 3 additions & 5 deletions ReadMe.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,9 @@ The following parameters can be set in config files or in env variables:
- DYNAMODB.AWS_WRITE_UNITS: The DynamoDB table write unit configuration, default is 2
- DYNAMODB.TIMEOUT: The timeout setting used in health check
- SCOPES: The M2M scopes, refer `config/default.js` for more information
- ES.HOST: Elasticsearch host, default value is 'localhost:9200'
- ES.API_VERSION: Elasticsearch API version, default value is '6.8'
- ES.ES_INDEX: Elasticsearch index name for resources, default value is 'resources'
- ES.ES_TYPE: Elasticsearch index type for resources, default value is '_doc'
- ES.ES_REFRESH: Elasticsearch force refresh flag, default value is 'true'
- OS.HOST: Opensearch host, default value is 'localhost:9200'
- OS.OS_INDEX: Opensearch index name for resources, default value is 'resources'
- OS.OS_REFRESH: Opensearch force refresh flag, default value is 'true'
- BUSAPI_URL: the bus api, default value is 'https://api.topcoder-dev.com/v5'
- KAFKA_ERROR_TOPIC: Kafka error topic, default value is 'common.error.reporting',
- KAFKA_MESSAGE_ORIGINATOR: the Kafka message originator, default value is 'resources-api'
Expand Down
12 changes: 5 additions & 7 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,11 @@ module.exports = {
TIMEOUT: process.env.DYNAMODB_TIMEOUT || 10000
},

ES: {
// above AWS_REGION is used if we use AWS ES
HOST: process.env.ES_HOST || 'localhost:9200',
API_VERSION: process.env.ES_API_VERSION || '6.8',
ES_INDEX: process.env.ES_INDEX || 'resources',
ES_TYPE: process.env.ES_TYPE || '_doc', // ES 6.x accepts only 1 Type per index and it's mandatory to define it
ES_REFRESH: process.env.ES_REFRESH || 'true'
OS: {
// above AWS_REGION is used if we use AWS OS
HOST: process.env.OS_HOST || 'localhost:9200',
OS_INDEX: process.env.OS_INDEX || 'resources',
OS_REFRESH: process.env.OS_REFRESH || 'true'
},

SCOPES: {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"config": "^3.0.1",
"cors": "^2.7.1",
"dynamoose": "^1.7.2",
"elasticsearch": "^16.1.1",
"@opensearch-project/opensearch": "^2.11.0",
"express": "^4.16.4",
"express-interceptor": "^1.2.0",
"get-parameter-names": "^0.3.0",
Expand Down
35 changes: 16 additions & 19 deletions src/common/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ const errors = require('./errors')
const logger = require('./logger')
const m2mAuth = require('tc-core-library-js').auth.m2m
const AWS = require('aws-sdk')
const elasticsearch = require('elasticsearch')
const opensearch = require('@opensearch-project/opensearch')
const m2m = m2mAuth(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_PROXY_SERVER_URL']))
const busApi = require('tc-bus-api-wrapper')
const busApiClient = busApi(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_CLIENT_ID',
'AUTH0_CLIENT_SECRET', 'BUSAPI_URL', 'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL']))

// Elasticsearch client
let esClient
// Opensearch client
let osClient

/**
* Check the error is custom error.
Expand Down Expand Up @@ -436,32 +436,29 @@ async function getAllPages (url, query) {
}

/**
* Get ES Client
* @return {Object} Elasticsearch Client Instance
* Get OS Client
* @return {Object} Opensearch Client Instance
*/
function getESClient () {
if (esClient) {
return esClient
function getOSClient () {
if (osClient) {
return osClient
}
const esHost = config.get('ES.HOST')
const osHost = config.get('OS.HOST')
// AWS ES configuration is different from other providers
if (/.*amazonaws.*/.test(esHost)) {
esClient = elasticsearch.Client({
apiVersion: config.get('ES.API_VERSION'),
hosts: esHost,
connectionClass: require('http-aws-es'), // eslint-disable-line global-require
if (/.*amazonaws.*/.test(osHost)) {
osClient = new opensearch.Client({
node: osHost,
amazonES: {
region: config.get('DYNAMODB.AWS_REGION'),
credentials: new AWS.EnvironmentCredentials('AWS')
}
})
} else {
esClient = new elasticsearch.Client({
apiVersion: config.get('ES.API_VERSION'),
hosts: esHost
osClient = new opensearch.Client({
node: osHost
})
}
return esClient
return osClient
}

/**
Expand Down Expand Up @@ -539,7 +536,7 @@ module.exports = {
isCustomError,
setResHeaders,
getAllPages,
getESClient,
getOSClient,
checkAgreedTerms,
postRequest,
getMemberById,
Expand Down
16 changes: 8 additions & 8 deletions src/init-es.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,26 @@ const config = require('config')
const logger = require('./common/logger')
const helper = require('./common/helper')

const client = helper.getESClient()
const client = helper.getOSClient()

const initES = async () => {
if (process.argv.length === 3 && process.argv[2] === 'force') {
logger.info(`Delete index ${config.ES.ES_INDEX} if any.`)
logger.info(`Delete index ${config.OS.OS_INDEX} if any.`)
try {
await client.indices.delete({ index: config.ES.ES_INDEX })
await client.indices.delete({ index: config.OS.OS_INDEX })
} catch (err) {
// ignore
}
}

const exists = await client.indices.exists({ index: config.ES.ES_INDEX })
const exists = await client.indices.exists({ index: config.OS.OS_INDEX })
if (exists) {
logger.info(`The index ${config.ES.ES_INDEX} exists.`)
logger.info(`The index ${config.OS.OS_INDEX} exists.`)
} else {
logger.info(`The index ${config.ES.ES_INDEX} will be created.`)
logger.info(`The index ${config.OS.OS_INDEX} will be created.`)

const body = { mappings: {} }
body.mappings[config.get('ES.ES_TYPE')] = {
body.mappings['_doc'] = {
properties: {
id: { type: 'keyword' },
memberHandle: {
Expand All @@ -40,7 +40,7 @@ const initES = async () => {
}

await client.indices.create({
index: config.ES.ES_INDEX,
index: config.OS.OS_INDEX,
body
})
}
Expand Down
13 changes: 6 additions & 7 deletions src/services/CleanUpService.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,15 @@ const helper = require('../common/helper')
const logger = require('../common/logger')

/**
* Delete the Resource from the ES by the given id
* Delete the Resource from the OS by the given id
* @param id the resource id
* @returns {Promise<void>}
*/
const deleteFromESById = async (id) => {
const deleteFromOSById = async (id) => {
// delete from ES
const esClient = await helper.getESClient()
await esClient.delete({
index: config.ES.ES_INDEX,
type: config.ES.ES_TYPE,
const osClient = await helper.getOSClient()
await osClient.delete({
index: config.OS.OS_INDEX,
id: id,
refresh: 'true' // refresh ES so that it is effective for read operations instantly
})
Expand Down Expand Up @@ -86,7 +85,7 @@ const cleanUpTestData = async () => {
for (const res of resources) {
logger.info('Resource to be deleted', res.id)
await deleteFromDBById('Resource', res.id)
await deleteFromESById(res.id)
await deleteFromOSById(res.id)
}
logger.info('ResourceRole to be deleted', roleId)
await deleteFromDBById('ResourceRole', roleId)
Expand Down
82 changes: 39 additions & 43 deletions src/services/ResourceService.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ async function getResources (currentUser, challengeId, roleId, memberId, memberH
})

const sortCriteria = [{ [sortBy]: { 'order': sortOrder } }]
const docs = await searchES(mustQuery, perPage, page, sortCriteria)
let docs = await searchOS(mustQuery, perPage, page, sortCriteria)
docs = docs.body

// Extract data from hits
const allResources = _.map(docs.hits.hits, item => item._source)
Expand Down Expand Up @@ -374,11 +375,10 @@ async function createResource (currentUser, resource) {
createdBy: currentUser.handle || currentUser.sub
}, resource))

// Create resources in ES
const esClient = await helper.getESClient()
await esClient.create({
index: config.ES.ES_INDEX,
type: config.ES.ES_TYPE,
// Create resources in OS
const osClient = await helper.getOSClient()
await osClient.index({
index: config.OS.OS_INDEX,
id: ret.id,
body: _.pick(ret, payloadFields),
refresh: 'true' // refresh ES so that it is visible for read operations instantly
Expand Down Expand Up @@ -463,13 +463,12 @@ async function deleteResource (currentUser, resource) {

await ret.delete()

// delete from ES
const esClient = await helper.getESClient()
await esClient.delete({
index: config.ES.ES_INDEX,
type: config.ES.ES_TYPE,
// delete from OS
const osClient = await helper.getOSClient()
await osClient.delete({
index: config.OS.OS_INDEX,
id: ret.id,
refresh: 'true' // refresh ES so that it is effective for read operations instantly
refresh: 'true' // refresh OS so that it is effective for read operations instantly
})

logger.debug(`Deleted resource, posting to Bus API: ${JSON.stringify(_.pick(ret, payloadFields))}`)
Expand Down Expand Up @@ -521,17 +520,18 @@ async function listChallengesByMember (memberId, criteria) {
}

if (criteria.useScroll) {
docs = await searchESWithScroll(mustQuery)
docs = await searchOSWithScroll(mustQuery)
} else if (perPage * page <= config.MAX_ELASTIC_SEARCH_RECORDS_SIZE) {
docs = await searchES(mustQuery, perPage, page)
docs = await searchOS(mustQuery, perPage, page).body
} else {
throw new errors.BadRequestError(`
ES pagination params:
OS pagination params:
page ${page},
perPage: ${perPage}
exceeds the max search window:${config.MAX_ELASTIC_SEARCH_RECORDS_SIZE}`
)
}
logger.debug(`Docs from OS: ${JSON.stringify(docs)}`)

// Extract data from hits
let result = _.map(docs.hits.hits, item => item._source)
Expand All @@ -554,11 +554,10 @@ listChallengesByMember.schema = {
}).required()
}

async function searchESWithScroll (mustQuery) {
async function searchOSWithScroll (mustQuery) {
const scrollTimeout = '1m'
const esQuery = {
index: config.get('ES.ES_INDEX'),
type: config.get('ES.ES_TYPE'),
const osQuery = {
index: config.get('OS.OS_INDEX'),
size: 10000,
body: {
query: {
Expand All @@ -570,18 +569,17 @@ async function searchESWithScroll (mustQuery) {
scroll: scrollTimeout
}

const esClient = await helper.getESClient()
const searchResponse = await esClient.search(esQuery)

const osClient = await helper.getOSClient()
let searchResponse = await osClient.search(osQuery)
// eslint-disable-next-line camelcase
const { _scroll_id, hits } = searchResponse
const { _scroll_id, hits } = searchResponse.body
const totalHits = hits.total

// eslint-disable-next-line camelcase
let scrollId = _scroll_id

while (hits.hits.length < totalHits) {
const nextScrollResponse = await esClient.scroll({
const nextScrollResponse = await osClient.scroll({
scroll: scrollTimeout,
scroll_id: scrollId
})
Expand All @@ -590,7 +588,7 @@ async function searchESWithScroll (mustQuery) {
hits.hits = [...hits.hits, ...nextScrollResponse.hits.hits]
}

await esClient.clearScroll({
await osClient.clearScroll({
body: {
// eslint-disable-next-line camelcase
scroll_id: [_scroll_id]
Expand All @@ -606,18 +604,17 @@ async function searchESWithScroll (mustQuery) {
}

/**
* Execute ES query
* Execute OS query
* @param {Object} mustQuery the query that will be sent to ES
* @param {Number} perPage number of search result per page
* @param {Number} page the current page
* @returns {Object} doc from ES
* @returns {Object} doc from OS
*/
async function searchES (mustQuery, perPage, page, sortCriteria) {
let esQuery
async function searchOS (mustQuery, perPage, page, sortCriteria) {
let osQuery
if (sortCriteria) {
esQuery = {
index: config.get('ES.ES_INDEX'),
type: config.get('ES.ES_TYPE'),
osQuery = {
index: config.get('OS.OS_INDEX'),
size: perPage,
from: perPage * (page - 1), // Es Index starts from 0
body: {
Expand All @@ -630,9 +627,8 @@ async function searchES (mustQuery, perPage, page, sortCriteria) {
}
}
} else {
esQuery = {
index: config.get('ES.ES_INDEX'),
type: config.get('ES.ES_TYPE'),
osQuery = {
index: config.get('OS.OS_INDEX'),
size: perPage,
from: perPage * (page - 1), // Es Index starts from 0
body: {
Expand All @@ -644,11 +640,11 @@ async function searchES (mustQuery, perPage, page, sortCriteria) {
}
}
}
logger.debug(`ES Query ${JSON.stringify(esQuery)}`)
const esClient = await helper.getESClient()
logger.debug(`OS Query ${JSON.stringify(osQuery)}`)
const osClient = await helper.getOSClient()
let docs
try {
docs = await esClient.search(esQuery)
docs = await osClient.search(osQuery)
} catch (e) {
// Catch error when the ES is fresh and has no data
logger.info(`Query Error from ES ${JSON.stringify(e)}`)
Expand All @@ -675,9 +671,8 @@ async function getResourceCount (challengeId, roleId) {
must.push({ term: { 'roleId.keyword': roleId } })
}

const esQuery = {
index: config.get('ES.ES_INDEX'),
type: config.get('ES.ES_TYPE'),
const osQuery = {
index: config.get('OS.OS_INDEX'),
size: 0,
body: {
query: {
Expand All @@ -695,10 +690,11 @@ async function getResourceCount (challengeId, roleId) {
}
}

const esClient = await helper.getESClient()
const osClient = await helper.getOSClient()
let result
try {
result = await esClient.search(esQuery)
result = await osClient.search(osQuery)
result = result.body
} catch (err) {
logger.error(`Get Resource Count Error ${JSON.stringify(err)}`)
throw err
Expand Down
7 changes: 3 additions & 4 deletions test/common/testHelper.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,12 @@ async function clearDependencies () {
}

/**
* Clear the ES documents.
* Clear the OS documents.
*/
async function initES () {
const client = helper.getESClient()
const client = helper.getOSClient()
await client.deleteByQuery({
index: config.ES.ES_INDEX,
type: config.ES.ES_TYPE,
index: config.OS.OS_INDEX,
body: {
query: {
match_all: {}
Expand Down
Loading