From 81736d1866f1dfefdf9bb599e4e1c5992c02ebbc Mon Sep 17 00:00:00 2001 From: XieTao Date: Sat, 21 May 2016 13:24:22 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=BA=20dp2Capo=20=E5=88=86=E5=8F=91=20MSMQ?= =?UTF-8?q?=20=E6=B6=88=E6=81=AF=E8=BF=87=E7=A8=8B=E5=8A=A0=E9=94=81?= =?UTF-8?q?=EF=BC=8C=E8=BF=99=E6=A0=B7=E5=8F=AF=E4=BB=A5=E9=81=BF=E5=85=8D?= =?UTF-8?q?=E4=B8=A4=E4=B8=AA=20Instance=20=E9=83=BD=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E5=90=8C=E4=B8=80=E4=B8=AA=20dp2library=20=E5=AE=9E=E4=BE=8B?= =?UTF-8?q?=E6=97=B6=E5=87=BA=E7=8E=B0=E7=9A=84=E5=86=B2=E7=AA=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DigitalPlatform.Common.csproj | 1 + DigitalPlatform.Common/RecordLock.cs | 378 ++++++++++++++++++ dp2Capo/Instance.cs | 16 +- dp2Capo/ServerInfo.cs | 5 +- 4 files changed, 398 insertions(+), 2 deletions(-) create mode 100644 DigitalPlatform.Common/RecordLock.cs diff --git a/DigitalPlatform.Common/DigitalPlatform.Common.csproj b/DigitalPlatform.Common/DigitalPlatform.Common.csproj index 4320d84f..dee8e5c3 100644 --- a/DigitalPlatform.Common/DigitalPlatform.Common.csproj +++ b/DigitalPlatform.Common/DigitalPlatform.Common.csproj @@ -46,6 +46,7 @@ + diff --git a/DigitalPlatform.Common/RecordLock.cs b/DigitalPlatform.Common/RecordLock.cs new file mode 100644 index 00000000..3bdc5bbc --- /dev/null +++ b/DigitalPlatform.Common/RecordLock.cs @@ -0,0 +1,378 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace DigitalPlatform.Common +{ + // 记录锁集合 + public class RecordLockCollection + { +#if OLD_LOCK + ReaderWriterLock m_lock = new ReaderWriterLock(); +#else + ReaderWriterLockSlim m_lock = new ReaderWriterLockSlim(); +#endif + + int m_nLockTimeout = 5000; // 5000=5秒 + + Hashtable RecordLocks = new Hashtable(); + + public RecordLockCollection() + { + } + + public RecordLockCollection(int nTimeout) + { + m_nLockTimeout = nTimeout; + } + + public int LockTimeout + { + get + { + return m_nLockTimeout; + } + set + { + m_nLockTimeout = value; + } + } + + public int Count + { + get + { + return this.RecordLocks.Count; + } + } + + void _lockForWrite(int nTimeout) + { +#if OLD_LOCK + this.m_lock.AcquireWriterLock(nTimeout); +#else + if (this.m_lock.TryEnterWriteLock(nTimeout) == false) + throw new ApplicationException("锁定尝试中超时"); +#endif + } + + void _unlockForWrite() + { +#if OLD_LOCK + this.m_lock.ReleaseWriterLock(); +#else + this.m_lock.ExitWriteLock(); +#endif + } + + void _lockForRead(int nTimeout) + { +#if OLD_LOCK + this.m_lock.AcquireWriterLock(nTimeout); +#else + if (this.m_lock.TryEnterReadLock(nTimeout) == false) + throw new ApplicationException("锁定尝试中超时"); +#endif + } + + void _unlockForRead() + { +#if OLD_LOCK + this.m_lock.ReleaseReaderLock(); +#else + this.m_lock.ExitReadLock(); +#endif + } + + // 获得锁对象 + RecordLock GetLock(string strID, + bool bAutoCreate = true) + { + // 加写锁 + this._lockForWrite(m_nLockTimeout); + try + { + RecordLock reclock = (RecordLock)RecordLocks[strID]; + + if (reclock == null) + { + if (bAutoCreate == true) + { + reclock = new RecordLock(this.m_nLockTimeout); + reclock.m_strID = strID; + RecordLocks.Add(strID, reclock); + } + else + return null; + } + + Interlocked.Increment(ref reclock.m_nUseCount); + // Debug.WriteLine("record lock count " + RecordLocks.Count); + return reclock; + } + finally + { + this._unlockForWrite(); + } + } + + // 试图移走锁对象 + // 应当在RecordLock:m_lock解锁以后进行 + void TryRemoveLock(RecordLock reclock) + { + // 加写锁 + this._lockForWrite(m_nLockTimeout); + try + { + int nRet = Interlocked.Increment(ref reclock.m_nUseCount); + if (nRet == 1) // 说明增量以前为0 + { + this.RecordLocks.Remove(reclock.m_strID); + } + else + { + Interlocked.Decrement(ref reclock.m_nUseCount); + } + } + finally + { + this._unlockForWrite(); + } + + + } + + // 读锁定 + public void LockForRead(string strID) + { + LockForRead(strID, RecordLock.m_nLockTimeout); + } + + // 读锁定 + public void LockForRead(string strID, + int nTimeOut) + { + RecordLock reclock = GetLock(strID); + + // Interlocked.Increment(ref reclock.m_nUseCount); + + // 加读锁 + try + { + reclock._lockForRead(nTimeOut); + } + catch (Exception ex) + { + Interlocked.Decrement(ref reclock.m_nUseCount); + // 是否还要删除? + + throw ex; + } + } + + public void UnlockForRead(string strID) + { + RecordLock reclock = GetLock(strID, false); + + if (reclock == null) + throw new Exception("id '" + strID + "' 没有找到对应的记录锁"); + + try + { + reclock._unlockForRead(); + } + finally + { + + Interlocked.Decrement(ref reclock.m_nUseCount); + Interlocked.Decrement(ref reclock.m_nUseCount); + + TryRemoveLock(reclock); + } + } + + // 写锁定一批 ID + // parameters: + // nTimeOut 如果 == 0,表示使用缺省的 timeout 值 + public void LockForWrite(ref List ids, + int nTimeOut = 0) + { + if (nTimeOut == 0) + nTimeOut = RecordLock.m_nLockTimeout; + + ids.Sort(); + List succeeds = new List(); + bool bSucceed = false; + try + { + foreach (string id in ids) + { + LockForWrite(id, nTimeOut); + succeeds.Add(id); + } + bSucceed = true; + } + finally + { + // 如果中途遇到异常,要把已经枷锁成功的部分解锁 + if (bSucceed == false) + { + foreach (string id in succeeds) + { + UnlockForWrite(id); + } + } + } + } + + public void UnlockForWrite(List ids) + { + foreach (string id in ids) + { + UnlockForWrite(id); + } + } + + // 写锁定 + public void LockForWrite(string strID) + { + LockForWrite(strID, RecordLock.m_nLockTimeout); + } + + // 写锁定 + public void LockForWrite(string strID, + int nTimeOut) + { + RecordLock reclock = GetLock(strID); + + // Interlocked.Increment(ref reclock.m_nUseCount); + + // 加写锁 + try + { + reclock._lockForWrite(nTimeOut); + } + catch (Exception ex) + { + Interlocked.Decrement(ref reclock.m_nUseCount); + // 是否还要删除? + + throw ex; + } + } + + public void UnlockForWrite(string strID) + { + RecordLock reclock = GetLock(strID, false); + + if (reclock == null) + throw new Exception("id '" + strID + "' 没有找到对应的记录锁"); + + try + { + reclock._unlockForWrite(); + } + finally + { + Interlocked.Decrement(ref reclock.m_nUseCount); + Interlocked.Decrement(ref reclock.m_nUseCount); + + TryRemoveLock(reclock); + } + } + + // 输出到文本 + public string DumpText() + { + // 加读锁 + this._lockForRead(m_nLockTimeout); + try + { + string strResult = ""; + + foreach (string key in RecordLocks.Keys) + { + RecordLock onelock = (RecordLock)this.RecordLocks[key]; + + strResult += "id='" + onelock.m_strID + "' usecount='" + Convert.ToString(onelock.m_nUseCount) + "' hashcode='" + onelock.GetLockHashCode() + "'\r\n"; + } + + return strResult; + } + finally + { + this._unlockForRead(); + } + } + + } + + /// + /// 记录锁 + /// + public class RecordLock + { +#if OLD_LOCK + private ReaderWriterLock m_lock = new ReaderWriterLock(); +#else + private ReaderWriterLockSlim m_lock = new ReaderWriterLockSlim(); +#endif + internal static int m_nLockTimeout = 5000; // 5000=5秒 + + internal string m_strID; + internal int m_nUseCount = 0; + + public RecordLock(int nDefaultTimeout) + { + m_nLockTimeout = nDefaultTimeout; + } + + internal void _lockForWrite(int nTimeout) + { +#if OLD_LOCK + this.m_lock.AcquireWriterLock(nTimeout); +#else + if (this.m_lock.TryEnterWriteLock(nTimeout) == false) + throw new ApplicationException("锁定尝试中超时"); +#endif + } + + internal void _unlockForWrite() + { +#if OLD_LOCK + this.m_lock.ReleaseWriterLock(); +#else + this.m_lock.ExitWriteLock(); +#endif + } + + internal void _lockForRead(int nTimeout) + { +#if OLD_LOCK + this.m_lock.AcquireWriterLock(nTimeout); +#else + if (this.m_lock.TryEnterReadLock(nTimeout) == false) + throw new ApplicationException("锁定尝试中超时"); +#endif + } + + internal void _unlockForRead() + { +#if OLD_LOCK + this.m_lock.ReleaseReaderLock(); +#else + this.m_lock.ExitReadLock(); +#endif + } + + public int GetLockHashCode() + { + return this.m_lock.GetHashCode(); + } + } + +} diff --git a/dp2Capo/Instance.cs b/dp2Capo/Instance.cs index c23ce6ad..5aabd86c 100644 --- a/dp2Capo/Instance.cs +++ b/dp2Capo/Instance.cs @@ -282,6 +282,16 @@ public void Notify() if (_queue != null && this.MessageConnection.IsConnected) { + try + { + ServerInfo._recordLocks.LockForWrite(this._queue.Path); + } + catch(ApplicationException) + { + // 超时了 + return; + } + try { MessageEnumerator iterator = _queue.GetMessageEnumerator2(); @@ -290,7 +300,7 @@ public void Notify() Message message = iterator.Current; MessageRecord record = new MessageRecord(); - record.groups = new string[1]{"gn:_patronNotify"}; // gn 表示 group name + record.groups = new string[1] { "gn:_patronNotify" }; // gn 表示 group name record.data = (string)message.Body; record.format = "xml"; List records = new List { record }; @@ -326,6 +336,10 @@ public void Notify() // Program.WriteWindowsLog("Instance.Notify() 出现异常: " + ExceptionUtil.GetDebugText(ex)); this.WriteErrorLog("Instance.Notify() 出现异常: " + ExceptionUtil.GetDebugText(ex)); } + finally + { + ServerInfo._recordLocks.UnlockForWrite(this._queue.Path); + } } } diff --git a/dp2Capo/ServerInfo.cs b/dp2Capo/ServerInfo.cs index 3fe9661b..69f1aaf2 100644 --- a/dp2Capo/ServerInfo.cs +++ b/dp2Capo/ServerInfo.cs @@ -1,4 +1,5 @@ -using System; +using DigitalPlatform.Common; +using System; using System.Collections.Generic; using System.IO; using System.Linq; @@ -15,6 +16,8 @@ public static class ServerInfo // 管理线程 public static DefaultThread _defaultThread = new DefaultThread(); + public static RecordLockCollection _recordLocks = new RecordLockCollection(); + // 从数据目录装载全部实例定义,并连接服务器 public static void Initial(string strDataDir) {