Skip to content

Commit 1e98ee6

Browse files
committed
Transition from ES-->OS for Opensearch consolidation
1 parent 8b12c3e commit 1e98ee6

10 files changed

+86
-103
lines changed

ReadMe.md

+3-5
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,9 @@ The following parameters can be set in config files or in env variables:
5959
- DYNAMODB.AWS_WRITE_UNITS: The DynamoDB table write unit configuration, default is 2
6060
- DYNAMODB.TIMEOUT: The timeout setting used in health check
6161
- SCOPES: The M2M scopes, refer `config/default.js` for more information
62-
- ES.HOST: Elasticsearch host, default value is 'localhost:9200'
63-
- ES.API_VERSION: Elasticsearch API version, default value is '6.8'
64-
- ES.ES_INDEX: Elasticsearch index name for resources, default value is 'resources'
65-
- ES.ES_TYPE: Elasticsearch index type for resources, default value is '_doc'
66-
- ES.ES_REFRESH: Elasticsearch force refresh flag, default value is 'true'
62+
- OS.HOST: Opensearch host, default value is 'localhost:9200'
63+
- OS.OS_INDEX: Opensearch index name for resources, default value is 'resources'
64+
- OS.OS_REFRESH: Opensearch force refresh flag, default value is 'true'
6765
- BUSAPI_URL: the bus api, default value is 'https://api.topcoder-dev.com/v5'
6866
- KAFKA_ERROR_TOPIC: Kafka error topic, default value is 'common.error.reporting',
6967
- KAFKA_MESSAGE_ORIGINATOR: the Kafka message originator, default value is 'resources-api'

config/default.js

+5-7
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,11 @@ module.exports = {
4444
TIMEOUT: process.env.DYNAMODB_TIMEOUT || 10000
4545
},
4646

47-
ES: {
48-
// above AWS_REGION is used if we use AWS ES
49-
HOST: process.env.ES_HOST || 'localhost:9200',
50-
API_VERSION: process.env.ES_API_VERSION || '6.8',
51-
ES_INDEX: process.env.ES_INDEX || 'resources',
52-
ES_TYPE: process.env.ES_TYPE || '_doc', // ES 6.x accepts only 1 Type per index and it's mandatory to define it
53-
ES_REFRESH: process.env.ES_REFRESH || 'true'
47+
OS: {
48+
// above AWS_REGION is used if we use AWS OS
49+
HOST: process.env.OS_HOST || 'localhost:9200',
50+
OS_INDEX: process.env.OS_INDEX || 'resources',
51+
OS_REFRESH: process.env.OS_REFRESH || 'true'
5452
},
5553

5654
SCOPES: {

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
"config": "^3.0.1",
3939
"cors": "^2.7.1",
4040
"dynamoose": "^1.7.2",
41-
"elasticsearch": "^16.1.1",
41+
"@opensearch-project/opensearch": "^2.11.0",
4242
"express": "^4.16.4",
4343
"express-interceptor": "^1.2.0",
4444
"get-parameter-names": "^0.3.0",

src/common/helper.js

+16-19
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@ const errors = require('./errors')
1313
const logger = require('./logger')
1414
const m2mAuth = require('tc-core-library-js').auth.m2m
1515
const AWS = require('aws-sdk')
16-
const elasticsearch = require('elasticsearch')
16+
const opensearch = require('@opensearch-project/opensearch')
1717
const m2m = m2mAuth(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_PROXY_SERVER_URL']))
1818
const busApi = require('tc-bus-api-wrapper')
1919
const busApiClient = busApi(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_CLIENT_ID',
2020
'AUTH0_CLIENT_SECRET', 'BUSAPI_URL', 'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL']))
2121

22-
// Elasticsearch client
23-
let esClient
22+
// Opensearch client
23+
let osClient
2424

2525
/**
2626
* Check the error is custom error.
@@ -436,32 +436,29 @@ async function getAllPages (url, query) {
436436
}
437437

438438
/**
439-
* Get ES Client
440-
* @return {Object} Elasticsearch Client Instance
439+
* Get OS Client
440+
* @return {Object} Opensearch Client Instance
441441
*/
442-
function getESClient () {
443-
if (esClient) {
444-
return esClient
442+
function getOSClient () {
443+
if (osClient) {
444+
return osClient
445445
}
446-
const esHost = config.get('ES.HOST')
446+
const osHost = config.get('OS.HOST')
447447
// AWS ES configuration is different from other providers
448-
if (/.*amazonaws.*/.test(esHost)) {
449-
esClient = elasticsearch.Client({
450-
apiVersion: config.get('ES.API_VERSION'),
451-
hosts: esHost,
452-
connectionClass: require('http-aws-es'), // eslint-disable-line global-require
448+
if (/.*amazonaws.*/.test(osHost)) {
449+
osClient = new opensearch.Client({
450+
hosts: osHost,
453451
amazonES: {
454452
region: config.get('DYNAMODB.AWS_REGION'),
455453
credentials: new AWS.EnvironmentCredentials('AWS')
456454
}
457455
})
458456
} else {
459-
esClient = new elasticsearch.Client({
460-
apiVersion: config.get('ES.API_VERSION'),
461-
hosts: esHost
457+
osClient = new opensearch.Client({
458+
node: osHost
462459
})
463460
}
464-
return esClient
461+
return osClient
465462
}
466463

467464
/**
@@ -539,7 +536,7 @@ module.exports = {
539536
isCustomError,
540537
setResHeaders,
541538
getAllPages,
542-
getESClient,
539+
getOSClient,
543540
checkAgreedTerms,
544541
postRequest,
545542
getMemberById,

src/init-es.js

+8-8
Original file line numberDiff line numberDiff line change
@@ -10,26 +10,26 @@ const config = require('config')
1010
const logger = require('./common/logger')
1111
const helper = require('./common/helper')
1212

13-
const client = helper.getESClient()
13+
const client = helper.getOSClient()
1414

1515
const initES = async () => {
1616
if (process.argv.length === 3 && process.argv[2] === 'force') {
17-
logger.info(`Delete index ${config.ES.ES_INDEX} if any.`)
17+
logger.info(`Delete index ${config.OS.OS_INDEX} if any.`)
1818
try {
19-
await client.indices.delete({ index: config.ES.ES_INDEX })
19+
await client.indices.delete({ index: config.OS.OS_INDEX })
2020
} catch (err) {
2121
// ignore
2222
}
2323
}
2424

25-
const exists = await client.indices.exists({ index: config.ES.ES_INDEX })
25+
const exists = await client.indices.exists({ index: config.OS.OS_INDEX })
2626
if (exists) {
27-
logger.info(`The index ${config.ES.ES_INDEX} exists.`)
27+
logger.info(`The index ${config.OS.OS_INDEX} exists.`)
2828
} else {
29-
logger.info(`The index ${config.ES.ES_INDEX} will be created.`)
29+
logger.info(`The index ${config.OS.OS_INDEX} will be created.`)
3030

3131
const body = { mappings: {} }
32-
body.mappings[config.get('ES.ES_TYPE')] = {
32+
body.mappings['_doc'] = {
3333
properties: {
3434
id: { type: 'keyword' },
3535
memberHandle: {
@@ -40,7 +40,7 @@ const initES = async () => {
4040
}
4141

4242
await client.indices.create({
43-
index: config.ES.ES_INDEX,
43+
index: config.OS.OS_INDEX,
4444
body
4545
})
4646
}

src/services/CleanUpService.js

+6-7
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,15 @@ const helper = require('../common/helper')
99
const logger = require('../common/logger')
1010

1111
/**
12-
* Delete the Resource from the ES by the given id
12+
* Delete the Resource from the OS by the given id
1313
* @param id the resource id
1414
* @returns {Promise<void>}
1515
*/
16-
const deleteFromESById = async (id) => {
16+
const deleteFromOSById = async (id) => {
1717
// delete from ES
18-
const esClient = await helper.getESClient()
19-
await esClient.delete({
20-
index: config.ES.ES_INDEX,
21-
type: config.ES.ES_TYPE,
18+
const osClient = await helper.getOSClient()
19+
await osClient.delete({
20+
index: config.OS.OS_INDEX,
2221
id: id,
2322
refresh: 'true' // refresh ES so that it is effective for read operations instantly
2423
})
@@ -86,7 +85,7 @@ const cleanUpTestData = async () => {
8685
for (const res of resources) {
8786
logger.info('Resource to be deleted', res.id)
8887
await deleteFromDBById('Resource', res.id)
89-
await deleteFromESById(res.id)
88+
await deleteFromOSById(res.id)
9089
}
9190
logger.info('ResourceRole to be deleted', roleId)
9291
await deleteFromDBById('ResourceRole', roleId)

src/services/ResourceService.js

+34-40
Original file line numberDiff line numberDiff line change
@@ -374,11 +374,10 @@ async function createResource (currentUser, resource) {
374374
createdBy: currentUser.handle || currentUser.sub
375375
}, resource))
376376

377-
// Create resources in ES
378-
const esClient = await helper.getESClient()
379-
await esClient.create({
380-
index: config.ES.ES_INDEX,
381-
type: config.ES.ES_TYPE,
377+
// Create resources in OS
378+
const osClient = await helper.getOSClient()
379+
await osClient.index({
380+
index: config.OS.OS_INDEX,
382381
id: ret.id,
383382
body: _.pick(ret, payloadFields),
384383
refresh: 'true' // refresh ES so that it is visible for read operations instantly
@@ -463,13 +462,12 @@ async function deleteResource (currentUser, resource) {
463462

464463
await ret.delete()
465464

466-
// delete from ES
467-
const esClient = await helper.getESClient()
468-
await esClient.delete({
469-
index: config.ES.ES_INDEX,
470-
type: config.ES.ES_TYPE,
465+
// delete from OS
466+
const osClient = await helper.getOSClient()
467+
await osClient.delete({
468+
index: config.OS.OS_INDEX,
471469
id: ret.id,
472-
refresh: 'true' // refresh ES so that it is effective for read operations instantly
470+
refresh: 'true' // refresh OS so that it is effective for read operations instantly
473471
})
474472

475473
logger.debug(`Deleted resource, posting to Bus API: ${JSON.stringify(_.pick(ret, payloadFields))}`)
@@ -521,12 +519,12 @@ async function listChallengesByMember (memberId, criteria) {
521519
}
522520

523521
if (criteria.useScroll) {
524-
docs = await searchESWithScroll(mustQuery)
522+
docs = await searchOSWithScroll(mustQuery)
525523
} else if (perPage * page <= config.MAX_ELASTIC_SEARCH_RECORDS_SIZE) {
526-
docs = await searchES(mustQuery, perPage, page)
524+
docs = await searchOS(mustQuery, perPage, page)
527525
} else {
528526
throw new errors.BadRequestError(`
529-
ES pagination params:
527+
OS pagination params:
530528
page ${page},
531529
perPage: ${perPage}
532530
exceeds the max search window:${config.MAX_ELASTIC_SEARCH_RECORDS_SIZE}`
@@ -554,11 +552,10 @@ listChallengesByMember.schema = {
554552
}).required()
555553
}
556554

557-
async function searchESWithScroll (mustQuery) {
555+
async function searchOSWithScroll (mustQuery) {
558556
const scrollTimeout = '1m'
559-
const esQuery = {
560-
index: config.get('ES.ES_INDEX'),
561-
type: config.get('ES.ES_TYPE'),
557+
const osQuery = {
558+
index: config.get('OS.OS_INDEX'),
562559
size: 10000,
563560
body: {
564561
query: {
@@ -570,8 +567,8 @@ async function searchESWithScroll (mustQuery) {
570567
scroll: scrollTimeout
571568
}
572569

573-
const esClient = await helper.getESClient()
574-
const searchResponse = await esClient.search(esQuery)
570+
const osClient = await helper.getOSClient()
571+
const searchResponse = await osClient.search(osQuery)
575572

576573
// eslint-disable-next-line camelcase
577574
const { _scroll_id, hits } = searchResponse
@@ -581,7 +578,7 @@ async function searchESWithScroll (mustQuery) {
581578
let scrollId = _scroll_id
582579

583580
while (hits.hits.length < totalHits) {
584-
const nextScrollResponse = await esClient.scroll({
581+
const nextScrollResponse = await osClient.scroll({
585582
scroll: scrollTimeout,
586583
scroll_id: scrollId
587584
})
@@ -590,7 +587,7 @@ async function searchESWithScroll (mustQuery) {
590587
hits.hits = [...hits.hits, ...nextScrollResponse.hits.hits]
591588
}
592589

593-
await esClient.clearScroll({
590+
await osClient.clearScroll({
594591
body: {
595592
// eslint-disable-next-line camelcase
596593
scroll_id: [_scroll_id]
@@ -606,18 +603,17 @@ async function searchESWithScroll (mustQuery) {
606603
}
607604

608605
/**
609-
* Execute ES query
606+
* Execute OS query
610607
* @param {Object} mustQuery the query that will be sent to ES
611608
* @param {Number} perPage number of search result per page
612609
* @param {Number} page the current page
613-
* @returns {Object} doc from ES
610+
* @returns {Object} doc from OS
614611
*/
615-
async function searchES (mustQuery, perPage, page, sortCriteria) {
616-
let esQuery
612+
async function searchOS (mustQuery, perPage, page, sortCriteria) {
613+
let osQuery
617614
if (sortCriteria) {
618-
esQuery = {
619-
index: config.get('ES.ES_INDEX'),
620-
type: config.get('ES.ES_TYPE'),
615+
osQuery = {
616+
index: config.get('OS.OS_INDEX'),
621617
size: perPage,
622618
from: perPage * (page - 1), // Es Index starts from 0
623619
body: {
@@ -630,9 +626,8 @@ async function searchES (mustQuery, perPage, page, sortCriteria) {
630626
}
631627
}
632628
} else {
633-
esQuery = {
634-
index: config.get('ES.ES_INDEX'),
635-
type: config.get('ES.ES_TYPE'),
629+
osQuery = {
630+
index: config.get('OS.OS_INDEX'),
636631
size: perPage,
637632
from: perPage * (page - 1), // Es Index starts from 0
638633
body: {
@@ -644,11 +639,11 @@ async function searchES (mustQuery, perPage, page, sortCriteria) {
644639
}
645640
}
646641
}
647-
logger.debug(`ES Query ${JSON.stringify(esQuery)}`)
648-
const esClient = await helper.getESClient()
642+
logger.debug(`OS Query ${JSON.stringify(osQuery)}`)
643+
const osClient = await helper.getOSClient()
649644
let docs
650645
try {
651-
docs = await esClient.search(esQuery)
646+
docs = await osClient.search(osQuery)
652647
} catch (e) {
653648
// Catch error when the ES is fresh and has no data
654649
logger.info(`Query Error from ES ${JSON.stringify(e)}`)
@@ -675,9 +670,8 @@ async function getResourceCount (challengeId, roleId) {
675670
must.push({ term: { 'roleId.keyword': roleId } })
676671
}
677672

678-
const esQuery = {
679-
index: config.get('ES.ES_INDEX'),
680-
type: config.get('ES.ES_TYPE'),
673+
const osQuery = {
674+
index: config.get('OS.OS_INDEX'),
681675
size: 0,
682676
body: {
683677
query: {
@@ -695,10 +689,10 @@ async function getResourceCount (challengeId, roleId) {
695689
}
696690
}
697691

698-
const esClient = await helper.getESClient()
692+
const osClient = await helper.getOSClient()
699693
let result
700694
try {
701-
result = await esClient.search(esQuery)
695+
result = await osClient.search(osQuery)
702696
} catch (err) {
703697
logger.error(`Get Resource Count Error ${JSON.stringify(err)}`)
704698
throw err

test/common/testHelper.js

+3-4
Original file line numberDiff line numberDiff line change
@@ -170,13 +170,12 @@ async function clearDependencies () {
170170
}
171171

172172
/**
173-
* Clear the ES documents.
173+
* Clear the OS documents.
174174
*/
175175
async function initES () {
176-
const client = helper.getESClient()
176+
const client = helper.getOSClient()
177177
await client.deleteByQuery({
178-
index: config.ES.ES_INDEX,
179-
type: config.ES.ES_TYPE,
178+
index: config.OS.OS_INDEX,
180179
body: {
181180
query: {
182181
match_all: {}

0 commit comments

Comments
 (0)