flatly compute maxTs
authorGuillaume Seguin <guillaume@segu.in>
Fri, 14 Aug 2009 15:05:09 +0200
changeset 4751 38d2ff762cf6
parent 4750 0a243100f50e
child 4752 d1fcd8777eac
flatly compute maxTs
src/simulator/multithreaded-simulator-impl.cc
src/simulator/multithreaded-simulator-impl.h
src/simulator/multithreading-partition.h
--- a/src/simulator/multithreaded-simulator-impl.cc	Fri Aug 14 15:05:09 2009 +0200
+++ b/src/simulator/multithreaded-simulator-impl.cc	Fri Aug 14 15:05:09 2009 +0200
@@ -278,80 +278,36 @@
   return 0;
 }
 
-/* Returns whether thread should continue */
-bool
-MultiThreadedSimulatorImpl::ProcessBarrier (struct ThreadContext *context)
-{
-  /* Check termination conditions */
-  if (AtomicGet (&m_nStoppedPartitions) == m_nPartitions + 1 ||
-      AtomicGet (&m_nEvents) == 0)
-    {
-      return false;
-    }
-  context->runnable.clear ();
-  context->maxTs = GetMaximumSimulationTs ();
-  /* Process events of runnable partitions until maxTs */
-  for (std::list<MultiThreadingPartition *>::iterator it = context->partitions.begin (); it != context->partitions.end (); ++it)
-    {
-      MultiThreadingPartition *partition = (*it);
-      while (!partition->eventMessages.empty ())
-        {
-          struct MultiThreadingPartition::EventMessage message = partition->eventMessages.front ();
-          partition->eventMessages.pop ();
-          partition->nMessages--;
-          NS_ASSERT (message.event != 0);
-          Scheduler::Event ev;
-          ev.impl = message.event;
-          ev.key.m_ts = message.timestamp;
-          ev.key.m_context = partition->id;
-          ev.key.m_uid = partition->uid;
-          partition->uid++;
-          partition->events->Insert (ev);
-        }
-      if (!partition->events->IsEmpty ())
-        {
-          context->maxTs = std::min (context->maxTs, NextPartitionEventTs (partition) + partition->minDelay);
-        }
-    }
-  return true;
-}
-
 void
 MultiThreadedSimulatorImpl::DoRunThread (struct ThreadContext *context)
 {
   NS_LOG_FUNCTION (this << context);
   uint32_t nExecutedByThread = 0;
   CLOCK_DEBUGGING_INIT ();
-  while (true)
+  while (AtomicGet (&m_nStoppedPartitions) < m_nPartitions + 1 &&
+         AtomicGet (&m_nEvents) != 0)
     {
       CLOCK_DEBUGGING_NOW (&start);
       /* Try to get the master lock */
       if (context->id == 0)
         {
-          pthread_barrier_wait (&m_barrier);
-          CLOCK_DEBUGGING_NOW (&start2);
-          /* Process the first barrier (process messages, compute thread local maxTs) */
-          if (!ProcessBarrier (context))
-            {
-              NS_LOG_INFO ("Thread " << context->id << " processed " << nExecutedByThread << " events.");
-              CLOCK_DEBUGGING_PRINT ();
-              return;
-            }
-          CLOCK_DEBUGGING_NOW (&end2);
-          CLOCK_DEBUGGING_DIFF_AND_ACCUMULATE(&barriers_actual, &diff, &start2, &end2);
-          pthread_barrier_wait (&m_barrier);
           CLOCK_DEBUGGING_NOW (&start2);
           /* Compute global maxTs */
           m_maxTs = GetMaximumSimulationTs ();
-          for (uint32_t i = 0; i < m_threadsCount; ++i)
+          for (MultiThreadingPartitions::iterator it = m_partitions.begin (); it != m_partitions.end (); ++it)
             {
-              m_maxTs = std::min (m_maxTs, m_threads[i].maxTs);
+              MultiThreadingPartition *partition = it->second;
+              m_maxTs = std::min (m_maxTs, partition->minMessageTs + partition->minDelay);
+              if (!partition->events->IsEmpty ())
+                {
+                  m_maxTs = std::min (m_maxTs, NextPartitionEventTs (partition) + partition->minDelay);
+                }
             }
           uint32_t threadId = 0;
           for (MultiThreadingPartitions::iterator it = m_partitions.begin (); it != m_partitions.end (); ++it)
             {
               MultiThreadingPartition *partition = it->second;
-              if (!partition->events->IsEmpty () && NextPartitionEventTs (partition) < m_maxTs)
+              if (partition->minMessageTs < m_maxTs || (!partition->events->IsEmpty () && (NextPartitionEventTs (partition) < m_maxTs)))
                 {
                   m_threads[threadId].runnable.push_back (partition);
                   threadId = (threadId + 1) % m_threadsCount;
@@ -376,32 +332,35 @@
           CLOCK_DEBUGGING_NOW (&end2);
           CLOCK_DEBUGGING_DIFF_AND_ACCUMULATE(&barriers_actual, &diff, &start2, &end2);
           NS_LOG_INFO ("New iteration.");
-          pthread_barrier_wait (&m_barrier);
         }
-      else
-        {
-          pthread_barrier_wait (&m_barrier);
-          CLOCK_DEBUGGING_NOW (&start2);
-          /* Process first barrier (see master thread case for details) */
-          if (!ProcessBarrier (context))
-            {
-              NS_LOG_INFO ("Thread " << context->id << " processed " << nExecutedByThread << " events.");
-              CLOCK_DEBUGGING_PRINT ();
-              return;
-            }
-          CLOCK_DEBUGGING_NOW (&end2);
-          CLOCK_DEBUGGING_DIFF_AND_ACCUMULATE(&barriers_actual, &diff, &start2, &end2);
-          pthread_barrier_wait (&m_barrier);
-          pthread_barrier_wait (&m_barrier);
-        }
+
+      pthread_barrier_wait (&m_barrier);
+
       CLOCK_DEBUGGING_NOW (&end);
       CLOCK_DEBUGGING_DIFF_AND_ACCUMULATE(&barriers, &diff, &start, &end);
       uint32_t nExecutedAtIteration = 0;
       CLOCK_DEBUGGING_NOW (&start);
-      /* Process events of runnable partitions until maxTs */
+      /* Process events and messages of runnable partitions until maxTs */
       for (std::list<MultiThreadingPartition *>::iterator it = context->runnable.begin (); it != context->runnable.end (); ++it)
         {
           MultiThreadingPartition *partition = (*it);
+          pthread_mutex_lock (&partition->eventMessagesLock);
+          partition->minMessageTs = GetMaximumSimulationTs ();
+          while (!partition->eventMessages.empty ())
+            {
+              struct MultiThreadingPartition::EventMessage message = partition->eventMessages.front ();
+              partition->eventMessages.pop ();
+              partition->nMessages--;
+              NS_ASSERT (message.event != 0);
+              Scheduler::Event ev;
+              ev.impl = message.event;
+              ev.key.m_ts = message.timestamp;
+              ev.key.m_context = partition->id;
+              ev.key.m_uid = partition->uid;
+              partition->uid++;
+              partition->events->Insert (ev);
+            }
+          pthread_mutex_unlock (&partition->eventMessagesLock);
           g_currentPartition = partition;
           uint32_t nExecuted = RunPartitionUntil (partition, m_maxTs);
           AtomicExchangeAndAdd (&m_nEvents, -nExecuted);
@@ -409,10 +368,15 @@
           nExecutedAtIteration += nExecuted;
           g_currentPartition = 0;
         }
+      context->runnable.clear ();
       CLOCK_DEBUGGING_NOW (&end);
       CLOCK_DEBUGGING_DIFF_AND_ACCUMULATE(&events, &diff, &start, &end);
       NS_LOG_INFO ("Thread " << context->id << " processed " << nExecutedAtIteration << " events at current iteration.");
+
+      pthread_barrier_wait (&m_barrier);
     }
+  NS_LOG_INFO ("Thread " << context->id << " processed " << nExecutedByThread << " events.");
+  CLOCK_DEBUGGING_PRINT ();
 }
 
 void
@@ -525,6 +489,7 @@
   struct MultiThreadingPartition::EventMessage message;
   message.from = from;
   message.timestamp = timestamp;
+  partition->minMessageTs = std::min (partition->minMessageTs, timestamp);
   message.event = event;
   partition->nMessages++;
   partition->eventMessages.push (message);
--- a/src/simulator/multithreaded-simulator-impl.h	Fri Aug 14 15:05:09 2009 +0200
+++ b/src/simulator/multithreaded-simulator-impl.h	Fri Aug 14 15:05:09 2009 +0200
@@ -81,7 +81,6 @@
   void PushEventMessage (MultiThreadingPartition *partition, uint32_t from, uint64_t timestamp, EventImpl *event);
   struct ThreadContext {
     uint32_t id;
-    uint64_t maxTs;
     pthread_t thread;
     MultiThreadedSimulatorImpl *simulator;
     std::list<MultiThreadingPartition *> partitions;
@@ -97,7 +96,6 @@
   typedef std::queue<struct InitialEvent> InitialEvents;
   typedef std::map<int, MultiThreadingPartition *> MultiThreadingPartitions;
 
-  bool ProcessBarrier (struct ThreadContext *context);
   static void* RunThread (void *data);
   uint64_t GetMaximumSimulationTs (void) const;
   void DoRunThread (struct ThreadContext *context);
--- a/src/simulator/multithreading-partition.h	Fri Aug 14 15:05:09 2009 +0200
+++ b/src/simulator/multithreading-partition.h	Fri Aug 14 15:05:09 2009 +0200
@@ -51,6 +51,7 @@
   uint32_t id;
   uint64_t minDelay;
 
+  uint64_t minMessageTs;
   pthread_mutex_t eventMessagesLock;
   EventMessages eventMessages;
   uint32_t nMessages;