26 #include "mpi-interface.h"
27 #include "mpi-receiver.h"
30 #include "ns3/node-list.h"
31 #include "ns3/net-device.h"
32 #include "ns3/simulator.h"
33 #include "ns3/simulator-impl.h"
34 #include "ns3/nstime.h"
42 SentBuffer::SentBuffer ()
48 SentBuffer::~SentBuffer ()
73 uint32_t MpiInterface::m_sid = 0;
74 uint32_t MpiInterface::m_size = 1;
75 bool MpiInterface::m_initialized =
false;
76 bool MpiInterface::m_enabled =
false;
77 uint32_t MpiInterface::m_rxCount = 0;
78 uint32_t MpiInterface::m_txCount = 0;
79 std::list<SentBuffer> MpiInterface::m_pendingTx;
82 MPI_Request* MpiInterface::m_requests;
83 char** MpiInterface::m_pRxBuffers;
90 for (uint32_t i = 0; i <
GetSize (); ++i)
92 delete [] m_pRxBuffers[i];
94 delete [] m_pRxBuffers;
118 Simulator::GetImplementation ();
119 m_initialized =
true;
129 Simulator::GetImplementation ();
130 m_initialized =
true;
140 Simulator::GetImplementation ();
141 m_initialized =
true;
151 MPI_Init (pargc, pargv);
152 MPI_Barrier (MPI_COMM_WORLD);
153 MPI_Comm_rank (MPI_COMM_WORLD, reinterpret_cast <int *> (&m_sid));
154 MPI_Comm_size (MPI_COMM_WORLD, reinterpret_cast <int *> (&m_size));
156 m_initialized =
true;
158 m_pRxBuffers =
new char*[m_size];
159 m_requests =
new MPI_Request[m_size];
160 for (uint32_t i = 0; i <
GetSize (); ++i)
164 MPI_COMM_WORLD, &m_requests[i]);
167 NS_FATAL_ERROR (
"Can't use distributed simulator without MPI compiled in");
176 m_pendingTx.push_back (sendBuf);
177 std::list<SentBuffer>::reverse_iterator i = m_pendingTx.rbegin ();
180 uint8_t* buffer =
new uint8_t[serializedSize + 16];
181 i->SetBuffer (buffer);
184 uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer);
186 uint32_t* pData =
reinterpret_cast<uint32_t *
> (pTime);
190 p->Serialize (reinterpret_cast<uint8_t *> (pData), serializedSize);
196 MPI_Isend (reinterpret_cast<void *> (i->GetBuffer ()), serializedSize + 16, MPI_CHAR, nodeSysId,
197 0, MPI_COMM_WORLD, (i->GetRequest ()));
200 NS_FATAL_ERROR (
"Can't use distributed simulator without MPI compiled in");
214 MPI_Testany (
GetSize (), m_requests, &index, &flag, &status);
220 MPI_Get_count (&status, MPI_CHAR, &count);
224 uint64_t* pTime =
reinterpret_cast<uint64_t *
> (m_pRxBuffers[index]);
225 uint64_t nanoSeconds = *pTime++;
226 uint32_t* pData =
reinterpret_cast<uint32_t *
> (pTime);
227 uint32_t node = *pData++;
228 uint32_t dev = *pData++;
232 count -=
sizeof (nanoSeconds) +
sizeof (node) +
sizeof (dev);
234 Ptr<Packet> p = Create<Packet> (
reinterpret_cast<uint8_t *
> (pData), count,
true);
240 for (uint32_t i = 0; i < nDevices; ++i)
243 if (pThisDev->GetIfIndex () == dev)
257 MPI_Irecv (m_pRxBuffers[index],
MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0,
258 MPI_COMM_WORLD, &m_requests[index]);
261 NS_FATAL_ERROR (
"Can't use distributed simulator without MPI compiled in");
269 std::list<SentBuffer>::iterator i = m_pendingTx.begin ();
270 while (i != m_pendingTx.end ())
274 MPI_Test (i->GetRequest (), &flag, &status);
275 std::list<SentBuffer>::iterator current = i;
279 m_pendingTx.erase (current);
283 NS_FATAL_ERROR (
"Can't use distributed simulator without MPI compiled in");
292 MPI_Initialized (&flag);
297 m_initialized =
false;
301 NS_FATAL_ERROR (
"Cannot disable MPI environment without Initializing it first");
304 NS_FATAL_ERROR (
"Can't use distributed simulator without MPI compiled in");
Time NanoSeconds(uint64_t ns)
create ns3::Time instances in units of nanoseconds.
static Ptr< Node > GetNode(uint32_t n)
#define NS_ASSERT(condition)
uint32_t GetSystemId(void) const
#define NS_FATAL_ERROR(msg)
fatal error handling
static uint32_t GetTxCount()
static void Enable(int *pargc, char ***pargv)
void SetBuffer(uint8_t *buffer)
void Receive(Ptr< Packet > p)
Direct an incoming packet to the device Receive() method.
static uint32_t GetRxCount()
Ptr< NetDevice > GetDevice(uint32_t index) const
static void ScheduleWithContext(uint32_t context, Time const &time, MEM mem_ptr, OBJ obj)
static void ReceiveMessages()
static void TestSendComplete()
uint32_t GetNDevices(void) const
MPI_Request * GetRequest()
int64_t GetNanoSeconds(void) const
static void SendPacket(Ptr< Packet > p, const Time &rxTime, uint32_t node, uint32_t dev)
uint32_t GetId(void) const
static uint32_t GetSystemId()
uint32_t GetSerializedSize(void) const
const uint32_t MAX_MPI_MSG_SIZE
static uint32_t GetSize()