multithreading - TThreadedQueue access violation with multiple consumers -
couple of days ago user (@victoria) gave me example how send email multiple recipients, putting each message (idmessage
) in queue, , send in next several threads.
she said example going present trouble when more 1 thread stays in access queue.
she had here reasons. right. in tests happened, access violation when used more 1 thread (without resort sleep()
function).
i have read possible bug in ttreadedqueue
class, can seen here, did not understand thing.
my question: - example below fixable or ttreadedqueue
have supposed bug since creation?
uses system.types, generics.collections, idmessage; type tthreaditem = class; tthreadlist = tobjectlist<tthreaditem>; tmessageitem = tidmessage; tmessagequeue = tthreadedqueue<tmessageitem>; tthreadpool = class private fqueue: tmessagequeue; fthreads: tthreadlist; public constructor create(count: integer); destructor destroy; override; procedure shutdown; property queue: tmessagequeue read fqueue; end; tthreaditem = class(tthread) private fqueue: tmessagequeue; protected procedure execute; override; public constructor create(queue: tmessagequeue); reintroduce; end; implementation { tthreadpool } constructor tthreadpool.create(count: integer); var i: integer; thread: tthreaditem; begin inherited create; { create thread queue wait push , pop of items infinite time; that's useful thread sleeping } fqueue := tmessagequeue.create; fthreads := tthreadlist.create; := 0 count-1 begin thread := tthreaditem.create(fqueue); fthreads.add(thread); end; end; destructor tthreadpool.destroy; begin shutdown; fthreads.free; fqueue.free; inherited; end; procedure tthreadpool.shutdown; var thread: tthreaditem; message: tmessageitem; begin { signal threads termination } thread in fthreads thread.terminate; { shutdown queue "unlock" sleeping threads } fqueue.doshutdown; { free unprocessed enqueued message items } message := fqueue.popitem; while assigned(message) begin message.free; message := fqueue.popitem; end; end; { tthreaditem } constructor tthreaditem.create(queue: tmessagequeue); begin inherited create; fqueue := queue; end; procedure tthreaditem.execute; var message: tmessageitem; begin { <- create , setup indy sending object here } try while not terminated { here we'll wait infinite time item or until queue shutted down; should consider checking error state } if fqueue.popitem(message) = wrsignaled try { <- send message through indy sending object here } message.free; end; { <- destroy indy sending object here } end; end; //////////////////////////////////////////////////////////////////////////////////////////////////////////// { possible usage: } pool := tthreadpool.create(2); { <- create 2 threads } := 0 99 begin message := tmessageitem.create(nil); message.subject := 'message subject'; ... pool.queue.pushitem(message); end;
Comments
Post a Comment