diff --git a/DigitalPlatform.Common/DigitalPlatform.Common.csproj b/DigitalPlatform.Common/DigitalPlatform.Common.csproj
index 4320d84f..dee8e5c3 100644
--- a/DigitalPlatform.Common/DigitalPlatform.Common.csproj
+++ b/DigitalPlatform.Common/DigitalPlatform.Common.csproj
@@ -46,6 +46,7 @@
+
diff --git a/DigitalPlatform.Common/RecordLock.cs b/DigitalPlatform.Common/RecordLock.cs
new file mode 100644
index 00000000..3bdc5bbc
--- /dev/null
+++ b/DigitalPlatform.Common/RecordLock.cs
@@ -0,0 +1,378 @@
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DigitalPlatform.Common
+{
+ // 记录锁集合
+ public class RecordLockCollection
+ {
+#if OLD_LOCK
+ ReaderWriterLock m_lock = new ReaderWriterLock();
+#else
+ ReaderWriterLockSlim m_lock = new ReaderWriterLockSlim();
+#endif
+
+ int m_nLockTimeout = 5000; // 5000=5秒
+
+ Hashtable RecordLocks = new Hashtable();
+
+ public RecordLockCollection()
+ {
+ }
+
+ public RecordLockCollection(int nTimeout)
+ {
+ m_nLockTimeout = nTimeout;
+ }
+
+ public int LockTimeout
+ {
+ get
+ {
+ return m_nLockTimeout;
+ }
+ set
+ {
+ m_nLockTimeout = value;
+ }
+ }
+
+ public int Count
+ {
+ get
+ {
+ return this.RecordLocks.Count;
+ }
+ }
+
+ void _lockForWrite(int nTimeout)
+ {
+#if OLD_LOCK
+ this.m_lock.AcquireWriterLock(nTimeout);
+#else
+ if (this.m_lock.TryEnterWriteLock(nTimeout) == false)
+ throw new ApplicationException("锁定尝试中超时");
+#endif
+ }
+
+ void _unlockForWrite()
+ {
+#if OLD_LOCK
+ this.m_lock.ReleaseWriterLock();
+#else
+ this.m_lock.ExitWriteLock();
+#endif
+ }
+
+ void _lockForRead(int nTimeout)
+ {
+#if OLD_LOCK
+ this.m_lock.AcquireWriterLock(nTimeout);
+#else
+ if (this.m_lock.TryEnterReadLock(nTimeout) == false)
+ throw new ApplicationException("锁定尝试中超时");
+#endif
+ }
+
+ void _unlockForRead()
+ {
+#if OLD_LOCK
+ this.m_lock.ReleaseReaderLock();
+#else
+ this.m_lock.ExitReadLock();
+#endif
+ }
+
+ // 获得锁对象
+ RecordLock GetLock(string strID,
+ bool bAutoCreate = true)
+ {
+ // 加写锁
+ this._lockForWrite(m_nLockTimeout);
+ try
+ {
+ RecordLock reclock = (RecordLock)RecordLocks[strID];
+
+ if (reclock == null)
+ {
+ if (bAutoCreate == true)
+ {
+ reclock = new RecordLock(this.m_nLockTimeout);
+ reclock.m_strID = strID;
+ RecordLocks.Add(strID, reclock);
+ }
+ else
+ return null;
+ }
+
+ Interlocked.Increment(ref reclock.m_nUseCount);
+ // Debug.WriteLine("record lock count " + RecordLocks.Count);
+ return reclock;
+ }
+ finally
+ {
+ this._unlockForWrite();
+ }
+ }
+
+ // 试图移走锁对象
+ // 应当在RecordLock:m_lock解锁以后进行
+ void TryRemoveLock(RecordLock reclock)
+ {
+ // 加写锁
+ this._lockForWrite(m_nLockTimeout);
+ try
+ {
+ int nRet = Interlocked.Increment(ref reclock.m_nUseCount);
+ if (nRet == 1) // 说明增量以前为0
+ {
+ this.RecordLocks.Remove(reclock.m_strID);
+ }
+ else
+ {
+ Interlocked.Decrement(ref reclock.m_nUseCount);
+ }
+ }
+ finally
+ {
+ this._unlockForWrite();
+ }
+
+
+ }
+
+ // 读锁定
+ public void LockForRead(string strID)
+ {
+ LockForRead(strID, RecordLock.m_nLockTimeout);
+ }
+
+ // 读锁定
+ public void LockForRead(string strID,
+ int nTimeOut)
+ {
+ RecordLock reclock = GetLock(strID);
+
+ // Interlocked.Increment(ref reclock.m_nUseCount);
+
+ // 加读锁
+ try
+ {
+ reclock._lockForRead(nTimeOut);
+ }
+ catch (Exception ex)
+ {
+ Interlocked.Decrement(ref reclock.m_nUseCount);
+ // 是否还要删除?
+
+ throw ex;
+ }
+ }
+
+ public void UnlockForRead(string strID)
+ {
+ RecordLock reclock = GetLock(strID, false);
+
+ if (reclock == null)
+ throw new Exception("id '" + strID + "' 没有找到对应的记录锁");
+
+ try
+ {
+ reclock._unlockForRead();
+ }
+ finally
+ {
+
+ Interlocked.Decrement(ref reclock.m_nUseCount);
+ Interlocked.Decrement(ref reclock.m_nUseCount);
+
+ TryRemoveLock(reclock);
+ }
+ }
+
+ // 写锁定一批 ID
+ // parameters:
+ // nTimeOut 如果 == 0,表示使用缺省的 timeout 值
+ public void LockForWrite(ref List ids,
+ int nTimeOut = 0)
+ {
+ if (nTimeOut == 0)
+ nTimeOut = RecordLock.m_nLockTimeout;
+
+ ids.Sort();
+ List succeeds = new List();
+ bool bSucceed = false;
+ try
+ {
+ foreach (string id in ids)
+ {
+ LockForWrite(id, nTimeOut);
+ succeeds.Add(id);
+ }
+ bSucceed = true;
+ }
+ finally
+ {
+ // 如果中途遇到异常,要把已经枷锁成功的部分解锁
+ if (bSucceed == false)
+ {
+ foreach (string id in succeeds)
+ {
+ UnlockForWrite(id);
+ }
+ }
+ }
+ }
+
+ public void UnlockForWrite(List ids)
+ {
+ foreach (string id in ids)
+ {
+ UnlockForWrite(id);
+ }
+ }
+
+ // 写锁定
+ public void LockForWrite(string strID)
+ {
+ LockForWrite(strID, RecordLock.m_nLockTimeout);
+ }
+
+ // 写锁定
+ public void LockForWrite(string strID,
+ int nTimeOut)
+ {
+ RecordLock reclock = GetLock(strID);
+
+ // Interlocked.Increment(ref reclock.m_nUseCount);
+
+ // 加写锁
+ try
+ {
+ reclock._lockForWrite(nTimeOut);
+ }
+ catch (Exception ex)
+ {
+ Interlocked.Decrement(ref reclock.m_nUseCount);
+ // 是否还要删除?
+
+ throw ex;
+ }
+ }
+
+ public void UnlockForWrite(string strID)
+ {
+ RecordLock reclock = GetLock(strID, false);
+
+ if (reclock == null)
+ throw new Exception("id '" + strID + "' 没有找到对应的记录锁");
+
+ try
+ {
+ reclock._unlockForWrite();
+ }
+ finally
+ {
+ Interlocked.Decrement(ref reclock.m_nUseCount);
+ Interlocked.Decrement(ref reclock.m_nUseCount);
+
+ TryRemoveLock(reclock);
+ }
+ }
+
+ // 输出到文本
+ public string DumpText()
+ {
+ // 加读锁
+ this._lockForRead(m_nLockTimeout);
+ try
+ {
+ string strResult = "";
+
+ foreach (string key in RecordLocks.Keys)
+ {
+ RecordLock onelock = (RecordLock)this.RecordLocks[key];
+
+ strResult += "id='" + onelock.m_strID + "' usecount='" + Convert.ToString(onelock.m_nUseCount) + "' hashcode='" + onelock.GetLockHashCode() + "'\r\n";
+ }
+
+ return strResult;
+ }
+ finally
+ {
+ this._unlockForRead();
+ }
+ }
+
+ }
+
+ ///
+ /// 记录锁
+ ///
+ public class RecordLock
+ {
+#if OLD_LOCK
+ private ReaderWriterLock m_lock = new ReaderWriterLock();
+#else
+ private ReaderWriterLockSlim m_lock = new ReaderWriterLockSlim();
+#endif
+ internal static int m_nLockTimeout = 5000; // 5000=5秒
+
+ internal string m_strID;
+ internal int m_nUseCount = 0;
+
+ public RecordLock(int nDefaultTimeout)
+ {
+ m_nLockTimeout = nDefaultTimeout;
+ }
+
+ internal void _lockForWrite(int nTimeout)
+ {
+#if OLD_LOCK
+ this.m_lock.AcquireWriterLock(nTimeout);
+#else
+ if (this.m_lock.TryEnterWriteLock(nTimeout) == false)
+ throw new ApplicationException("锁定尝试中超时");
+#endif
+ }
+
+ internal void _unlockForWrite()
+ {
+#if OLD_LOCK
+ this.m_lock.ReleaseWriterLock();
+#else
+ this.m_lock.ExitWriteLock();
+#endif
+ }
+
+ internal void _lockForRead(int nTimeout)
+ {
+#if OLD_LOCK
+ this.m_lock.AcquireWriterLock(nTimeout);
+#else
+ if (this.m_lock.TryEnterReadLock(nTimeout) == false)
+ throw new ApplicationException("锁定尝试中超时");
+#endif
+ }
+
+ internal void _unlockForRead()
+ {
+#if OLD_LOCK
+ this.m_lock.ReleaseReaderLock();
+#else
+ this.m_lock.ExitReadLock();
+#endif
+ }
+
+ public int GetLockHashCode()
+ {
+ return this.m_lock.GetHashCode();
+ }
+ }
+
+}
diff --git a/dp2Capo/Instance.cs b/dp2Capo/Instance.cs
index c23ce6ad..5aabd86c 100644
--- a/dp2Capo/Instance.cs
+++ b/dp2Capo/Instance.cs
@@ -282,6 +282,16 @@ public void Notify()
if (_queue != null
&& this.MessageConnection.IsConnected)
{
+ try
+ {
+ ServerInfo._recordLocks.LockForWrite(this._queue.Path);
+ }
+ catch(ApplicationException)
+ {
+ // 超时了
+ return;
+ }
+
try
{
MessageEnumerator iterator = _queue.GetMessageEnumerator2();
@@ -290,7 +300,7 @@ public void Notify()
Message message = iterator.Current;
MessageRecord record = new MessageRecord();
- record.groups = new string[1]{"gn:_patronNotify"}; // gn 表示 group name
+ record.groups = new string[1] { "gn:_patronNotify" }; // gn 表示 group name
record.data = (string)message.Body;
record.format = "xml";
List records = new List { record };
@@ -326,6 +336,10 @@ public void Notify()
// Program.WriteWindowsLog("Instance.Notify() 出现异常: " + ExceptionUtil.GetDebugText(ex));
this.WriteErrorLog("Instance.Notify() 出现异常: " + ExceptionUtil.GetDebugText(ex));
}
+ finally
+ {
+ ServerInfo._recordLocks.UnlockForWrite(this._queue.Path);
+ }
}
}
diff --git a/dp2Capo/ServerInfo.cs b/dp2Capo/ServerInfo.cs
index 3fe9661b..69f1aaf2 100644
--- a/dp2Capo/ServerInfo.cs
+++ b/dp2Capo/ServerInfo.cs
@@ -1,4 +1,5 @@
-using System;
+using DigitalPlatform.Common;
+using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
@@ -15,6 +16,8 @@ public static class ServerInfo
// 管理线程
public static DefaultThread _defaultThread = new DefaultThread();
+ public static RecordLockCollection _recordLocks = new RecordLockCollection();
+
// 从数据目录装载全部实例定义,并连接服务器
public static void Initial(string strDataDir)
{