Mail Archives: cygwin/2001/11/13/09:43:11
--0-1236313688-1005662374=:69172
Content-Type: text/plain; charset=us-ascii
Content-Disposition: inline
Thanks for the help inre: getting a linkable pthreads library. I have
1.3.5 installed and can link (using pthreads, pthread_cond, and pthread_mutex
calls). However, I can't get pthread_create to call back into my startup
function...
I'm writing a QueueProcessor (producer-consumer pattern) that accepts
implementations of a Task interface. The QueueProcessor is essentially
a thread pulling Tasks off an STL list + the cond/mutex code to handle the
wait/notify inter-thread communication.
Attached is the source for the QueueProcessor/Task and a test driver. I've
also attached the output to illustrate the behavior. Is there something I'm
doing wrong in the pthread_create call? Notice that none of the messages in
the static driveQueue() call appear, nor anything from
QueueProcessor::processTasks(). The cond/mutex code
QueueProcessor::appendTask() and ::processTasks() is purely conjecture, as I
have no experience w/ POSIX thread wait/notify patterns (too used to the more
OO Java approach).
Ideas? Any good online pthreads references/faq's that you'd recommend?
regards,
Evan
***
Here's the output:
------------------
QueueProcessor sucessfully constructed.
--> pthread_create()
<-- pthread_create()
QueueProcessor started successfully.
--> joining against the queue thread...
<-- join() returned... This shouldn't happen!!!!
Appended Task[TestTask]
Appended Task[TestTask]
Appended Task[TestTask]
Appended Task[TestTask]
Appended Task[TestTask]
Appended Task[TestTask]
Appended Task[TestTask]
Appended Task[TestTask]
Appended Task[TestTask]
Appended Task[TestTask]
QueueProcessor shut down successfully.
--- Robert Collins <robert DOT collins AT itdomain DOT com DOT au> wrote:
> From: "Robert Collins" <robert DOT collins AT itdomain DOT com DOT au>
> To: "Evan Pollan" <evan_pollan AT yahoo DOT com>,
> "Gerrit P. Haase" <cygwin AT cygwin DOT com>
> Subject: Re: Cygwin && C++ POSIX threads support?
> Date: Sun, 11 Nov 2001 15:15:22 +1100
>
> ----- Original Message -----
> From: "Evan Pollan" <evan_pollan AT yahoo DOT com>
> To: "Gerrit P. Haase" <cygwin AT cygwin DOT com>
> Sent: Sunday, November 11, 2001 3:12 PM
> Subject: Re: Cygwin && C++ POSIX threads support?
>
>
> > Great -- I'll pick up the kit tommorow over a more suitable
> > connection. What's the preferred threading library? POSIX? Or
> > something a little more tuned to the Win32 env?
> >
>
> Just link with no explicit library - the pthreads functions are in libc.
>
> Rob
>
__________________________________________________
Do You Yahoo!?
Find the one for you at Yahoo! Personals
http://personals.yahoo.com
--0-1236313688-1005662374=:69172
Content-Type: text/plain; name="QueueProcessor.h"
Content-Description: QueueProcessor.h
Content-Disposition: inline; filename="QueueProcessor.h"
#include <iostream>
#include <list>
#include <pthread.h>
#define ProcessingException 37
//-----------------------------------------------------------------------
// C-style call to drive the QueueProcessor
//-----------------------------------------------------------------------
extern "C" void* driveQueue (void*);
//-----------------------------------------------------------------------
// Task
//-----------------------------------------------------------------------
class Task {
public:
Task () {};
virtual void execute () throw (int) = 0;
virtual char* getDescription () {
return "BaseTask";
};
friend ostream& operator<< (ostream& stream, Task* t) {
stream << "Task[" << t->getDescription() << "]";
return stream;
};
};
//-----------------------------------------------------------------------
// QueueProcessor
//-----------------------------------------------------------------------
class QueueProcessor {
private:
bool _stopped;
pthread_cond_t _condition;
pthread_mutex_t _mutex;
pthread_t _thread;
list<Task*>* _queue;
public:
QueueProcessor ();
~QueueProcessor ();
void appendTask (Task*);
void start ();
void stop ();
void processTasks ();
};
--0-1236313688-1005662374=:69172
Content-Type: text/plain; name="QueueProcessor.cpp"
Content-Description: QueueProcessor.cpp
Content-Disposition: inline; filename="QueueProcessor.cpp"
#include "QueueProcessor.h"
//-----------------------------------------------------------------------
// C-style call to drive the QueueProcessor
//-----------------------------------------------------------------------
void* execute(void* args) {
cout << "> Call into driveQueue()\n";
QueueProcessor* p = (QueueProcessor*)args;
p->processTasks();
cout << "< QueueProcessor::processTasks() returned.\n";
return NULL;
}
//-----------------------------------------------------------------------
// QueueProcessor
//-----------------------------------------------------------------------
QueueProcessor::QueueProcessor () {
pthread_cond_init(&_condition, NULL);
pthread_mutex_init(&_mutex, NULL);
_stopped = false;
_queue = new list<Task*>();
cout << "QueueProcessor sucessfully constructed.\n";
}
QueueProcessor::~QueueProcessor () {
pthread_cond_destroy(&_condition);
pthread_mutex_destroy(&_mutex);
delete _queue;
cout << "QueueProcessor sucessfully destroyed.\n";
}
void QueueProcessor::appendTask (Task* t) {
if (!_stopped) {
pthread_mutex_lock(&_mutex);
_queue->push_back(t);
pthread_cond_signal(&_condition);
pthread_mutex_unlock(&_mutex);
cout << "Appended "<< t << "\n";
} else {
cout << "WARNING: " << t << " added after the QueueProcessor was ";
cout << "stopped.\n";
}
}
void QueueProcessor::start () {
cout << " --> pthread_create()\n";
int status = pthread_create(&_thread, NULL, execute, NULL);
cout << " <-- pthread_create()\n";
if (status != 0) {
cout << "ERROR: Thread creation failed (status=" << status << ").\n";
} else {
cout << "QueueProcessor started successfully.\n";
}
void* threadExitStatus;
cout << " --> joining against the queue thread...\n";
pthread_join(&_thread, &threadExitStatus);
cout << " <-- join() returned... This shouldn't happen!!!!\n";
}
void QueueProcessor::stop () {
pthread_mutex_lock(&_mutex);
_stopped = true;
pthread_cond_signal(&_condition);
pthread_mutex_unlock(&_mutex);
void* threadExitStatus;
pthread_join(&_thread, &threadExitStatus);
cout << "QueueProcessor shut down successfully.\n";
}
void QueueProcessor::processTasks () {
cout << "--> processTasks()\n";
Task* t;
while (!_stopped) {
cout << "----> !_stopped\n";
pthread_mutex_lock(&_mutex);
while (_queue->size() == 0 && !_stopped) {
pthread_cond_wait(&_condition, &_mutex);
}
pthread_mutex_unlock(&_mutex);
if (_queue->size()>0 && !_stopped) {
t=_queue->front();
_queue->pop_front();
try {
t->execute();
} catch (int i) {
if (i == ProcessingException) {
cout << "Couldn't process "<< t << "\n";
} else {
throw i;
}
}
}
}
cout << "<-- processTasks()\n";
}
--0-1236313688-1005662374=:69172
Content-Type: text/plain; name="Driver.cpp"
Content-Description: Driver.cpp
Content-Disposition: inline; filename="Driver.cpp"
#include "QueueProcessor.h"
class TestTask: public Task {
private:
int _size;
public:
TestTask (int i) {
_size = i;
}
char* getDescription () {
return "TestTask";
}
void execute () {
cout << "Executing TestTask(" << _size << ")\n";
}
};
int main (int, char*[]) {
// Create and startup the QueueProcessor
QueueProcessor* p = new QueueProcessor();
p->start();
// Add the tasks, allowing them to be processed asynchronously by the
// QueueProcessor
Task* t;
for (int i=0; i<10; i++) {
t = new TestTask(i);
p->appendTask(t);
}
// Shutdown the QueueProcessor
p->stop();
}
--0-1236313688-1005662374=:69172
Content-Type: text/plain; charset=us-ascii
--
Unsubscribe info: http://cygwin.com/ml/#unsubscribe-simple
Bug reporting: http://cygwin.com/bugs.html
Documentation: http://cygwin.com/docs.html
FAQ: http://cygwin.com/faq/
--0-1236313688-1005662374=:69172--
- Raw text -