Commit b6681af9 authored by Thomas Grill's avatar Thomas Grill
Browse files

template fixes to threaded code

parent 855f6049
......@@ -106,16 +106,18 @@ public:
}
};
template<typename T=void>
FLEXT_TEMPLATE
struct ThrRegistry
{
static ThrFinder< PooledLifo<thr_entry,1,10> > pending;
static ThrFinder< TypedLifo<thr_entry> > active,stopped;
typedef ThrFinder< PooledLifo<thr_entry,1,10> > RegPooledLifo;
typedef ThrFinder< TypedLifo<thr_entry> > RegFinder;
static RegPooledLifo pending;
static RegFinder active,stopped;
};
template<typename T> ThrFinder< PooledLifo<thr_entry,1,10> > ThrRegistry<T>::pending;
template<typename T> ThrFinder< TypedLifo<thr_entry> > ThrRegistry<T>::active;
template<typename T> ThrFinder< TypedLifo<thr_entry> > ThrRegistry<T>::stopped;
FLEXT_TEMPIMPL(FLEXT_TEMPINST(ThrRegistry)::RegPooledLifo ThrRegistry)::pending;
FLEXT_TEMPIMPL(FLEXT_TEMPINST(ThrRegistry)::RegFinder ThrRegistry)::active;
FLEXT_TEMPIMPL(FLEXT_TEMPINST(ThrRegistry)::RegFinder ThrRegistry)::stopped;
class ThrId
......@@ -134,7 +136,7 @@ public:
}
};
template<typename T=void>
FLEXT_TEMPLATE
struct ThrVars {
// this should _definitely_ be a hashmap....
// \TODO above all it should be populated immediately, otherwise it could easily happen
......@@ -150,12 +152,12 @@ struct ThrVars {
static bool initialized;
};
template<typename T> std::set<ThrId> ThrVars<T>::regthreads;
template<typename T> flext::ThrMutex *ThrVars<T>::thrregmtx = NULL;
template<typename T> flext::ThrCond *ThrVars<T>::thrhelpcond = NULL;
template<typename T> bool ThrVars<T>::initialized = false;
FLEXT_TEMPIMPL(std::set<ThrId> ThrVars)::regthreads;
FLEXT_TEMPIMPL(flext::ThrMutex *ThrVars)::thrregmtx = NULL;
FLEXT_TEMPIMPL(flext::ThrCond *ThrVars)::thrhelpcond = NULL;
FLEXT_TEMPIMPL(bool ThrVars)::initialized = false;
static void LaunchHelper(thr_entry *e)
FLEXT_TEMPLATE void LaunchHelper(thr_entry *e)
{
e->thrid = flext::GetThreadId();
flext::RegisterThread(e->thrid);
......@@ -167,9 +169,9 @@ static void LaunchHelper(thr_entry *e)
FLEXT_TEMPIMPL(bool FLEXT_CLASSDEF(flext))::StartHelper()
{
bool ok = false;
ThrVars<>::initialized = false;
FLEXT_TEMPINST(ThrVars)::initialized = false;
ThrVars<>::thrregmtx = new ThrMutex;
FLEXT_TEMPINST(ThrVars)::thrregmtx = new ThrMutex;
#if FLEXT_THREADS == FLEXT_THR_POSIX
pthread_attr_t attr;
......@@ -195,7 +197,7 @@ FLEXT_TEMPIMPL(bool FLEXT_CLASSDEF(flext))::StartHelper()
error("flext - Could not launch helper thread!");
else {
// now we have to wait for thread helper to initialize
while(!ThrVars<>::initialized) Sleep(0.001);
while(!FLEXT_TEMPINST(ThrVars)::initialized) Sleep(0.001);
// we are ready for threading now!
}
......@@ -222,37 +224,37 @@ FLEXT_TEMPIMPL(void FLEXT_CLASSDEF(flext))::ThrHelper(void *)
// so thread construction won't disturb real-time audio
RelPriority(-1);
ThrVars<>::thrhelpcond = new ThrCond;
FLEXT_TEMPINST(ThrVars)::thrhelpcond = new ThrCond;
ThrVars<>::initialized = true;
FLEXT_TEMPINST(ThrVars)::initialized = true;
// helper loop
for(;;) {
ThrVars<>::thrhelpcond->Wait();
FLEXT_TEMPINST(ThrVars)::thrhelpcond->Wait();
// start all inactive threads
thr_entry *ti;
while((ti = ThrRegistry<>::pending.Pop()) != NULL) {
while((ti = FLEXT_TEMPINST(ThrRegistry)::pending.Pop()) != NULL) {
bool ok;
#if FLEXT_THREADS == FLEXT_THR_POSIX
thrid_t dummy;
ok = pthread_create (&dummy,&attr,(void *(*)(void *))LaunchHelper,ti) == 0;
ok = pthread_create (&dummy,&attr,(void *(*)(void *))FLEXT_TEMPINST(LaunchHelper),ti) == 0;
#elif FLEXT_THREADS == FLEXT_THR_MP
thrid_t dummy;
ok = MPCreateTask((TaskProc)LaunchHelper,ti,0,0,0,0,0,&dummy) == noErr;
ok = MPCreateTask((TaskProc)FLEXT_TEMPINST(LaunchHelper),ti,0,0,0,0,0,&dummy) == noErr;
#elif FLEXT_THREADS == FLEXT_THR_WIN32
ok = _beginthread((void (*)(void *))LaunchHelper,0,ti) >= 0;
ok = _beginthread((void (*)(void *))FLEXT_TEMPINST(LaunchHelper),0,ti) >= 0;
#else
#error
#endif
if(!ok) {
error("flext - Could not launch thread!");
ThrRegistry<>::pending.Free(ti); ti = NULL;
FLEXT_TEMPINST(ThrRegistry)::pending.Free(ti); ti = NULL;
}
else
// insert into queue of active threads
ThrRegistry<>::active.Push(ti);
FLEXT_TEMPINST(ThrRegistry)::active.Push(ti);
}
}
......@@ -272,19 +274,19 @@ FLEXT_TEMPIMPL(void FLEXT_CLASSDEF(flext))::ThrHelper(void *)
FLEXT_TEMPIMPL(bool FLEXT_CLASSDEF(flext))::LaunchThread(void (*meth)(thr_params *p),thr_params *p)
{
FLEXT_ASSERT(ThrVars<>::thrhelpcond);
FLEXT_ASSERT(FLEXT_TEMPINST(ThrVars)::thrhelpcond);
// make an entry into thread list
thr_entry *e = ThrRegistry<>::pending.New();
thr_entry *e = FLEXT_TEMPINST(ThrRegistry)::pending.New();
e->Set(meth,p);
ThrRegistry<>::pending.Push(e);
FLEXT_TEMPINST(ThrRegistry)::pending.Push(e);
// signal thread helper
ThrVars<>::thrhelpcond->Signal();
FLEXT_TEMPINST(ThrVars)::thrhelpcond->Signal();
return true;
}
template<typename=void> bool waitforstopped(TypedLifo<thr_entry> &qufnd,float wait = 0)
FLEXT_TEMPLATE bool waitforstopped(TypedLifo<thr_entry> &qufnd,float wait = 0)
{
TypedLifo<thr_entry> qutmp;
......@@ -297,9 +299,9 @@ template<typename=void> bool waitforstopped(TypedLifo<thr_entry> &qufnd,float wa
thr_entry *ti;
// search for entry
while((ti = ThrRegistry<>::stopped.Pop()) != NULL && ti != fnd) qutmp.Push(ti);
while((ti = FLEXT_TEMPINST(ThrRegistry)::stopped.Pop()) != NULL && ti != fnd) qutmp.Push(ti);
// put back entries
while((ti = qutmp.Pop()) != NULL) ThrRegistry<>::stopped.Push(ti);
while((ti = qutmp.Pop()) != NULL) FLEXT_TEMPINST(ThrRegistry)::stopped.Push(ti);
if(ti) {
// still in ThrRegistry::stopped queue
......@@ -317,7 +319,7 @@ template<typename=void> bool waitforstopped(TypedLifo<thr_entry> &qufnd,float wa
FLEXT_TEMPIMPL(bool FLEXT_CLASSDEF(flext))::StopThread(void (*meth)(thr_params *p),thr_params *p,bool wait)
{
FLEXT_ASSERT(ThrVars<>::thrhelpcond);
FLEXT_ASSERT(FLEXT_TEMPINST(ThrVars)::thrhelpcond);
TypedLifo<thr_entry> qutmp;
thr_entry *ti;
......@@ -327,17 +329,17 @@ FLEXT_TEMPIMPL(bool FLEXT_CLASSDEF(flext))::StopThread(void (*meth)(thr_params *
{
bool found = false;
while((ti = ThrRegistry<>::pending.Pop()) != NULL)
while((ti = FLEXT_TEMPINST(ThrRegistry)::pending.Pop()) != NULL)
if(ti->meth == meth && ti->params == p) {
// found -> thread hasn't started -> just delete
ThrRegistry<>::pending.Free(ti);
FLEXT_TEMPINST(ThrRegistry)::pending.Free(ti);
found = true;
}
else
qutmp.Push(ti);
// put back into pending queue (order doesn't matter)
while((ti = qutmp.Pop()) != NULL) ThrRegistry<>::pending.Push(ti);
while((ti = qutmp.Pop()) != NULL) FLEXT_TEMPINST(ThrRegistry)::pending.Push(ti);
if(found) return true;
}
......@@ -347,31 +349,31 @@ FLEXT_TEMPIMPL(bool FLEXT_CLASSDEF(flext))::StopThread(void (*meth)(thr_params *
TypedLifo<thr_entry> qufnd;
while((ti = ThrRegistry<>::active.Pop()) != NULL)
while((ti = FLEXT_TEMPINST(ThrRegistry)::active.Pop()) != NULL)
if(ti->meth == meth && ti->params == p) {
ThrRegistry<>::stopped.Push(ti);
ThrVars<>::thrhelpcond->Signal();
FLEXT_TEMPINST(ThrRegistry)::stopped.Push(ti);
FLEXT_TEMPINST(ThrVars)::thrhelpcond->Signal();
qufnd.Push(ti);
}
else
qutmp.Push(ti);
// put back into pending queue (order doesn't matter)
while((ti = qutmp.Pop()) != NULL) ThrRegistry<>::active.Push(ti);
while((ti = qutmp.Pop()) != NULL) FLEXT_TEMPINST(ThrRegistry)::active.Push(ti);
// wakeup helper thread
ThrVars<>::thrhelpcond->Signal();
FLEXT_TEMPINST(ThrVars)::thrhelpcond->Signal();
// now wait for entries in qufnd to have vanished from ThrRegistry::stopped
if(wait)
return waitforstopped(qufnd);
return FLEXT_TEMPINST(waitforstopped)(qufnd);
else
return !qufnd.Avail();
}
FLEXT_TEMPIMPL(bool FLEXT_CLASSDEF(flext))::ShouldExit()
{
return ThrRegistry<>::stopped.Find(GetThreadId()) != NULL;
return FLEXT_TEMPINST(ThrRegistry)::stopped.Find(GetThreadId()) != NULL;
}
FLEXT_TEMPIMPL(bool FLEXT_CLASSDEF(flext))::PushThread()
......@@ -386,11 +388,11 @@ FLEXT_TEMPIMPL(void FLEXT_CLASSDEF(flext))::PopThread()
{
thrid_t id = GetThreadId();
UnregisterThread(id);
thr_entry *fnd = ThrRegistry<>::stopped.Find(id,true);
if(!fnd) fnd = ThrRegistry<>::active.Find(id,true);
thr_entry *fnd = FLEXT_TEMPINST(ThrRegistry)::stopped.Find(id,true);
if(!fnd) fnd = FLEXT_TEMPINST(ThrRegistry)::active.Find(id,true);
if(fnd)
ThrRegistry<>::pending.Free(fnd);
FLEXT_TEMPINST(ThrRegistry)::pending.Free(fnd);
#ifdef FLEXT_DEBUG
else
post("flext - INTERNAL ERROR: Thread not found!");
......@@ -400,10 +402,10 @@ FLEXT_TEMPIMPL(void FLEXT_CLASSDEF(flext))::PopThread()
FLEXT_TEMPIMPL(void FLEXT_CLASSDEF(flext))::RegisterThread(thrid_t id)
{
#if 1
FLEXT_ASSERT(ThrVars<>::thrregmtx);
ThrVars<>::thrregmtx->Lock();
ThrVars<>::regthreads.insert(id);
ThrVars<>::thrregmtx->Unlock();
FLEXT_ASSERT(FLEXT_TEMPINST(ThrVars)::thrregmtx);
FLEXT_TEMPINST(ThrVars)::thrregmtx->Lock();
FLEXT_TEMPINST(ThrVars)::regthreads.insert(id);
FLEXT_TEMPINST(ThrVars)::thrregmtx->Unlock();
#else
regqueue.Push(new ThrIdCell(id));
#endif
......@@ -412,10 +414,10 @@ FLEXT_TEMPIMPL(void FLEXT_CLASSDEF(flext))::RegisterThread(thrid_t id)
FLEXT_TEMPIMPL(void FLEXT_CLASSDEF(flext))::UnregisterThread(thrid_t id)
{
#if 1
FLEXT_ASSERT(ThrVars<>::thrregmtx);
ThrVars<>::thrregmtx->Lock();
ThrVars<>::regthreads.erase(id);
ThrVars<>::thrregmtx->Unlock();
FLEXT_ASSERT(FLEXT_TEMPINST(ThrVars)::thrregmtx);
FLEXT_TEMPINST(ThrVars)::thrregmtx->Lock();
FLEXT_TEMPINST(ThrVars)::regthreads.erase(id);
FLEXT_TEMPINST(ThrVars)::thrregmtx->Unlock();
#else
unregqueue.Push(new ThrIdCell(id));
#endif
......@@ -432,17 +434,17 @@ FLEXT_TEMPIMPL(void FLEXT_CLASSDEF(flext))::ThreadRegistryWorker()
FLEXT_TEMPIMPL(bool FLEXT_CLASSDEF(flext))::IsThreadRegistered()
{
FLEXT_ASSERT(ThrVars<>::thrregmtx);
ThrVars<>::thrregmtx->Lock();
bool fnd = ThrVars<>::regthreads.find(GetThreadId()) != ThrVars<>::regthreads.end();
ThrVars<>::thrregmtx->Unlock();
FLEXT_ASSERT(FLEXT_TEMPINST(ThrVars)::thrregmtx);
FLEXT_TEMPINST(ThrVars)::thrregmtx->Lock();
bool fnd = FLEXT_TEMPINST(ThrVars)::regthreads.find(GetThreadId()) != FLEXT_TEMPINST(ThrVars)::regthreads.end();
FLEXT_TEMPINST(ThrVars)::thrregmtx->Unlock();
return fnd;
}
//! Terminate all object threads
FLEXT_TEMPIMPL(bool FLEXT_CLASSDEF(flext_base))::StopThreads()
{
FLEXT_ASSERT(ThrVars<>::thrhelpcond);
FLEXT_ASSERT(FLEXT_TEMPINST(ThrVars)::thrhelpcond);
TypedLifo<thr_entry> qutmp;
thr_entry *ti;
......@@ -450,38 +452,38 @@ FLEXT_TEMPIMPL(bool FLEXT_CLASSDEF(flext_base))::StopThreads()
// first search pending queue
// --------------------------
while((ti = ThrRegistry<>::pending.Pop()) != NULL)
while((ti = FLEXT_TEMPINST(ThrRegistry)::pending.Pop()) != NULL)
if(ti->This() == this)
// found -> thread hasn't started -> just delete
ThrRegistry<>::pending.Free(ti);
FLEXT_TEMPINST(ThrRegistry)::pending.Free(ti);
else
qutmp.Push(ti);
// put back into pending queue (order doesn't matter)
while((ti = qutmp.Pop()) != NULL) ThrRegistry<>::pending.Push(ti);
while((ti = qutmp.Pop()) != NULL) FLEXT_TEMPINST(ThrRegistry)::pending.Push(ti);
// now search active queue
// -----------------------
TypedLifo<thr_entry> qufnd;
while((ti = ThrRegistry<>::active.Pop()) != NULL)
while((ti = FLEXT_TEMPINST(ThrRegistry)::active.Pop()) != NULL)
if(ti->This() == this) {
ThrRegistry<>::stopped.Push(ti);
ThrVars<>::thrhelpcond->Signal();
FLEXT_TEMPINST(ThrRegistry)::stopped.Push(ti);
FLEXT_TEMPINST(ThrVars)::thrhelpcond->Signal();
qufnd.Push(ti);
}
else
qutmp.Push(ti);
// put back into pending queue (order doesn't matter)
while((ti = qutmp.Pop()) != NULL) ThrRegistry<>::active.Push(ti);
while((ti = qutmp.Pop()) != NULL) FLEXT_TEMPINST(ThrRegistry)::active.Push(ti);
// wakeup helper thread
ThrVars<>::thrhelpcond->Signal();
FLEXT_TEMPINST(ThrVars)::thrhelpcond->Signal();
// now wait for entries in qufnd to have vanished from ThrRegistry::stopped
if(!waitforstopped(qufnd,MAXIMUMWAIT*0.001f)) {
if(!FLEXT_TEMPINST(waitforstopped)(qufnd,MAXIMUMWAIT*0.001f)) {
#ifdef FLEXT_DEBUG
post("flext - doing hard thread termination");
#endif
......@@ -501,7 +503,7 @@ FLEXT_TEMPIMPL(bool FLEXT_CLASSDEF(flext_base))::StopThreads()
#else
#error Not implemented
#endif
ThrRegistry<>::pending.Free(ti);
FLEXT_TEMPINST(ThrRegistry)::pending.Free(ti);
}
return false;
}
......@@ -584,8 +586,8 @@ FLEXT_TEMPIMPL(bool FLEXT_CLASSDEF(flext))::RelPriority(int dp,thrid_t ref,thrid
return true;
#elif FLEXT_THREADS == FLEXT_THR_MP
thr_entry *ti = ThrRegistry<>::pending.Find(id);
if(!ti) ti = ThrRegistry<>::active.Find(id);
thr_entry *ti = FLEXT_TEMPINST(ThrRegistry)::pending.Find(id);
if(!ti) ti = FLEXT_TEMPINST(ThrRegistry)::active.Find(id);
if(ti) {
// thread found in list
int w = GetPriority(id);
......@@ -639,8 +641,8 @@ FLEXT_TEMPIMPL(int FLEXT_CLASSDEF(flext))::GetPriority(thrid_t id)
return pr;
#elif FLEXT_THREADS == FLEXT_THR_MP
thr_entry *ti = ThrRegistry<>::pending.Find(id);
if(!ti) ti = ThrRegistry<>::active.Find(id);
thr_entry *ti = FLEXT_TEMPINST(ThrRegistry)::pending.Find(id);
if(!ti) ti = FLEXT_TEMPINST(ThrRegistry)::active.Find(id);
return ti?ti->weight:-1;
#else
#error
......@@ -681,8 +683,8 @@ FLEXT_TEMPIMPL(bool FLEXT_CLASSDEF(flext))::SetPriority(int p,thrid_t id)
return true;
#elif FLEXT_THREADS == FLEXT_THR_MP
thr_entry *ti = ThrRegistry<>::pending.Find(id);
if(!ti) ti = ThrRegistry<>::active.Find(id);
thr_entry *ti = FLEXT_TEMPINST(ThrRegistry)::pending.Find(id);
if(!ti) ti = FLEXT_TEMPINST(ThrRegistry)::active.Find(id);
return ti && MPSetTaskWeight(id,ti->weight = p) == noErr;
#else
#error
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment