當前位置:開發者網絡 >> 技術教程 >> .NET教程 >> C#語言 >> 內容
精彩推薦
分類最新教程
分類熱點教程
    
[C#]I/O完成端口的類定義和測試實例
作者:未知
日期:2005-04-19
人氣:
投稿:(轉貼)
來源:未知
字體:
收藏:加入瀏覽器收藏
以下正文:
從William Kennedy那裡整理過來的,不同之處在於他自己定義了一個Overlapped,而我們這裡直接使用
System.Threading.NativeOverlapped.
附一段我以前的Win32下的IOCP文檔,如果您瞭解IOCP也可以直接跳過看後面的C#測試示範:

 

整理者:鄭昀@UltraPower

 

我們採用的是I/O Complete Port(以下簡稱IOCP)處理機制。

簡單的講,當服務應用程序初始化時,它應該先創建一個I/O CP。我們在請求到來後,將得到的數據打包用PostQueuedCompletionStatus發送到IOCP中。這時需要創建一些個線程(7個線程/每CPU,再多就沒有意義了)來處理發送到IOCP端口的消息。實現步驟大致如下:

1     先在主線程中調用CreateIoCompletionPort創建IOCP。

CreateIoCompletionPort的前三個參數只在把設備同Complete Port相關聯時才有用。

此時我們只需傳遞INVALID_HANDLE_VALUE,NULL和0即可。

第四個參數告訴端口同時能運行的最多線程數,這裡設置為0,表示默認為當前計算機的CPU數目。

2     我們的ThreadFun線程函數執行一些初始化之後,將進入一個循環,該循環會在服務進程終止時才結束。

在循環中,調用GetQueuedCompletionStatus,這樣就把當前線程的ID放入一個等待線程隊列中,I/O CP內核對象就總能知道哪個線程在等待處理完成的I/O請求。

如果在IDLE_THREAD_TIMEOUT規定的時間內I/O CP上還沒有出現一個Completion Packet,則轉入下一次循環。在這裡我們設置的IDLE_THREAD_TIMEOUT為1秒。

 

當端口的I/O完成隊列中出現一項時,完成端口就喚醒等待線程隊列中的這個線程,該線程將得到完成的I/O項中的信息:       傳輸的字節數、完成鍵和OVERLAPPED結構的地址。

 

在我們的程序中可以用智能指針或者BSTR或者int來接受這個OVERLAPPED結構的地址的值,從而得到消息;然後在這個線程中處理消息。

GetQueuedCompletionStatus的第一個參數hCompletionPort指出了要監視哪一個端口,這裡我們傳送先前從CreateIoCompletionPort返回的端口句柄。

 

需要注意的是:

第一,   線程池的數目是有限制的,和CPU數目有關係。

第二,   IOCP是一種較為完美的睡眠/喚醒 線程機制;線程當前沒有任務要處理時,就進入睡眠狀態,從而不佔用CPU資源,直到被內核喚醒;

第三,   最近一次剛執行完的線程,下次任務來的時候還會喚醒它;所以有可能比較少被調用的線程以後被調用的幾率也少。

 

測試代碼:

using System;
using System.Threading;  // Included for the Thread.Sleep call
using Continuum.Threading;
using System.Runtime.InteropServices;

namespace IOCPDemo
{
    //=============================================================================
    /**//// <summary> Sample class for the threading class </summary>
    public class UtilThreadingSample
    {
        //*****************************************************************************   
        /**//// <summary> Test Method </summary>
        static void Main()
        {
            // Create the MSSQL IOCP Thread Pool
            IOCPThreadPool pThreadPool = new IOCPThreadPool(0, 10, 20, new IOCPThreadPool.USER_FUNCTION(IOCPThreadFunction));
      
            //for(int i =1;i<10000;i++)
            {
                pThreadPool.PostEvent(1234);
            }
      
            Thread.Sleep(100);
      
            pThreadPool.Dispose();
        }
    
        //********************************************************************
        /**//// <summary> Function to be called by the IOCP thread pool.  Called when
        ///           a command is posted for processing by the SocketManager </summary>
        /// <param name="iValue"> The value provided by the thread posting the event </param>
        static public void IOCPThreadFunction(int iValue)
        {
            try
            {
                Console.WriteLine("Value: {0}", iValue.ToString());
                Thread.Sleep(3000);
            }
      
            catch (Exception pException)
            {
                Console.WriteLine(pException.Message);
            }
        }
    }

}


類代碼:
using System;
using System.Threading;
using System.Runtime.InteropServices;

namespace IOCPThreading
{
    [StructLayout(LayoutKind.Sequential, CharSet=CharSet.Auto)]

    public sealed class IOCPThreadPool
    {
        [DllImport("Kernel32", CharSet=CharSet.Auto)]
        private unsafe static extern UInt32 CreateIoCompletionPort(UInt32 hFile, UInt32 hExistingCompletionPort, UInt32* puiCompletionKey, UInt32 uiNumberOfConcurrentThreads);

        [DllImport("Kernel32", CharSet=CharSet.Auto)]
        private unsafe static extern Boolean CloseHandle(UInt32 hObject);

        [DllImport("Kernel32", CharSet=CharSet.Auto)]
        private unsafe static extern Boolean PostQueuedCompletionStatus(UInt32 hCompletionPort, UInt32 uiSizeOfArgument, UInt32* puiUserArg, System.Threading.NativeOverlapped* pOverlapped);

        [DllImport("Kernel32", CharSet=CharSet.Auto)]
        private unsafe static extern Boolean GetQueuedCompletionStatus(UInt32 hCompletionPort, UInt32* pSizeOfArgument, UInt32* puiUserArg, System.Threading.NativeOverlapped** ppOverlapped, UInt32 uiMilliseconds);

        private const UInt32 INVALID_HANDLE_VALUE = 0xffffffff;
        private const UInt32 INIFINITE = 0xffffffff;
        private const Int32 SHUTDOWN_IOCPTHREAD = 0x7fffffff;
        public delegate void USER_FUNCTION(int iValue);
        private UInt32 m_hHandle;
        private UInt32 GetHandle { get { return m_hHandle; } set { m_hHandle = value; } }

        private Int32 m_uiMaxConcurrency;

        private Int32 GetMaxConcurrency { get { return m_uiMaxConcurrency; } set { m_uiMaxConcurrency = value; } }


        private Int32 m_iMinThreadsInPool;

        private Int32 GetMinThreadsInPool { get { return m_iMinThreadsInPool; } set { m_iMinThreadsInPool = value; } }

        private Int32 m_iMaxThreadsInPool;

        private Int32 GetMaxThreadsInPool { get { return m_iMaxThreadsInPool; } set { m_iMaxThreadsInPool = value; } }


        private Object m_pCriticalSection;

        private Object GetCriticalSection { get { return m_pCriticalSection; } set { m_pCriticalSection = value; } }


        private USER_FUNCTION m_pfnUserFunction;

        private USER_FUNCTION GetUserFunction { get { return m_pfnUserFunction; } set { m_pfnUserFunction = value; } }


        private Boolean m_bDisposeFlag;

        /**//// <summary> SimType: Flag to indicate if the class is disposing </summary>

        private Boolean IsDisposed { get { return m_bDisposeFlag; } set { m_bDisposeFlag = value; } }

        private Int32 m_iCurThreadsInPool;

        /**//// <summary> SimType: The current number of threads in the thread pool </summary>

        public Int32 GetCurThreadsInPool { get { return m_iCurThreadsInPool; } set { m_iCurThreadsInPool = value; } }

        /**//// <summary> SimType: Increment current number of threads in the thread pool </summary>

        private Int32 IncCurThreadsInPool() { return Interlocked.Increment(ref m_iCurThreadsInPool); }

        /**//// <summary> SimType: Decrement current number of threads in the thread pool </summary>

        private Int32 DecCurThreadsInPool() { return Interlocked.Decrement(ref m_iCurThreadsInPool); }


        private Int32 m_iActThreadsInPool;

        /**//// <summary> SimType: The current number of active threads in the thread pool </summary>

        public Int32 GetActThreadsInPool { get { return m_iActThreadsInPool; } set { m_iActThreadsInPool = value; } }

        /**//// <summary> SimType: Increment current number of active threads in the thread pool </summary>

        private Int32 IncActThreadsInPool() { return Interlocked.Increment(ref m_iActThreadsInPool); }

        /**//// <summary> SimType: Decrement current number of active threads in the thread pool </summary>

        private Int32 DecActThreadsInPool() { return Interlocked.Decrement(ref m_iActThreadsInPool); }


        private Int32 m_iCurWorkInPool;

        /**//// <summary> SimType: The current number of Work posted in the thread pool </summary>

        public Int32 GetCurWorkInPool { get { return m_iCurWorkInPool; } set { m_iCurWorkInPool = value; } }

        /**//// <summary> SimType: Increment current number of Work posted in the thread pool </summary>

        private Int32 IncCurWorkInPool() { return Interlocked.Increment(ref m_iCurWorkInPool); }

        /**//// <summary> SimType: Decrement current number of Work posted in the thread pool </summary>

        private Int32 DecCurWorkInPool() { return Interlocked.Decrement(ref m_iCurWorkInPool); }

        public IOCPThreadPool(Int32 iMaxConcurrency, Int32 iMinThreadsInPool, Int32 iMaxThreadsInPool, USER_FUNCTION pfnUserFunction)
        {
            try
            {
                // Set initial class state

                GetMaxConcurrency   = iMaxConcurrency;

                GetMinThreadsInPool = iMinThreadsInPool;

                GetMaxThreadsInPool = iMaxThreadsInPool;

                GetUserFunction     = pfnUserFunction;


                // Init the thread counters

                GetCurThreadsInPool = 0;

                GetActThreadsInPool = 0;

                GetCurWorkInPool    = 0;


                // Initialize the Monitor Object

                GetCriticalSection = new Object();


                // Set the disposing flag to false

                IsDisposed = false;


                unsafe
                {

                    // Create an IO Completion Port for Thread Pool use
                    GetHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, null, (UInt32) GetMaxConcurrency);

                }


                // Test to make sure the IO Completion Port was created

                if (GetHandle == 0)

                    throw new Exception("Unable To Create IO Completion Port");


                // Allocate and start the Minimum number of threads specified

                Int32 iStartingCount = GetCurThreadsInPool;

        

                ThreadStart tsThread = new ThreadStart(IOCPFunction);

                for (Int32 iThread = 0; iThread < GetMinThreadsInPool; ++iThread)
                {

                    // Create a thread and start it

                    Thread thThread = new Thread(tsThread);

                    thThread.Name = "IOCP " + thThread.GetHashCode();

                    thThread.Start();


                    // Increment the thread pool count

                    IncCurThreadsInPool();

                }

            }


            catch
            {

                throw new Exception("Unhandled Exception");

            }

        }

        ~IOCPThreadPool()
        {

            if (!IsDisposed)

                Dispose();

        }

        public void Dispose()
        {

            try
            {

                // Flag that we are disposing this object

                IsDisposed = true;


                // Get the current number of threads in the pool

                Int32 iCurThreadsInPool = GetCurThreadsInPool;


                // Shutdown all thread in the pool

                for (Int32 iThread = 0; iThread < iCurThreadsInPool; ++iThread)
                {
                    unsafe
                    {

                        bool bret = PostQueuedCompletionStatus(GetHandle, 4, (UInt32*) SHUTDOWN_IOCPTHREAD, null);

                    }

                }


                // Wait here until all the threads are gone

                while (GetCurThreadsInPool != 0) Thread.Sleep(100);


                unsafe
                {

                    // Close the IOCP Handle
                    CloseHandle(GetHandle);

                }

            }

            catch
            {

            }

        }
        private void IOCPFunction()
        {
            UInt32 uiNumberOfBytes;

            Int32  iValue;

            try
            {
                while (true)
                {

                    unsafe
                    {

                        System.Threading.NativeOverlapped* pOv;


                        // Wait for an event

                        GetQueuedCompletionStatus(GetHandle, &uiNumberOfBytes, (UInt32*) &iValue, &pOv, INIFINITE);
                    }

                    // Decrement the number of events in queue

                    DecCurWorkInPool();


                    // Was this thread told to shutdown

                    if (iValue == SHUTDOWN_IOCPTHREAD)

                        break;


                    // Increment the number of active threads

                    IncActThreadsInPool();


                    try
                    {
                        // Call the user function
                        GetUserFunction(iValue);

                    }

                    catch(Exception ex)
                    {
                        throw ex;
                    }


                    // Get a lock

                    Monitor.Enter(GetCriticalSection);


                    try
                    {

                        // If we have less than max threads currently in the pool

                        if (GetCurThreadsInPool < GetMaxThreadsInPool)
                        {

                            // Should we add a new thread to the pool

                            if (GetActThreadsInPool == GetCurThreadsInPool)
                            {

                                if (IsDisposed == false)
                                {

                                    // Create a thread and start it

                                    ThreadStart tsThread = new ThreadStart(IOCPFunction);

                                    Thread thThread = new Thread(tsThread);

                                    thThread.Name = "IOCP " + thThread.GetHashCode();

                                    thThread.Start();


                                    // Increment the thread pool count

                                    IncCurThreadsInPool();

                                }

                            }

                        }

                    }