Commit 28ad33e2 by 蒋勇

d

parent d0b7d08a
......@@ -8,9 +8,13 @@ class CtlBase {
this.cacheManager = system.getObject("db.common.cacheManager");
this.logClient = system.getObject("util.logClient");
}
setSocketServer (socketserver) {
this.socketserver = socketserver
}
static getServiceName (ClassObj) {
return ClassObj["name"].substring(0, ClassObj["name"].lastIndexOf("Ctl")).toLowerCase() + "Sve";
let x = ClassObj["name"].substring(0, ClassObj["name"].lastIndexOf("Ctl")).toLowerCase() + "Sve";
console.log("kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkk", x)
return x;
}
async update (pobj, qobj, req) {
const up = await this.service.update(pobj);
......
var system=require("../../system")
var settings=require("../../../config/settings");
const CtlBase = require("../ctl.base");
var system = require("../../../system")
const CtlBase = require("../../ctl.base");
const uuidv4 = require('uuid/v4');
class MsgHistoryCtl extends CtlBase{
constructor(){
super(CtlBase.getServiceName(MsgHistoryCtl));
}
class MsgHistoryCtl extends CtlBase {
constructor() {
super("msg", CtlBase.getServiceName(MsgHistoryCtl));
}
async chanMsg (msg) {
const from = msg.from;
const to = msg.to;
const msgContent = msg.content;
var msgH = {
toType: msg.type ? msg.type : "single",
senderId: msg.from,
sender: msg.nickNameForm,
fromHeadUrl: msg.headUrlFrom,
targetId: msg.to,
target: msg.nickNameTo,
toHeadUrl: msg.headUrlTo,
content: msg.content
};
}
}
module.exports=MsgHistoryCtl;
module.exports = MsgHistoryCtl;
var system = require("../../system")
var settings = require("../../../config/settings");
const CtlBase = require("../ctl.base");
var system = require("../../../system")
const CtlBase = require("../../ctl.base");
const uuidv4 = require('uuid/v4');
class MsgnoticeCtl extends CtlBase {
constructor() {
super(CtlBase.getServiceName(MsgnoticeCtl));
super("msg", CtlBase.getServiceName(MsgnoticeCtl));
}
/**
......@@ -12,14 +11,12 @@ class MsgnoticeCtl extends CtlBase {
* @param {*} p
* @param {*} msgHandler
*/
async findOnlinesByStr (p, msgHandler) {
async findOnlinesByStr (p) {
let s = p.likestr
if (s && s != "") {
if (s) {
console.log(s, "xxxxxxxxxxxxxxxxxxxx")
let d = await this.service.findOnlinesByStr(s)
let rtn = d.map((item) => {
return msgHandler.server.uinfos[item]
})
return system.getResult(rtn)
return system.getResult(d)
} else {
return system.getResult(null)
}
......
const fs=require("fs");
const settings=require("../../../../config/settings");
class CacheManager{
constructor(){
//await this.buildCacheMap();
this.buildCacheMap();
}
buildCacheMap(){
var self=this;
self.doc={};
var cachePath=settings.basepath+"/app/base/db/cache/";
const files=fs.readdirSync(cachePath);
if(files){
files.forEach(function(r){
var classObj=require(cachePath+"/"+r);
self[classObj.name]=new classObj();
var refTmp=self[classObj.name];
if(refTmp.prefix){
self.doc[refTmp.prefix]=refTmp.desc;
}
else{
console.log("请在"+classObj.name+"缓存中定义prefix");
}
});
}
}
}
module.exports=CacheManager;
// var cm= new CacheManager();
// cm["InitGiftCache"].cacheGlobalVal("hello").then(function(){
// cm["InitGiftCache"].cacheGlobalVal().then(x=>{
// console.log(x);
// });
// });
......@@ -3,11 +3,12 @@ const settings = require("../../../../config/settings");
const appconfig = system.getSysConfig();
module.exports = (db, DataTypes) => {
return db.define("msghistory", {
msgType: {
toType: {
type: DataTypes.ENUM,
allowNull: false,
values: Object.keys(appconfig.pdict.msgType),
},
ctype: DataTypes.STRING,
sender: DataTypes.STRING,
senderId: DataTypes.INTEGER,
fromHeadUrl: DataTypes.STRING,
......@@ -18,6 +19,8 @@ module.exports = (db, DataTypes) => {
type: DataTypes.TEXT('long'),
allowNull: false,
},//需要在后台补充
fileSize: DataTypes.INTEGER,
fileName: DataTypes.STRING,
isRead: {//是否已经读过
type: DataTypes.BOOLEAN,
defaultValue: false,
......
const system=require("../../../system");
const ServiceBase=require("../../sve.base");
class MsgHistoryService extends ServiceBase{
constructor(){
super(ServiceBase.getDaoName(MsgHistoryService));
this.msgnoticeDao = system.getObject("db.msg.msgnoticeDao");
this.userDao = system.getObject("db.auth.userDao");
this.redisClient = system.getObject("util.redisClient");
const system = require("../../../system");
const ServiceBase = require("../../sve.base");
class MsgHistoryService extends ServiceBase {
constructor() {
super("msg", ServiceBase.getDaoName(MsgHistoryService));
this.msgnoticeDao = system.getObject("db.msg.msgnoticeDao");
this.redisClient = system.getObject("util.redisClient");
}
async saveMsg(msg) {
async saveMsg (msg) {
var self = this;
console.log("save msg ", msg);
// 事务
await this.db.transaction(async function (t){
// 1.保存聊天信息
msg = await self.dao.create(msg, t);
// 2.保存好友信息
await self.msgnoticeDao.saveNotice(msg, t);
await this.db.transaction(async function (t) {
// 1.保存聊天信息
msg = await self.dao.create(msg, t);
// 2.保存好友信息
await self.msgnoticeDao.saveNotice(msg, t);
});
return msg;
}
async pushBusinessLicenseMsg(senderId, targetId, businessLicense_id) {
if(!businessLicense_id) {
async pushBusinessLicenseMsg (senderId, targetId, businessLicense_id) {
if (!businessLicense_id) {
return 0;
}
var notice = await this.msgnoticeDao.findOne({fromId : senderId, toId : targetId});
if(notice && notice.businessLicense_id == businessLicense_id) {
var notice = await this.msgnoticeDao.findOne({ fromId: senderId, toId: targetId });
if (notice && notice.businessLicense_id == businessLicense_id) {
return 0;
}
......@@ -39,13 +38,13 @@ class MsgHistoryService extends ServiceBase{
var target = targetUser.app_id + "¥" + targetUser.id + "¥" + targetUser.headUrl;
var msg = {
msgType: "mryzLicense",
sender:sender,
senderId:senderId,
target:target,
targetId:targetId,
content:businessLicense_id,
isRead:false,
businessLicense_id:businessLicense_id
sender: sender,
senderId: senderId,
target: target,
targetId: targetId,
content: businessLicense_id,
isRead: false,
businessLicense_id: businessLicense_id
}
var obj = await this.saveMsg(msg);
......@@ -60,24 +59,24 @@ class MsgHistoryService extends ServiceBase{
return 1;
}
async getChatList(senderId, targetId, maxId, pageSize) {
async getChatList (senderId, targetId, maxId, pageSize) {
let sql = "SELECT * FROM `msghistory` WHERE id < :maxId AND ((senderId = :senderId AND targetId = :targetId) OR (targetId = :senderId AND senderId = :targetId)) ORDER BY id DESC LIMIT :pageSize "
let params = {senderId:senderId, targetId: targetId, maxId: maxId, pageSize: pageSize};
let params = { senderId: senderId, targetId: targetId, maxId: maxId, pageSize: pageSize };
var list = await this.dao.customQuery(sql, params);
if(!list || list.length == 0) {
if (!list || list.length == 0) {
return [];
}
var licenseIds = [];
var msgIds = [];
list.forEach(item => {
if(item.msgType == 'mryzLicense') {
if (item.msgType == 'mryzLicense') {
licenseIds.push(Number(item.businessLicense_id));
}
msgIds.push(item.id);
});
if(licenseIds.length > 0) {
if (licenseIds.length > 0) {
let licenseSql = "SELECT * FROM yz_business_license WHERE id IN (" + licenseIds.join(",") + ") ";
var licenseList = await this.businesslicenseDao.customQuery(licenseSql);
......@@ -86,60 +85,60 @@ class MsgHistoryService extends ServiceBase{
licenseMap["id" + item.id] = item;
});
list.forEach(item => {
if(item.msgType == 'mryzLicense') {
if (item.msgType == 'mryzLicense') {
item.businessLicense = licenseMap['id' + item.businessLicense_id];
}
});
}
var self = this;
setTimeout(function(){
setTimeout(function () {
self.setRead(senderId, targetId, list);
}, 1000);
return list;
}
async setRead(senderId, targetId, list) {
if(!list || list.length == 0) {
async setRead (senderId, targetId, list) {
if (!list || list.length == 0) {
return;
}
var target = await this.userDao.findById(targetId);
if(!target) {
if (!target) {
return;
}
var pushIds = [];
for(var item of list) {
if(item.isRead || senderId != item.targetId) {
for (var item of list) {
if (item.isRead || senderId != item.targetId) {
continue;
}
pushIds.push(item.id);
}
if(pushIds.length == 0) {
if (pushIds.length == 0) {
return;
}
this.dao.updateByWhere({isRead: true}, {where:{id:{[this.db.Op.in]:pushIds}}});
this.dao.updateByWhere({ isRead: true }, { where: { id: { [this.db.Op.in]: pushIds } } });
var channel = target.app_id + "¥" + target.id;
var rs = await this.redisClient.publish(channel, JSON.stringify({type:"readmsg", data : pushIds}));
var rs = await this.redisClient.publish(channel, JSON.stringify({ type: "readmsg", data: pushIds }));
console.log(rs, "------------------------------------------ publish result ");
}
async readMsg(userId, id) {
async readMsg (userId, id) {
var msg = await this.dao.findById(id);
if(!msg || userId != msg.targetId) {
if (!msg || userId != msg.targetId) {
return 0;
}
msg.isRead = true;
await msg.save();
var user = await this.userDao.findById(msg.senderId);
if(!user) {
if (!user) {
return 0;
}
var channel = user.app_id + "¥" + user.id;
return await this.redisClient.publish(channel, JSON.stringify({type:"readmsg", data : [msg.id]}));
return await this.redisClient.publish(channel, JSON.stringify({ type: "readmsg", data: [msg.id] }));
}
}
module.exports=MsgHistoryService;
module.exports = MsgHistoryService;
......@@ -2,14 +2,26 @@ const system = require("../../../system");
const ServiceBase = require("../../sve.base");
class MsgNoticeService extends ServiceBase {
constructor() {
super(ServiceBase.getDaoName(MsgNoticeService));
this.userDao = system.getObject("db.auth.userDao");
super("msg", ServiceBase.getDaoName(MsgNoticeService));
this.msghistoryDao = system.getObject("db.msg.msghistoryDao");
this.redisClient = system.getObject("util.redisClient");
}
/**
* 模糊查找在线人员,为了客户端添加联系人
* @param {*} str
*/
async findOnlinesByStr (str) {
let rtn = await this.redisClient.sscan('_onlines', str + "*")
return rtn
let jsons = rtn.map((item) => {
let onlinep = item.split("|")
return {
uid: onlinep[3],
userName: onlinep[0],
nickName: onlinep[1],
headUrl: onlinep[2],
}
})
return jsons
}
getApp (appkey) {
return this.cacheManager["AppCache"].cacheApp(appkey);
......@@ -125,4 +137,8 @@ class MsgNoticeService extends ServiceBase {
return count;
}
}
module.exports = MsgNoticeService;
\ No newline at end of file
module.exports = MsgNoticeService;
// new MsgNoticeService().findOnlinesByStr("").then(r=>{
// console.log(r)
// })
\ No newline at end of file
......@@ -3,7 +3,8 @@ var system = require('../base/system');
var redisClient = system.getObject("util.redisClient");
const redisAdapter = require('socket.io-redis');
const uuidv4 = require('uuid/v4');
// const notifyCtl = system.getObject("web.socketNotifyCtl");
const msghistoryCtl = system.getObject("web.msg.msghistoryCtl");
const msgnoticeCtl = system.getObject("web.msg.msgnoticeCtl");
// const msgHistoryService = system.getObject("service.msghistorySve");
class RoomSet {
constructor(server) {
......@@ -68,7 +69,8 @@ class RoomSet {
//发送新登录用户的广播
if (roomname == ukencstr) {
let countonlines = await redisClient.sadd(this.onlinekey, [ukencstr])
let onlineperson = this.server.buildOnlineValue(ukencstr)
let countonlines = await redisClient.sadd(this.onlinekey, [onlineperson])
console.log("current lines....", countonlines)
redisClient.publish("brc", "new user login....online count:" + countonlines)
}
......@@ -148,7 +150,7 @@ class MsgHandler {
// let headUrlTo = self.server.uinfos[to].headUrl
var msgH = {
msgType: msg.type ? msg.type : "single",
senderId: msg.from
senderId: msg.from,
sender: msg.nickNameForm,
fromHeadUrl: msg.headUrlFrom,
targetId: msg.to,
......@@ -167,9 +169,11 @@ class MsgHandler {
//响应消息处理
this.client.on("replymsg", (msg, fn) => {
var p = null;
console.log("web." + msg.pkgname + "." + msg.cls, "mmmmmmmmmmmmmmmmmmmmmm")
var invokeObj = system.getObject("web." + msg.pkgname + "." + msg.cls);
console.log(invokeObj, "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv")
if (invokeObj[msg.method]) {
p = invokeObj[msg.method].apply(invokeObj, [msg.data, self]);
p = invokeObj[msg.method].apply(invokeObj, [msg.data]);
}
p.then(r => {
fn(r);
......@@ -203,10 +207,16 @@ class SocketServer {
var u = uuid.replace(/\-/g, "");
return u;
}
buildOnlineValue (ukencstr) {
let onlineperson = `${this.uinfos[ukencstr].userName
}|${this.uinfos[ukencstr].nickName}|${this.uinfos[ukencstr].headUrl}|${ukencstr}`
return onlineperson
}
init () {
var self = this;
//挂载到web应用的控制器,开放消息服务器的rest接口调用
//notifyCtl.setSocketServer(self);
msghistoryCtl.setSocketServer(self);
msgnoticeCtl.setSocketServer(self);
//初始化房间集合
this.rooms = new RoomSet(self)
//订阅广播频道
......@@ -263,15 +273,17 @@ class SocketServer {
console.log(ukencstr, "ddddddddddddd")
if (ukencstr && ukencstr != "undefined") {
console.log(ukencstr, "will delete.....")
delete self.users[ukencstr]
delete self.uinfos[ukencstr]
//删除在线人数
let currentpersonstr = self.buildOnlineValue(ukencstr)
await redisClient.srem(self.rooms.onlinekey, currentpersonstr)
//断开链接时,从自己私人房间退出
self.rooms.exitRoom(ukencstr, ukencstr)
//取消当前人在当前节点,对其它房间的订阅,但不清除曾经加入过的房间,除非显式退出房间
self.rooms.unsubscribeOwnedRooms(ukencstr)
//删除在线人数
await redisClient.srem(self.rooms.onlinekey, ukencstr)
//当前在线人数
let c = await redisClient.scard(self.rooms.onlinekey)
......@@ -280,6 +292,9 @@ class SocketServer {
//删除私人房间uk
delete self.socketidMap[client.id]
delete self.users[ukencstr]
delete self.uinfos[ukencstr]
}
});
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment