diff --git a/dp2Message/BaseMsgHandler.cs b/dp2Message/BaseMsgHandler.cs index b69c08d7..81703bfa 100644 --- a/dp2Message/BaseMsgHandler.cs +++ b/dp2Message/BaseMsgHandler.cs @@ -91,7 +91,7 @@ void _channels_AddMessage(object sender, AddMessageEventArgs e) if (e.Records != null) { - DoMessage(e.Records,"addMessage"); + DoMessage(e.Records, "addMessage"); } } @@ -100,33 +100,74 @@ public async void DoLoadMessage() { if (_inGetMessage > 0) return; - _inGetMessage++; try { string strGroupName = "_patronNotify";//""; - string strError = ""; - - CancellationToken cancel_token = new CancellationToken(); - string id = Guid.NewGuid().ToString(); - GetMessageRequest request = new GetMessageRequest(id, - "", - strGroupName, // "" 表示默认群组 - "", - "", - 0, - -1); try { MessageConnection connection = await this._channels.GetConnectionAsync( this._dp2mserverUrl, - ""); - MessageResult result = await connection.GetMessageAsync( - request, - OutputMessage, - new TimeSpan(0, 1, 0), - cancel_token); + ""); + /* +我提前说一下:小批循环获取,如果按照一个条件,不在中途删除服务器端的消息,应该是这样循环:start=0,count=100; start=101,count=100,... +如果你每次中途都主动删除(或者失效)刚处理的这小批消息,循环就是这样了:start=0,count=100; start=0,count=100;... 明白么?分号位置是要做删除调用的 +当然如果请求的条件发生变化了,就不算同一批了 +多次调用只是为了避免突破内存空间问题,脑子里要清楚这个原则。 + */ + int batchNo = 1;//用于输出,看结果对不对 + long totalCount = -1;//用于输出,看结果对不对 -1表示未赋值 + + int start = 0; + int count = 10; + REDO: + CancellationToken cancel_token = new CancellationToken(); + string id = Guid.NewGuid().ToString(); + GetMessageRequest request = new GetMessageRequest(id, + "", + strGroupName, // "" 表示默认群组 + "", + "", + start, + count); + + // 改为同步小批循环获取。原来回调函数方案删除或失效有问题,也占响应时间 + GetMessageResult result = connection.GetMessage(request, + new TimeSpan(0, 1, 0), + cancel_token); + if (result.Value == -1) + { + goto ERROR1; + } + if (result.Value == 0) + { + goto END1; + } + + // 用于测试,第一次返回的记为总记录数 + if (totalCount == -1) + totalCount = result.Value; + + // 做事,发送消息给微信,里面用了expire,所以下次的start位置不变 + if (result.Results != null && result.Results.Count > 0) + { + this.DoMessage(result.Results, "getMessage"); + } + + // 继续获取消息 + if (result.Results != null && result.Results.Count > 0 + && result.Value > result.Results.Count) + { + //start += result.Results.Count; + + batchNo++; //用于输出,测试 + goto REDO; + } + + END1: + // 输出一次分批获取的情况 + this.WriteErrorLog("总记录数[" + totalCount + "],分为[" + batchNo + "]批获取完:)"); } catch (AggregateException ex) { @@ -155,7 +196,7 @@ void OutputMessage(long totalCount, string errorInfo, string errorCode) { - if (totalCount == -1) // todo 什么情况下-1 + if (totalCount == -1) // todo 什么情况下-1ko { StringBuilder text = new StringBuilder(); text.Append("***\r\n"); @@ -168,7 +209,7 @@ void OutputMessage(long totalCount, if (records != null && records.Count > 0) { - DoMessage(records,"getMessage"); + DoMessage(records, "getMessage"); } } @@ -198,27 +239,28 @@ private bool AddMsgToHashTable(string msgId) /// 实际处理通知消息 /// /// - private void DoMessage(IList records,string from) + private void DoMessage(IList records, string from) { - try + lock (msgLocker) { - if (records == null || records.Count == 0) - return; - - List delIds = new List(); - foreach (MessageRecord record in records) + try { - // 先检查一下是不是_patronNotify组消息,因为addMessage会得到账户配置的所有组的消息,getMessage没关系只获取_patronNotify群消息 - bool bPatronNotifyGroup = this.CheckIsNotifyGroup(record.groups); - if (bPatronNotifyGroup == false) - { - continue; - } + if (records == null || records.Count == 0) + return; - // getMessage与addMessage处理消息都会走到这里,对这段代码加锁,以保证不会重发消息。 - lock (msgLocker) + List delIds = new List(); + foreach (MessageRecord record in records) { - this.WriteErrorLog("这次是["+from+"]传过来的消息。"); + // 先检查一下是不是_patronNotify组消息,因为addMessage会得到账户配置的所有组的消息,getMessage没关系只获取_patronNotify群消息 + bool bPatronNotifyGroup = this.CheckIsNotifyGroup(record.groups); + if (bPatronNotifyGroup == false) + { + continue; + } + + // getMessage与addMessage处理消息都会走到这里,对这段代码加锁,以保证不会重发消息。 + + this.WriteErrorLog("这次是[" + from + "]传过来的消息。"); // 从已处理消息队列里查重,如果是前面处理过的,则不再处理 bool bSended = this.checkMsgIsDone(record.id); @@ -240,25 +282,26 @@ private void DoMessage(IList records,string from) // 加到已处理消息队列里 this.AddMsgToHashTable(record.id); + + + // 加到删除列表 + delIds.Add(record.id); } - // 加到删除列表 - delIds.Add(record.id); + //删除处理过的消息 + if (delIds.Count > 0) + { + string strError = ""; + int nRet = this.DeleteMessage(delIds, out strError); + if (nRet == -1) + throw new Exception(strError); + } } - - //删除处理过的消息 - if (delIds.Count > 0) + catch (Exception ex) { - string strError = ""; - int nRet = this.DeleteMessage(delIds, out strError); - if (nRet == -1) - throw new Exception(strError); + this.WriteErrorLog(ex.Message); } } - catch (Exception ex) - { - this.WriteErrorLog(ex.Message); - } } @@ -319,7 +362,7 @@ int DeleteMessage(List idList, out string strError) records.Add(record); } - SetMessageRequest param = new SetMessageRequest("delete", + SetMessageRequest param = new SetMessageRequest("expire", //delete "", records);