Commit 2509df8f by 宋毅

tj

parent b9920929
......@@ -67,12 +67,12 @@ class ConsumerBase {
}
var failQueuedName = "SYTXFAIL-SYTXPUBLIC-MSGQ";
actionBody.queuedName = queuedName;
actionBody.counter = actionBody.counter ? actionBody.counter + 1 : 1;
actionBody.counter = actionBody.counter ? actionBody.counter : 1;
console.log(actionBody, "...........................................44.......");
await this.redisClient.lpushData(failQueuedName, actionBody);
return;
}
if (actionBody.counter === 4) {
if (actionBody.counter > 4) {
await this.pushFailureLogDao.addOpFailureLogs("推送失败", actionBody, execResult);
return;
}
......
......@@ -12,7 +12,7 @@ class PublicFailConsumer extends ConsumerBase {
async subDoConsumer(queuedName, actionBody) {
console.log(actionBody, "......publicFailConsumer.....");
actionBody.counter = actionBody.counter + 1;
var exTime = actionBody.counter * 60;
var exTime = actionBody.counter * 10;
var mathStr = await this.getUidInfo(3, 10);
var scoreValue = Number(moment().format("YYYYMMDDHHmmssSSS") + mathStr);
......
......@@ -29,13 +29,13 @@ class RedisClient {
const setLock = async (key, expires = 5) => {
// 这里锁名修改成taskLocks
const lock = `cron:recyclePool:taskLocks:${key}`;
const res = await redisServer.set(lock, key, 'EX', expires, 'NX');
const res = await this.client.set(lock, key, 'EX', expires, 'NX');
return res;
};
const delLock = async (key) => {
const lock = `cron:recyclePool:taskLocks:${key}`;
await redisServer.del(lock);
await this.client.del(lock);
};
const self = this;
// 监听回调
......@@ -61,13 +61,19 @@ class RedisClient {
// 锁会在 5 秒后过期
const lock = await setLock(opResult[1]);
if (lock) {
var opData = await self.zremrangebyscoreData(opResult[0], opResult[1], opResult[1]);
await self.lpushData(opData.queuedName, opData);
await self.zremRangebyscoreData(opData.queuedName, opResult[1], opResult[1]);
var opData = await self.zrangeList(opResult[0], opResult[1], opResult[1]);
if (!opData || opData.length == 0) {
delLock(opResult[1]);
return;
}
var opDataItem = JSON.parse(opData[0]);
await self.lpushData(opDataItem.queuedName, opDataItem);
await self.zremRangebyscoreData(opResult[0], opResult[1], opResult[1]);
delLock(opResult[1])
}
} catch (error) {
//TODO:日志处理
console.log(error.stack, ".....message......................error...........");
}
});
......
......@@ -18,7 +18,7 @@ var settings = {
consumerName: ENVINPUT.CONSUMER_NAME,
queuedName: ENVINPUT.QUEUED_NAME,
basepath: path.normalize(path.join(__dirname, '../..')),
port: process.env.NODE_PORT || 4016,
port: process.env.NODE_PORT || 8080,
redis: function () {
if (this.env == "dev") {
console.log("dev.........................................................");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment