Skip to content

Commit

Permalink
改为小批循环获取消息
Browse files Browse the repository at this point in the history
  • Loading branch information
renyh1013 committed May 24, 2016
1 parent e4b5d82 commit 7d535d2
Showing 1 changed file with 94 additions and 51 deletions.
145 changes: 94 additions & 51 deletions dp2Message/BaseMsgHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void _channels_AddMessage(object sender, AddMessageEventArgs e)

if (e.Records != null)
{
DoMessage(e.Records,"addMessage");
DoMessage(e.Records, "addMessage");
}
}

Expand All @@ -100,33 +100,74 @@ public async void DoLoadMessage()
{
if (_inGetMessage > 0)
return;

_inGetMessage++;
try
{
string strGroupName = "_patronNotify";//"<default>";

string strError = "";

CancellationToken cancel_token = new CancellationToken();
string id = Guid.NewGuid().ToString();
GetMessageRequest request = new GetMessageRequest(id,
"",
strGroupName, // "" 表示默认群组
"",
"",
0,
-1);
try
{
MessageConnection connection = await this._channels.GetConnectionAsync(
this._dp2mserverUrl,
"");
MessageResult result = await connection.GetMessageAsync(
request,
OutputMessage,
new TimeSpan(0, 1, 0),
cancel_token);
"");
/*
我提前说一下:小批循环获取,如果按照一个条件,不在中途删除服务器端的消息,应该是这样循环:start=0,count=100; start=101,count=100,...
如果你每次中途都主动删除(或者失效)刚处理的这小批消息,循环就是这样了:start=0,count=100; start=0,count=100;... 明白么?分号位置是要做删除调用的
当然如果请求的条件发生变化了,就不算同一批了
多次调用只是为了避免突破内存空间问题,脑子里要清楚这个原则。
*/
int batchNo = 1;//用于输出,看结果对不对
long totalCount = -1;//用于输出,看结果对不对 -1表示未赋值

int start = 0;
int count = 10;
REDO:
CancellationToken cancel_token = new CancellationToken();
string id = Guid.NewGuid().ToString();
GetMessageRequest request = new GetMessageRequest(id,
"",
strGroupName, // "" 表示默认群组
"",
"",
start,
count);

// 改为同步小批循环获取。原来回调函数方案删除或失效有问题,也占响应时间
GetMessageResult result = connection.GetMessage(request,
new TimeSpan(0, 1, 0),
cancel_token);
if (result.Value == -1)
{
goto ERROR1;
}
if (result.Value == 0)
{
goto END1;
}

// 用于测试,第一次返回的记为总记录数
if (totalCount == -1)
totalCount = result.Value;

// 做事,发送消息给微信,里面用了expire,所以下次的start位置不变
if (result.Results != null && result.Results.Count > 0)
{
this.DoMessage(result.Results, "getMessage");
}

// 继续获取消息
if (result.Results != null && result.Results.Count > 0
&& result.Value > result.Results.Count)
{
//start += result.Results.Count;

batchNo++; //用于输出,测试
goto REDO;
}

END1:
// 输出一次分批获取的情况
this.WriteErrorLog("总记录数[" + totalCount + "],分为[" + batchNo + "]批获取完:)");
}
catch (AggregateException ex)
{
Expand Down Expand Up @@ -155,7 +196,7 @@ void OutputMessage(long totalCount,
string errorInfo,
string errorCode)
{
if (totalCount == -1) // todo 什么情况下-1
if (totalCount == -1) // todo 什么情况下-1ko
{
StringBuilder text = new StringBuilder();
text.Append("***\r\n");
Expand All @@ -168,7 +209,7 @@ void OutputMessage(long totalCount,

if (records != null && records.Count > 0)
{
DoMessage(records,"getMessage");
DoMessage(records, "getMessage");
}
}

Expand Down Expand Up @@ -198,27 +239,28 @@ private bool AddMsgToHashTable(string msgId)
/// 实际处理通知消息
/// </summary>
/// <param name="records"></param>
private void DoMessage(IList<MessageRecord> records,string from)
private void DoMessage(IList<MessageRecord> records, string from)
{
try
lock (msgLocker)
{
if (records == null || records.Count == 0)
return;

List<string> delIds = new List<string>();
foreach (MessageRecord record in records)
try
{
// 先检查一下是不是_patronNotify组消息,因为addMessage会得到账户配置的所有组的消息,getMessage没关系只获取_patronNotify群消息
bool bPatronNotifyGroup = this.CheckIsNotifyGroup(record.groups);
if (bPatronNotifyGroup == false)
{
continue;
}
if (records == null || records.Count == 0)
return;

// getMessage与addMessage处理消息都会走到这里,对这段代码加锁,以保证不会重发消息。
lock (msgLocker)
List<string> delIds = new List<string>();
foreach (MessageRecord record in records)
{
this.WriteErrorLog("这次是["+from+"]传过来的消息。");
// 先检查一下是不是_patronNotify组消息,因为addMessage会得到账户配置的所有组的消息,getMessage没关系只获取_patronNotify群消息
bool bPatronNotifyGroup = this.CheckIsNotifyGroup(record.groups);
if (bPatronNotifyGroup == false)
{
continue;
}

// getMessage与addMessage处理消息都会走到这里,对这段代码加锁,以保证不会重发消息。

this.WriteErrorLog("这次是[" + from + "]传过来的消息。");

// 从已处理消息队列里查重,如果是前面处理过的,则不再处理
bool bSended = this.checkMsgIsDone(record.id);
Expand All @@ -240,25 +282,26 @@ private void DoMessage(IList<MessageRecord> records,string from)

// 加到已处理消息队列里
this.AddMsgToHashTable(record.id);


// 加到删除列表
delIds.Add(record.id);
}

// 加到删除列表
delIds.Add(record.id);
//删除处理过的消息
if (delIds.Count > 0)
{
string strError = "";
int nRet = this.DeleteMessage(delIds, out strError);
if (nRet == -1)
throw new Exception(strError);
}
}

//删除处理过的消息
if (delIds.Count > 0)
catch (Exception ex)
{
string strError = "";
int nRet = this.DeleteMessage(delIds, out strError);
if (nRet == -1)
throw new Exception(strError);
this.WriteErrorLog(ex.Message);
}
}
catch (Exception ex)
{
this.WriteErrorLog(ex.Message);
}
}


Expand Down Expand Up @@ -319,7 +362,7 @@ int DeleteMessage(List<string> idList, out string strError)
records.Add(record);
}

SetMessageRequest param = new SetMessageRequest("delete",
SetMessageRequest param = new SetMessageRequest("expire", //delete
"",
records);

Expand Down

0 comments on commit 7d535d2

Please sign in to comment.