A Discrete-Event Network Simulator
API
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
mpi-interface.cc
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * This program is free software; you can redistribute it and/or modify
4  * it under the terms of the GNU General Public License version 2 as
5  * published by the Free Software Foundation;
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15  *
16  * Author: George Riley <riley@ece.gatech.edu>
17  */
18 
19 // This object contains static methods that provide an easy interface
20 // to the necessary MPI information.
21 
22 #include <iostream>
23 #include <iomanip>
24 #include <list>
25 
26 #include "mpi-interface.h"
27 #include "mpi-receiver.h"
28 
29 #include "ns3/node.h"
30 #include "ns3/node-list.h"
31 #include "ns3/net-device.h"
32 #include "ns3/simulator.h"
33 #include "ns3/simulator-impl.h"
34 #include "ns3/nstime.h"
35 
36 #ifdef NS3_MPI
37 #include <mpi.h>
38 #endif
39 
40 namespace ns3 {
41 
42 SentBuffer::SentBuffer ()
43 {
44  m_buffer = 0;
45  m_request = 0;
46 }
47 
48 SentBuffer::~SentBuffer ()
49 {
50  delete [] m_buffer;
51 }
52 
53 uint8_t*
55 {
56  return m_buffer;
57 }
58 
59 void
60 SentBuffer::SetBuffer (uint8_t* buffer)
61 {
62  m_buffer = buffer;
63 }
64 
65 #ifdef NS3_MPI
66 MPI_Request*
68 {
69  return &m_request;
70 }
71 #endif
72 
73 uint32_t MpiInterface::m_sid = 0;
74 uint32_t MpiInterface::m_size = 1;
75 bool MpiInterface::m_initialized = false;
76 bool MpiInterface::m_enabled = false;
77 uint32_t MpiInterface::m_rxCount = 0;
78 uint32_t MpiInterface::m_txCount = 0;
79 std::list<SentBuffer> MpiInterface::m_pendingTx;
80 
81 #ifdef NS3_MPI
82 MPI_Request* MpiInterface::m_requests;
83 char** MpiInterface::m_pRxBuffers;
84 #endif
85 
86 void
88 {
89 #ifdef NS3_MPI
90  for (uint32_t i = 0; i < GetSize (); ++i)
91  {
92  delete [] m_pRxBuffers[i];
93  }
94  delete [] m_pRxBuffers;
95  delete [] m_requests;
96 
97  m_pendingTx.clear ();
98 #endif
99 }
100 
101 uint32_t
103 {
104  return m_rxCount;
105 }
106 
107 uint32_t
109 {
110  return m_txCount;
111 }
112 
113 uint32_t
115 {
116  if (!m_initialized)
117  {
118  Simulator::GetImplementation ();
119  m_initialized = true;
120  }
121  return m_sid;
122 }
123 
124 uint32_t
126 {
127  if (!m_initialized)
128  {
129  Simulator::GetImplementation ();
130  m_initialized = true;
131  }
132  return m_size;
133 }
134 
135 bool
137 {
138  if (!m_initialized)
139  {
140  Simulator::GetImplementation ();
141  m_initialized = true;
142  }
143  return m_enabled;
144 }
145 
146 void
147 MpiInterface::Enable (int* pargc, char*** pargv)
148 {
149 #ifdef NS3_MPI
150  // Initialize the MPI interface
151  MPI_Init (pargc, pargv);
152  MPI_Barrier (MPI_COMM_WORLD);
153  MPI_Comm_rank (MPI_COMM_WORLD, reinterpret_cast <int *> (&m_sid));
154  MPI_Comm_size (MPI_COMM_WORLD, reinterpret_cast <int *> (&m_size));
155  m_enabled = true;
156  m_initialized = true;
157  // Post a non-blocking receive for all peers
158  m_pRxBuffers = new char*[m_size];
159  m_requests = new MPI_Request[m_size];
160  for (uint32_t i = 0; i < GetSize (); ++i)
161  {
162  m_pRxBuffers[i] = new char[MAX_MPI_MSG_SIZE];
163  MPI_Irecv (m_pRxBuffers[i], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0,
164  MPI_COMM_WORLD, &m_requests[i]);
165  }
166 #else
167  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
168 #endif
169 }
170 
171 void
172 MpiInterface::SendPacket (Ptr<Packet> p, const Time& rxTime, uint32_t node, uint32_t dev)
173 {
174 #ifdef NS3_MPI
175  SentBuffer sendBuf;
176  m_pendingTx.push_back (sendBuf);
177  std::list<SentBuffer>::reverse_iterator i = m_pendingTx.rbegin (); // Points to the last element
178 
179  uint32_t serializedSize = p->GetSerializedSize ();
180  uint8_t* buffer = new uint8_t[serializedSize + 16];
181  i->SetBuffer (buffer);
182  // Add the time, dest node and dest device
183  uint64_t t = rxTime.GetNanoSeconds ();
184  uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer);
185  *pTime++ = t;
186  uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
187  *pData++ = node;
188  *pData++ = dev;
189  // Serialize the packet
190  p->Serialize (reinterpret_cast<uint8_t *> (pData), serializedSize);
191 
192  // Find the system id for the destination node
193  Ptr<Node> destNode = NodeList::GetNode (node);
194  uint32_t nodeSysId = destNode->GetSystemId ();
195 
196  MPI_Isend (reinterpret_cast<void *> (i->GetBuffer ()), serializedSize + 16, MPI_CHAR, nodeSysId,
197  0, MPI_COMM_WORLD, (i->GetRequest ()));
198  m_txCount++;
199 #else
200  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
201 #endif
202 }
203 
204 void
206 { // Poll the non-block reads to see if data arrived
207 #ifdef NS3_MPI
208  while (true)
209  {
210  int flag = 0;
211  int index = 0;
212  MPI_Status status;
213 
214  MPI_Testany (GetSize (), m_requests, &index, &flag, &status);
215  if (!flag)
216  {
217  break; // No more messages
218  }
219  int count;
220  MPI_Get_count (&status, MPI_CHAR, &count);
221  m_rxCount++; // Count this receive
222 
223  // Get the meta data first
224  uint64_t* pTime = reinterpret_cast<uint64_t *> (m_pRxBuffers[index]);
225  uint64_t nanoSeconds = *pTime++;
226  uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
227  uint32_t node = *pData++;
228  uint32_t dev = *pData++;
229 
230  Time rxTime = NanoSeconds (nanoSeconds);
231 
232  count -= sizeof (nanoSeconds) + sizeof (node) + sizeof (dev);
233 
234  Ptr<Packet> p = Create<Packet> (reinterpret_cast<uint8_t *> (pData), count, true);
235 
236  // Find the correct node/device to schedule receive event
237  Ptr<Node> pNode = NodeList::GetNode (node);
238  Ptr<MpiReceiver> pMpiRec = 0;
239  uint32_t nDevices = pNode->GetNDevices ();
240  for (uint32_t i = 0; i < nDevices; ++i)
241  {
242  Ptr<NetDevice> pThisDev = pNode->GetDevice (i);
243  if (pThisDev->GetIfIndex () == dev)
244  {
245  pMpiRec = pThisDev->GetObject<MpiReceiver> ();
246  break;
247  }
248  }
249 
250  NS_ASSERT (pNode && pMpiRec);
251 
252  // Schedule the rx event
253  Simulator::ScheduleWithContext (pNode->GetId (), rxTime - Simulator::Now (),
254  &MpiReceiver::Receive, pMpiRec, p);
255 
256  // Re-queue the next read
257  MPI_Irecv (m_pRxBuffers[index], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0,
258  MPI_COMM_WORLD, &m_requests[index]);
259  }
260 #else
261  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
262 #endif
263 }
264 
265 void
267 {
268 #ifdef NS3_MPI
269  std::list<SentBuffer>::iterator i = m_pendingTx.begin ();
270  while (i != m_pendingTx.end ())
271  {
272  MPI_Status status;
273  int flag = 0;
274  MPI_Test (i->GetRequest (), &flag, &status);
275  std::list<SentBuffer>::iterator current = i; // Save current for erasing
276  i++; // Advance to next
277  if (flag)
278  { // This message is complete
279  m_pendingTx.erase (current);
280  }
281  }
282 #else
283  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
284 #endif
285 }
286 
287 void
289 {
290 #ifdef NS3_MPI
291  int flag = 0;
292  MPI_Initialized (&flag);
293  if (flag)
294  {
295  MPI_Finalize ();
296  m_enabled = false;
297  m_initialized = false;
298  }
299  else
300  {
301  NS_FATAL_ERROR ("Cannot disable MPI environment without Initializing it first");
302  }
303 #else
304  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
305 #endif
306 }
307 
308 
309 } // namespace ns3
Time NanoSeconds(uint64_t ns)
create ns3::Time instances in units of nanoseconds.
Definition: nstime.h:629
keep track of time unit.
Definition: nstime.h:149
static Ptr< Node > GetNode(uint32_t n)
Definition: node-list.cc:192
#define NS_ASSERT(condition)
Definition: assert.h:64
static void Disable()
uint32_t GetSystemId(void) const
Definition: node.cc:112
#define NS_FATAL_ERROR(msg)
fatal error handling
Definition: fatal-error.h:72
static uint32_t GetTxCount()
static void Destroy()
static void Enable(int *pargc, char ***pargv)
static bool IsEnabled()
void SetBuffer(uint8_t *buffer)
void Receive(Ptr< Packet > p)
Direct an incoming packet to the device Receive() method.
Definition: mpi-receiver.cc:43
static uint32_t GetRxCount()
Ptr< NetDevice > GetDevice(uint32_t index) const
Definition: node.cc:133
uint8_t * GetBuffer()
static void ScheduleWithContext(uint32_t context, Time const &time, MEM mem_ptr, OBJ obj)
Definition: simulator.h:900
static void ReceiveMessages()
static void TestSendComplete()
uint32_t GetNDevices(void) const
Definition: node.cc:141
static Time Now(void)
Definition: simulator.cc:179
MPI_Request * GetRequest()
int64_t GetNanoSeconds(void) const
Definition: nstime.h:287
static void SendPacket(Ptr< Packet > p, const Time &rxTime, uint32_t node, uint32_t dev)
uint32_t GetId(void) const
Definition: node.cc:105
static uint32_t GetSystemId()
uint32_t GetSerializedSize(void) const
Definition: packet.cc:588
const uint32_t MAX_MPI_MSG_SIZE
Definition: mpi-interface.h:51
static uint32_t GetSize()