Merge pull request #1 from hamidrezanorouzi/MPIdev

Mp idev
This commit is contained in:
Hamidreza Norouzi 2024-05-05 23:02:16 +03:30 committed by GitHub
commit 30d5349fcf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 362 additions and 176 deletions

View File

@ -3,10 +3,18 @@
</div> </div>
**PhasicFlow** is a parallel C++ code for performing DEM simulations. It can run on shared-memory multi-core computational units such as multi-core CPUs or GPUs (for now it works on CUDA-enabled GPUs). The parallelization method mainly relies on loop-level parallelization on a shared-memory computational unit. You can build and run PhasicFlow in serial mode on regular PCs, in parallel mode for multi-core CPUs, or build it for a GPU device to off-load computations to a GPU. In its current statues you can simulate millions of particles (up to 80M particles tested) on a single desktop computer. You can see the [performance tests of PhasicFlow](https://github.com/PhasicFlow/phasicFlow/wiki/Performance-of-phasicFlow) in the wiki page. **PhasicFlow** is a parallel C++ code for performing DEM simulations. It can run on shared-memory multi-core computational units such as multi-core CPUs or GPUs (for now it works on CUDA-enabled GPUs). The parallelization method mainly relies on loop-level parallelization on a shared-memory computational unit. You can build and run PhasicFlow in serial mode on regular PCs, in parallel mode for multi-core CPUs, or build it for a GPU device to off-load computations to a GPU. In its current statues you can simulate millions of particles (up to 80M particles tested) on a single desktop computer. You can see the [performance tests of PhasicFlow](https://github.com/PhasicFlow/phasicFlow/wiki/Performance-of-phasicFlow) in the wiki page.
**MPI** parallelization with dynamic load balancing is under development. With this level of parallelization, PhasicFlow can leverage the computational power of **multi-gpu** workstations or clusters with distributed memory CPUs.
In summary PhasicFlow can have 6 execution modes:
1. Serial on a single CPU core,
2. Parallel on a multi-core computer/node (using OpenMP),
3. Parallel on an nvidia-GPU (using Cuda),
4. Parallel on distributed memory workstation (Using MPI)
5. Parallel on distributed memory workstations with multi-core nodes (using MPI+OpenMP)
6. Parallel on workstations with multiple GPUs (using MPI+Cuda).
## How to build? ## How to build?
You can build PhasicFlow for CPU and GPU executions. [Here is a complete step-by-step procedure](https://github.com/PhasicFlow/phasicFlow/wiki/How-to-Build-PhasicFlow). You can build PhasicFlow for CPU and GPU executions. The latest release of PhasicFlow is v-0.1. [Here is a complete step-by-step procedure for building phasicFlow-v-0.1.](https://github.com/PhasicFlow/phasicFlow/wiki/How-to-Build-PhasicFlow).
## Online code documentation ## Online code documentation
You can find a full documentation of the code, its features, and other related materials on [online documentation of the code](https://phasicflow.github.io/phasicFlow/) You can find a full documentation of the code, its features, and other related materials on [online documentation of the code](https://phasicflow.github.io/phasicFlow/)

View File

@ -130,10 +130,10 @@ public:
csPairContainerType& pwPairs, csPairContainerType& pwPairs,
bool force = false) override bool force = false) override
{ {
ppTimer().start();
Particles().boundingSphere().updateBoundaries(DataDirection::SlaveToMaster);
ppTimer().start();
const auto& position = Particles().pointPosition().deviceViewAll(); const auto& position = Particles().pointPosition().deviceViewAll();
const auto& flags = Particles().dynPointStruct().activePointsMaskDevice(); const auto& flags = Particles().dynPointStruct().activePointsMaskDevice();
const auto& diam = Particles().boundingSphere().deviceViewAll(); const auto& diam = Particles().boundingSphere().deviceViewAll();
@ -167,6 +167,7 @@ public:
csPairContainerType& pwPairs, csPairContainerType& pwPairs,
bool force = false)override bool force = false)override
{ {
Particles().boundingSphere().updateBoundaries(DataDirection::SlaveToMaster);
return csBoundaries_[i].broadSearch( return csBoundaries_[i].broadSearch(
iter, iter,
t, t,
@ -176,7 +177,6 @@ public:
force); force);
} }
bool enterBroadSearch(uint32 iter, real t, real dt)const override bool enterBroadSearch(uint32 iter, real t, real dt)const override
{ {
if(ppwContactSearch_) if(ppwContactSearch_)

View File

@ -85,7 +85,7 @@ bool pFlow::processorBoundaryContactSearch::broadSearch
{ {
if(masterSearch_) if(masterSearch_)
{ {
/*const auto thisPoints = boundary().thisPoints(); const auto thisPoints = boundary().thisPoints();
const auto& neighborProcPoints = boundary().neighborProcPoints(); const auto& neighborProcPoints = boundary().neighborProcPoints();
const auto& bDiams = diameter_.BoundaryField(thisBoundaryIndex()); const auto& bDiams = diameter_.BoundaryField(thisBoundaryIndex());
const auto thisDiams = bDiams.thisField(); const auto thisDiams = bDiams.thisField();
@ -96,9 +96,9 @@ bool pFlow::processorBoundaryContactSearch::broadSearch
thisPoints, thisPoints,
thisDiams, thisDiams,
neighborProcPoints, neighborProcPoints,
neighborProcDiams); neighborProcDiams
);
pOutput<<"ppPairs size in boundary"<< ppPairs.size()<<endl; */ //pOutput<<"ppSize "<< ppPairs.size()<<endl;
return true; return true;
}else }else

View File

@ -99,7 +99,7 @@ bool pFlow::twoPartContactSearch::broadSearchPP
ppPairs.increaseCapacityBy(len); ppPairs.increaseCapacityBy(len);
INFORMATION<< "Particle-particle contact pair container capacity increased from "<< INFORMATION<< "Particle-particle contact pair container capacity increased from "<<
oldCap << " to "<<ppPairs.capacity()<<" in peiodicBoundaryContactSearch."<<END_INFO; oldCap << " to "<<ppPairs.capacity()<<" in contact search in boundary region."<<END_INFO;
} }

View File

@ -32,9 +32,7 @@ pFlow::MPI::processorBoundarySphereInteraction<cFM, gMM>::processorBoundarySpher
geomMotion geomMotion
), ),
masterInteraction_(boundary.isBoundaryMaster()) masterInteraction_(boundary.isBoundaryMaster())
{ {}
pOutput<<"Processor boundayrCondition for "<< boundary.name()<<endl;
}
template <typename cFM, typename gMM> template <typename cFM, typename gMM>
bool pFlow::MPI::processorBoundarySphereInteraction<cFM, gMM>::sphereSphereInteraction bool pFlow::MPI::processorBoundarySphereInteraction<cFM, gMM>::sphereSphereInteraction
@ -43,13 +41,13 @@ bool pFlow::MPI::processorBoundarySphereInteraction<cFM, gMM>::sphereSphereInter
const ContactForceModel &cfModel const ContactForceModel &cfModel
) )
{ {
return true;
if(!masterInteraction_) return true; if(!masterInteraction_) return true;
const auto & sphPar = this->sphParticles(); const auto & sphPar = this->sphParticles();
uint32 thisIndex = this->boundary().thisBoundaryIndex(); uint32 thisIndex = this->boundary().thisBoundaryIndex();
const auto& a = sphPar.diameter().BoundaryField(thisIndex).neighborProcField().deviceViewAll(); pOutput<<"beofre sphereSphereInteraction"<<endl;
pFlow::MPI::processorBoundarySIKernels::sphereSphereInteraction(
/*pFlow::MPI::processorBoundarySIKernels::sphereSphereInteraction(
dt, dt,
this->ppPairs(), this->ppPairs(),
cfModel, cfModel,
@ -67,7 +65,9 @@ bool pFlow::MPI::processorBoundarySphereInteraction<cFM, gMM>::sphereSphereInter
sphPar.rVelocity().BoundaryField(thisIndex).neighborProcField().deviceViewAll(), sphPar.rVelocity().BoundaryField(thisIndex).neighborProcField().deviceViewAll(),
sphPar.contactForce().BoundaryField(thisIndex).neighborProcField().deviceViewAll(), sphPar.contactForce().BoundaryField(thisIndex).neighborProcField().deviceViewAll(),
sphPar.contactTorque().BoundaryField(thisIndex).neighborProcField().deviceViewAll() sphPar.contactTorque().BoundaryField(thisIndex).neighborProcField().deviceViewAll()
);*/ );
pOutput<<"after sphereSphereInteraction"<<endl;
return true; return true;
} }

View File

@ -166,12 +166,12 @@ bool pFlow::sphereInteraction<cFM,gMM, cLT>::iterate()
bool broadSearch = contactSearch_().enterBroadSearch(iter, t, dt); bool broadSearch = contactSearch_().enterBroadSearch(iter, t, dt);
/*sphParticles_.diameter().updateBoundaries(DataDirection::SlaveToMaster); sphParticles_.diameter().updateBoundaries(DataDirection::SlaveToMaster);
sphParticles_.velocity().updateBoundaries(DataDirection::SlaveToMaster); sphParticles_.velocity().updateBoundaries(DataDirection::SlaveToMaster);
sphParticles_.rVelocity().updateBoundaries(DataDirection::SlaveToMaster); sphParticles_.rVelocity().updateBoundaries(DataDirection::SlaveToMaster);
sphParticles_.mass().updateBoundaries(DataDirection::SlaveToMaster); sphParticles_.mass().updateBoundaries(DataDirection::SlaveToMaster);
sphParticles_.I().updateBoundaries(DataDirection::SlaveToMaster); sphParticles_.I().updateBoundaries(DataDirection::SlaveToMaster);
sphParticles_.propertyId().updateBoundaries(DataDirection::SlaveToMaster);*/ sphParticles_.propertyId().updateBoundaries(DataDirection::SlaveToMaster);
if(broadSearch) if(broadSearch)

View File

@ -238,6 +238,18 @@ inline auto send(span<T> data, int dest, int tag, Comm comm)
comm); comm);
} }
template<typename T>
inline auto send(const T& data, int dest, int tag, Comm comm)
{
return MPI_Send(
&data,
sFactor<T>(),
Type<T>(),
dest,
tag,
comm);
}
template<typename T> template<typename T>
inline auto Isend(span<T> data, int dest, int tag, Comm comm, Request* req) inline auto Isend(span<T> data, int dest, int tag, Comm comm, Request* req)
{ {
@ -277,6 +289,19 @@ inline auto recv(span<T> data, int source, int tag, Comm comm, Status *status)
status); status);
} }
template<typename T>
inline auto recv(T& data, int source, int tag, Comm comm, Status *status)
{
return MPI_Recv(
&data,
sFactor<T>(),
Type<T>(),
source,
tag,
comm,
status);
}
template<typename T> template<typename T>
inline auto Irecv(T& data, int source, int tag, Comm comm, Request* req) inline auto Irecv(T& data, int source, int tag, Comm comm, Request* req)
{ {

View File

@ -24,13 +24,13 @@ pFlow::MPI::processorBoundaryField<T, MemorySpace>::checkDataRecieved() const
{ {
if (!dataRecieved_) if (!dataRecieved_)
{ {
//uint32 nRecv = reciever_.waitComplete(); uint32 nRecv = reciever_.waitBufferForUse();
dataRecieved_ = true; dataRecieved_ = true;
/*if (nRecv != this->neighborProcSize()) if (nRecv != this->neighborProcSize())
{ {
fatalErrorInFunction; fatalErrorInFunction;
fatalExit; fatalExit;
}*/ }
} }
} }
@ -41,7 +41,7 @@ pFlow::MPI::processorBoundaryField<T, MemorySpace>::updateBoundary(
DataDirection direction DataDirection direction
) )
{ {
/*if (step == 1) if (step == 1)
{ {
// Isend // Isend
if (direction == DataDirection::TwoWay || if (direction == DataDirection::TwoWay ||
@ -67,7 +67,7 @@ pFlow::MPI::processorBoundaryField<T, MemorySpace>::updateBoundary(
{ {
fatalErrorInFunction << "Invalid step number " << step << endl; fatalErrorInFunction << "Invalid step number " << step << endl;
return false; return false;
}*/ }
return true; return true;
} }
@ -90,6 +90,8 @@ pFlow::MPI::processorBoundaryField<T, MemorySpace>::processorBoundaryField(
boundary.mirrorBoundaryIndex() boundary.mirrorBoundaryIndex()
) )
{ {
this->addEvent(message::BNDR_PROCTRANS1).
addEvent(message::BNDR_PROCTRANS2);
} }
template<class T, class MemorySpace> template<class T, class MemorySpace>

View File

@ -50,11 +50,11 @@ public:
private: private:
dataSender<T, MemorySpace> sender_; dataSender<T, MemorySpace> sender_;
mutable dataReciever<T, MemorySpace> reciever_; dataReciever<T, MemorySpace> reciever_;
mutable bool dataRecieved_ = true; mutable bool dataRecieved_ = true;
void checkDataRecieved()const; void checkDataRecieved()const;
@ -82,7 +82,6 @@ public:
ProcVectorType& neighborProcField() override; ProcVectorType& neighborProcField() override;
const ProcVectorType& neighborProcField()const override; const ProcVectorType& neighborProcField()const override;
bool hearChanges bool hearChanges

View File

@ -21,15 +21,13 @@ Licence:
#include "boundaryProcessor.hpp" #include "boundaryProcessor.hpp"
#include "dictionary.hpp" #include "dictionary.hpp"
#include "mpiCommunication.hpp" #include "mpiCommunication.hpp"
#include "boundaryBaseKernels.hpp"
#include "internalPoints.hpp"
void void
pFlow::MPI::boundaryProcessor::checkSize() const pFlow::MPI::boundaryProcessor::checkSize() const
{ {
if (!sizeObtained_)
{
//MPI_Wait(&sizeRequest_, StatusIgnore);
sizeObtained_ = true;
}
} }
void void
@ -37,13 +35,13 @@ pFlow::MPI::boundaryProcessor::checkDataRecieved() const
{ {
if (!dataRecieved_) if (!dataRecieved_)
{ {
//uint32 nRecv = reciever_.waitComplete(); uint32 nRecv = reciever_.waitBufferForUse();
dataRecieved_ = true; dataRecieved_ = true;
/*if (nRecv != neighborProcSize()) if (nRecv != neighborProcSize())
{ {
fatalErrorInFunction; fatalErrorInFunction;
fatalExit; fatalExit;
}*/ }
} }
} }
@ -92,8 +90,7 @@ pFlow::MPI::boundaryProcessor::beforeIteration(uint32 iterNum, real t, real dt)
pFlowProcessors().localCommunicator(), pFlowProcessors().localCommunicator(),
MPI_STATUS_IGNORE MPI_STATUS_IGNORE
); );
MPI_Request_free(&req);
sizeObtained_ = false;
return true; return true;
} }
@ -135,6 +132,105 @@ pFlow::MPI::boundaryProcessor::updataBoundary(int step)
return true; return true;
} }
bool pFlow::MPI::boundaryProcessor::transferData(int step)
{
if(step==1)
{
uint32 s = size();
uint32Vector_D transferFlags("transferFlags",s+1, s+1, RESERVE());
transferFlags.fill(0u);
const auto& transferD = transferFlags.deviceViewAll();
auto points = thisPoints();
auto p = boundaryPlane().infPlane();
numToTransfer_ = 0;
Kokkos::parallel_reduce
(
"boundaryProcessor::afterIteration",
deviceRPolicyStatic(0,s),
LAMBDA_HD(uint32 i, uint32& transferToUpdate)
{
if(p.pointInNegativeSide(points(i)))
{
transferD(i)=1;
transferToUpdate++;
}
},
numToTransfer_
);
uint32Vector_D keepIndices("keepIndices");
if(numToTransfer_ != 0u)
{
pFlow::boundaryBaseKernels::createRemoveKeepIndices
(
indexList(),
numToTransfer_,
transferFlags,
transferIndices_,
keepIndices,
false
);
// delete transfer point from this processor
if( !setRemoveKeepIndices(transferIndices_, keepIndices))
{
fatalErrorInFunction<<
"error in setting transfer and keep points in boundary "<< name()<<endl;
return false;
}
}
else
{
transferIndices_.clear();
}
auto req = RequestNull;
CheckMPI( Isend(
numToTransfer_,
neighborProcessorNo(),
thisBoundaryIndex(),
pFlowProcessors().localCommunicator(),
&req), true );
CheckMPI(recv(
numToRecieve_,
neighborProcessorNo(),
mirrorBoundaryIndex(),
pFlowProcessors().localCommunicator(),
StatusesIgnore), true);
MPI_Request_free(&req);
return true;
}
else if(step ==2 )
{
pointFieldAccessType transferPoints(
transferIndices_.size(),
transferIndices_.deviceViewAll(),
internal().pointPositionDevice());
sender_.sendData(pFlowProcessors(), transferPoints);
return true;
}
else if(step == 3)
{
reciever_.recieveData(pFlowProcessors(), numToRecieve_);
return true;
}
else if(step == 4)
{
reciever_.waitBufferForUse();
//
return false;
}
return false;
}
bool bool
pFlow::MPI::boundaryProcessor::iterate(uint32 iterNum, real t, real dt) pFlow::MPI::boundaryProcessor::iterate(uint32 iterNum, real t, real dt)
{ {
@ -144,5 +240,54 @@ pFlow::MPI::boundaryProcessor::iterate(uint32 iterNum, real t, real dt)
bool bool
pFlow::MPI::boundaryProcessor::afterIteration(uint32 iterNum, real t, real dt) pFlow::MPI::boundaryProcessor::afterIteration(uint32 iterNum, real t, real dt)
{ {
uint32 s = size();
pOutput<<"size of boundary is "<< s <<endl;
uint32Vector_D transferFlags("transferFlags",s+1, s+1, RESERVE());
transferFlags.fill(0u);
const auto& transferD = transferFlags.deviceViewAll();
auto points = thisPoints();
auto p = boundaryPlane().infPlane();
uint32 numTransfer = 0;
Kokkos::parallel_reduce
(
"boundaryProcessor::afterIteration",
deviceRPolicyStatic(0,s),
LAMBDA_HD(uint32 i, uint32& transferToUpdate)
{
if(p.pointInNegativeSide(points(i)))
{
transferD(i)=1;
transferToUpdate++;
}
},
numTransfer
);
pOutput<<"Numebr to be transfered "<< numTransfer<<endl;
uint32Vector_D transferIndices("transferIndices");
uint32Vector_D keepIndices("keepIndices");
pFlow::boundaryBaseKernels::createRemoveKeepIndices
(
indexList(),
numTransfer,
transferFlags,
transferIndices,
keepIndices
);
// delete transfer point from this processor
if( !setRemoveKeepIndices(transferIndices, keepIndices))
{
fatalErrorInFunction<<
"error in setting transfer and keep points in boundary "<< name()<<endl;
return false;
}
return true; return true;
} }

View File

@ -1,18 +1,18 @@
/*------------------------------- phasicFlow --------------------------------- /*------------------------------- phasicFlow ---------------------------------
O C enter of O C enter of
O O E ngineering and O O E ngineering and
O O M ultiscale modeling of O O M ultiscale modeling of
OOOOOOO F luid flow OOOOOOO F luid flow
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
Copyright (C): www.cemf.ir Copyright (C): www.cemf.ir
email: hamid.r.norouzi AT gmail.com email: hamid.r.norouzi AT gmail.com
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
Licence: Licence:
This file is part of phasicFlow code. It is a free software for simulating This file is part of phasicFlow code. It is a free software for simulating
granular and multiphase flows. You can redistribute it and/or modify it under granular and multiphase flows. You can redistribute it and/or modify it under
the terms of GNU General Public License v3 or any other later versions. the terms of GNU General Public License v3 or any other later versions.
phasicFlow is distributed to help others in their research in the field of phasicFlow is distributed to help others in their research in the field of
granular and multiphase flows, but WITHOUT ANY WARRANTY; without even the granular and multiphase flows, but WITHOUT ANY WARRANTY; without even the
implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
@ -21,7 +21,6 @@ Licence:
#ifndef __boundaryProcessor_hpp__ #ifndef __boundaryProcessor_hpp__
#define __boundaryProcessor_hpp__ #define __boundaryProcessor_hpp__
#include "boundaryBase.hpp" #include "boundaryBase.hpp"
#include "mpiTypes.hpp" #include "mpiTypes.hpp"
#include "dataSender.hpp" #include "dataSender.hpp"
@ -30,86 +29,82 @@ Licence:
namespace pFlow::MPI namespace pFlow::MPI
{ {
class boundaryProcessor class boundaryProcessor
: : public boundaryBase
public boundaryBase {
{ public:
private: using pointFieldAccessType = typename boundaryBase::pointFieldAccessType;
uint32 neighborProcNumPoints_ = 0; private:
uint32 neighborProcNumPoints_ = 0;
uint32 thisNumPoints_; uint32 thisNumPoints_ = 0;
realx3Vector_D neighborProcPoints_; realx3Vector_D neighborProcPoints_;
mutable Request sizeRequest_; dataSender<realx3> sender_;
mutable Request sSizeRequest_; dataReciever<realx3> reciever_;
int req_=0; mutable bool dataRecieved_ = true;
mutable bool sizeObtained_ = true; uint32 numToTransfer_ = 0;
mutable dataSender<realx3> sender_; uint32 numToRecieve_ = 0;
mutable dataReciever<realx3> reciever_; uint32Vector_D transferIndices_{"transferIndices"};
mutable bool dataRecieved_ = true; void checkSize() const;
void checkSize()const; void checkDataRecieved() const;
void checkDataRecieved()const; /// @brief Update processor boundary data for this processor
/// @param step It is either 1 or 2 in the input to indicate
/// @brief Update processor boundary data for this processor /// the update step
/// @param step It is either 1 or 2 in the input to indicate /// @return true if successful
/// the update step /// @details This method is called by boundaryList two times to
/// @return true if successful /// allow processor boundaries to exchange data in two steps.
/// @details This method is called by boundaryList two times to /// The first step is a buffered non-blocking send and the second
/// allow processor boundaries to exchange data in two steps. /// step is non-blocking recieve to get data.
/// The first step is a buffered non-blocking send and the second bool updataBoundary(int step) override;
/// step is non-blocking recieve to get data.
bool updataBoundary(int step)override;
public: bool transferData(int step) override;
TypeInfo("boundary<processor>"); public:
TypeInfo("boundary<processor>");
boundaryProcessor( boundaryProcessor(
const dictionary& dict, const dictionary &dict,
const plane& bplane, const plane &bplane,
internalPoints& internal, internalPoints &internal,
boundaryList& bndrs, boundaryList &bndrs,
uint32 thisIndex uint32 thisIndex);
);
~boundaryProcessor() override = default; ~boundaryProcessor() override = default;
add_vCtor add_vCtor(
( boundaryBase,
boundaryBase, boundaryProcessor,
boundaryProcessor, dictionary);
dictionary
);
bool beforeIteration(uint32 iterNum, real t, real dt) override; bool beforeIteration(uint32 iterNum, real t, real dt) override;
bool iterate(uint32 iterNum, real t, real dt) override; bool iterate(uint32 iterNum, real t, real dt) override;
bool afterIteration(uint32 iterNum, real t, real dt) override; bool afterIteration(uint32 iterNum, real t, real dt) override;
/// @brief Return number of points in the neighbor processor boundary. /// @brief Return number of points in the neighbor processor boundary.
/// This is overriden from boundaryBase. /// This is overriden from boundaryBase.
uint32 neighborProcSize() const override; uint32 neighborProcSize() const override;
/// @brief Return a reference to point positions in the neighbor /// @brief Return a reference to point positions in the neighbor
/// processor boundary. /// processor boundary.
realx3Vector_D& neighborProcPoints() override; realx3Vector_D &neighborProcPoints() override;
/// @brief Return a const reference to point positions in the /// @brief Return a const reference to point positions in the
/// neighbor processor boundary. /// neighbor processor boundary.
const realx3Vector_D& neighborProcPoints() const override; const realx3Vector_D &neighborProcPoints() const override;
};
};
} // namespace pFlow::MPI } // namespace pFlow::MPI

View File

@ -27,13 +27,11 @@ private:
BufferVectorType buffer_; BufferVectorType buffer_;
std::vector<T> buffer0_;
int fromProc_; int fromProc_;
int tag_; int tag_;
Request recvRequest_; mutable Request recvRequest_ = RequestNull;
public: public:
@ -46,34 +44,40 @@ public:
~dataReciever()=default; ~dataReciever()=default;
uint32 waitBufferForUse()const
{
if(recvRequest_ != RequestNull)
{
Status status;
MPI_Wait(&recvRequest_, &status);
int count;
CheckMPI(getCount<T>(&status, count), true);
return static_cast<uint32>(count);
}
else
return buffer_.size();
}
void recieveData( void recieveData(
const localProcessors& processors, const localProcessors& processors,
uint32 numToRecv uint32 numToRecv
) )
{ {
waitBufferForUse();
buffer0_.clear(); buffer_.clear();
buffer0_.resize(numToRecv); buffer_.resize(numToRecv);
MPI_Status status;
/*CheckMPI(recv( CheckMPI(
buffer_.getSpan(), Irecv(
fromProc_, buffer_.getSpan(),
tag_, fromProc_,
processors.localCommunicator(), tag_,
&status), true);*/ processors.localCommunicator(),
MPI_Recv( &recvRequest_
buffer0_.data(), ),
buffer0_.size(), true
realx3Type__,
fromProc_,
tag_,
processors.localCommunicator(),
&status
); );
int c;
getCount<realx3>(&status, c);
pOutput<<"Number of data recieved "<<c<<endl;
} }
auto& buffer() auto& buffer()
@ -86,20 +90,6 @@ public:
return buffer_; return buffer_;
} }
uint32 waitComplete()
{
/*Status status;
CheckMPI(MPI_Wait(&recvRequest_, &status), true);
int count;
CheckMPI(getCount<T>(&status, count), true);
return static_cast<uint32>(count);*/
return buffer_.size();
}
}; };
} }

View File

@ -26,15 +26,13 @@ public:
private: private:
//BufferVectorType buffer_; BufferVectorType buffer_;
std::vector<T> buffer_;
int toProc_; int toProc_;
int tag_; int tag_;
Request sendRequest_ = RequestNull; mutable Request sendRequest_ = RequestNull;
public: public:
@ -44,7 +42,22 @@ public:
tag_(tag) tag_(tag)
{} {}
~dataSender()=default; ~dataSender()
{
if(sendRequest_ != RequestNull)
{
MPI_Request_free(&sendRequest_);
}
}
bool waitBufferForUse()const
{
if(sendRequest_ != RequestNull)
{
MPI_Wait(&sendRequest_, StatusesIgnore);
}
return true;
}
void sendData( void sendData(
const localProcessors& processors, const localProcessors& processors,
@ -52,17 +65,21 @@ public:
) )
{ {
using RPolicy = Kokkos::RangePolicy< using RPolicy = Kokkos::RangePolicy<
DefaultExecutionSpace, execution_space,
Kokkos::Schedule<Kokkos::Static>, Kokkos::Schedule<Kokkos::Static>,
Kokkos::IndexType<pFlow::uint32>>; Kokkos::IndexType<pFlow::uint32>>;
uint32 n = scatterField.size(); uint32 n = scatterField.size();
// make sure the buffer is ready to be used and free
// the previous request (if any).
waitBufferForUse();
// clear the buffer to prevent data copy if capacity increases // clear the buffer to prevent data copy if capacity increases
buffer_.clear(); buffer_.clear();
buffer_.resize(n); buffer_.resize(n);
auto* buffView = buffer_.data(); const auto& buffView = buffer_.deviceViewAll();
Kokkos::parallel_for( Kokkos::parallel_for(
"dataSender::sendData", "dataSender::sendData",
@ -73,26 +90,20 @@ public:
} }
); );
Kokkos::fence(); Kokkos::fence();
auto req = MPI_REQUEST_NULL;
MPI_Isend(
buffer_.data(),
buffer_.size(),
realx3Type__,
toProc_,
tag_,
processors.localCommunicator(),
&req);
/*CheckMPI(send( CheckMPI(
buffer_.getSpan(), Isend(buffer_.getSpan(),
toProc_, toProc_,
tag_, tag_,
processors.localCommunicator(), processors.localCommunicator(),
MPI_STATUS_IGNORE), true);*/ &sendRequest_
),
true
);
} }
/*auto& buffer() auto& buffer()
{ {
return buffer_; return buffer_;
} }
@ -100,17 +111,20 @@ public:
const auto& buffer()const const auto& buffer()const
{ {
return buffer_; return buffer_;
}*/ }
bool sendComplete() bool sendComplete()
{ {
return true; int test;
/*int test; if(sendRequest_ != RequestNull)
MPI_Test(&sendRequest_, &test, StatusIgnore); {
if(test) MPI_Test(&sendRequest_, &test, StatusIgnore);
return true; return test;
}
else else
return false;*/ {
return true;
}
} }
}; };

View File

@ -156,3 +156,9 @@ void pFlow::processorOstream::indent()
checkForPrefix(); checkForPrefix();
Ostream::indent(); Ostream::indent();
} }
pFlow::processorOstream &pFlow::processorOstream::setColor(const char *colorCode)
{
Ostream::write(colorCode);
return *this;
}

View File

@ -139,6 +139,8 @@ public:
/// Add indentation characters /// Add indentation characters
void indent() override; void indent() override;
processorOstream& setColor(const char* colorCode);
}; // processorOstream }; // processorOstream

View File

@ -33,7 +33,7 @@ namespace pFlow
} }
#define INFORMATION pFlow::pOutput<<boldChar<<magentaColor<<"> INFO: "<<defaultColor<<magentaColor #define INFORMATION pFlow::pOutput.setColor(boldChar).setColor(magentaColor)<<"> INFO: "<<defaultColor<<magentaColor
#define END_INFO defaultColor<<pFlow::endl #define END_INFO defaultColor<<pFlow::endl
#define REPORT(n) pFlow::mOutput.space(2*n) #define REPORT(n) pFlow::mOutput.space(2*n)