--- a/src/simulator/multithreaded-simulator-impl.cc Fri Aug 14 15:05:09 2009 +0200
+++ b/src/simulator/multithreaded-simulator-impl.cc Fri Aug 14 15:05:09 2009 +0200
@@ -278,80 +278,36 @@
return 0;
}
-/* Returns whether thread should continue */
-bool
-MultiThreadedSimulatorImpl::ProcessBarrier (struct ThreadContext *context)
-{
- /* Check termination conditions */
- if (AtomicGet (&m_nStoppedPartitions) == m_nPartitions + 1 ||
- AtomicGet (&m_nEvents) == 0)
- {
- return false;
- }
- context->runnable.clear ();
- context->maxTs = GetMaximumSimulationTs ();
- /* Process events of runnable partitions until maxTs */
- for (std::list<MultiThreadingPartition *>::iterator it = context->partitions.begin (); it != context->partitions.end (); ++it)
- {
- MultiThreadingPartition *partition = (*it);
- while (!partition->eventMessages.empty ())
- {
- struct MultiThreadingPartition::EventMessage message = partition->eventMessages.front ();
- partition->eventMessages.pop ();
- partition->nMessages--;
- NS_ASSERT (message.event != 0);
- Scheduler::Event ev;
- ev.impl = message.event;
- ev.key.m_ts = message.timestamp;
- ev.key.m_context = partition->id;
- ev.key.m_uid = partition->uid;
- partition->uid++;
- partition->events->Insert (ev);
- }
- if (!partition->events->IsEmpty ())
- {
- context->maxTs = std::min (context->maxTs, NextPartitionEventTs (partition) + partition->minDelay);
- }
- }
- return true;
-}
-
void
MultiThreadedSimulatorImpl::DoRunThread (struct ThreadContext *context)
{
NS_LOG_FUNCTION (this << context);
uint32_t nExecutedByThread = 0;
CLOCK_DEBUGGING_INIT ();
- while (true)
+ while (AtomicGet (&m_nStoppedPartitions) < m_nPartitions + 1 &&
+ AtomicGet (&m_nEvents) != 0)
{
CLOCK_DEBUGGING_NOW (&start);
/* Try to get the master lock */
if (context->id == 0)
{
- pthread_barrier_wait (&m_barrier);
- CLOCK_DEBUGGING_NOW (&start2);
- /* Process the first barrier (process messages, compute thread local maxTs) */
- if (!ProcessBarrier (context))
- {
- NS_LOG_INFO ("Thread " << context->id << " processed " << nExecutedByThread << " events.");
- CLOCK_DEBUGGING_PRINT ();
- return;
- }
- CLOCK_DEBUGGING_NOW (&end2);
- CLOCK_DEBUGGING_DIFF_AND_ACCUMULATE(&barriers_actual, &diff, &start2, &end2);
- pthread_barrier_wait (&m_barrier);
CLOCK_DEBUGGING_NOW (&start2);
/* Compute global maxTs */
m_maxTs = GetMaximumSimulationTs ();
- for (uint32_t i = 0; i < m_threadsCount; ++i)
+ for (MultiThreadingPartitions::iterator it = m_partitions.begin (); it != m_partitions.end (); ++it)
{
- m_maxTs = std::min (m_maxTs, m_threads[i].maxTs);
+ MultiThreadingPartition *partition = it->second;
+ m_maxTs = std::min (m_maxTs, partition->minMessageTs + partition->minDelay);
+ if (!partition->events->IsEmpty ())
+ {
+ m_maxTs = std::min (m_maxTs, NextPartitionEventTs (partition) + partition->minDelay);
+ }
}
uint32_t threadId = 0;
for (MultiThreadingPartitions::iterator it = m_partitions.begin (); it != m_partitions.end (); ++it)
{
MultiThreadingPartition *partition = it->second;
- if (!partition->events->IsEmpty () && NextPartitionEventTs (partition) < m_maxTs)
+ if (partition->minMessageTs < m_maxTs || (!partition->events->IsEmpty () && (NextPartitionEventTs (partition) < m_maxTs)))
{
m_threads[threadId].runnable.push_back (partition);
threadId = (threadId + 1) % m_threadsCount;
@@ -376,32 +332,35 @@
CLOCK_DEBUGGING_NOW (&end2);
CLOCK_DEBUGGING_DIFF_AND_ACCUMULATE(&barriers_actual, &diff, &start2, &end2);
NS_LOG_INFO ("New iteration.");
- pthread_barrier_wait (&m_barrier);
}
- else
- {
- pthread_barrier_wait (&m_barrier);
- CLOCK_DEBUGGING_NOW (&start2);
- /* Process first barrier (see master thread case for details) */
- if (!ProcessBarrier (context))
- {
- NS_LOG_INFO ("Thread " << context->id << " processed " << nExecutedByThread << " events.");
- CLOCK_DEBUGGING_PRINT ();
- return;
- }
- CLOCK_DEBUGGING_NOW (&end2);
- CLOCK_DEBUGGING_DIFF_AND_ACCUMULATE(&barriers_actual, &diff, &start2, &end2);
- pthread_barrier_wait (&m_barrier);
- pthread_barrier_wait (&m_barrier);
- }
+
+ pthread_barrier_wait (&m_barrier);
+
CLOCK_DEBUGGING_NOW (&end);
CLOCK_DEBUGGING_DIFF_AND_ACCUMULATE(&barriers, &diff, &start, &end);
uint32_t nExecutedAtIteration = 0;
CLOCK_DEBUGGING_NOW (&start);
- /* Process events of runnable partitions until maxTs */
+ /* Process events and messages of runnable partitions until maxTs */
for (std::list<MultiThreadingPartition *>::iterator it = context->runnable.begin (); it != context->runnable.end (); ++it)
{
MultiThreadingPartition *partition = (*it);
+ pthread_mutex_lock (&partition->eventMessagesLock);
+ partition->minMessageTs = GetMaximumSimulationTs ();
+ while (!partition->eventMessages.empty ())
+ {
+ struct MultiThreadingPartition::EventMessage message = partition->eventMessages.front ();
+ partition->eventMessages.pop ();
+ partition->nMessages--;
+ NS_ASSERT (message.event != 0);
+ Scheduler::Event ev;
+ ev.impl = message.event;
+ ev.key.m_ts = message.timestamp;
+ ev.key.m_context = partition->id;
+ ev.key.m_uid = partition->uid;
+ partition->uid++;
+ partition->events->Insert (ev);
+ }
+ pthread_mutex_unlock (&partition->eventMessagesLock);
g_currentPartition = partition;
uint32_t nExecuted = RunPartitionUntil (partition, m_maxTs);
AtomicExchangeAndAdd (&m_nEvents, -nExecuted);
@@ -409,10 +368,15 @@
nExecutedAtIteration += nExecuted;
g_currentPartition = 0;
}
+ context->runnable.clear ();
CLOCK_DEBUGGING_NOW (&end);
CLOCK_DEBUGGING_DIFF_AND_ACCUMULATE(&events, &diff, &start, &end);
NS_LOG_INFO ("Thread " << context->id << " processed " << nExecutedAtIteration << " events at current iteration.");
+
+ pthread_barrier_wait (&m_barrier);
}
+ NS_LOG_INFO ("Thread " << context->id << " processed " << nExecutedByThread << " events.");
+ CLOCK_DEBUGGING_PRINT ();
}
void
@@ -525,6 +489,7 @@
struct MultiThreadingPartition::EventMessage message;
message.from = from;
message.timestamp = timestamp;
+ partition->minMessageTs = std::min (partition->minMessageTs, timestamp);
message.event = event;
partition->nMessages++;
partition->eventMessages.push (message);
--- a/src/simulator/multithreaded-simulator-impl.h Fri Aug 14 15:05:09 2009 +0200
+++ b/src/simulator/multithreaded-simulator-impl.h Fri Aug 14 15:05:09 2009 +0200
@@ -81,7 +81,6 @@
void PushEventMessage (MultiThreadingPartition *partition, uint32_t from, uint64_t timestamp, EventImpl *event);
struct ThreadContext {
uint32_t id;
- uint64_t maxTs;
pthread_t thread;
MultiThreadedSimulatorImpl *simulator;
std::list<MultiThreadingPartition *> partitions;
@@ -97,7 +96,6 @@
typedef std::queue<struct InitialEvent> InitialEvents;
typedef std::map<int, MultiThreadingPartition *> MultiThreadingPartitions;
- bool ProcessBarrier (struct ThreadContext *context);
static void* RunThread (void *data);
uint64_t GetMaximumSimulationTs (void) const;
void DoRunThread (struct ThreadContext *context);
--- a/src/simulator/multithreading-partition.h Fri Aug 14 15:05:09 2009 +0200
+++ b/src/simulator/multithreading-partition.h Fri Aug 14 15:05:09 2009 +0200
@@ -51,6 +51,7 @@
uint32_t id;
uint64_t minDelay;
+ uint64_t minMessageTs;
pthread_mutex_t eventMessagesLock;
EventMessages eventMessages;
uint32_t nMessages;