Skip to content

Commit

Permalink
Merge pull request #42 from LeoPlatform/feature/invoke-bot
Browse files Browse the repository at this point in the history
Feature/invoke bot
  • Loading branch information
czirker authored May 9, 2022
2 parents b833cdd + 82da159 commit fe77f01
Show file tree
Hide file tree
Showing 8 changed files with 1,116 additions and 77 deletions.
11 changes: 11 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module.exports = {
"env": {
"node": true,
"mocha": true,
"commonjs": true,
},
"extends": ["eslint:recommended"],
"rules": {
"space-before-function-paren": "off"
}
}
276 changes: 220 additions & 56 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const fs = require('fs')
const validate = require('./lib/validate')
const compileLeo = require('./lib/leo')
const utils = require('./lib/utils')
const { generateConfig, getConfigFullPath, populateEnvFromConfig } = require('./lib/generateConfig')

// TODO: sls create - Place tempates in memorable cdn location like https://dsco.io/aws-nodejs-leo-microservice
// TODO: sls create bot - Place all templates in memorable cdn location, and publish them, but also create the schortcuts like `sls create bot --name my-bot-name`
Expand All @@ -18,7 +19,7 @@ const utils = require('./lib/utils')
// TODO: test validation phase - that it complains there are no valid sections

class ServerlessLeo {
constructor (serverless, options) {
constructor(serverless, options) {
this.serverless = serverless
this.options = options
this.provider = this.serverless.getProvider('aws')
Expand Down Expand Up @@ -76,7 +77,7 @@ class ServerlessLeo {
shortcut: 'b',
type: 'string'
},
functionName: {
function: {
usage: 'Specify the name of the function for the bot',
shortcut: 'f',
type: 'string'
Expand All @@ -85,17 +86,84 @@ class ServerlessLeo {
usage: 'Specify the name of the bot',
shortcut: 'n',
type: 'string'
},
runner: {
usage: 'Way to invoke the bot. node|serverless|sls default: node',
shortcut: 'r',
type: 'string',
default: 'node'
},
mockDir: {
usage: 'Directory of the mock data',
shortcut: 'md',
type: 'string'
},
mockFlag: {
usage: 'mock data using default dir .mock-data',
shortcut: 'm',
type: 'boolean'
},
workflow: {
usage: 'Invoke down stream bots',
shortcut: 'w',
type: 'boolean'
},
actualSource: {
usage: 'Source Bot will read from bus when mocking',
shortcut: 's',
type: 'boolean'
}

// TODO flag to mock source only

// TODO way to link multiple projects for workflow
}
},
'generate-config': {
usage: 'Run a leo bot locally',
lifecycleEvents: [
'run'
],
options: {
file: {
usage: 'Specify the name of the bot',
shortcut: 'f',
type: 'string'
}
}
},
'watch-config': {
usage: 'Run a leo bot locally',
lifecycleEvents: [
'run'
],
options: {
file: {
usage: 'Specify the name of the bot',
shortcut: 'f',
type: 'string'
}
}
}
}

serverless.configSchemaHandler.defineFunctionEvent(serverless.service.provider.name, 'leo', {
type: 'object',
properties: {
cron: { type: 'string' },
destination: { type: 'string' }
},
required: [],
additionalProperties: true
})

Object.assign(
this,
validate,
compileLeo
)

let state = {}
this.hooks = {
'create:bot:copy-template': () => {
let {
Expand All @@ -122,7 +190,7 @@ class ServerlessLeo {
source,
destination
} = this.serverless.pluginManager.cliOptions

const replacements = [
['NAME_TOKEN', name],
['SOURCE_TOKEN', source || `${name}_source`],
Expand All @@ -132,6 +200,20 @@ class ServerlessLeo {
return Promise.resolve()
},
'before:package:cleanup': () => BbPromise.bind(this).then(this.gatherBots),
'before:webpack:compile:compile': () => {
return this.hooks['before:package:createDeploymentArtifacts']()
},
'before:package:createDeploymentArtifacts': () => {
let opts = { ...this.serverless.pluginManager.cliOptions }
let file = getConfigFullPath(this.serverless, opts.file)
if (state.generatedConfig || !fs.existsSync(file)) {
return BbPromise.resolve()
}
state.generatedConfig = true
return BbPromise.bind(this)
.then(() => generateConfig(file))
.then((d) => populateEnvFromConfig(this.serverless, file, d))
},
'after:package:compileFunctions': () => {
this.validated = this.validate()
if (this.validated.errors.length > 0) {
Expand All @@ -146,67 +228,149 @@ class ServerlessLeo {
return BbPromise.bind(this)
.then(this.compileLeo)
},
'invoke-bot:leo-local': () => {
const { functionName, name, botNumber = 0 } = this.serverless.pluginManager.cliOptions
const lambdaName = functionName || name
const regex = new RegExp(lambdaName)
const functions = Object.keys(this.serverless.service.functions)
const matchingFunctions = functions.filter(i => regex.test(i))
let functionKey
if (matchingFunctions.length > 1) {
functionKey = matchingFunctions.find(i => i === lambdaName)
if (!functionKey) {
throw new Error('Multiple matches found for bot name/lambda, please be more specific.')
'invoke-bot:leo-local': async () => {
let opts = { ...this.serverless.pluginManager.cliOptions }
let webpackPlugin = this.serverless.pluginManager.plugins.find(s => s.constructor.name === 'ServerlessWebpack')

// Setup the node runner
if (opts.runner === 'node' && (this.serverless.service.provider.runtime || '').match(/^nodejs/)) {
// Try and find the tsconfig build directory
let tsConfigPath = path.resolve(process.cwd(), 'tsconfig.json')
if (fs.existsSync(tsConfigPath)) {
let tsConfig = {}
try {
tsConfig = require(tsConfigPath)
} catch (err) {
// remove any trailing commas and try to parse it again
let tsConfigContent = fs.readFileSync(tsConfigPath).toString()
.replace(/[\n\r]+[ \t]*/g, '').replace(/,([}\]])/, '$1')

tsConfig = JSON.parse(tsConfigContent)
}

// Set serverless directory to the tsconfig output directory
let outDir = (tsConfig.compilerOptions || {}).outDir || '.'
this.serverless.serviceDir = path.resolve(this.serverless.serviceDir, outDir)
}
} else if (matchingFunctions.length === 1) {
functionKey = matchingFunctions[0]

// Remove any webpack local invoke hooks
// We are bypassing webpack and running the code directly via node
let beforeInvokeHook = (this.serverless.pluginManager.hooks['before:invoke:local:invoke'] || [])
this.serverless.pluginManager.hooks['before:invoke:local:invoke'] = beforeInvokeHook.filter(s => s.pluginName !== 'ServerlessWebpack')
webpackPlugin = null
} else {
throw new Error('Could not match bot name/lambda in serverless defined functions.')
// If they have already build the project disable webpack builds
if (webpackPlugin != null) {
// Build once and then disable webpack builds
execSync('serverless webpack')
this.serverless.service.custom.webpack = Object.assign(this.serverless.service.custom.webpack || {}, { noBuild: true })
}
}
const pathSegments = this.serverless.service.functions[functionKey].handler.split(/\//)
pathSegments[pathSegments.length - 1] = 'serverless.yml'
const serverlessYml = fs.readFileSync(path.join(...pathSegments)).toString()
const serverlessJson = utils.ymlToJson(serverlessYml)
// Build the event to invoke the lambda with
let event
let eventIndex = 0
if (serverlessJson[functionKey].events.length === 1) {
event = serverlessJson[functionKey].events[0].leo
} else {
let filteredEvents = serverlessJson[functionKey].events.filter((event, index) => {
if (Object.values(event.leo).some(leoKey => name === leoKey)) {
eventIndex = index
return true

// Support mock data streams
if (opts.mockDir || opts.mockFlag) {
process.env.RSTREAMS_MOCK_DATA = path.resolve(process.cwd(), options.mockDir || '.mock-data')
}

// Mark Source queue from the first bot as reading from the actual bus
// Only applies if mocking and actualSource is enabled
if ((opts.mockDir || opts.mockFlag) && opts.actualSource) {
let event = utils.buildBotInvocationEvent(this.serverless, this.serverless.pluginManager.cliOptions)
let queue = event.queue || event.source
if (queue != null) {
process.env[`RSTREAMS_MOCK_DATA_Q_${queue}`] = 'passthrough'
}
}

let invokedBots = new Set()
let queuesThatGotData = new Set()
let botsToInvoke = [{ function: opts.function, name: opts.name, botNumber: opts.botNumber }]

let serviceDir = this.serverless.serviceDir

for (let functionData of botsToInvoke) {
// Service directory may have been changed from a previous bot invoke, just reset it back
this.serverless.serviceDir = serviceDir
let functionKey = functionData.function
let event = utils.buildBotInvocationEvent(this.serverless, functionData)

this.serverless.cli.log(`\nInvoking local lambda ${functionKey} with data: ${JSON.stringify(event)}`)

// Setup the function to run

// Change global options for other plugins
this.options.function = functionKey
this.options.data = JSON.stringify(event)

// Fix webpack references if used
if (webpackPlugin != null) {
webpackPlugin.options.function = functionKey
if (opts.workflow) {
// Need to reset webpack config require because entries get set once
// for the first function and then cause an error for subsquent calls
// So it needs to re import the file for each function
let file = webpackPlugin.configuration && (webpackPlugin.configuration.config || webpackPlugin.configuration.webpackConfig)
if (typeof file === 'string') {
let webpackConfigPath = path.join(this.serverless.config.servicePath, file)
delete require.cache[require.resolve(webpackConfigPath)]
}
}
})
if (filteredEvents.length === 1) {
event = filteredEvents[0].leo
} else {
filteredEvents = serverlessJson[functionKey].events.filter((event, index) => {
if (Object.values(event.leo).some(leoKey => new RegExp(name).test(leoKey))) {
eventIndex = index
return true
}

// Clean Env Vars
let func = this.serverless.service.getFunction(functionKey)
utils.removeExternallyProvidedServerlessEnvironmentVariables(this.serverless, func)

// Invoke the function
await this.serverless.pluginManager.spawn('invoke:local')

// Add down stream bots to invoke list
if (opts.workflow) {
invokedBots.add(functionKey)

// Get list of queues with new data
let queuesWithNewData = []
Object.keys(process.env).forEach(k => {
// mock-wrapper will flag a queue that gets new data
// by adding an env var `RSTREAMS_MOCK_DATA_Q_${queue}`
let [, queue] = k.match(/^RSTREAMS_MOCK_DATA_Q_(.*)$/) || []

if (queue != null && !queuesThatGotData.has(queue)) {
// If it is a queue that hasn't received data already
// add it to the list and mark it
queuesThatGotData.add(queue)
queuesWithNewData.push(queue)
}
})
if (filteredEvents.length === 1) {
event = filteredEvents[0].leo
}

// Get any bots that are triggered by the new queues
let bots = utils.getBotsTriggeredFromQueues(this.serverless, queuesWithNewData)
.map(f => ({ function: f.function }))
.filter(f => !invokedBots.has(f.function))

// Add the bots to the list to invoke
botsToInvoke.push(...bots)
}
}
if (!event) {
throw new Error('Could not match the bot name with the bot configurations')
}
const botInfo = utils.getBotInfo(this.serverless.service.service, this.serverless.service.provider.stage, functionKey, serverlessJson[functionKey].events, eventIndex, event, botNumber)
event.botId = botInfo.id
event.__cron = {
id: botInfo.id,
iid: '0',
ts: Date.now(),
force: true
}
this.serverless.cli.log(`Invoking local lambda ${functionKey} with data: ${JSON.stringify(event)}`)
const environmentSetString = Object.entries(process.env).filter(i => !/ /.test(i[1])).map(([key, value]) => ` -e ${key}="${value}"`).join('')
return execSync(`serverless invoke local -f ${functionKey} -d ${JSON.stringify(JSON.stringify(event))}${environmentSetString}`, { stdio: 'inherit' })
},

'watch-config:run': () => {
let opts = { ...this.serverless.pluginManager.cliOptions }
let file = getConfigFullPath(this.serverless, opts.file)

fs.watch(file, {
}, (eventType, filename) => {
try {
generateConfig(file)
} catch (err) {
this.serverless.cli.error(err)
}
})
},
'generate-config:run': () => {
let opts = { ...this.serverless.pluginManager.cliOptions }
let file = getConfigFullPath(this.serverless, opts.file)
generateConfig(file)
}
}
}
Expand Down
Loading

0 comments on commit fe77f01

Please sign in to comment.