2010年9月24日星期五

C# Producer-Consumer Threading

由於有大量資料要從Database query出來, 但又不想使User Interface 停下來, 在Stackoverflow得到幫助, 用Producer-Consumer model 來解決

流程是: User Interface 需要query資料 >> producer 就produce一個delegate(就是c++ 的function pointer) >> 一大堆的query(其實是fucntion), 就由consumer來執行 >> 等所有有關的query都做完, 就invoke main thread(interface的thread) 的有關control refresh一下, 讓它顯示query了回來的資料

e.g. 有一個label要顯示用戶名稱

labelA.Text = queryUserName(userId, labelA);

public static string queryUserName(uint id, Control control)
{
    QueryFunction threadStart = delegate { StaticRes.queryUserNameThread(id, control); };
    Program.g_QueryThread.Enqueue(threadStart);
    return "Querying User Name...";
}

StaticRes.queryUserNameThread 就是雖要時間較長, 用來query資料的function

以下是class QueryThread的implementation:

public delegate void QueryFunction();

/// <summary>
/// Act as a consumer to the queries produced by the DataGridViewCustomCell
/// </summary>
public class QueryThread
{
    private struct QueueItem
    {
        public Delegate _target;
        public UInt64 _id;

        public QueueItem(Delegate target, UInt64 id)
        {
            _target = target;
            _id = id;
        }
    }
    private Object _syncEvents = new Object();
    private Queue<QueueItem> _queryQueue = new Queue<QueueItem>();
    private EventWaitHandle _waitHandle = new EventWaitHandle(false, EventResetMode.AutoReset);

    Producer queryProducer;
    Consumer queryConsumer;

    public QueryThread()
    {
        queryProducer = new Producer(_queryQueue, _syncEvents, _waitHandle);
        queryConsumer = new Consumer(_queryQueue, _syncEvents);

        Thread producerThread = new Thread(queryProducer.ThreadRun);
        Thread consumerThread = new Thread(queryConsumer.ThreadRun);

        producerThread.IsBackground = true;
        consumerThread.IsBackground = true;

        producerThread.Start();
        consumerThread.Start();
    }

    public void Enqueue(Delegate item, UInt64 id)
    {
        QueueItem queueItem = new QueueItem(item, id);
        _queryQueue.Enqueue(queueItem);

        _waitHandle.Set();
    }
}

按下來就是producer 跟consumer thread
producer主要是fire event 使consumer thread 開始digest那些query
consumer就是執行enqueue了的query function

class Producer
{
    private readonly Queue<QueueItem> _queue;
    private Object _sync;
    private EventWaitHandle _handle;

    public Producer(Queue<QueueItem> q, Object sync, EventWaitHandle waitHandle)
    {
        _queue = q;
        _sync = sync;
        _handle = waitHandle;
    }
    
    public void ThreadRun()
    {
        lock (_sync)
        {
            while (true)
            {
                //wait until item is enqueued
                _handle.WaitOne();

                //enqueued, tell worker thread
                if (_queue.Count > 0)
                {
                    Monitor.Pulse(_sync);
                    Monitor.Wait(_sync,0);
                 }
             }
        }
    }
}

class Consumer
{
    private readonly Queue<QueueItem> _queue;
    private Object _sync;

    public Consumer(Queue<QueueItem> q, Object sync)
    {
        _queue = q;
        _sync = sync;
    }

    public void ThreadRun()
    {
        lock (_sync)
        {
            Delegate query;
            while (true)
            {
                while (_queue.Count == 0)
                {
                    if (Program.g_CustomDialog.Visible == true)
                    {
                        Program.g_CustomDialog.DialogResult = DialogResult.OK;
                    }

                    Monitor.Wait(_sync);

                }

                QueueItem item = _queue.Dequeue();
                query = item._target;
                query.DynamicInvoke(null);
            }
        }
    }
}

後來有人在stackoverflow上說 lock後的 while(true) loop 會容易引致deadlock
由於我的threading implementation比較簡單, 就不作改動了, 下次記緊就好

msdn上對c# threading的教學: How to: Synchronize a Producer and a Consumer Thread
但不要跟他的implementation, 看一看這裡

沒有留言:

發佈留言