Commit d8455f02 authored by thomas's avatar thomas
Browse files

static message queue


git-svn-id: https://svn.grrrr.org/ext/trunk@644 4d9ac71a-51e6-0310-8455-cad1006bcd31
parent 5627bcf2
......@@ -17,6 +17,7 @@ Version history:
- added some more SIMD functions
- fixed wrong returned result of flext::buffer::set function
- fix for linux static exported function name-clash (flext::Setup() single- vs. multi-threading)
- made message queue stuff global (static) for all flext objects
0.4.4:
- fixed deadly bug for Max/MSP method-to-symbol-binding proxies
......
This diff is collapsed.
......@@ -730,25 +730,34 @@ private:
// queue stuff
class qmsg;
static qmsg *qhead,*qtail;
// class qmsg;
// static qmsg *qhead,*qtail;
//! Flush messages in the queue
static void QFlush(flext_base *th = NULL);
//! Queue worker function
// static void QWork(bool qlock,bool syslock);
//! Start message queue
static void StartQueue();
#if FLEXT_SYS == FLEXT_SYS_JMAX
static void QTick(int winlet = 0, fts_symbol_t s = NULL, int ac = 0, const fts_atom_t *at = NULL);
// static void QTick(int winlet = 0, fts_symbol_t s = NULL, int ac = 0, const fts_atom_t *at = NULL);
#else // PD or Max
static void QTick();
// static void QTick();
#ifndef FLEXT_QTHR
static t_qelem *qclk;
// static t_qelem *qclk;
#else
//! Start message queue worker thread
static void StartQThr();
//! Queue worker thread function
// static void *QWorker(void *);
//! Queue worker thread conditional
static ThrCond qthrcond;
// static ThrCond qthrcond;
#endif
#endif
static void Queue(qmsg *m);
// static void Queue(qmsg *m);
#ifdef FLEXT_THREADS
static ThrMutex qmutex;
// static ThrMutex qmutex;
#endif
......
......@@ -159,9 +159,7 @@ void flext_base::Setup(t_classid id)
SetProxies(c);
#ifdef FLEXT_QTHR
StartQThr();
#endif
StartQueue();
}
#if FLEXT_SYS == FLEXT_SYS_JMAX
......
......@@ -19,7 +19,7 @@ WARRANTIES, see the file, "license.txt," in this distribution.
#include "flext.h"
#include "flinternal.h"
class flext_base::qmsg
class qmsg
{
public:
qmsg(flext_base *b): th(b),nxt(NULL),tp(tp_none) {}
......@@ -33,8 +33,8 @@ public:
void SetFloat(int o,float f) { Clear(); out = o; tp = tp_float; _float = f; }
void SetInt(int o,int i) { Clear(); out = o; tp = tp_int; _int = i; }
void SetSymbol(int o,const t_symbol *s) { Clear(); out = o; tp = tp_sym; _sym = s; }
void SetList(int o,int argc,const t_atom *argv) { Clear(); out = o; tp = tp_list; _list.argc = argc,_list.argv = CopyList(argc,argv); }
void SetAny(int o,const t_symbol *s,int argc,const t_atom *argv) { Clear(); out = o; tp = tp_any; _any.s = s,_any.argc = argc,_any.argv = CopyList(argc,argv); }
void SetList(int o,int argc,const t_atom *argv) { Clear(); out = o; tp = tp_list; _list.argc = argc,_list.argv = flext::CopyList(argc,argv); }
void SetAny(int o,const t_symbol *s,int argc,const t_atom *argv) { Clear(); out = o; tp = tp_any; _any.s = s,_any.argc = argc,_any.argv = flext::CopyList(argc,argv); }
flext_base *th;
int out;
......@@ -48,40 +48,40 @@ public:
};
};
flext_base::qmsg::~qmsg()
qmsg::~qmsg()
{
Clear();
if(nxt) delete nxt;
}
void flext_base::qmsg::Clear()
void qmsg::Clear()
{
if(tp == tp_list) { if(_list.argv) delete[] _list.argv; }
else if(tp == tp_any) { if(_any.argv) delete[] _any.argv; }
tp = tp_none;
}
#if FLEXT_SYS == FLEXT_SYS_JMAX
void flext_base::QTick(int winlet, fts_symbol_t s, int ac, const fts_atom_t *at)
{
static qmsg *qhead = NULL,*qtail = NULL;
#ifdef FLEXT_QTHR
static flext::ThrCond qthrcond;
#else
void flext_base::QTick()
{
static t_qelem *qclk = NULL;
#endif
// post("qtick");
#if defined(FLEXT_THREADS) && defined(FLEXT_DEBUG) && !defined(FLEXT_QTHR)
if(!IsSystemThread()) {
error("flext - Queue tick called by wrong thread!");
return;
}
#ifdef FLEXT_THREADS
static flext::ThrMutex qmutex;
#endif
static void QWork(bool qlock,bool syslock)
{
#ifdef FLEXT_THREADS
qmutex.Lock();
if(qlock) qmutex.Lock();
#endif
#ifdef FLEXT_QTHR
pd_lock();
if(syslock) pd_lock();
#endif
for(;;) {
qmsg *m = qhead;
if(!m) break;
......@@ -94,27 +94,27 @@ void flext_base::QTick()
switch(m->tp) {
case qmsg::tp_bang:
m->th->m_methodmain(n,sym_bang,0,&tmp);
m->th->m_methodmain(n,flext::sym_bang,0,&tmp);
break;
case qmsg::tp_float:
SetFloat(tmp,m->_float);
m->th->m_methodmain(n,sym_float,1,&tmp);
flext::SetFloat(tmp,m->_float);
m->th->m_methodmain(n,flext::sym_float,1,&tmp);
break;
case qmsg::tp_int:
SetInt(tmp,m->_int);
flext::SetInt(tmp,m->_int);
#if FLEXT_SYS == FLEXT_SYS_PD
m->th->m_methodmain(n,sym_float,1,&tmp);
m->th->m_methodmain(n,flext::sym_float,1,&tmp);
#elif FLEXT_SYS == FLEXT_SYS_MAX
m->th->m_methodmain(n,sym_int,1,&tmp);
m->th->m_methodmain(n,flext::sym_int,1,&tmp);
#else
#error Not implemented!
#endif
case qmsg::tp_sym:
SetSymbol(tmp,m->_sym);
m->th->m_methodmain(n,sym_symbol,1,&tmp);
flext::SetSymbol(tmp,m->_sym);
m->th->m_methodmain(n,flext::sym_symbol,1,&tmp);
break;
case qmsg::tp_list:
m->th->m_methodmain(n,sym_list,m->_list.argc,m->_list.argv);
m->th->m_methodmain(n,flext::sym_list,m->_list.argc,m->_list.argv);
break;
case qmsg::tp_any:
m->th->m_methodmain(n,m->_any.s,m->_any.argc,m->_any.argv);
......@@ -146,23 +146,46 @@ void flext_base::QTick()
delete m;
}
#ifdef FLEXT_QTHR
pd_unlock();
if(syslock) pd_unlock();
#endif
#ifdef FLEXT_THREADS
qmutex.Unlock();
if(qlock) qmutex.Unlock();
#endif
}
#if FLEXT_SYS == FLEXT_SYS_JMAX
static void QTick(fts_object_t *c,int winlet, fts_symbol_t s, int ac, const fts_atom_t *at)
{
#else
static void QTick(flext_base *c)
{
#endif
// post("qtick");
#if defined(FLEXT_THREADS) && defined(FLEXT_DEBUG) && !defined(FLEXT_QTHR)
if(!flext::IsSystemThread()) {
error("flext - Queue tick called by wrong thread!");
return;
}
#endif
QWork(true,true);
}
/*
It would be sufficient to only send messages belonging to object th
But then the order is not as intended
It would be sufficient to only flush messages belonging to object th
But then the order of sent messages is not as intended
*/
void flext_base::QFlush(flext_base *th)
{
while(qhead) QTick();
#ifdef FLEXT_THREADS
if(!IsSystemThread()) {
error("flext - Queue flush called by wrong thread!");
return;
}
#endif
while(qhead) QWork(true,false);
}
void flext_base::Queue(qmsg *m)
static void Queue(qmsg *m)
{
// post("Queue");
......@@ -190,72 +213,74 @@ void flext_base::Queue(qmsg *m)
// this is dangerous because there may be other timers on this object!
fts_timebase_add_call(fts_get_timebase(), (fts_object_t *)thisHdr(), QTick, NULL, 0);
#else
#error
#error Not implemented
#endif
}
#ifdef FLEXT_QTHR
ThrCond flext_base::qthrcond;
#else
void QWorker(flext::thr_params *)
{
for(;;) {
qthrcond.Wait();
QWork(true,true);
}
}
#endif
void flext_base::StartQueue()
{
// message queue ticker
qhead = qtail = NULL;
#ifdef FLEXT_QTHR
for(;;) {
qthrcond.Wait();
QTick();
}
LaunchThread(QWorker,NULL);
#else
#if FLEXT_SYS == FLEXT_SYS_PD || FLEXT_SYS == FLEXT_SYS_MAX
qclk = (t_qelem *)(qelem_new(this,(t_method)QTick));
qclk = (t_qelem *)(qelem_new(NULL,(t_method)QTick));
#else
#error Not implemented!
#endif
#endif
}
#endif
void flext_base::ToQueueBang(int o) const
{
qmsg *m = new qmsg(this);
qmsg *m = new qmsg(const_cast<flext_base *>(this));
m->SetBang(o);
Queue(m);
}
void flext_base::ToQueueFloat(int o,float f) const
{
qmsg *m = new qmsg(this);
qmsg *m = new qmsg(const_cast<flext_base *>(this));
m->SetFloat(o,f);
Queue(m);
}
void flext_base::ToQueueInt(int o,int f) const
{
qmsg *m = new qmsg(this);
qmsg *m = new qmsg(const_cast<flext_base *>(this));
m->SetInt(o,f);
Queue(m);
}
void flext_base::ToQueueSymbol(int o,const t_symbol *s) const
{
qmsg *m = new qmsg(this);
qmsg *m = new qmsg(const_cast<flext_base *>(this));
m->SetSymbol(o,s);
Queue(m);
}
void flext_base::ToQueueList(int o,int argc,const t_atom *argv) const
{
qmsg *m = new qmsg(this);
qmsg *m = new qmsg(const_cast<flext_base *>(this));
m->SetList(o,argc,argv);
Queue(m);
}
void flext_base::ToQueueAnything(int o,const t_symbol *s,int argc,const t_atom *argv) const
{
qmsg *m = new qmsg(this);
qmsg *m = new qmsg(const_cast<flext_base *>(this));
m->SetAny(o,s,argc,argv);
Queue(m);
}
......
......@@ -283,8 +283,6 @@ bool flext_base::StopThreads()
#endif
// --- all object threads have terminated by now -------
qmutex.Lock(); // Lock message queue
tlmutex.Lock();
// timeout -> hard termination
......@@ -306,7 +304,6 @@ bool flext_base::StopThreads()
thrhead = NULL;
tlmutex.Unlock();
qmutex.Unlock();
}
// post("All threads have terminated");
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment