--- a/src/helper/multithreading-helper.cc Fri Aug 14 15:05:09 2009 +0200
+++ b/src/helper/multithreading-helper.cc Fri Aug 14 15:05:09 2009 +0200
@@ -21,9 +21,12 @@
#include "multithreading-helper.h"
#include "ns3/multithreaded-simulator-impl.h"
-#include "ns3/multithreading-node-partition.h"
#include "ns3/simulator.h"
#include "ns3/node-list.h"
+#include "ns3/net-device.h"
+#include "ns3/channel.h"
+
+#include <algorithm>
namespace ns3 {
@@ -45,11 +48,47 @@
{
if (m_enableMultiThreading)
{
+ pthread_mutexattr_t lock_attr;
+ pthread_mutexattr_init (&lock_attr);
for (NodeList::Iterator i = NodeList::Begin (); i != NodeList::End (); ++i)
{
- Create<MultiThreadingNodePartition> (m_simulator, *i);
+ Ptr<Node> node = (*i);
+ MultiThreadingPartition *partition = new MultiThreadingPartition;
+ partition->id = node->GetId ();
+ partition->nMessages = 0;
+ partition->events = 0;
+ partition->currentUid = 0;
+ // uids are allocated from 4.
+ // uid 0 is "invalid" events
+ // uid 1 is "now" events
+ // uid 2 is "destroy" events
+ partition->uid = 4;
+ partition->currentTs = 0;
+ partition->minDelay = GetMinDelay (node);
+ pthread_mutex_init (&partition->eventMessagesLock, &lock_attr);
+ m_simulator->AddPartition (partition->id, partition);
}
}
}
+uint64_t
+MultiThreadingHelper::GetMinDelay (Ptr<Node> node) const
+{
+ int srcDevCount = node->GetNDevices ();
+ uint64_t minDelay = Simulator::GetMaximumSimulationTime ().GetTimeStep ();
+ for (int srcDevIndex = 0; srcDevIndex < srcDevCount; srcDevIndex++)
+ {
+ Ptr<NetDevice> srcDevice = node->GetDevice (srcDevIndex);
+ Ptr<Channel> channel = srcDevice->GetChannel ();
+ if (channel == 0)
+ {
+ continue;
+ }
+ uint64_t delay = channel->GetMinDelay (srcDevice).GetTimeStep ();
+ NS_ASSERT (delay != 0);
+ minDelay = std::min (minDelay, delay);
+ }
+ return minDelay;
+}
+
} // namespace ns3
--- a/src/helper/multithreading-helper.h Fri Aug 14 15:05:09 2009 +0200
+++ b/src/helper/multithreading-helper.h Fri Aug 14 15:05:09 2009 +0200
@@ -21,6 +21,7 @@
#ifndef MULTI_THREADING_HELPER_H
#define MULTI_THREADING_HELPER_H
+#include "ns3/node.h"
#include "ns3/ptr.h"
#include <string>
@@ -48,6 +49,7 @@
void Install (void) const;
private:
+ uint64_t GetMinDelay (Ptr<Node> node) const;
bool m_enableMultiThreading;
Ptr<MultiThreadedSimulatorImpl> m_simulator;
};
--- a/src/simulator/multithreaded-simulator-impl.cc Fri Aug 14 15:05:09 2009 +0200
+++ b/src/simulator/multithreaded-simulator-impl.cc Fri Aug 14 15:05:09 2009 +0200
@@ -94,6 +94,15 @@
NS_OBJECT_ENSURE_REGISTERED (MultiThreadedSimulatorImpl);
+/*
+ * Helper function macro-like which peeks the next event timestamp of a
+ * given partition
+ */
+inline uint64_t
+NextPartitionEventTs (MultiThreadingPartition *partition)
+{
+ return partition->events->PeekNext ().key.m_ts;
+}
static __thread MultiThreadingPartition *g_currentPartition = 0;
@@ -156,18 +165,36 @@
}
for (MultiThreadingPartitions::const_iterator i = m_partitions.begin (); i != m_partitions.end (); ++i)
{
- (*i).second->Unref ();
+ MultiThreadingPartition *partition = i->second;
+ delete partition;
}
m_partitions.clear ();
}
void
+MultiThreadedSimulatorImpl::SetPartitionScheduler (MultiThreadingPartition *partition, ObjectFactory schedulerFactory)
+{
+ Ptr<Scheduler> scheduler = schedulerFactory.Create<Scheduler> ();
+
+ if (partition->events != 0)
+ {
+ while (!partition->events->IsEmpty ())
+ {
+ Scheduler::Event next = partition->events->RemoveNext ();
+ scheduler->Insert (next);
+ }
+ partition->events->Unref ();
+ }
+ partition->events = GetPointer (scheduler);
+}
+
+void
MultiThreadedSimulatorImpl::SetScheduler (ObjectFactory schedulerFactory)
{
m_schedulerFactory = schedulerFactory;
- for (MultiThreadingPartitions::const_iterator it = m_partitions.begin(); it != m_partitions.end(); ++it)
+ for (MultiThreadingPartitions::const_iterator it = m_partitions.begin (); it != m_partitions.end (); ++it)
{
- it->second->SetScheduler (schedulerFactory);
+ SetPartitionScheduler (it->second, schedulerFactory);
}
Ptr<Scheduler> scheduler = schedulerFactory.Create<Scheduler> ();
if (m_globalEvents != 0)
@@ -187,7 +214,8 @@
{
for (MultiThreadingPartitions::const_iterator it = m_partitions.begin(); it != m_partitions.end(); ++it)
{
- if (!it->second->IsFinished ())
+ MultiThreadingPartition *partition = it->second;
+ if (!partition->events->IsEmpty () || !partition->eventMessages.empty ())
{
return false;
}
@@ -201,14 +229,45 @@
uint64_t nextTs = GetMaximumSimulationTs ();
for (MultiThreadingPartitions::const_iterator it = m_partitions.begin (); it != m_partitions.end (); ++it)
{
- if (!it->second->IsFinished ())
+ MultiThreadingPartition *partition = it->second;
+ if (!partition->events->IsEmpty ())
{
- nextTs = std::min (nextTs, it->second->NextTs ());
+ nextTs = std::min (nextTs, NextPartitionEventTs (partition));
}
}
return TimeStep (nextTs);
}
+inline uint32_t
+MultiThreadedSimulatorImpl::RunPartitionUntil (MultiThreadingPartition *partition, uint64_t maxTs)
+{
+ NS_LOG_DEBUG ("Processing events at partition " << partition->id << " going from " << partition->currentTs << " until " << maxTs);
+ uint32_t nExecuted = 0;
+ while (!partition->events->IsEmpty () && NextPartitionEventTs (partition) < maxTs)
+ {
+ ProcessPartitionEvent (partition);
+ nExecuted++;
+ }
+ return nExecuted;
+}
+
+inline void
+MultiThreadedSimulatorImpl::ProcessPartitionEvent (MultiThreadingPartition *partition)
+{
+ Scheduler::Event next = partition->events->RemoveNext ();
+
+ NS_LOG_DEBUG ("Process event at partition " << partition->id << " at " << partition->currentTs);
+
+ NS_ASSERT (next.key.m_ts >= partition->currentTs);
+ NS_ASSERT (next.key.m_context != 0xffffffff);
+
+ NS_LOG_LOGIC ("handle " << next.key.m_ts);
+ NS_LOG_DEBUG ("Clock " << partition->id << " going from " << partition->currentTs << " to " << next.key.m_ts << " (during event processing)");
+ partition->currentTs = next.key.m_ts;
+ partition->currentUid = next.key.m_uid;
+ next.impl->Invoke ();
+ next.impl->Unref ();
+}
void*
MultiThreadedSimulatorImpl::RunThread (void *data)
@@ -235,10 +294,23 @@
for (std::list<MultiThreadingPartition *>::iterator it = context->partitions.begin (); it != context->partitions.end (); ++it)
{
MultiThreadingPartition *partition = (*it);
- partition->ProcessMessages ();
- if (!partition->IsSchedulerEmpty ())
+ while (!partition->eventMessages.empty ())
{
- context->maxTs = std::min (context->maxTs, partition->NextTs () + partition->GetMinDelay ());
+ struct MultiThreadingPartition::EventMessage message = partition->eventMessages.front ();
+ partition->eventMessages.pop ();
+ partition->nMessages--;
+ NS_ASSERT (message.event != 0);
+ Scheduler::Event ev;
+ ev.impl = message.event;
+ ev.key.m_ts = message.timestamp;
+ ev.key.m_context = partition->id;
+ ev.key.m_uid = partition->uid;
+ partition->uid++;
+ partition->events->Insert (ev);
+ }
+ if (!partition->events->IsEmpty ())
+ {
+ context->maxTs = std::min (context->maxTs, NextPartitionEventTs (partition) + partition->minDelay);
}
}
return true;
@@ -279,7 +351,7 @@
for (MultiThreadingPartitions::iterator it = m_partitions.begin (); it != m_partitions.end (); ++it)
{
MultiThreadingPartition *partition = it->second;
- if (!partition->IsSchedulerEmpty () && partition->NextTs () < m_maxTs)
+ if (!partition->events->IsEmpty () && NextPartitionEventTs (partition) < m_maxTs)
{
m_threads[threadId].runnable.push_back (partition);
threadId = (threadId + 1) % m_threadsCount;
@@ -331,7 +403,7 @@
{
MultiThreadingPartition *partition = (*it);
g_currentPartition = partition;
- uint32_t nExecuted = partition->RunUntil (m_maxTs);
+ uint32_t nExecuted = RunPartitionUntil (partition, m_maxTs);
AtomicExchangeAndAdd (&m_nEvents, -nExecuted);
nExecutedByThread += nExecuted;
nExecutedAtIteration += nExecuted;
@@ -351,7 +423,14 @@
{
struct InitialEvent event = m_initialEvents.front ();
m_initialEvents.pop ();
- m_partitions[event.context]->ScheduleAbs (event.timestamp, event.event);
+ MultiThreadingPartition *partition = m_partitions[event.context];
+ Scheduler::Event ev;
+ ev.impl = event.event;
+ ev.key.m_ts = event.timestamp;
+ ev.key.m_context = partition->id;
+ ev.key.m_uid = partition->uid;
+ partition->uid++;
+ partition->events->Insert (ev);
}
m_nStoppedPartitions = 0;
m_nPartitions = (int) m_partitions.size ();
@@ -400,15 +479,16 @@
uint32_t nextPartition = -1;
for (MultiThreadingPartitions::const_iterator it = m_partitions.begin (); it != m_partitions.end (); ++it)
{
- if (!it->second->IsFinished () && it->second->NextTs () < nextTs)
+ MultiThreadingPartition *partition = it->second;
+ if (!partition->events->IsEmpty () && NextPartitionEventTs (partition) < nextTs)
{
- nextTs = it->second->NextTs ();
- nextPartition = it->second->GetPartitionId ();
+ nextTs = NextPartitionEventTs (partition);
+ nextPartition = partition->id;
}
}
if (nextPartition != (uint32_t) -1)
{
- m_partitions[nextPartition]->RunOneEvent ();
+ ProcessPartitionEvent (m_partitions[nextPartition]);
}
}
@@ -425,7 +505,8 @@
Simulator::Schedule (time, &MultiThreadedSimulatorImpl::StopOnePartition, this);
for (MultiThreadingPartitions::iterator it = m_partitions.begin(); it != m_partitions.end(); ++it)
{
- Simulator::ScheduleWithContext (it->second->GetPartitionId (), time,
+ MultiThreadingPartition *partition = it->second;
+ Simulator::ScheduleWithContext (partition->id, time,
&MultiThreadedSimulatorImpl::StopOnePartition, this);
}
}
@@ -436,6 +517,20 @@
AtomicExchangeAndAdd (&m_nStoppedPartitions, 1);
}
+
+void
+MultiThreadedSimulatorImpl::PushEventMessage (MultiThreadingPartition *partition, uint32_t from, uint64_t timestamp, EventImpl *event)
+{
+ pthread_mutex_lock (&partition->eventMessagesLock);
+ struct MultiThreadingPartition::EventMessage message;
+ message.from = from;
+ message.timestamp = timestamp;
+ message.event = event;
+ partition->nMessages++;
+ partition->eventMessages.push (message);
+ pthread_mutex_unlock (&partition->eventMessagesLock);
+}
+
//
// Schedule an event for a _relative_ time in the future.
//
@@ -460,7 +555,14 @@
}
else
{
- return g_currentPartition->ScheduleRel (time.GetTimeStep (), event);
+ Scheduler::Event ev;
+ ev.impl = event;
+ ev.key.m_ts = g_currentPartition->currentTs + time.GetTimeStep ();
+ ev.key.m_context = g_currentPartition->id;
+ ev.key.m_uid = g_currentPartition->uid;
+ g_currentPartition->uid++;
+ g_currentPartition->events->Insert (ev);
+ return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
}
}
@@ -493,7 +595,7 @@
{
MultiThreadingPartition *partition = it->second;
uint64_t timestamp = m_globalTs + (uint64_t) time.GetTimeStep ();
- partition->PushEventMessage (0xffffffff, timestamp, event);
+ PushEventMessage (partition, 0xffffffff, timestamp, event);
NS_LOG_INFO ("Scheduling event at " << timestamp << " with context " << context << " (from global event)");
}
/* Known partition and we have a current partition :
@@ -501,8 +603,8 @@
else
{
MultiThreadingPartition *partition = it->second;
- uint64_t timestamp = g_currentPartition->GetCurrentTs () + time.GetTimeStep ();
- partition->PushEventMessage (g_currentPartition->GetPartitionId (), timestamp, event);
+ uint64_t timestamp = g_currentPartition->currentTs + time.GetTimeStep ();
+ PushEventMessage (partition, g_currentPartition->id, timestamp, event);
NS_LOG_INFO ("Scheduling event at " << timestamp << " with context " << context);
}
}
@@ -528,14 +630,15 @@
{
if (g_currentPartition != 0)
{
- return g_currentPartition->Now ();
+ return TimeStep (g_currentPartition->currentTs);
}
else
{
uint64_t minTs = GetMaximumSimulationTs ();
for (MultiThreadingPartitions::const_iterator it = m_partitions.begin (); it != m_partitions.end (); ++it)
{
- minTs = std::min (minTs, it->second->GetCurrentTs ());
+ MultiThreadingPartition *partition = it->second;
+ minTs = std::min (minTs, partition->currentTs);
}
return TimeStep (minTs);
}
@@ -551,8 +654,8 @@
else
{
MultiThreadingPartition *partition = g_currentPartition;
- NS_ASSERT (partition->GetPartitionId () == id.GetContext ());
- return TimeStep (id.GetTs () - partition->GetCurrentTs ());
+ NS_ASSERT (partition->id == id.GetContext ());
+ return TimeStep (id.GetTs () - partition->currentTs);
}
}
@@ -577,8 +680,19 @@
else
{
MultiThreadingPartition *partition = g_currentPartition;
- NS_ASSERT (partition->GetPartitionId () == id.GetContext ());
- partition->Remove (id);
+ NS_ASSERT (partition->id == id.GetContext ());
+ if (IsExpired (id))
+ {
+ return;
+ }
+ Scheduler::Event event;
+ event.impl = id.PeekEventImpl ();
+ event.key.m_ts = id.GetTs ();
+ event.key.m_uid = id.GetUid ();
+ partition->events->Remove (event);
+ event.impl->Cancel ();
+ // whenever we remove an event from the event list, we have to unref it.
+ event.impl->Unref ();
}
}
@@ -628,7 +742,19 @@
{
MultiThreadingPartitions::const_iterator it = m_partitions.find (ev.GetContext ());
NS_ASSERT (it != m_partitions.end ());
- return it->second->IsExpired (ev);
+ MultiThreadingPartition *partition = it->second;
+ if (ev.PeekEventImpl () == 0 ||
+ ev.GetTs () < partition->currentTs ||
+ (ev.GetTs () == partition->currentTs &&
+ ev.GetUid () <= partition->currentUid) ||
+ ev.PeekEventImpl ()->IsCancelled ())
+ {
+ return true;
+ }
+ else
+ {
+ return false;
+ }
}
}
@@ -643,7 +769,7 @@
{
if (g_currentPartition != 0)
{
- return g_currentPartition->GetPartitionId ();
+ return g_currentPartition->id;
}
else
{
@@ -660,24 +786,10 @@
}
void
-MultiThreadedSimulatorImpl::AddPartition (uint32_t context, Ptr<MultiThreadingPartition> partition)
-{
- m_partitions[context] = GetPointer (partition);
- partition->SetScheduler (m_schedulerFactory);
-}
-
-MultiThreadingPartition *
-MultiThreadedSimulatorImpl::PeekPartition (uint32_t context) const
+MultiThreadedSimulatorImpl::AddPartition (uint32_t context, MultiThreadingPartition *partition)
{
- MultiThreadingPartitions::const_iterator it = m_partitions.find (context);
- if (it == m_partitions.end ())
- {
- return 0;
- }
- else
- {
- return it->second;
- }
+ m_partitions[context] = partition;
+ SetPartitionScheduler (partition, m_schedulerFactory);
}
} // namespace ns3
--- a/src/simulator/multithreaded-simulator-impl.h Fri Aug 14 15:05:09 2009 +0200
+++ b/src/simulator/multithreaded-simulator-impl.h Fri Aug 14 15:05:09 2009 +0200
@@ -37,6 +37,8 @@
namespace ns3 {
+inline uint64_t NextPartitionEventTs (MultiThreadingPartition *partition);
+
class MultiThreadedSimulatorImpl : public SimulatorImpl
{
public:
@@ -64,19 +66,19 @@
Time GetMaximumSimulationTime (void) const;
uint32_t GetContext (void) const;
void SetScheduler (ObjectFactory schedulerFactory);
+ void SetPartitionScheduler (MultiThreadingPartition *partition, ObjectFactory schedulerFactory);
/**
* Registers a partition to simulator, associated with the given context
*
* \param context Context of the given partition
* \param partition The partition to add
*/
- void AddPartition (uint32_t context, Ptr<MultiThreadingPartition> partition);
- /**
- * Gets the partition associated with the given context
- */
- MultiThreadingPartition *PeekPartition (uint32_t context) const;
+ void AddPartition (uint32_t context, MultiThreadingPartition *partition);
private:
+ inline void ProcessPartitionEvent (MultiThreadingPartition *partition);
+ inline uint32_t RunPartitionUntil (MultiThreadingPartition *partition, uint64_t maxTs);
+ void PushEventMessage (MultiThreadingPartition *partition, uint32_t from, uint64_t timestamp, EventImpl *event);
struct ThreadContext {
uint32_t id;
uint64_t maxTs;
--- a/src/simulator/multithreading-node-partition.cc Fri Aug 14 15:05:09 2009 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,76 +0,0 @@
-/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/*
- * Copyright (c) 2009 INRIA
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation;
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- * Author: Guillaume Seguin <guillaume@segu.in>
- */
-
-#include "multithreading-node-partition.h"
-#include "multithreaded-simulator-impl.h"
-
-#include "ns3/simulator.h"
-#include "ns3/net-device.h"
-#include "ns3/channel.h"
-
-#include <algorithm>
-
-NS_LOG_COMPONENT_DEFINE ("MultiThreadingNodePartition");
-
-namespace ns3 {
-
-NS_OBJECT_ENSURE_REGISTERED (MultiThreadingNodePartition);
-
-TypeId
-MultiThreadingNodePartition::GetTypeId (void)
-{
- static TypeId tid = TypeId ("ns3::MultiThreadingNodePartition")
- .SetParent<MultiThreadingPartition> ()
- ;
- return tid;
-}
-
-MultiThreadingNodePartition::MultiThreadingNodePartition (Ptr<MultiThreadedSimulatorImpl> simulator, Ptr<Node> node)
- : MultiThreadingPartition (simulator, node->GetId ())
-{
- m_minDelay = (uint64_t) Simulator::GetMaximumSimulationTime ().GetTimeStep ();
- DiscoverNeighborhood (node);
-}
-
-void
-MultiThreadingNodePartition::DiscoverNeighborhood (Ptr<Node> node)
-{
- int srcDevCount = node->GetNDevices ();
- for (int srcDevIndex = 0; srcDevIndex < srcDevCount; srcDevIndex++)
- {
- Ptr<NetDevice> srcDevice = node->GetDevice (srcDevIndex);
- Ptr<Channel> channel = srcDevice->GetChannel ();
- if (channel == 0)
- {
- continue;
- }
- uint64_t delay = channel->GetMinDelay (srcDevice).GetHighPrecision ().GetInteger ();
- NS_ASSERT (delay != 0);
- m_minDelay = std::min (m_minDelay, delay);
- }
-}
-
-uint64_t
-MultiThreadingNodePartition::GetMinDelay (void)
-{
- return m_minDelay;
-}
-
-} // namespace ns3
--- a/src/simulator/multithreading-node-partition.h Fri Aug 14 15:05:09 2009 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,64 +0,0 @@
-/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/*
- * Copyright (c) 2009 INRIA
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation;
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- * Author: Guillaume Seguin <guillaume@segu.in>
- */
-
-#ifndef MULTI_THREADING_NODE_PARTITION_H
-#define MULTI_THREADING_NODE_PARTITION_H
-
-#include "multithreading-partition.h"
-
-#include "ns3/node.h"
-#include "ns3/assert.h"
-#include "ns3/log.h"
-
-namespace ns3 {
-
-class MultiThreadedSimulatorImpl;
-
-/**
- * \brief Node-based multithreading partitions implementation
- *
- * This class implements a MultiThreadingPartition based on nodes.
- * Upon creation, it browses the neighboring network to find out neighbor
- * partitions.
- */
-class MultiThreadingNodePartition : public MultiThreadingPartition
-{
-public:
- static TypeId GetTypeId (void);
-
- MultiThreadingNodePartition (Ptr<MultiThreadedSimulatorImpl> simulator, Ptr<Node> node);
-
- /**
- * Inherited from MultiThreadingPartition
- */
- uint64_t GetMinDelay (void);
-
-private:
- /**
- * Browses neighboring partitions
- */
- void DiscoverNeighborhood (Ptr<Node> node);
-
- uint64_t m_minDelay;
-};
-
-} // namespace ns3
-
-#endif /* MULTI_THREADING_NODE_PARTITION_H */
--- a/src/simulator/multithreading-partition.cc Fri Aug 14 15:05:09 2009 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,265 +0,0 @@
-/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/*
- * Copyright (c) 2009 INRIA
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation;
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- * Author: Guillaume Seguin <guillaume@segu.in>
- */
-
-#include "multithreading-partition.h"
-#include "multithreaded-simulator-impl.h"
-
-#include "ns3/boolean.h"
-#include <algorithm>
-
-NS_LOG_COMPONENT_DEFINE ("MultiThreadingPartition");
-
-namespace ns3 {
-
-NS_OBJECT_ENSURE_REGISTERED (MultiThreadingPartition);
-
-TypeId
-MultiThreadingPartition::GetTypeId (void)
-{
- static TypeId tid = TypeId ("ns3::MultiThreadingPartition")
- ;
- return tid;
-}
-
-MultiThreadingPartition::MultiThreadingPartition (Ptr<MultiThreadedSimulatorImpl> simulator, uint32_t partitionId)
- : m_partitionId (partitionId),
- m_nMessages (0),
- m_simulator (PeekPointer (simulator)),
- m_events (0),
- m_currentUid (0),
- // uids are allocated from 4.
- // uid 0 is "invalid" events
- // uid 1 is "now" events
- // uid 2 is "destroy" events
- m_uid (4),
- m_currentTs (0)
-{
- m_simulator->AddPartition (partitionId, this);
- pthread_mutexattr_t lock_attr;
- pthread_mutexattr_init (&lock_attr);
- pthread_mutex_init (&m_eventMessagesLock, &lock_attr);
- pthread_mutex_init (&m_partitionLock, &lock_attr);
-}
-
-MultiThreadingPartition::~MultiThreadingPartition ()
-{
- pthread_mutex_destroy (&m_eventMessagesLock);
- pthread_mutex_destroy (&m_partitionLock);
- while (!m_events->IsEmpty ())
- {
- Scheduler::Event next = m_events->RemoveNext ();
- next.impl->Unref ();
- }
- m_events->Unref ();
- m_events = 0;
- m_simulator = 0;
-}
-
-MultiThreadingPartition *
-MultiThreadingPartition::PeekPartition (uint32_t partitionId)
-{
- return m_simulator->PeekPartition (partitionId);
-}
-
-void
-MultiThreadingPartition::SetScheduler (ObjectFactory schedulerFactory)
-{
- Ptr<Scheduler> scheduler = schedulerFactory.Create<Scheduler> ();
-
- if (m_events != 0)
- {
- while (!m_events->IsEmpty ())
- {
- Scheduler::Event next = m_events->RemoveNext ();
- scheduler->Insert (next);
- }
- m_events->Unref ();
- }
- m_events = GetPointer (scheduler);
-}
-
-void
-MultiThreadingPartition::PushEventMessage (uint32_t from, uint64_t timestamp, EventImpl *event)
-{
- pthread_mutex_lock (&m_eventMessagesLock);
- struct EventMessage message;
- message.from = from;
- message.timestamp = timestamp;
- message.event = event;
- m_nMessages++;
- m_eventMessages.push (message);
- pthread_mutex_unlock (&m_eventMessagesLock);
-}
-
-void
-MultiThreadingPartition::Remove (const EventId &id)
-{
- if (IsExpired (id))
- {
- return;
- }
- Scheduler::Event event;
- event.impl = id.PeekEventImpl ();
- event.key.m_ts = id.GetTs ();
- event.key.m_uid = id.GetUid ();
- m_events->Remove (event);
- event.impl->Cancel ();
- // whenever we remove an event from the event list, we have to unref it.
- event.impl->Unref ();
-}
-
-
-void
-MultiThreadingPartition::ProcessOneEvent (void)
-{
- Scheduler::Event next = m_events->RemoveNext ();
-
- NS_LOG_DEBUG ("Process event at partition " << GetPartitionId () << " at " << m_currentTs);
-
- NS_ASSERT (next.key.m_ts >= m_currentTs);
- NS_ASSERT (next.key.m_context != 0xffffffff);
-
- NS_LOG_LOGIC ("handle " << next.key.m_ts);
- NS_LOG_DEBUG ("Clock " << GetPartitionId () << " going from " << m_currentTs << " to " << next.key.m_ts << " (during event processing)");
- m_currentTs = next.key.m_ts;
- m_currentUid = next.key.m_uid;
- next.impl->Invoke ();
- next.impl->Unref ();
-}
-
-void
-MultiThreadingPartition::ProcessMessages (void)
-{
- while (!m_eventMessages.empty ())
- {
- struct EventMessage message = m_eventMessages.front ();
- m_eventMessages.pop ();
- m_nMessages--;
- NS_ASSERT (message.event != 0);
- Scheduler::Event ev;
- ev.impl = message.event;
- ev.key.m_ts = message.timestamp;
- ev.key.m_context = m_partitionId;
- ev.key.m_uid = m_uid;
- m_uid++;
- m_events->Insert (ev);
- }
-}
-
-uint32_t
-MultiThreadingPartition::RunUntil (uint64_t maxTs)
-{
- NS_LOG_DEBUG ("Processing events at partition " << GetPartitionId () << " going from " << m_currentTs << " until " << maxTs);
- uint32_t nExecuted = 0;
- while (!m_events->IsEmpty () && NextTs () < maxTs)
- {
- ProcessOneEvent ();
- nExecuted++;
- }
- return nExecuted;
-}
-
-void
-MultiThreadingPartition::RunOneEvent (void)
-{
- ProcessOneEvent ();
-}
-
-Time
-MultiThreadingPartition::Now (void) const
-{
- return TimeStep (m_currentTs);
-}
-
-bool
-MultiThreadingPartition::IsFinished (void) const
-{
- return (m_events->IsEmpty () && m_eventMessages.empty ());
-}
-
-bool
-MultiThreadingPartition::IsSchedulerEmpty (void) const
-{
- return m_events->IsEmpty ();
-}
-
-uint64_t
-MultiThreadingPartition::NextTs (void) const
-{
- NS_ASSERT (!m_events->IsEmpty ());
- Scheduler::Event ev = m_events->PeekNext ();
- return ev.key.m_ts;
-}
-
-bool
-MultiThreadingPartition::IsExpired (const EventId &ev) const
-{
- if (ev.PeekEventImpl () == 0 ||
- ev.GetTs () < m_currentTs ||
- (ev.GetTs () == m_currentTs &&
- ev.GetUid () <= m_currentUid) ||
- ev.PeekEventImpl ()->IsCancelled ())
- {
- return true;
- }
- else
- {
- return false;
- }
-}
-
-uint32_t
-MultiThreadingPartition::GetPartitionId (void) const
-{
- return m_partitionId;
-}
-
-uint64_t
-MultiThreadingPartition::GetCurrentTs (void) const
-{
- return m_currentTs;
-}
-
-EventId
-MultiThreadingPartition::ScheduleRel (uint64_t timestamp, EventImpl *event)
-{
- Scheduler::Event ev;
- ev.impl = event;
- ev.key.m_ts = m_currentTs + timestamp;
- ev.key.m_context = m_partitionId;
- ev.key.m_uid = m_uid;
- m_uid++;
- m_events->Insert (ev);
- return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
-}
-
-void
-MultiThreadingPartition::ScheduleAbs (uint64_t timestamp, EventImpl *event)
-{
- Scheduler::Event ev;
- ev.impl = event;
- ev.key.m_ts = timestamp;
- ev.key.m_context = m_partitionId;
- ev.key.m_uid = m_uid;
- m_uid++;
- m_events->Insert (ev);
-}
-
-} // namespace ns3
--- a/src/simulator/multithreading-partition.h Fri Aug 14 15:05:09 2009 +0200
+++ b/src/simulator/multithreading-partition.h Fri Aug 14 15:05:09 2009 +0200
@@ -39,94 +39,8 @@
namespace ns3 {
-class MultiThreadedSimulatorImpl;
-
-/**
- * \bried Multithreading partition base class
- *
- * This class implements the interface of multithreading partitions.
- * A partition is a subset of the global network and is processed as is
- * in multithreaded simulations (that is, with a common list of events).
- */
-class MultiThreadingPartition : public RefCountBase<MultiThreadingPartition>
+struct MultiThreadingPartition
{
-public:
- static TypeId GetTypeId (void);
-
- MultiThreadingPartition (Ptr<MultiThreadedSimulatorImpl> simulator,
- uint32_t partitionId);
- virtual ~MultiThreadingPartition ();
-
- /**
- * Returns whether this partition has no more events or messages
- */
- bool IsFinished (void) const;
-
- /**
- * Returns the timestamp of next event
- * This function assumes (and asserts) there is such an event
- */
- uint64_t NextTs (void) const;
-
- /**
- * Pushes a messages to this partition :
- *
- * \param from Source partition, used for null messages
- * \param timestamp Source partition timestamp, used for null messages
- * \param event Event implementation, if this is an event message
- */
- void PushEventMessage (uint32_t from, uint64_t timestamp, EventImpl *event);
- void Remove (const EventId &ev);
- bool IsExpired (const EventId &ev) const;
-
- /**
- * Processes partition messages
- */
- void ProcessMessages (void);
-
- /**
- * Processes partition events until the given date.
- * This function does not processes messages.
- *
- * \returns The number of events processed
- */
- uint32_t RunUntil (uint64_t maxTs);
-
- void RunOneEvent (void);
-
- Time Now (void) const;
-
- void SetScheduler (ObjectFactory schedulerFactory);
-
- /**
- * \returns Returs partition unique id
- */
- uint32_t GetPartitionId (void) const;
-
- /**
- * \returns Returs partition current timestamp
- */
- uint64_t GetCurrentTs (void) const;
-
- /**
- * \returns Whether the scheduler is empty
- */
- bool IsSchedulerEmpty (void) const;
- EventId ScheduleRel (uint64_t timestamp, EventImpl *event);
- void ScheduleAbs (uint64_t timestamp, EventImpl *event);
-
- /**
- * Gets the minimum delay from other partitions to this one.
- *
- * This method must be implemented by subclasses.
- */
- virtual uint64_t GetMinDelay (void) = 0;
-protected:
- MultiThreadingPartition *PeekPartition (uint32_t partitionId);
-private:
-
- void ProcessOneEvent (void);
-
struct EventMessage {
uint32_t from;
uint64_t timestamp;
@@ -134,18 +48,17 @@
};
typedef std::queue<EventMessage> EventMessages;
- pthread_mutex_t m_partitionLock;
- uint32_t m_partitionId;
+ uint32_t id;
+ uint64_t minDelay;
- pthread_mutex_t m_eventMessagesLock;
- EventMessages m_eventMessages;
- uint32_t m_nMessages;
+ pthread_mutex_t eventMessagesLock;
+ EventMessages eventMessages;
+ uint32_t nMessages;
- MultiThreadedSimulatorImpl *m_simulator;
- Scheduler *m_events;
- uint32_t m_currentUid;
- uint32_t m_uid;
- uint64_t m_currentTs;
+ Scheduler *events;
+ uint32_t currentUid;
+ uint32_t uid;
+ uint64_t currentTs;
};
} // namespace ns3
--- a/src/simulator/wscript Fri Aug 14 15:05:09 2009 +0200
+++ b/src/simulator/wscript Fri Aug 14 15:05:09 2009 +0200
@@ -121,10 +121,7 @@
headers.source.extend([
'multithreading-partition.h',
'multithreaded-simulator-impl.h',
- 'multithreading-node-partition.h',
])
sim.source.extend([
- 'multithreading-partition.cc',
'multithreaded-simulator-impl.cc',
- 'multithreading-node-partition.cc',
])