Cut over UDP applications to use the new receive API
authorTom Henderson <tomh@tomh.org>
Fri, 25 Apr 2008 14:29:28 -0700
changeset 3098 d384d52f8f6e
parent 3097 1b1661bbfa33
child 3099 99ab4885e124
Cut over UDP applications to use the new receive API
src/applications/udp-echo/udp-echo-client.cc
src/applications/udp-echo/udp-echo-client.h
src/applications/udp-echo/udp-echo-server.cc
src/applications/udp-echo/udp-echo-server.h
src/common/tags.h
src/internet-node/tcp-socket.cc
src/internet-node/udp-socket.cc
src/node/packet-socket.cc
src/node/socket.cc
src/node/socket.h
src/routing/olsr/olsr-agent-impl.cc
src/routing/olsr/olsr-agent-impl.h
--- a/src/applications/udp-echo/udp-echo-client.cc	Thu Apr 24 08:18:01 2008 -0700
+++ b/src/applications/udp-echo/udp-echo-client.cc	Fri Apr 25 14:29:28 2008 -0700
@@ -103,7 +103,7 @@
       m_socket->Connect (InetSocketAddress (m_peerAddress, m_peerPort));
     }
 
-  m_socket->SetRecvCallback(MakeCallback(&UdpEchoClient::Receive, this));
+  m_socket->SetRecv_Callback(MakeCallback(&UdpEchoClient::HandleRead, this));
 
   ScheduleTransmit (Seconds(0.));
 }
@@ -149,6 +149,29 @@
 }
 
 void
+UdpEchoClient::HandleRead (Ptr<Socket> socket)
+{
+  NS_LOG_FUNCTION (this << socket);
+  Ptr<Packet> packet;
+  uint32_t maxSize = std::numeric_limits<uint32_t>::max();
+  uint32_t flags = 0;  // no flags
+  while (packet = socket->Recv (maxSize, flags))
+    {
+      SocketRxAddressTag tag;
+      bool found = packet->PeekTag (tag);
+      NS_ASSERT (found);
+      Address from = tag.GetAddress ();
+      packet->RemoveTag (tag);
+      if (InetSocketAddress::IsMatchingType (from))
+        {
+          InetSocketAddress address = InetSocketAddress::ConvertFrom (from);
+          NS_LOG_INFO ("Received " << packet->GetSize() << " bytes from " <<
+            address.GetIpv4());
+        }
+    }
+}
+
+void
 UdpEchoClient::Receive(
   Ptr<Socket> socket, 
   Ptr<Packet> packet,
--- a/src/applications/udp-echo/udp-echo-client.h	Thu Apr 24 08:18:01 2008 -0700
+++ b/src/applications/udp-echo/udp-echo-client.h	Fri Apr 25 14:29:28 2008 -0700
@@ -57,6 +57,7 @@
   void Send (void);
 
   void Receive(Ptr<Socket> socket, Ptr<Packet> packet, const Address &from);
+  void HandleRead (Ptr<Socket> socket);
 
   uint32_t m_count;
   Time m_interval;
--- a/src/applications/udp-echo/udp-echo-server.cc	Thu Apr 24 08:18:01 2008 -0700
+++ b/src/applications/udp-echo/udp-echo-server.cc	Fri Apr 25 14:29:28 2008 -0700
@@ -79,7 +79,8 @@
       m_socket->Bind (local);
     }
 
-  m_socket->SetRecvCallback(MakeCallback(&UdpEchoServer::Receive, this));
+  //m_socket->SetRecvCallback(MakeCallback(&UdpEchoServer::Receive, this));
+  m_socket->SetRecv_Callback(MakeCallback(&UdpEchoServer::HandleRead, this));
 }
 
 void 
@@ -94,6 +95,31 @@
     }
 }
 
+void 
+UdpEchoServer::HandleRead (Ptr<Socket> socket)
+{
+  Ptr<Packet> packet;
+  uint32_t maxSize = std::numeric_limits<uint32_t>::max();
+  uint32_t flags = 0;  // no flags
+  while (packet = socket->Recv (maxSize, flags))
+    {
+      SocketRxAddressTag tag;
+      bool found = packet->PeekTag (tag); 
+      NS_ASSERT (found);
+      Address from = tag.GetAddress ();
+      packet->RemoveTag (tag);
+      if (InetSocketAddress::IsMatchingType (from))
+        {
+          InetSocketAddress address = InetSocketAddress::ConvertFrom (from);
+          NS_LOG_INFO ("Received " << packet->GetSize() << " bytes from " << 
+            address.GetIpv4());
+
+          NS_LOG_LOGIC ("Echoing packet");
+          socket->SendTo (from, packet);
+        }
+    }
+}
+
 void
 UdpEchoServer::Receive(
   Ptr<Socket> socket, 
--- a/src/applications/udp-echo/udp-echo-server.h	Thu Apr 24 08:18:01 2008 -0700
+++ b/src/applications/udp-echo/udp-echo-server.h	Fri Apr 25 14:29:28 2008 -0700
@@ -50,6 +50,7 @@
   virtual void StopApplication (void);
 
   void Receive(Ptr<Socket> socket, Ptr<Packet> packet, const Address &from);
+  void HandleRead (Ptr<Socket> socket);
 
   uint16_t m_port;
   Ptr<Socket> m_socket;
--- a/src/common/tags.h	Thu Apr 24 08:18:01 2008 -0700
+++ b/src/common/tags.h	Fri Apr 25 14:29:28 2008 -0700
@@ -33,7 +33,7 @@
  * The maximum size (in bytes) of a Tag is stored
  * in this constant.
  */
-#define TAGS_MAX_SIZE 16
+#define TAGS_MAX_SIZE 32
 
 class Tags {
 public:
--- a/src/internet-node/tcp-socket.cc	Thu Apr 24 08:18:01 2008 -0700
+++ b/src/internet-node/tcp-socket.cc	Fri Apr 25 14:29:28 2008 -0700
@@ -445,8 +445,19 @@
 Ptr<Packet>
 TcpSocket::Recv (uint32_t maxSize, uint32_t flags)
 {
+  if (m_deliveryQueue.empty() )
+    {
+      return 0;
+    }
   Ptr<Packet> p = m_deliveryQueue.front ();
-  m_deliveryQueue.pop ();
+  if (p->GetSize() <= maxSize)
+    {
+      m_deliveryQueue.pop ();
+    }
+  else
+    {
+      p = 0;
+    }
   return p;
 }
 
--- a/src/internet-node/udp-socket.cc	Thu Apr 24 08:18:01 2008 -0700
+++ b/src/internet-node/udp-socket.cc	Fri Apr 25 14:29:28 2008 -0700
@@ -325,6 +325,25 @@
   return DoSendTo (p, ipv4, port);
 }
 
+Ptr<Packet>
+UdpSocket::Recv (uint32_t maxSize, uint32_t flags)
+{
+  if (m_deliveryQueue.empty() )
+    {
+      return 0;
+    }
+  Ptr<Packet> p = m_deliveryQueue.front ();
+  if (p->GetSize() <= maxSize) 
+    {
+      m_deliveryQueue.pop ();
+    }
+  else
+    {
+      p = 0; 
+    }
+  return p;
+}
+
 void 
 UdpSocket::ForwardUp (Ptr<Packet> packet, Ipv4Address ipv4, uint16_t port)
 {
@@ -334,9 +353,12 @@
     {
       return;
     }
-  
   Address address = InetSocketAddress (ipv4, port);
-  NotifyDataReceived (packet, address);
+  SocketRxAddressTag tag;
+  tag.SetAddress (address);
+  packet->AddTag (tag);
+  m_deliveryQueue.push (packet);
+  NotifyDataRecv ();
 }
 
 } //namespace ns3
@@ -367,6 +389,8 @@
 
   void ReceivePacket (Ptr<Socket> socket, Ptr<Packet> packet, const Address &from);
   void ReceivePacket2 (Ptr<Socket> socket, Ptr<Packet> packet, const Address &from);
+  void ReceivePkt (Ptr<Socket> socket);
+  void ReceivePkt2 (Ptr<Socket> socket);
 };
 
 
@@ -384,6 +408,16 @@
   m_receivedPacket2 = packet;
 }
 
+void UdpSocketTest::ReceivePkt (Ptr<Socket> socket)
+{
+  m_receivedPacket = socket->Recv (std::numeric_limits<uint32_t>::max(), 0);
+}
+
+void UdpSocketTest::ReceivePkt2 (Ptr<Socket> socket)
+{
+  m_receivedPacket2 = socket->Recv (std::numeric_limits<uint32_t>::max(), 0);
+}
+
 bool
 UdpSocketTest::RunTests (void)
 {
@@ -457,10 +491,18 @@
   Ptr<SocketFactory> rxSocketFactory = rxNode->GetObject<Udp> ();
   Ptr<Socket> rxSocket = rxSocketFactory->CreateSocket ();
   NS_TEST_ASSERT_EQUAL (rxSocket->Bind (InetSocketAddress (Ipv4Address ("10.0.0.1"), 1234)), 0);
+#ifdef OLDSEMANTICS
   rxSocket->SetRecvCallback (MakeCallback (&UdpSocketTest::ReceivePacket, this));
+#else
+  rxSocket->SetRecv_Callback (MakeCallback (&UdpSocketTest::ReceivePkt, this));
+#endif
 
   Ptr<Socket> rxSocket2 = rxSocketFactory->CreateSocket ();
+#ifdef OLDSEMANTICS
   rxSocket2->SetRecvCallback (MakeCallback (&UdpSocketTest::ReceivePacket2, this));
+#else
+  rxSocket2->SetRecv_Callback (MakeCallback (&UdpSocketTest::ReceivePkt2, this));
+#endif
   NS_TEST_ASSERT_EQUAL (rxSocket2->Bind (InetSocketAddress (Ipv4Address ("10.0.1.1"), 1234)), 0);
 
   Ptr<SocketFactory> txSocketFactory = txNode->GetObject<Udp> ();
@@ -477,6 +519,8 @@
   NS_TEST_ASSERT_EQUAL (m_receivedPacket->GetSize (), 123);
   NS_TEST_ASSERT_EQUAL (m_receivedPacket2->GetSize (), 0); // second interface should receive it
 
+  m_receivedPacket->RemoveAllTags ();
+  m_receivedPacket2->RemoveAllTags ();
 
   // Simple broadcast test
 
@@ -489,6 +533,8 @@
   // second socket should not receive it (it is bound specifically to the second interface's address
   NS_TEST_ASSERT_EQUAL (m_receivedPacket2->GetSize (), 0);
 
+  m_receivedPacket->RemoveAllTags ();
+  m_receivedPacket2->RemoveAllTags ();
 
   // Broadcast test with multiple receiving sockets
 
@@ -497,7 +543,11 @@
   // the socket address matches.
   rxSocket2->Dispose ();
   rxSocket2 = rxSocketFactory->CreateSocket ();
+#ifdef OLDSEMANTICS
   rxSocket2->SetRecvCallback (MakeCallback (&UdpSocketTest::ReceivePacket2, this));
+#else
+  rxSocket2->SetRecv_Callback (MakeCallback (&UdpSocketTest::ReceivePkt2, this));
+#endif
   NS_TEST_ASSERT_EQUAL (rxSocket2->Bind (InetSocketAddress (Ipv4Address ("0.0.0.0"), 1234)), 0);
 
   m_receivedPacket = Create<Packet> ();
@@ -508,19 +558,14 @@
   NS_TEST_ASSERT_EQUAL (m_receivedPacket->GetSize (), 123);
   NS_TEST_ASSERT_EQUAL (m_receivedPacket2->GetSize (), 123);
 
+  m_receivedPacket->RemoveAllTags ();
+  m_receivedPacket2->RemoveAllTags ();
+
   Simulator::Destroy ();
 
   return result;
 }
 
-Ptr<Packet>
-UdpSocket::Recv (uint32_t maxSize, uint32_t flags)
-{
-  Ptr<Packet> p = m_deliveryQueue.front ();
-  m_deliveryQueue.pop ();
-  return p;
-}
-
 
 static UdpSocketTest gUdpSocketTest;
 
--- a/src/node/packet-socket.cc	Thu Apr 24 08:18:01 2008 -0700
+++ b/src/node/packet-socket.cc	Fri Apr 25 14:29:28 2008 -0700
@@ -301,15 +301,30 @@
   address.SetSingleDevice (device->GetIfIndex ());
   address.SetProtocol (protocol);
 
+  SocketRxAddressTag tag;
+  tag.SetAddress (address);
+  packet->AddTag (tag);
+  m_deliveryQueue.push (packet);
   NS_LOG_LOGIC ("UID is " << packet->GetUid() << " PacketSocket " << this);
-  NotifyDataReceived (packet, address);
+  NotifyDataRecv ();
 }
 
 Ptr<Packet> 
 PacketSocket::Recv (uint32_t maxSize, uint32_t flags)
 {
+  if (m_deliveryQueue.empty() )
+    {
+      return 0;
+    }
   Ptr<Packet> p = m_deliveryQueue.front ();
-  m_deliveryQueue.pop ();
+  if (p->GetSize() <= maxSize)
+    {
+      m_deliveryQueue.pop ();
+    }
+  else
+    {
+      p = 0;
+    }
   return p;
 }
 
--- a/src/node/socket.cc	Thu Apr 24 08:18:01 2008 -0700
+++ b/src/node/socket.cc	Fri Apr 25 14:29:28 2008 -0700
@@ -246,4 +246,53 @@
       m_receivedData_ (this);
     }
 }
+
+SocketRxAddressTag::SocketRxAddressTag ()  
+{
+}
+
+uint32_t 
+SocketRxAddressTag::GetUid (void)
+{
+  static uint32_t uid = ns3::Tag::AllocateUid<SocketRxAddressTag> ("SocketRxAddressTag.ns3");
+  return uid;
+}
+
+void
+SocketRxAddressTag::Print (std::ostream &os) const
+{
+  os << "address="<< m_address;
+}
+
+uint32_t 
+SocketRxAddressTag::GetSerializedSize (void) const
+{
+  return 0;
+}
+
+void 
+SocketRxAddressTag::Serialize (Buffer::Iterator i) const
+{
+  // for local use in stack only
+}
+
+uint32_t 
+SocketRxAddressTag::Deserialize (Buffer::Iterator i)
+{
+  // for local use in stack only
+  return 0;
+}
+
+void 
+SocketRxAddressTag::SetAddress (Address addr)
+{
+  m_address = addr;
+}
+
+Address 
+SocketRxAddressTag::GetAddress (void) const
+{
+  return m_address;
+}
+
 }//namespace ns3
--- a/src/node/socket.h	Thu Apr 24 08:18:01 2008 -0700
+++ b/src/node/socket.h	Fri Apr 25 14:29:28 2008 -0700
@@ -25,12 +25,14 @@
 
 #include "ns3/callback.h"
 #include "ns3/ptr.h"
+#include "ns3/tag.h"
 #include "ns3/object.h"
 #include "address.h"
 #include <stdint.h>
 
 namespace ns3 {
 
+
 class Node;
 class Packet;
 
@@ -257,6 +259,14 @@
    */
   int SendTo (const Address &address, const uint8_t* buf, uint32_t size);
 
+  /**
+   * \brief Read a single packet from the socket
+   * \param maxSize reader will accept packet up to maxSize
+   * \param flags Socket recv flags
+   * \returns Ptr<Packet> of the next in-sequence packet.  Returns
+   * 0 if the socket cannot return a next in-sequence packet conforming
+   * to the maxSize and flags.
+   */
   virtual Ptr<Packet> Recv (uint32_t maxSize, uint32_t flags) = 0;
 
 protected:
@@ -285,6 +295,26 @@
   Callback<void, Ptr<Socket> > m_receivedData_;
 };
 
+/**
+ * \brief This class implements a tag that carries the source address
+ * of a packet across the receiving socket interface.
+ */
+class SocketRxAddressTag : public Tag
+{
+public:
+  SocketRxAddressTag ();
+  static uint32_t GetUid (void);
+  void Print (std::ostream &os) const;
+  uint32_t GetSerializedSize (void) const;
+  void Serialize (Buffer::Iterator i) const;
+  uint32_t Deserialize (Buffer::Iterator i);
+
+  void SetAddress (Address addr);
+  Address GetAddress (void) const;
+private:
+  Address m_address;
+};
+
 } //namespace ns3
 
 #endif /* SOCKET_H */
--- a/src/routing/olsr/olsr-agent-impl.cc	Thu Apr 24 08:18:01 2008 -0700
+++ b/src/routing/olsr/olsr-agent-impl.cc	Fri Apr 25 14:29:28 2008 -0700
@@ -282,7 +282,7 @@
 
       // Create a socket to listen only on this interface
       Ptr<Socket> socket = socketFactory->CreateSocket ();
-      socket->SetRecvCallback (MakeCallback (&AgentImpl::RecvOlsr,  this));
+      socket->SetRecv_Callback (MakeCallback (&AgentImpl::RecvOlsr,  this));
       if (socket->Bind (InetSocketAddress (addr, OLSR_PORT_NUMBER)))
         {
           NS_FATAL_ERROR ("Failed to bind() OLSR receive socket");
@@ -307,10 +307,18 @@
 //
 // \brief Processes an incoming %OLSR packet following RFC 3626 specification.
 void
-AgentImpl::RecvOlsr (Ptr<Socket> socket,
-                     Ptr<Packet> receivedPacket,
-                     const Address &sourceAddress)
+AgentImpl::RecvOlsr (Ptr<Socket> socket)
 {
+  Ptr<Packet> receivedPacket;
+  uint32_t maxSize = std::numeric_limits<uint32_t>::max();
+  uint32_t flags = 0;  // no flags
+  receivedPacket = socket->Recv (maxSize, flags);
+
+  SocketRxAddressTag tag;
+  bool found = receivedPacket->PeekTag (tag);
+  NS_ASSERT (found);
+  Address sourceAddress = tag.GetAddress ();
+
   InetSocketAddress inetSourceAddr = InetSocketAddress::ConvertFrom (sourceAddress);
   Ipv4Address senderIfaceAddr = inetSourceAddr.GetIpv4 ();
   Ipv4Address receiverIfaceAddr = m_socketAddresses[socket];
--- a/src/routing/olsr/olsr-agent-impl.h	Thu Apr 24 08:18:01 2008 -0700
+++ b/src/routing/olsr/olsr-agent-impl.h	Fri Apr 25 14:29:28 2008 -0700
@@ -98,9 +98,7 @@
   /// Increments message sequence number and returns the new value.
   inline uint16_t GetMessageSequenceNumber ();
 	
-  void RecvOlsr (Ptr<Socket> socket,
-                 Ptr<Packet> receivedPacket,
-                 const Address &sourceAddress);
+  void RecvOlsr (Ptr<Socket> socket);
 
   void MprComputation ();
   void RoutingTableComputation ();