任务执行的顺序是什么?
任务默认遵循带优先级的先进先出执行顺序。不过也修改sort参数来改变执行规则。
lockLimit和maxConcurrency的区别是什么?
agenda会将任务一个接一个的上锁,并在mongodb数据库中设置lockedAt参数,并创建Job类实例缓存到_lockedJobs数组中,这个过程默认没有限制,但可以使用lockLimit进行限制,如果所有任务都需要被执行,则agenda会尝试上锁所有任务,并将任务从_lockedJobs数组中发到_runningJobs数组中,这些任务运行后会执行用户的代码,这可以通过maxConcurrency参数进行限制。
但如果许多任务在同一时间执行,你或许会发现他们甚至没有被加载进来,因为他们会尽可能多地给任务上锁,尽管没有足够的并发来处理他们。里有调整lockLimit和maxConcurrency参数来解决这个问题。
agenda/save-job.js
如果任务已经存在则更新数据库中的信息,如果不存在则插入一条记录。(其他无关代码省略)
module.exports = async function(job) { try { // 保存任务信息,并设置最后修改者为当前agenda队列进程 const props = job.toJSON(); props.lastModifiedBy = this._name; // 获取当前时间并设置默认查询选项 const now = new Date(); const protect = {}; let update = {$set: props}; // 如果当前任务已经有id,则修改相关属性,例如谁最后一次修改的它,并返回 if (id) { const result = await this._collection.findOneAndUpdate( {_id: id}, update, {returnOriginal: false} ); return processDbResult(job, result); } // 否则,直接插入一条 const result = await this._collection.insertOne(props); return processDbResult(job, result); } catch (error) { throw error; } }; //一个判断任务是否应该立即执行的方法 const processDbResult = (job, result) => { let res = result.ops ? result.ops : result.value; if (res) { // 如果是个数组,则只取第一个 if (Array.isArray(res)) { res = res[0]; } job.attrs.nextRunAt = res.nextRunAt; // 如果当前时间大于任务下一次应该执行的时间,则立即执行任务 if (job.attrs.nextRunAt && job.attrs.nextRunAt < this._nextScanAt) { processJobs.call(this, job); } } // Return the Job instance return job; };
agenda/find-and-lock-next-job.js
const JOB_PROCESS_WHERE_QUERY = { $and: [{ name: jobName, disabled: {$ne: true} }, { $or: [{ lockedAt: {$eq: null}, nextRunAt: {$lte: this._nextScanAt} }, { lockedAt: {$lte: lockDeadline} }] }] }; const JOB_PROCESS_SET_QUERY = {$set: {lockedAt: now}}; const JOB_RETURN_QUERY = {returnOriginal: false, sort: this._sort}; const result = await this._collection.findOneAndUpdate(JOB_PROCESS_WHERE_QUERY, JOB_PROCESS_SET_QUERY, JOB_RETURN_QUERY);