[Tinyos-commits] CVS: tinyos-1.x/tos/lib/TinyDB QueryHandlerM.nc,
1.1.2.1, 1.1.2.2 QueryProcessor.nc, 1.5.8.1,
1.5.8.2 TupleRouterM.nc, 1.49.2.1, 1.49.2.2
Sam Madden
smadden at users.sourceforge.net
Sun Aug 14 12:04:26 PDT 2005
Update of /cvsroot/tinyos/tinyos-1.x/tos/lib/TinyDB
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv3731
Modified Files:
Tag: restructure
QueryHandlerM.nc QueryProcessor.nc TupleRouterM.nc
Log Message:
lots of comments about regarding this that are broken in this branch
Index: QueryHandlerM.nc
===================================================================
RCS file: /cvsroot/tinyos/tinyos-1.x/tos/lib/TinyDB/Attic/QueryHandlerM.nc,v
retrieving revision 1.1.2.1
retrieving revision 1.1.2.2
diff -C2 -d -r1.1.2.1 -r1.1.2.2
*** QueryHandlerM.nc 13 Mar 2004 01:34:47 -0000 1.1.2.1
--- QueryHandlerM.nc 14 Aug 2005 19:04:20 -0000 1.1.2.2
***************
*** 82,86 ****
void continueQuery(Handle *memory);
bool addQueryField(QueryMessagePtr qmsg);
- bool allocPendingQuery(MemoryCallback callback, Query *q);
bool allocQuery(MemoryCallback callback, Query *q);
void parsedCallback(Handle *memory);
--- 82,85 ----
***************
*** 221,226 ****
-
-
/** Message indicating the arrival of (part of) a query */
event result_t Network.querySub(QueryMessagePtr qmsg) {
--- 220,223 ----
***************
*** 228,233 ****
QueryMessagePtr tmpmsg = call Network.getQueryPayLoad(&mQmsg);
*tmpmsg = *qmsg;
post querySubTask();
! }
return SUCCESS;
}
--- 225,233 ----
QueryMessagePtr tmpmsg = call Network.getQueryPayLoad(&mQmsg);
*tmpmsg = *qmsg;
+ // XXX should check/set IS_IN_QUERY_MESSAGE here, and remove
+ // refs later
+ // Note possible race condition on overwriting contents of mQmsg
post querySubTask();
! } //else drop this message
return SUCCESS;
}
***************
*** 244,251 ****
//call UartDebugger.writeLine("cleaning",8);
#ifdef kMATCHBOX
call DBBuffer.cleanupEEPROM();
#endif
if (qmsg->u.ttl-- > 0)
! forwardQuery(qmsg);
UNSET_IS_IN_QUERY_MSG();
return;
--- 244,253 ----
//call UartDebugger.writeLine("cleaning",8);
#ifdef kMATCHBOX
+ //BUG/INCOMPLETE FEATURE -- we delete all tables, not just the one the
+ // user asked about XXX
call DBBuffer.cleanupEEPROM();
#endif
if (qmsg->u.ttl-- > 0)
! forwardQuery(qmsg); // DISSEM - shouldn't do this here
UNSET_IS_IN_QUERY_MSG();
return;
***************
*** 263,269 ****
if (!getQuery(qmsg->qid, &q)) {
goto done_rate;
! }
if (qmsg->fwdNode == TOS_UART_ADDR) {
! forwardQuery(qmsg);
}
dbg(DBG_USR2, "changing rate to %d\n", qmsg->epochDuration);
--- 265,271 ----
if (!getQuery(qmsg->qid, &q)) {
goto done_rate;
! } //XXX should we forward this anyway?
if (qmsg->fwdNode == TOS_UART_ADDR) {
! forwardQuery(qmsg); //DISSEM -- shouldn't be here
}
dbg(DBG_USR2, "changing rate to %d\n", qmsg->epochDuration);
***************
*** 276,282 ****
//is a request to delete an existing query
ParsedQuery *pq;
! bool isKnown;
! call Leds.yellowToggle();
--- 278,286 ----
//is a request to delete an existing query
ParsedQuery *pq;
! bool isKnown;
!
! call Leds.yellowToggle(); // LEDS -- should have a layer of indirection on
! // events to leds mappings
***************
*** 291,296 ****
//call Leds.redOn();
! call Leds.greenOn();
! call Leds.yellowOn();
//can't force -- might be in flight
mQidToRemove = qmsg->qid;
--- 295,300 ----
//call Leds.redOn();
! call Leds.greenOn(); //LEDS
! call Leds.yellowOn(); //LEDS
//can't force -- might be in flight
mQidToRemove = qmsg->qid;
***************
*** 299,305 ****
//only forward if we know about the query, or if we're the root
! if (isKnown) {
forwardQuery(qmsg);
}
mStoppedQid = qmsg->qid;
--- 303,315 ----
//only forward if we know about the query, or if we're the root
! if (isKnown) { //DISSEM
forwardQuery(qmsg);
}
+
+ //keep track of the last query we stopped in mStoppedQid
+ // we will not restart queries with the same id, and will
+ // send cancel query messages to nodes that generate results
+ // with this query id
+ //NOTE: this only remembers one stopped query
mStoppedQid = qmsg->qid;
***************
*** 311,359 ****
if (!IS_IN_QUERY_MSG()) {
!
! if (!getQuery(qmsg->qid, &q)) { //ignore if we already know about this query
SET_IS_IN_QUERY_MSG();
! mStoppedQid = 0xFF; //a new query, not one we just stopped
//go ahead and time synchronize with the sender of this message
//XXX doTimeSync(qmsg->timeSyncData, qmsg->clockCount);
if (IS_READING_QUERY()) {
! if (qmsg->qid != (**mCurQuery).qid) {
if (IS_SPACE_ALLOCED() || mTriedAllocWaiting) {
! //query is alloced, but heard about a new one
! //forget old one
if (IS_SPACE_ALLOCED()) {
UNSET_SPACE_ALLOCED();
dbg(DBG_USR3, "freeing query in READING_QUERY\n");
call MemAlloc.free((Handle)mCurQuery);
- //mCurQuery
}
! UNSET_READING_QUERY();
} else {
! mTriedAllocWaiting = TRUE;
! UNSET_IS_IN_QUERY_MSG();
return;
}
} else if (! IS_SPACE_ALLOCED()) {
! UNSET_IS_IN_QUERY_MSG();
return;
! } else {
mTriedAllocWaiting = FALSE;
! oldField = addQueryField(qmsg);
}
//go ahead and forward this guy on, as long as it's new, or we're the root
! if (!oldField || qmsg->fwdNode == TOS_UART_ADDR)
forwardQuery(qmsg);
! }
- //note that we can fall through from previous clause
- if (!IS_READING_QUERY() /*&& !IS_SENDING_MESSAGE()*/) {
-
Query pq;
QueryMessagePtr qmsgCopy = call Network.getQueryPayLoad(&mMsg);
!
!
SET_READING_QUERY();
UNSET_SPACE_ALLOCED();
--- 321,371 ----
if (!IS_IN_QUERY_MSG()) {
! //message for a new query or a query that hasn't been fully parsed
! if (!getQuery(qmsg->qid, &q)) {
SET_IS_IN_QUERY_MSG();
!
! if (mStoppedQid == qmsg->qid) // new query wants to reuse id of stopped query
! mStoppedQid = 0xFF;
!
//go ahead and time synchronize with the sender of this message
//XXX doTimeSync(qmsg->timeSyncData, qmsg->clockCount);
+ // check and see if we are dealing with some other query
if (IS_READING_QUERY()) {
! if (qmsg->qid != (**mCurQuery).qid) { //mCurQuery is old query
if (IS_SPACE_ALLOCED() || mTriedAllocWaiting) {
! //mCurQuery is alloced, but message we are handling is for a new one
! // so, drop mCurQuery
if (IS_SPACE_ALLOCED()) {
UNSET_SPACE_ALLOCED();
dbg(DBG_USR3, "freeing query in READING_QUERY\n");
call MemAlloc.free((Handle)mCurQuery);
}
! UNSET_READING_QUERY();
} else {
! mTriedAllocWaiting = TRUE;
! UNSET_IS_IN_QUERY_MSG(); // XXX should really do UNSET/return via goto
return;
}
} else if (! IS_SPACE_ALLOCED()) {
! UNSET_IS_IN_QUERY_MSG(); // can't add field to query that hasn't finished alloc'ing
!
return;
! } else { //message is for the current query, and current query is alloc'ed
mTriedAllocWaiting = FALSE;
! oldField = addQueryField(qmsg); //add the field to the query
}
//go ahead and forward this guy on, as long as it's new, or we're the root
! if (!oldField || qmsg->fwdNode == TOS_UART_ADDR) //DISSEM
forwardQuery(qmsg);
! } else { //this is a new query
! // used to be
! //if (!IS_READING_QUERY() /*&& !IS_SENDING_MESSAGE()*/) {
Query pq;
QueryMessagePtr qmsgCopy = call Network.getQueryPayLoad(&mMsg);
!
SET_READING_QUERY();
UNSET_SPACE_ALLOCED();
***************
*** 365,374 ****
*qmsgCopy = *qmsg; //save a copy
//allocate space for query
! if (!allocPendingQuery(&continueQuery, &pq)) {
UNSET_READING_QUERY(); //barf!
}
}
} else if (qmsg->fwdNode == TOS_UART_ADDR) //forward on if it comes from UART
! forwardQuery(qmsg);
UNSET_IS_IN_QUERY_MSG();
}
--- 377,389 ----
*qmsgCopy = *qmsg; //save a copy
//allocate space for query
! mAllocState = STATE_ALLOC_IN_FLIGHT_QUERY;
! mAllocCallback = &continueQuery;
!
! if (!call MemAlloc.allocate(&mTmpHandle, call QueryIntf.size(&pq))) {
UNSET_READING_QUERY(); //barf!
}
}
} else if (qmsg->fwdNode == TOS_UART_ADDR) //forward on if it comes from UART
! forwardQuery(qmsg); //DISSEM
UNSET_IS_IN_QUERY_MSG();
}
***************
*** 383,387 ****
@return err_MSF_ForwardKnownQuery if message send failed, err_MSG_ForwardKnownQueryBusy if radio was busy
*/
! TinyDBError forwardQuery(QueryMessagePtr qmsg) {
TinyDBError err = err_NoError;
--- 398,402 ----
@return err_MSF_ForwardKnownQuery if message send failed, err_MSG_ForwardKnownQueryBusy if radio was busy
*/
! TinyDBError forwardQuery(QueryMessagePtr qmsg) { //DISSEM
TinyDBError err = err_NoError;
***************
*** 402,407 ****
TDB_SIG_ERR(err);
return err;
-
}
/** Continuation after query is successfully alloc'ed
@param memory The newly allocated handle (must to be non-null)
--- 417,422 ----
TDB_SIG_ERR(err);
return err;
}
+
/** Continuation after query is successfully alloc'ed
@param memory The newly allocated handle (must to be non-null)
***************
*** 447,451 ****
//now forward the message on
! forwardQuery(qmsg);
}
--- 462,466 ----
//now forward the message on
! forwardQuery(qmsg); //DISSEM
}
***************
*** 598,602 ****
//set up the output buffer for this query
! switch (pq->bufferType) {
case kRADIO:
pq->bufferId = kRADIO_BUFFER;
--- 613,618 ----
//set up the output buffer for this query
! switch (pq->bufferType) { //XXX probably want to abstract out this buffer specific stuff
! // to some sort of buffer driver
case kRADIO:
pq->bufferId = kRADIO_BUFFER;
***************
*** 845,849 ****
strcpy(qmsg->u.field.name, attr->name);
! call Leds.greenToggle();
if (!IS_SENDING_MESSAGE()) {
--- 861,865 ----
strcpy(qmsg->u.field.name, attr->name);
! call Leds.greenToggle(); //LEDS
if (!IS_SENDING_MESSAGE()) {
***************
*** 855,859 ****
//mTimestampMsg = &mQmsg;
}
! if (call Network.sendQueryMessage(&mQmsg) != err_NoError) {
UNSET_SENDING_MESSAGE();
//XXX
--- 871,875 ----
//mTimestampMsg = &mQmsg;
}
! if (call Network.sendQueryMessage(&mQmsg) != err_NoError) { //DISSEM
UNSET_SENDING_MESSAGE();
//XXX
***************
*** 895,899 ****
// mTimestampMsg = &mQmsg;
//}
! if (call Network.sendQueryMessage(&mQmsg) != err_NoError) {
UNSET_SENDING_MESSAGE();
//XXX
--- 911,915 ----
// mTimestampMsg = &mQmsg;
//}
! if (call Network.sendQueryMessage(&mQmsg) != err_NoError) { //DISSEM
UNSET_SENDING_MESSAGE();
//XXX
***************
*** 918,922 ****
SET_SENDING_MESSAGE();
! if (call Network.sendQueryMessage(&mQmsg) != err_NoError)
UNSET_SENDING_MESSAGE();
UNSET_SENDING_QUERY();
--- 934,938 ----
SET_SENDING_MESSAGE();
! if (call Network.sendQueryMessage(&mQmsg) != err_NoError) //DISSEM
UNSET_SENDING_MESSAGE();
UNSET_SENDING_QUERY();
***************
*** 1043,1047 ****
#ifdef kQUERY_SHARING
! call Leds.greenToggle();
qid = call QueryResultIntf.queryIdFromMsg(qrMsg);
--- 1059,1063 ----
#ifdef kQUERY_SHARING
! call Leds.greenToggle(); //LEDS
qid = call QueryResultIntf.queryIdFromMsg(qrMsg);
***************
*** 1065,1069 ****
qm->qid = qid;
qm->msgType = DEL_MSG;
! call Leds.yellowToggle();
post queryMsgTask();
--- 1081,1085 ----
qm->qid = qid;
qm->msgType = DEL_MSG;
! call Leds.yellowToggle(); //LEDS
post queryMsgTask();
***************
*** 1506,1519 ****
- /** Allocates space for a query, given a query with numExprs and numFields filled in
- @param q The query to allocate a query data structure for
- @param callback The callback to fire when the allocation is complete
- @return TRUE if the allocation was successfully initiated (e.g. a callback is expected)
- */
- bool allocPendingQuery(MemoryCallback callback, Query *q) {
- mAllocState = STATE_ALLOC_IN_FLIGHT_QUERY;
- mAllocCallback = callback;
- return call MemAlloc.allocate(&mTmpHandle, call QueryIntf.size(q));
- }
/** Allocate space for a parsed query
--- 1522,1525 ----
Index: QueryProcessor.nc
===================================================================
RCS file: /cvsroot/tinyos/tinyos-1.x/tos/lib/TinyDB/QueryProcessor.nc,v
retrieving revision 1.5.8.1
retrieving revision 1.5.8.2
diff -C2 -d -r1.5.8.1 -r1.5.8.2
*** QueryProcessor.nc 13 Mar 2004 01:34:47 -0000 1.5.8.1
--- QueryProcessor.nc 14 Aug 2005 19:04:20 -0000 1.5.8.2
***************
*** 55,62 ****
--- 55,66 ----
@return A pointer to the query data structure, or NULL if no such query exists.
*/
+
command ParsedQueryPtr getQueryCmd(uint8_t qid);
+
+ // XXX should remove this version -- these do the same thing
command bool getQuery(uint8_t qid, ParsedQueryPtr *pq);
+
/** Given a processor message return the owner (origninating node) of the query, or
-1 if the query is unknown or the message is a query processor message.
***************
*** 66,70 ****
--- 70,77 ----
command short msgToQueryRoot(TOS_Msg *msg);
+ /** Return the number of queries that are currently in the system */
command short numQueries();
+
+ /** Return the query with id i*/
command ParsedQueryPtr getQueryIdx(short i);
Index: TupleRouterM.nc
===================================================================
RCS file: /cvsroot/tinyos/tinyos-1.x/tos/lib/TinyDB/TupleRouterM.nc,v
retrieving revision 1.49.2.1
retrieving revision 1.49.2.2
diff -C2 -d -r1.49.2.1 -r1.49.2.2
*** TupleRouterM.nc 13 Mar 2004 01:34:47 -0000 1.49.2.1
--- TupleRouterM.nc 14 Aug 2005 19:04:20 -0000 1.49.2.2
***************
*** 785,788 ****
--- 785,790 ----
gotAgg = q->hasAgg;
+ //XXX NetworkMultiHop should only call us if this is an agg result, so
+ // this is probably dead
if (!gotAgg) { //didn't give to an aggregate, so just pass it on...
TinyDBError err;
***************
*** 1052,1060 ****
}
if(IS_SENDING_MESSAGE()) {
call NetworkMonitor.updateContention(TRUE, SEND_BUSY_FAILURE);
mNumBlocked++;
! if (mNumBlocked > 32){
mNumBlocked = 0;
outputDone(&mMsg);
--- 1054,1065 ----
}
+ //VOODOO -- if we always seem to be sending a message when
+ // we enter this code, the radio stack is probably hosed -- this
+ // code tries to reset it.
if(IS_SENDING_MESSAGE()) {
call NetworkMonitor.updateContention(TRUE, SEND_BUSY_FAILURE);
mNumBlocked++;
! if (mNumBlocked > 32){ //been block 32 times in a row
mNumBlocked = 0;
outputDone(&mMsg);
***************
*** 1068,1072 ****
mNumBlocked = 0;
!
if (mDeliverWait) {
mDeliverWait--;
--- 1073,1077 ----
mNumBlocked = 0;
! //XXX move this to some other routine?
if (mDeliverWait) {
mDeliverWait--;
***************
*** 1077,1081 ****
if (TOS_LOCAL_ADDRESS == 0) dbg(DBG_USR1, "deliver wait is 0\n");
! if (isRoot() && mWaitIsDummy) {
if (call QueryProcessor.getQueryList() != NULL) sendDummyQueryResult((**call QueryProcessor.getQueryList()).q.qid, (**call QueryProcessor.getQueryList()).q.numFields, (**call QueryProcessor.getQueryList()).q.currentEpoch);
} else {
--- 1082,1090 ----
if (TOS_LOCAL_ADDRESS == 0) dbg(DBG_USR1, "deliver wait is 0\n");
!
! //Root periodically needs to send "dummy messages" that let other nodes
! // learn about queries it is running. The mWaitIsDummy flag tells us if
! // we are supposed to send such a message.
! if (mWaitIsDummy) {
if (call QueryProcessor.getQueryList() != NULL) sendDummyQueryResult((**call QueryProcessor.getQueryList()).q.qid, (**call QueryProcessor.getQueryList()).q.numFields, (**call QueryProcessor.getQueryList()).q.currentEpoch);
} else {
***************
*** 1090,1093 ****
--- 1099,1103 ----
TDB_SIG_ERR(err);
}
+ // XXX should probably do deliverTuplesTask with enqueue
if (!pending) post deliverTuplesTask();
}
***************
*** 1165,1171 ****
/* --------------------------------- Tuple Output Routines ---------------------------------*/
! /** Walk through queries, finding ones that have gone off (timer reached 0), and
! where the tuples are complete. Output said tuples to the appropriate
! output buffers.
<p>
mCurRouteQuery contains the last query routed, or NULL if starting at
--- 1175,1184 ----
/* --------------------------------- Tuple Output Routines ---------------------------------*/
! /** Walk through queries, finding queries that have a tuple that is
! ready to be delivered. When such tuples are found, call
! sendTuple.
!
! If no tuples are ready to be delivered, call startFetchingTuples().
!
<p>
mCurRouteQuery contains the last query routed, or NULL if starting at
***************
*** 1173,1176 ****
--- 1186,1192 ----
as successive tuples are delivered)
*/
+
+ // XXX assert that none of FETCHING/STARTING attribute are set, etc.
+
task void deliverTuplesTask() {
bool success;
***************
*** 1182,1188 ****
SET_DELIVERING_TUPLES();
if (mCurRouteQuery != NULL) {
dbg(DBG_USR2, "end of epoch\n");
! if (!pending) resetTupleState(&(**mCurRouteQuery).q); //done with this tuple...
}
--- 1198,1206 ----
SET_DELIVERING_TUPLES();
+ // the query in mCurRouteQuery has acquired and delivered it's results,
+ // so we can reset its state
if (mCurRouteQuery != NULL) {
dbg(DBG_USR2, "end of epoch\n");
! resetTupleState(&(**mCurRouteQuery).q); //done with this query...
}
***************
*** 1202,1206 ****
success = TRUE;
#endif
! pq->needsData = FALSE;
call QueryResultIntf.initQueryResult(&qr);
// stamp current epoch number
--- 1220,1224 ----
success = TRUE;
#endif
! pq->needsData = FALSE; //not sure what this is
call QueryResultIntf.initQueryResult(&qr);
// stamp current epoch number
***************
*** 1214,1220 ****
//otherwise, just output the tuple associated with the query
while (e != NULL) {
! if (e->opType != kSEL) {
//add all of the aggregate results to the query result data structure
!
err = call addResults(&qr, pq, e);
didAgg = TRUE;
--- 1232,1238 ----
//otherwise, just output the tuple associated with the query
while (e != NULL) {
! if (e->opType != kSEL) { //AGGFIX
//add all of the aggregate results to the query result data structure
! // for group by queries, this can be one result per group per aggregate expression
err = call addResults(&qr, pq, e);
didAgg = TRUE;
***************
*** 1228,1232 ****
//then output the query result
//call Leds.redToggle();
! if (didAgg && err == err_NoError && call QueryResultIntf.numRecords(&qr,pq) > 0) {
//enqueue all the results from this aggregate
--- 1246,1250 ----
//then output the query result
//call Leds.redToggle();
! if (didAgg && err == err_NoError && call QueryResultIntf.numRecords(&qr,pq) > 0) { //AGGFIX
//enqueue all the results from this aggregate
***************
*** 1244,1248 ****
mWaitIsDummy = FALSE;
! call Leds.redToggle();
err = sendTuple(pq, &mEnqResult, &pending);
--- 1262,1266 ----
mWaitIsDummy = FALSE;
! call Leds.redToggle(); //LEDS
err = sendTuple(pq, &mEnqResult, &pending);
***************
*** 1250,1255 ****
! //one shot queries may have finished scanning their buffer
! if (pq->fromBuffer != kNO_QUERY && pq->epochDuration == kONE_SHOT)
{
uint8_t fromBuf;
--- 1268,1274 ----
! //queries on static buffers (these are "one shot" queries) may have finished scanning their buffer
! //XXX pull this out into a special case routine
! if (pq->fromBuffer != kNO_QUERY && pq->epochDuration == kONE_SHOT) // XXX rename kONE_SHOT?
{
uint8_t fromBuf;
***************
*** 1257,1261 ****
//stop queries that have scanned the entire buffer
err = call DBBuffer.size(fromBuf, &size);
! if (err != err_NoError || pq->curResult++ >= size)
{
//if this is an event based query, stop the query running but
--- 1276,1280 ----
//stop queries that have scanned the entire buffer
err = call DBBuffer.size(fromBuf, &size);
! if (err != err_NoError || pq->curResult++ >= size) // XXX move this cursor logic to storage manager
{
//if this is an event based query, stop the query running but
***************
*** 1326,1329 ****
--- 1345,1349 ----
TinyDBError err = err_NoError;
+ //is the epoch long enough to warrant using random backoff?
if (pq->clocksPerSample > kMIN_SLEEP_CLOCKS_PER_SAMPLE) {
if (TOS_LOCAL_ADDRESS == 0)
***************
*** 1396,1400 ****
bool mustSample = FALSE;
bool didReset = FALSE;
! bool needsNoData = TRUE;
ParsedQueryPtr firstQ = NULL;
--- 1416,1420 ----
bool mustSample = FALSE;
bool didReset = FALSE;
! bool rootNeedsNoData = isRoot();
ParsedQueryPtr firstQ = NULL;
***************
*** 1402,1405 ****
--- 1422,1428 ----
while (qlh != NULL) {
//reset queries that just restarted
+ // XXX clocksPerSample is overloaded to tell us if the query is running
+ // test for beginning of epoch -- XXX this should be a function
+ // seems like this test could be wrong if this queries clockCount went negative
if ((**qlh).q.clocksPerSample > 0 && !(**qlh).q.needsData &&
#ifdef qSNOOZE
***************
*** 1412,1442 ****
)
{
-
-
didReset = TRUE;
!
if (!isRoot() || (**qlh).q.hasAgg) {
! (**qlh).q.needsData = TRUE;
}
//else (**qlh).q.clockCount = 0;
}
! if ((**qlh).q.hasAgg) {
! needsNoData = FALSE;
}
! if ((**qlh).q.running && (**qlh).q.clocksPerSample > 0) {
!
if (didReset) {
didReset = FALSE;
mLastHeard ++; //increment count since we last heard something
! call Leds.yellowToggle();
! //call Leds.redToggle();
! //call UartDebugger.writeLine("epoch");
! #ifdef HAS_ROUTECONTROL
! // force a route update every epoch if we are not sleeping
! // if ((**qlh).q.clocksPerSample <= WAKING_CLOCKS)
! // call RouteControl.manualUpdate();
! #endif
if ((**qlh).q.markedForDeletion > 0) {
if (--(**qlh).q.markedForDeletion == 0) { //delete the query
--- 1435,1463 ----
)
{
didReset = TRUE;
! //common case -- all nodes except the root acquire data
! //special case for root: shouldn't acquire data, except if running an agg query,
! // since we need to make sure we deliver merged aggregate results
if (!isRoot() || (**qlh).q.hasAgg) {
! (**qlh).q.needsData = TRUE; //XXX needsData is being used to control
! // both acquistion and delivery of results
! // delivery can only happen if none of the control
! // flags are set
}
//else (**qlh).q.clockCount = 0;
}
! if (isRoot() && (**qlh).q.hasAgg) {
! rootNeedsNoData = FALSE;
}
!
!
! if ((**qlh).q.running && (**qlh).q.clocksPerSample > 0) { //XXX confusing -- running flag vs clocksPerSample
! //initialize epoch XXX should be a routine of its own
if (didReset) {
didReset = FALSE;
mLastHeard ++; //increment count since we last heard something
! call Leds.yellowToggle(); //LEDS
if ((**qlh).q.markedForDeletion > 0) {
if (--(**qlh).q.markedForDeletion == 0) { //delete the query
***************
*** 1444,1450 ****
--- 1465,1473 ----
call QueryProcessor.removeQuery((**qlh).q.qid, TRUE);
} else {
+ // XXX += seems like it could make us not detect the start of the epoch
(**qlh).q.clockCount += (**qlh).q.clocksPerSample; // just reschedule this query...
}
} else {
+ //not deleting, handle the common case
//only actually process local tuples if we're not the root.
#ifndef ROOT_SAMPLE
***************
*** 1456,1460 ****
firstQ = &(**qlh).q;
}
! (**qlh).q.currentEpoch++;
}
}
--- 1479,1483 ----
firstQ = &(**qlh).q;
}
! (**qlh).q.currentEpoch++;
}
}
***************
*** 1464,1467 ****
--- 1487,1492 ----
if (!call QueryProcessor.allQueriesSameRate() || (**qlh).q.clocksPerSample <= WAKING_CLOCKS) {
#endif
+ // XXX fix this with beginning of epoch
+ // this caused us to lose some time -- we've drifted relative to other nodes
if ((**qlh).q.clockCount <= 0) {
(**qlh).q.clockCount = (**qlh).q.clocksPerSample;
***************
*** 1476,1482 ****
mClockCount = 0;
! if (call QueryProcessor.getQueryList() != NULL) {
! qlh = call QueryProcessor.getQueryList();
! if (isRoot() && needsNoData && (**qlh).q.clocksPerSample > kMIN_SLEEP_CLOCKS_PER_SAMPLE) {
mWaitIsDummy = TRUE;
dbg(DBG_USR1,"DUMMY WAIT\n");
--- 1501,1509 ----
mClockCount = 0;
!
! //schedule a dummy wait for the root -- move to separate routine
! if (rootNeedsNoData && call QueryProcessor.getQueryList() != NULL) {
! qlh = call QueryProcessor.getQueryList(); //get the first query
! if ((**qlh).q.clocksPerSample > kMIN_SLEEP_CLOCKS_PER_SAMPLE) {
mWaitIsDummy = TRUE;
dbg(DBG_USR1,"DUMMY WAIT\n");
***************
*** 1498,1502 ****
mNumAttrs = firstQ->numFields;
SET_STARTING_ATTRIBUTE();
! startQueryAttrs(firstQ);
}
// fetchNextAttr();
--- 1525,1529 ----
mNumAttrs = firstQ->numFields;
SET_STARTING_ATTRIBUTE();
! startQueryAttrs(firstQ); ///XXX BUG this will only work for one query
}
// fetchNextAttr();
***************
*** 1860,1863 ****
--- 1887,1893 ----
Expr *e = nextExpr(q);
+ //ASSERT -- the parser needs to have set the order of expressions appropriately
+ // e.g., a safe order is WHERE (kSEL), SELECT (kAGG)), HAVING
+
if (e != NULL) { //assume expressions are listed in the order
e->success = TRUE; //they should be executed! (e.g. selections before aggs)
***************
*** 1994,2000 ****
--- 2024,2032 ----
#ifndef PLATFORM_PC
+ #ifdef USE_WATCHDOG
if (mSendFailed > MAX_FAILURES)
wdt_enable(WDTO_15MS); //watchdog reset
#endif
+ #endif
*pending = FALSE;
More information about the Tinyos-commits
mailing list