Commit b98fc42a by 蒋勇

d

parent b11fde54
...@@ -12,14 +12,16 @@ class TaskBase{ ...@@ -12,14 +12,16 @@ class TaskBase{
} }
async beforeTask(params){ async beforeTask(params){
var self=this; var self=this;
//订阅任务频道
await this.redisClient.subscribe(this.TASK_CHANNEL); await this.redisClient.subscribe(this.TASK_CHANNEL);
//频道事件处理函数
this.redisClient.subclient.on("message", async function (channel, message) { this.redisClient.subclient.on("message", async function (channel, message) {
await self.taskHandle(channel, message); await self.taskHandle(channel, message);
}); });
await this.subBeforeTask(params); await this.subBeforeTask(params);
} }
async taskHandle(channel, message){ async taskHandle(channel, message){
await this.subBeforeTask(channel, message); await this.subTaskHandle(channel, message);
} }
async subTaskHandle(channel, message){ async subTaskHandle(channel, message){
console.log(channel, message); console.log(channel, message);
...@@ -47,6 +49,7 @@ class TaskBase{ ...@@ -47,6 +49,7 @@ class TaskBase{
} }
} catch (e) { } catch (e) {
await this.redisClient.unsubscribe(this.TASK_CHANNEL); await this.redisClient.unsubscribe(this.TASK_CHANNEL);
console.log(e);
//日志记录 //日志记录
console.log(JSON.stringify({ console.log(JSON.stringify({
optitle:this.serviceName+",任务执行存在错误", optitle:this.serviceName+",任务执行存在错误",
...@@ -88,5 +91,10 @@ class TaskBase{ ...@@ -88,5 +91,10 @@ class TaskBase{
} }
return null; return null;
} }
sleep(milliSeconds) {
var startTime = new Date().getTime();
while (new Date().getTime() < startTime + milliSeconds);
}
} }
module.exports=TaskBase; module.exports=TaskBase;
...@@ -2,6 +2,7 @@ const TaskBase=require("../../task.base"); ...@@ -2,6 +2,7 @@ const TaskBase=require("../../task.base");
class TestTask extends TaskBase{ class TestTask extends TaskBase{
constructor(){ constructor(){
super(TaskBase.getServiceName(TestTask)); super(TaskBase.getServiceName(TestTask));
this.cacheData=[];
} }
async subBeforeTask(params){ async subBeforeTask(params){
console.log("前置操作......",this.serviceName); console.log("前置操作......",this.serviceName);
...@@ -9,6 +10,24 @@ class TestTask extends TaskBase{ ...@@ -9,6 +10,24 @@ class TestTask extends TaskBase{
//console.log(this.cacheManager); //console.log(this.cacheManager);
this.isDaemon=true; this.isDaemon=true;
} }
async subTaskHandle(channel, message){
var x=await this.redisClient.rpop(message);
if(x){
console.log("cache"+x);
this.cacheData.push(x);
}
setImmediate(()=>{
this.paseXls();
});
}
async paseXls(){
var pv=this.cacheData.pop();
if(pv){
console.log(pv);
this.sleep(5000);
this.paseXls();
}
}
async subDoTask(params){ async subDoTask(params){
console.log(params); console.log(params);
console.log("TestTask1....."); console.log("TestTask1.....");
......
...@@ -49,9 +49,14 @@ class RedisClient { ...@@ -49,9 +49,14 @@ class RedisClient {
return this.subclient.unsubscribeAsync(channel); return this.subclient.unsubscribeAsync(channel);
} }
async publish(channel, msg) { async publish(channel, msg) {
console.log(channel + ":" + msg); console.log("publish--"+channel + ":" + msg);
return this.client.publishAsync(channel, msg); return this.client.publishAsync(channel, msg);
} }
async notifyConsume(channel, consumetarget,val) {
await this.client.rpush(consumetarget,val);
console.log("publish--"+channel + ":" + consumetarget);
return this.client.publishAsync(channel, consumetarget);
}
async rpush(key, val) { async rpush(key, val) {
return this.client.rpushAsync(key, val); return this.client.rpushAsync(key, val);
} }
...@@ -146,15 +151,7 @@ class RedisClient { ...@@ -146,15 +151,7 @@ class RedisClient {
} }
} }
module.exports = RedisClient; module.exports = RedisClient;
// var client=new RedisClient();
// (async ()=>{
// await client.rpush("tasklist","xxx");
// await client.rpush("tasklist","xxx");
// var len=await client.llen("tasklist");
// //await client.clearlist("tasklist");
// len=await client.llen("tasklist");
// console.log(len);
// })()
// client.keys('*').then(s=>{ // client.keys('*').then(s=>{
// console.log(s); // console.log(s);
......
var RC=require("./redisClient.js");
var client=new RC();
(async ()=>{
for(i=0;i<100;i++){
await client.notifyConsume("k8stask","docurls",i);
}
})()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment