Commit 54820a9d by 蒋勇

d

parents 76de4d8f 1bccc0e0
const system = require("../system")
const settings = require("../../config/settings.js");
class CacheBase {
constructor() {
this.redisClient = system.getObject("util.redisClient");
this.desc = this.desc();
this.prefix = this.prefix();
this.cacheCacheKeyPrefix = "sadd_children_appkeys:" + settings.appKey + "_cachekey";
this.isdebug = this.isdebug();
}
isdebug() {
return false;
}
desc() {
throw new Error("子类需要定义desc方法,返回缓存描述");
}
prefix() {
throw new Error("子类需要定义prefix方法,返回本缓存的前缀");
}
async cache(inputkey, val, ex, ...items) {
const cachekey = this.prefix + inputkey;
var cacheValue = await this.redisClient.get(cachekey);
if (!cacheValue || cacheValue == "undefined" || cacheValue == "null" || this.isdebug) {
var objvalstr = await this.buildCacheVal(cachekey, inputkey, val, ex, ...items);
if (!objvalstr) {
return null;
}
if (ex) {
await this.redisClient.setWithEx(cachekey, objvalstr, ex);
} else {
await this.redisClient.set(cachekey, objvalstr);
}
//缓存当前应用所有的缓存key及其描述
this.redisClient.sadd(this.cacheCacheKeyPrefix, [cachekey + "|" + this.desc]);
return JSON.parse(objvalstr);
} else {
return JSON.parse(cacheValue);
}
}
async invalidate(inputkey) {
const cachekey = this.prefix + inputkey;
this.redisClient.delete(cachekey);
return 0;
}
async buildCacheVal(cachekey, inputkey, val, ex, ...items) {
throw new Error("子类中实现构建缓存值的方法,返回字符串");
}
}
module.exports = CacheBase;
const CacheBase = require("../cache.base");
const system = require("../../system");
class ApiAccessControlCache extends CacheBase {
constructor() {
super();
}
desc() {
return "API访问控制缓存";
}
prefix() {
return "api_access_control_:";
}
async buildCacheVal(cachekey, inputkey, val, ex, ...items) {
return val;
}
}
module.exports = ApiAccessControlCache;
const CacheBase=require("../cache.base");
const system=require("../../system");
const settings = require("../../../config/settings");
class ApiAccessKeyCache extends CacheBase{
constructor(){
super();
this.restS=system.getObject("util.restClient");
}
desc(){
return "应用中缓存访问token";
}
prefix(){
return "g_accesskey_";
}
async buildCacheVal(cachekey,inputkey,val,ex,...items){
var acckapp=await this.restS.execPost({appkey:settings.appKey,secret:settings.secret},settings.paasUrl()+"api/auth/accessAuth/getAccessKey");
var s=acckapp.stdout;
if(s){
var tmp=JSON.parse(s);
if(tmp.status==0){
return JSON.stringify(tmp.data);
}
}
return null;
}
}
module.exports=ApiAccessKeyCache;
const CacheBase=require("../cache.base");
const system=require("../../system");
const settings = require("../../../config/settings");
//缓存首次登录的赠送的宝币数量
class ApiAccessKeyCheckCache extends CacheBase{
constructor(){
super();
this.restS=system.getObject("util.restClient");
}
desc(){
return "应用中来访访问token缓存";
}
prefix(){
return "g_accesskeycheck_";
}
async buildCacheVal(cachekey,inputkey,val,ex,...items){
var cacheManager=system.getObject("db.common.cacheManager");
//当来访key缓存不存在时,需要去开放平台检查是否存在来访key缓存
var acckapp=await cacheManager["ApiAccessKeyCache"].cache(settings.appKey,null,ex);//先获取本应用accessKey
var checkresult=await this.restS.execPostWithAK({checkAccessKey:inputkey},settings.paasUrl()+"api/auth/accessAuth/authAccessKey",acckapp.accessKey);
if(checkresult.status==0){
var s=checkresult.data;
return JSON.stringify(s);
}else{
await cacheManager["ApiAccessKeyCache"].invalidate(settings.appKey);
var acckapp=await cacheManager["ApiAccessKeyCache"].cache(settings.appKey,null,ex);//先获取本应用accessKey
var checkresult=await this.restS.execPostWithAK({checkAccessKey:inputkey},settings.paasUrl()+"api/auth/accessAuth/authAccessKey",acckapp.accessKey);
var s=checkresult.data;
return JSON.stringify(s);
}
}
}
module.exports=ApiAccessKeyCheckCache;
const CacheBase = require("../cache.base");
const system = require("../../system");
class MagCache extends CacheBase {
constructor() {
super();
this.prefix = "magCache";
}
desc() {
return "缓存管理";
}
prefix() {
return "magCache:";
}
async buildCacheVal(cachekey, inputkey, val, ex, ...items) {
return val;
}
async getCacheSmembersByKey(key) {
return this.redisClient.smembers(key);
}
async delCacheBySrem(key, value) {
return this.redisClient.srem(key, value)
}
async keys(p) {
return this.redisClient.keys(p);
}
async get(k) {
return this.redisClient.get(k);
}
async del(k) {
return this.redisClient.delete(k);
}
async clearAll() {
console.log("xxxxxxxxxxxxxxxxxxxclearAll............");
return this.redisClient.flushall();
}
}
module.exports = MagCache;
const fs=require("fs");
const settings=require("../../../../config/settings");
class CacheManager{
constructor(){
//await this.buildCacheMap();
this.buildCacheMap();
}
buildCacheMap(){
var self=this;
self.doc={};
var cachePath=settings.basepath+"/app/base/db/cache/";
const files=fs.readdirSync(cachePath);
if(files){
files.forEach(function(r){
var classObj=require(cachePath+"/"+r);
self[classObj.name]=new classObj();
var refTmp=self[classObj.name];
if(refTmp.prefix){
self.doc[refTmp.prefix]=refTmp.desc;
}
else{
console.log("请在"+classObj.name+"缓存中定义prefix");
}
});
}
}
}
module.exports=CacheManager;
// var cm= new CacheManager();
// cm["InitGiftCache"].cacheGlobalVal("hello").then(function(){
// cm["InitGiftCache"].cacheGlobalVal().then(x=>{
// console.log(x);
// });
// });
const system=require("./system") const system=require("../system")
class TaskBase{ class TaskBase{
constructor(className){ constructor(className){
// this.redisClient=system.getObject("util.redisClient"); this.redisClient=system.getObject("util.redisClient");
this.serviceName=className; this.serviceName=className;
this.isThrough=false; this.isThrough=false;
<<<<<<< HEAD:taskexecutor/app/base/task.base.js
this.isDaemon=false; this.isDaemon=false;
=======
this.cacheManager=system.getObject("db.common.cacheManager");
this.db = system.getObject("db.common.connection").getCon();
>>>>>>> 1bccc0e0e608e84ed0a0ddd5bc63e6a076047c01:taskexecutor/app/base/db/task.base.js
} }
async beforeTask(params){ async beforeTask(params){
console.log("前置操作......",this.serviceName); console.log("前置操作......",this.serviceName);
...@@ -58,5 +63,21 @@ class TaskBase{ ...@@ -58,5 +63,21 @@ class TaskBase{
static getServiceName(ClassObj){ static getServiceName(ClassObj){
return ClassObj["name"]; return ClassObj["name"];
} }
async apiCallWithAk(url,params){
var acckapp=await this.cacheManager["ApiAccessKeyCache"].cache(settings.appKey);
var acck=acckapp.accessKey;
//按照访问token
var restResult=await this.restS.execPostWithAK(params,url,acck);
if(restResult){
if(restResult.status==0){
var resultRtn=restResult.data;
return resultRtn;
}else{
await this.cacheManager["ApiAccessKeyCache"].invalidate(settings.appKey);
return null;
}
}
return null;
}
} }
module.exports=TaskBase; module.exports=TaskBase;
...@@ -6,8 +6,10 @@ class TestTask extends TaskBase{ ...@@ -6,8 +6,10 @@ class TestTask extends TaskBase{
async beforeTask(params){ async beforeTask(params){
console.log("前置操作......",this.serviceName); console.log("前置操作......",this.serviceName);
//this.isThrough=true; //this.isThrough=true;
//console.log(this.cacheManager);
} }
async subDoTask(params){ async subDoTask(params){
console.log(params);
console.log("TestTask1....."); console.log("TestTask1.....");
} }
} }
......
const TaskBase=require("../../task.base");
const system = require("../../../system");
var settings = require("../../../../config/settings");
class TmtransferTask extends TaskBase{
constructor(){
super(TaskBase.getServiceName(TmtransferTask));
this.restclient = system.getObject("util.restClient");
this.transferurl = settings.reqTransferurl();
}
async beforeTask(params){
console.log("前置操作......",this.serviceName);
//this.isThrough=true;
//console.log(this.cacheManager);
}
async subDoTask(params){
console.log(params);
console.log("TestTask1.....");
var url = this.transferurl + "api/transfer/tradeApi/uppaymentstatus";
var transferinfo = await this.execlient.execPost(null, url);
var result = JSON.parse(transferinfo.stdout)
return result;
}
}
module.exports=TmtransferTask;
const System = require("../system");
const redis = require("redis");
const settings = require("../../config/settings");
const bluebird = require("bluebird");
bluebird.promisifyAll(redis);
// const logCtl=System.getObject("web.oplogCtl");
class RedisClient {
constructor() {
const redisConfig = settings.redis();
this.client = redis.createClient({
host: redisConfig.host,
port: redisConfig.port,
password: redisConfig.password,
db: redisConfig.db,
retry_strategy: function (options) {
// if (options.error && options.error.code === 'ECONNREFUSED') {
// // End reconnecting on a specific error and flush all commands with
// // a individual error
// return new Error('The server refused the connection');
// }
if (options.total_retry_time > 1000 * 60 * 60) {
// End reconnecting after a specific timeout and flush all commands
// with a individual error
return new Error('Retry time exhausted');
}
if (options.attempt > 10) {
// End reconnecting with built in error
return 10000;
}
// reconnect after
return Math.min(options.attempt * 100, 3000);
}
});
// return client.multi().get('foo').execAsync().then(function(res) {
// console.log(res); // => 'bar'
// });
this.client.on("error", function (err) {
console.log("Error " + err);
// //日志记录
// logCtl.error({
// optitle:"redis this.client.on异常:",
// op:"base/utils/redisClient/this.client.on",
// content:err,
// clientIp:""
// });
});
this.subclient = this.client.duplicate();
this.subclient.on("error", function (err) {
console.log("Error " + err);
// //日志记录
// logCtl.error({
// optitle:"redis this.subclient.on异常:",
// op:"base/utils/redisClient/this.subclient.on",
// content:err,
// clientIp:""
// });
});
var self = this;
this.subclient.on("message", async function (channel, message) {
console.log(channel, '------------- redis message ------------------- ');
if (self.taskmanager) {
if (channel == "task") {
if (message == "newtask") {
(async (that) => {
var msg2 = await that.rpop("tasklist");
if (msg2) {
console.log("taskName+++++" + msg2);
var msgs2 = msg2.split("_");
var action = msgs2[0];
var taskName = msgs2[1];
var exp = msgs2[2];
await that.taskmanager.addTask(taskName, exp);
}
})(self)
} else {
(async (msg, that) => {
var msgs = msg.split("_");
var action = msgs[0];
if (action == "delete") {
var taskName = msgs[1];
await that.taskmanager.deleteTask(taskName);
}
})(message, self);
}
}
}
if (self.chatserver) {
if (channel != "task") {
var message = JSON.parse(message);
console.log(message, "------------------------------------------ publish message");
if (channel == "brc") {//如果是广播频道,则发送广播到客户端
self.chatserver.server.emit("brc", message);
} else if (self.chatserver.users[channel]) {
if (message.type) {
self.chatserver.users[channel].client.emit(message.type, message.data);
} else {
//持久化
self.chatserver.users[channel].client.emit("chatmsg", message);
}
}
}
}
});
}
async subscribe(channel, chatserver) {
if (!this.chatserver) {
this.chatserver = chatserver;
}
return this.subclient.subscribeAsync(channel);
}
async unsubscribe(channel) {
//this.chatserver=null;
return this.subclient.unsubscribeAsync(channel);
}
async subscribeTask(channel, taskmanager) {
if (!this.taskmanager) {
this.taskmanager = taskmanager;
}
return this.subclient.subscribeAsync(channel);
}
async publish(channel, msg) {
console.log(channel + ":" + msg);
return this.client.publishAsync(channel, msg);
}
async rpush(key, val) {
return this.client.rpushAsync(key, val);
}
async llen(key) {
return this.client.llenAsync(key);
}
async rpushWithEx(key, val, t) {
var p = this.rpush(key, val);
this.client.expire(key, t);
return p;
}
async rpop(key) {
return this.client.rpopAsync(key);
}
async lpop(key) {
return this.client.lpopAsync(key);
}
async lrem(key, val) {
return this.client.lremAsync(key, 1, val);
}
async ltrim(key, s, e) {
return this.client.ltrimAsync(key, s, e);
}
async clearlist(key) {
await this.client.ltrim(key, -1, -1);
await this.client.ltrim(key, 1, -1);
return 0;
}
async flushall() {
console.log("sss");
return this.client.flushallAsync();
}
async keys(p) {
return this.client.keysAsync(p);
}
async set(key, val) {
if (typeof val == "undefined" || typeof key == "undefined") {
console.log("......................cache val undefined");
console.log(key);
return null;
}
return this.client.setAsync(key, val);
}
async setWithEx(key, val, t) {
var p = this.client.setAsync(key, val);
this.client.expire(key, t);
return p;
}
async get(key) {
return this.client.getAsync(key);
}
async delete(key) {
return this.client.delAsync(key);
}
async hmset(key, jsonObj) {
return this.client.hmsetAsync(key, jsonObj);
}
async hmsetWithEx(key, jsonObj, t) {
var p = this.client.hmsetAsync(key, jsonObj);
this.client.expire(key, t);
return p;
}
async hgetall(key) {
return this.client.hgetallAsync(key);
}
async hincrby(key, f, n) {
return this.client.hincrbyAsync(key, f, n);
}
async sadd(key, vals) {
await this.client.saddAsync(key, ...vals);
return this.scard(key);
}
async scard(key) {
return this.client.scardAsync(key);
}
async srem(key, val) {
return this.client.sremAsync(key, val);
}
async sismember(key, val) {
return this.client.sismemberAsync(key, val);
}
async smembers(key) {
return this.client.smembersAsync(key);
}
async exists(key) {
return this.client.existsAsync(key);
}
async incr(key) {
return this.client.incrAsync(key);
}
}
module.exports = RedisClient;
// var client=new RedisClient();
// (async ()=>{
// await client.rpush("tasklist","xxx");
// await client.rpush("tasklist","xxx");
// var len=await client.llen("tasklist");
// //await client.clearlist("tasklist");
// len=await client.llen("tasklist");
// console.log(len);
// })()
// client.keys('*').then(s=>{
// console.log(s);
// });
// let clients = {};
// clients.watcher = redis.createClient({ ... } );
// clients.alterer = clients.watcher.duplicate();
// client.sadd("h",["ok","jy","ok"]).then(function(r){
// console.log(r);
// });
// client.smembers("h").then(function(r){
// console.log(r);
// });
// client.sismember("h","ok").then(function(r){
// console.log(r);
// });
// console.dir(client);ti.exec( callback )回调函数参数err:返回null或者Array,出错则返回对应命令序列链中发生错误的错误信息,这个数组中最后一个元素是源自exec本身的一个EXECABORT类型的错误
// r.set("hello","oooo").then(function(result){
// console.log(result);
// });
// r.get("hello").then(function(result){
// console.log(result);
// });
// client.hmset("user_1",{name:"jy",age:13}).then(function(r){
// console.log(r);
//
// });
// client.hincrby("user_1","age",2).then(function(r){
// console.log(r);
// setTimeout(function(){
// client.hgetall("user_1").then(function(u){
// console.log(u);
// });
// },3000);
// });
...@@ -5,32 +5,154 @@ const querystring = require('querystring'); ...@@ -5,32 +5,154 @@ const querystring = require('querystring');
var settings=require("../../config/settings"); var settings=require("../../config/settings");
class RestClient{ class RestClient{
constructor(){ constructor(){
this.cmdGetPattern = "curl {-G} -k -d '{data}' {url}";
this.cmdPostPattern="curl -k -H 'Content-type: application/json' -d '{data}' '{url}'";
this.cmdPostPatternWithAK="curl -k -H 'Content-type: application/json' -H 'AccessKey:{ak}' -d '{data}' {url}";
this.cmdDownLoadFilePattern="curl -G -o {fileName} {url}"; this.cmdDownLoadFilePattern="curl -G -o {fileName} {url}";
this.cmdPostPattern2="curl -k -H 'Content-type: application/x-www-form-urlencoded' -d '{data}' {url}";
// form-data形式post data参数类型 md5=2&data=1
this.cmdPostPattern5="curl -k --data '{data}' {url}";
}
FetchGetCmd(subData, url) {
var cmd = this.cmdGetPattern.replace(/\{\-G\}/g, "-G").replace(
/\{data\}/g, subData).replace(/\{url\}/g, url);
return cmd;
}
FetchPostCmd(subData, url) {
var data=JSON.stringify(subData);
var cmd= this.cmdPostPattern.replace(/\{data\}/g,
data).replace(/\{url\}/g, url);
return cmd;
}
FetchPostCmdWithAK(subData, url,acck) {
var data=JSON.stringify(subData);
var cmd= this.cmdPostPatternWithAK.replace(/\{data\}/g,
data).replace(/\{url\}/g, url).replace(/\{ak\}/g,acck);
return cmd;
}
FetchPostCmd2(subData, url) {
var data=subData;
var cmd= this.cmdPostPattern2.replace(/\{data\}/g,
data).replace(/\{url\}/g, url);
return cmd;
}
FetchPostCmd3(subData, url) {
var data=subData;
var cmd= this.cmdPostPattern3.replace(/\{data\}/g,
data).replace(/\{url\}/g, url);
return cmd;
}
FetchGetCmd3(url) {
var cmd = this.cmdGetPattern3.replace(/\{\-G\}/g, "-G").replace(/\{url\}/g, url);
return cmd;
}
FetchPostCmd4(subData, url) {
var data=subData;
var cmd= this.cmdPostPattern4.replace(/\{data\}/g,
data).replace(/\{url\}/g, url);
return cmd;
}
FetchPostCmd5(subData, url) {
var data=subData;
var cmd= this.cmdPostPattern5.replace(/\{data\}/g,
data).replace(/\{url\}/g, url);
return cmd;
} }
FetchDownLoadCmd(outfname,url) { FetchDownLoadCmd(outfname,url) {
// console.log(this.cmdPattern); // console.log(this.cmdPattern);
var cmd = this.cmdDownLoadFilePattern.replace(/\{fileName\}/g, outfname).replace( var cmd = this.cmdDownLoadFilePattern.replace(/\{fileName\}/g, outfname).replace(
/\{url\}/g, url); /\{url\}/g, url);
return cmd; return cmd;
}
async exec(cmd,options) {
//await后面表达式返回的promise对象,是then的语法糖,await返回then函数的返回值
//异常需要try/catch自己捕获或外部catch捕获
if(options){
const { stdout, stderr } = await exec(cmd,options);
return { stdout, stderr };
}else{
const { stdout, stderr } = await exec(cmd);
return { stdout, stderr };
}
} }
async exec(cmd) {
//await后面表达式返回的promise对象,是then的语法糖,await返回then函数的返回值
//异常需要try/catch自己捕获或外部catch捕获
const { stdout, stderr } = await exec(cmd,{
maxBuffer:1024*1024*15
});
return { stdout, stderr };
}
async execDownload(url,outfname){ async execDownload(url,outfname){
let cmd=this.FetchDownLoadCmd(outfname,url); let cmd=this.FetchDownLoadCmd(outfname,url);
var result=await this.exec(cmd); var result=await this.exec(cmd);
return result; return result;
} }
async execGet(subData, url){
let cmd=this.FetchGetCmd(subData,url);
var result=await this.exec(cmd);
return result;
}
async execGet2(subData, url){
var data=querystring.stringify(subData);
let cmd=this.FetchGetCmd(data,url);
var result=await this.exec(cmd);
return result;
}
async execPost(subData, url){
let cmd=this.FetchPostCmd(subData,url);
var result=await this.exec(cmd,{
maxBuffer: 10000 * 1024
});
return result;
}
async execPostWithAK(subData, url,ak){
let cmd=this.FetchPostCmdWithAK(subData,url,ak);
var result=await this.exec(cmd,{
maxBuffer:1024*1024*15
});
var rtn=result.stdout;
if(rtn){
return JSON.parse(rtn);
}else{
return null;
}
}
async execPost2(subData, url){
let cmd=this.FetchPostCmd2(subData,url);
console.log(cmd);
var result=await this.exec(cmd);
return result;
}
async execPost3(subData, url){
let cmd=this.FetchPostCmd3(subData,url);
console.log(cmd);
var result=await this.exec(cmd);
return result;
}
async execGet3(url){
let cmd=this.FetchGetCmd3(url);
console.log("execGet3-----01");
console.log(cmd);
var result=await this.exec(cmd);
return result;
}
async execPostESignBao(subData, url){
let cmd=this.FetchPostCmd4(subData,url);
console.log(cmd);
var result=await this.exec(cmd);
return result;
}
async execPostForm(subData, url){
let cmd=this.FetchPostCmd5(subData,url);
console.log(cmd);
var result=await this.exec(cmd);
return result;
}
async execCustomPostESignBao(cmd){
console.log(cmd);
var result=await this.exec(cmd);
return result;
}
test(){
console.log("hello");
}
} }
module.exports=RestClient; module.exports=RestClient;
// var x=new RestClient();
// x.execGet("","http://www.163.com").then(function(r){
// console.log(r.stdout);
// console.log(r.stderr);
// });
...@@ -6,9 +6,8 @@ var settings = { ...@@ -6,9 +6,8 @@ var settings = {
db: 11, db: 11,
}, },
database: function () { database: function () {
var args = process.argv;
return { return {
dbname: "alop", dbname: "tasks",
user: "write", user: "write",
password: "write", password: "write",
config: { config: {
......
...@@ -3,6 +3,6 @@ var basepath=path.normalize(path.join(__dirname, '../..')); ...@@ -3,6 +3,6 @@ var basepath=path.normalize(path.join(__dirname, '../..'));
var settings = { var settings = {
db:path.join(basepath,"app/base/db/impl"), db:path.join(basepath,"app/base/db/impl"),
util:path.join(basepath,"app/base/utils"), util:path.join(basepath,"app/base/utils"),
task:path.join(basepath,"app/base/task"), task:path.join(basepath,"app/base/db/task"),
}; };
module.exports = settings; module.exports = settings;
...@@ -7,8 +7,13 @@ var ENVINPUT={ ...@@ -7,8 +7,13 @@ var ENVINPUT={
REDIS_HOST:process.env.REDIS_HOST, REDIS_HOST:process.env.REDIS_HOST,
REDIS_PORT:process.env.REDIS_PORT, REDIS_PORT:process.env.REDIS_PORT,
REDIS_PWD:process.env.REDIS_PWD, REDIS_PWD:process.env.REDIS_PWD,
<<<<<<< HEAD
REDIS_DB:process.env.TASK_REDIS_DB, REDIS_DB:process.env.TASK_REDIS_DB,
DB_NAME:process.env.TASK_DB_NAME, DB_NAME:process.env.TASK_DB_NAME,
=======
DB_NAME:process.env.TASK_DB_NAME,
REDIS_DB:process.env.TASK_REDIS_DB,
>>>>>>> 1bccc0e0e608e84ed0a0ddd5bc63e6a076047c01
APP_ENV:process.env.APP_ENV?process.env.APP_ENV:"dev" APP_ENV:process.env.APP_ENV?process.env.APP_ENV:"dev"
}; };
var settings = { var settings = {
...@@ -29,6 +34,7 @@ var settings = { ...@@ -29,6 +34,7 @@ var settings = {
}, },
database:function(){ database:function(){
if(this.env=="dev"){ if(this.env=="dev"){
console.log("===========================");
var localsettings=require("./localsettings"); var localsettings=require("./localsettings");
return localsettings.database; return localsettings.database;
}else{ }else{
......
const system=require("./app/base/system"); const system=require("./app/base/system");
const fs=require("fs"); const fs=require("fs");
var dbf=system.getObject("db.common.connection"); //数据库支持暂缓支持
var downClient=system.getObject("util.restClient"); // var dbf=system.getObject("db.common.connection");
con=dbf.getCon(); // con=dbf.getCon();
var taskName = process.env.TASK_NAME; var taskName = process.env.TASK_NAME;
var params= process.env.TASK_PARAM; var params= process.env.TASK_PARAM;
var port=process.env.TASK_PORT; var port=process.env.TASK_PORT;
if(taskName){ if(taskName){
var t=system.getObject("task.test."+taskName); var task=system.getObject("task."+taskName);
(async()=>{ (async()=>{
await t.doTask(params); await task.doTask(params);
console.log("process over............"); console.log("process over............");
})(); })();
}else{ }else{
......
...@@ -12,5 +12,8 @@ ...@@ -12,5 +12,8 @@
"glob": "^7.1.4", "glob": "^7.1.4",
"mysql2": "^1.5.3", "mysql2": "^1.5.3",
"sequelize": "^4.37.8" "sequelize": "^4.37.8"
},
"devDependencies": {
"redis": "^2.8.0"
} }
} }
大家关注:
任务开发说明(建议任务通过restapi去调用各应用的接口服务)
分支:taskexecutor 开发说明见readme.txt
1、各个应用实现任务的目录 taskexecutor/app/base/db/task
按照各自应用名称建立任务子目录,在子目录内实现任务实现类
需要继承TaskBase
2、TaskBase实现集成如下功能
A、缓存管理器
B、数据库连接器
C、三个方法
beforeTask(params)------前置
subDoTask(params)------任务主要逻辑
postTask(params)---------后置
3、任务部署:支持两种类型的任务(通过配置文件配置,联系上线人员)
3.1:Cron定时任务
3.2 : 非定时任务
A-一次性任务
B-指定固定执行次数任务,依次非并发执行
C-指定并发执行个数任务,保证一个完成即可
D-既有执行次数,又有并发个数
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment