Data transfer between processors

- Data transfer is done
- contact search at processor boundary is done
- contact force calculation at processor boundary is done
- tests have been done on rotating drum using serial and openMP
This commit is contained in:
HRN 2024-05-12 19:06:53 +03:30
parent e756d471ba
commit 665879f8ca
14 changed files with 483 additions and 77 deletions

View File

@ -52,7 +52,8 @@ pFlow::processorBoundaryContactSearch::processorBoundaryContactSearch(
:
boundaryContactSearch(dict, boundary, cSearch),
diameter_(cSearch.Particles().boundingSphere()),
masterSearch_(this->isBoundaryMaster())
masterSearch_(this->isBoundaryMaster()),
sizeRatio_(dict.getVal<real>("sizeRatio"))
{
if(masterSearch_)
@ -65,7 +66,8 @@ pFlow::processorBoundaryContactSearch::processorBoundaryContactSearch(
ppContactSearch_ = makeUnique<twoPartContactSearch>(
searchBox_,
maxD);
maxD,
sizeRatio_);
}
else
{
@ -96,7 +98,8 @@ bool pFlow::processorBoundaryContactSearch::broadSearch
thisPoints,
thisDiams,
neighborProcPoints,
neighborProcDiams
neighborProcDiams,
name()
);
//pOutput<<"ppSize "<< ppPairs.size()<<endl;
return true;

View File

@ -39,6 +39,8 @@ private:
bool masterSearch_;
real sizeRatio_;
void setSearchBox();
public:

View File

@ -64,7 +64,9 @@ bool pFlow::twoPartContactSearch::broadSearchPP
const realx3& transferVec
)
{
if(points1.empty())return true;
if(points2.empty()) return true;
buildList(points1);
uint32 nNotInserted = 1;
@ -114,7 +116,8 @@ bool pFlow::twoPartContactSearch::broadSearchPP
const deviceScatteredFieldAccess<realx3> &points1,
const deviceScatteredFieldAccess<real> &diams1,
const realx3Vector_D& points2,
const realVector_D& diams2
const realVector_D& diams2,
const word& name
)
{
buildList(points1);
@ -148,9 +151,9 @@ bool pFlow::twoPartContactSearch::broadSearchPP
auto oldCap = ppPairs.capacity();
ppPairs.increaseCapacityBy(len);
INFORMATION<< "Particle-particle contact pair container capacity increased from "<<
oldCap << " to "<<ppPairs.capacity()<<" in peiodicBoundaryContactSearch."<<END_INFO;
oldCap << " to "<<ppPairs.capacity()<<" in boundary contact search in "<< name <<END_INFO;
}

View File

@ -85,7 +85,8 @@ public:
const deviceScatteredFieldAccess<realx3> &points1,
const deviceScatteredFieldAccess<real> &diams1,
const realx3Vector_D& points2,
const realVector_D& diams2);
const realVector_D& diams2,
const word& name);
const auto& searchCells()const
{

View File

@ -20,9 +20,7 @@ pFlow::twoPartContactSearchKernels::buildNextHead(
deviceViewType1D<uint32>& next
)
{
if (points.empty())
return;
uint32 n = points.size();
Kokkos::parallel_for(

View File

@ -32,42 +32,126 @@ pFlow::MPI::processorBoundarySphereInteraction<cFM, gMM>::processorBoundarySpher
geomMotion
),
masterInteraction_(boundary.isBoundaryMaster())
{}
,
inter_("inter"),
send_("send"),
recv_("recv"),
add_("add")
{
if(masterInteraction_)
{
this->allocatePPPairs();
this->allocatePWPairs();
}
}
template <typename cFM, typename gMM>
bool pFlow::MPI::processorBoundarySphereInteraction<cFM, gMM>::sphereSphereInteraction
(
real dt,
const ContactForceModel &cfModel
const ContactForceModel &cfModel,
uint32 step
)
{
return true;
if(!masterInteraction_) return true;
const auto & sphPar = this->sphParticles();
uint32 thisIndex = this->boundary().thisBoundaryIndex();
pOutput<<"beofre sphereSphereInteraction"<<endl;
pFlow::MPI::processorBoundarySIKernels::sphereSphereInteraction(
dt,
this->ppPairs(),
cfModel,
this->boundary().thisPoints(),
sphPar.diameter().deviceViewAll(),
sphPar.propertyId().deviceViewAll(),
sphPar.velocity().deviceViewAll(),
sphPar.rVelocity().deviceViewAll(),
sphPar.contactForce().deviceViewAll(),
sphPar.contactTorque().deviceViewAll(),
this->boundary().neighborProcPoints().deviceViewAll(),
sphPar.diameter().BoundaryField(thisIndex).neighborProcField().deviceViewAll(),
sphPar.propertyId().BoundaryField(thisIndex).neighborProcField().deviceViewAll(),
sphPar.velocity().BoundaryField(thisIndex).neighborProcField().deviceViewAll(),
sphPar.rVelocity().BoundaryField(thisIndex).neighborProcField().deviceViewAll(),
sphPar.contactForce().BoundaryField(thisIndex).neighborProcField().deviceViewAll(),
sphPar.contactTorque().BoundaryField(thisIndex).neighborProcField().deviceViewAll()
);
// master processor calculates the contact force/torque and sends data back to the
// neighbor processor (slave processor).
// slave processor recieves the data and adds the data to the internalField
if(masterInteraction_)
{
if(step==1)return true;
pOutput<<"after sphereSphereInteraction"<<endl;
const auto & sphPar = this->sphParticles();
uint32 thisIndex = this->boundary().thisBoundaryIndex();
const auto& cfBndry = static_cast<const processorBoundaryField<realx3>&> (
sphPar.contactForce().BoundaryField(thisIndex));
return true;
const auto& ctBndry = static_cast<const processorBoundaryField<realx3>&> (
sphPar.contactTorque().BoundaryField(thisIndex));
if(step == 2 )
{
iter++;
inter_.start();
pFlow::MPI::processorBoundarySIKernels::sphereSphereInteraction(
dt,
this->ppPairs(),
cfModel,
this->boundary().thisPoints(),
sphPar.diameter().deviceViewAll(),
sphPar.propertyId().deviceViewAll(),
sphPar.velocity().deviceViewAll(),
sphPar.rVelocity().deviceViewAll(),
sphPar.contactForce().deviceViewAll(),
sphPar.contactTorque().deviceViewAll(),
this->boundary().neighborProcPoints().deviceViewAll(),
sphPar.diameter().BoundaryField(thisIndex).neighborProcField().deviceViewAll(),
sphPar.propertyId().BoundaryField(thisIndex).neighborProcField().deviceViewAll(),
sphPar.velocity().BoundaryField(thisIndex).neighborProcField().deviceViewAll(),
sphPar.rVelocity().BoundaryField(thisIndex).neighborProcField().deviceViewAll(),
cfBndry.neighborProcField().deviceViewAll(),
ctBndry.neighborProcField().deviceViewAll()
);
inter_.end();
return true;
}
else if(step == 3 )
{
send_.start();
cfBndry.sendBackData();
ctBndry.sendBackData();
send_.end();
return true;
}
if(iter % 1000 == 0u)
{
pOutput<<"inter "<< inter_.totalTime()<<endl;
pOutput<<"send "<< send_.totalTime()<<endl<<endl;;
}
return false;
}
else
{
const auto & sphPar = this->sphParticles();
uint32 thisIndex = this->boundary().thisBoundaryIndex();
const auto& cfBndry = static_cast<const processorBoundaryField<realx3>&>(
sphPar.contactForce().BoundaryField(thisIndex));
const auto& ctBndry = static_cast<const processorBoundaryField<realx3>&> (
sphPar.contactTorque().BoundaryField(thisIndex));
if(step==1)
{
recv_.start();
cfBndry.recieveBackData();
ctBndry.recieveBackData();
recv_.end();
return false;
}
else if(step == 2)
{
iter++;
return true;
}
else if(step == 3)
{
add_.start();
cfBndry.addBufferToInternalField();
ctBndry.addBufferToInternalField();
add_.end();
return true;
}
if(iter % 1000 == 0u)
{
pOutput<<"recive "<< recv_.totalTime()<<endl;
pOutput<<"add "<< add_.totalTime()<<endl<<endl;
}
return false;
}
return false;
}

View File

@ -21,6 +21,7 @@ Licence:
#define __processorBoundarySphereInteraction_hpp__
#include "boundarySphereInteraction.hpp"
#include "processorBoundaryField.hpp"
namespace pFlow::MPI
{
@ -56,6 +57,12 @@ private:
bool masterInteraction_;
Timer inter_;
Timer send_;
Timer recv_;
Timer add_;
uint32 iter=0;
public:
TypeInfoTemplate22("boundarySphereInteraction", "processor",ContactForceModel, MotionModel);
@ -78,7 +85,8 @@ public:
bool sphereSphereInteraction(
real dt,
const ContactForceModel& cfModel)override;
const ContactForceModel& cfModel,
uint32 step)override;
};

View File

@ -41,6 +41,9 @@ bool pFlow::MPI::MPISimulationDomain::createBoundaryDicts()
auto& mpiBoundaries = this->subDict("MPIBoundaries");
real neighborLength = boundaries.getVal<real>("neighborLength");
auto boundaryExtntionLengthRatio =
boundaries.getValOrSet<real>("boundaryExtntionLengthRatio", 0.1);
auto updateIntercal = boundaries.getValOrSet<uint32>("updateInterval", 1u);
auto neighbors = findPlaneNeighbors();
@ -61,6 +64,15 @@ bool pFlow::MPI::MPISimulationDomain::createBoundaryDicts()
"in dictionary "<< boundaries.globalName()<<endl;
return false;
}
if(!bDict.addOrReplace("updateInterval", updateIntercal))
{
fatalErrorInFunction<<"error in adding updateIntercal to "<< bName <<
"in dictionary "<< boundaries.globalName()<<endl;
}
bDict.addOrReplace("boundaryExtntionLengthRatio", boundaryExtntionLengthRatio);
if( thisDomainActive_ )
{
if( neighbors[i] == -1 )

View File

@ -1,3 +1,4 @@
#include "processorBoundaryField.hpp"
/*------------------------------- phasicFlow ---------------------------------
O C enter of
O O E ngineering and
@ -90,8 +91,10 @@ pFlow::MPI::processorBoundaryField<T, MemorySpace>::processorBoundaryField(
boundary.mirrorBoundaryIndex()
)
{
this->addEvent(message::BNDR_PROCTRANS1).
addEvent(message::BNDR_PROCTRANS2);
this->addEvent(message::BNDR_PROCTRANSFER_SEND).
addEvent(message::BNDR_PROCTRANSFER_RECIEVE).
addEvent(message::BNDR_PROCTRANSFER_WAITFILL).
addEvent(message::BNDR_PROC_SIZE_CHANGED);
}
template<class T, class MemorySpace>
@ -109,4 +112,123 @@ const typename pFlow::MPI::processorBoundaryField<T, MemorySpace>::
{
checkDataRecieved();
return reciever_.buffer();
}
template<class T, class MemorySpace>
bool pFlow::MPI::processorBoundaryField<T, MemorySpace>::hearChanges(
real t,
real dt,
uint32 iter,
const message& msg,
const anyList& varList
)
{
BoundaryFieldType::hearChanges(t,dt,iter, msg,varList);
if(msg.equivalentTo(message::BNDR_PROC_SIZE_CHANGED))
{
auto newProcSize = varList.getObject<uint32>("size");
reciever_.resize(newProcSize);
}
if(msg.equivalentTo(message::BNDR_PROCTRANSFER_SEND))
{
const auto& indices = varList.getObject<uint32Vector_D>(
message::eventName(message::BNDR_PROCTRANSFER_SEND)
);
FieldAccessType transferData(
indices.size(),
indices.deviceViewAll(),
this->internal().deviceViewAll()
);
sender_.sendData(pFlowProcessors(),transferData);
}
else if(msg.equivalentTo(message::BNDR_PROCTRANSFER_RECIEVE))
{
uint32 numRecieved = varList.getObject<uint32>(
message::eventName(message::BNDR_PROCTRANSFER_RECIEVE)
);
reciever_.recieveData(pFlowProcessors(), numRecieved);
}
else if(msg.equivalentTo(message::BNDR_PROCTRANSFER_WAITFILL))
{
uint32 numRecieved = reciever_.waitBufferForUse();
if(msg.equivalentTo(message::CAP_CHANGED))
{
auto newCap = varList.getObject<uint32>(
message::eventName(message::CAP_CHANGED));
this->internal().field().reserve(newCap);
}
if(msg.equivalentTo(message::SIZE_CHANGED))
{
auto newSize = varList.getObject<uint32>(
message::eventName(message::SIZE_CHANGED));
this->internal().field().resize(newSize);
}
const auto& indices = varList.getObject<uint32IndexContainer>(
message::eventName(message::ITEM_INSERT));
this->internal().field().insertSetElement(indices, reciever_.buffer().deviceView());
return true;
}
return true;
}
template <class T, class MemorySpace>
void pFlow::MPI::processorBoundaryField<T, MemorySpace>::sendBackData() const
{
reciever_.sendBackData(pFlowProcessors());
dataRecieved_ = false;
}
template <class T, class MemorySpace>
void pFlow::MPI::processorBoundaryField<T, MemorySpace>::recieveBackData() const
{
sender_.recieveBackData(pFlowProcessors(), this->size());
}
template <class T, class MemorySpace>
void pFlow::MPI::processorBoundaryField<T, MemorySpace>::addBufferToInternalField()const
{
using RPolicy = Kokkos::RangePolicy<
execution_space,
Kokkos::Schedule<Kokkos::Static>,
Kokkos::IndexType<pFlow::uint32>>;
sender_.waitBufferForUse();
const auto& buffView = sender_.buffer().deviceViewAll();
const auto& field = this->internal().deviceViewAll();
if constexpr( isDeviceAccessible<execution_space> )
{
const auto& indices = this->indexList().deviceViewAll();
Kokkos::parallel_for(
"dataSender::recieveBackData",
RPolicy(0,this->size()),
LAMBDA_HD(uint32 i)
{
field[indices[i]] += buffView[i];
}
);
Kokkos::fence();
}
else
{
const auto& indices = this->boundary().indexListHost().deviceViewAll();
Kokkos::parallel_for(
"dataSender::recieveBackData",
RPolicy(0,this->size()),
LAMBDA_HD(uint32 i)
{
field[indices[i]] += buffView[i];
}
);
Kokkos::fence();
}
}

View File

@ -83,25 +83,25 @@ public:
ProcVectorType& neighborProcField() override;
const ProcVectorType& neighborProcField()const override;
void fill(const T& val)override
{
reciever_.fill(val);
}
bool hearChanges
(
bool hearChanges(
real t,
real dt,
uint32 iter,
const message& msg,
const anyList& varList
) override
{
BoundaryFieldType::hearChanges(t,dt,iter, msg,varList);
if(msg.equivalentTo(message::BNDR_DELETE))
{
// do nothing;
}
return true;
}
) override;
void sendBackData()const;
void recieveBackData()const;
void addBufferToInternalField()const;
};

View File

@ -23,12 +23,9 @@ Licence:
#include "mpiCommunication.hpp"
#include "boundaryBaseKernels.hpp"
#include "internalPoints.hpp"
#include "Time.hpp"
#include "anyList.hpp"
void
pFlow::MPI::boundaryProcessor::checkSize() const
{
}
void
pFlow::MPI::boundaryProcessor::checkDataRecieved() const
@ -69,8 +66,11 @@ pFlow::MPI::boundaryProcessor::boundaryProcessor(
bool
pFlow::MPI::boundaryProcessor::beforeIteration(uint32 iterNum, real t, real dt)
{
thisNumPoints_ = size();
uint32 oldNeighborProcNumPoints = neighborProcNumPoints_;
auto req = MPI_REQUEST_NULL;
MPI_Isend(
&thisNumPoints_,
@ -92,13 +92,24 @@ pFlow::MPI::boundaryProcessor::beforeIteration(uint32 iterNum, real t, real dt)
);
MPI_Request_free(&req);
anyList varList;
message msg;
varList.emplaceBack(msg.addAndName(message::BNDR_PROC_SIZE_CHANGED), neighborProcNumPoints_);
if( !notify(iterNum, t, dt, msg, varList) )
{
fatalErrorInFunction;
return false;
}
return true;
}
pFlow::uint32
pFlow::MPI::boundaryProcessor::neighborProcSize() const
{
checkSize();
return neighborProcNumPoints_;
}
@ -117,7 +128,7 @@ pFlow::MPI::boundaryProcessor::neighborProcPoints() const
}
bool
pFlow::MPI::boundaryProcessor::updataBoundary(int step)
pFlow::MPI::boundaryProcessor::updataBoundaryData(int step)
{
if (step == 1)
{
@ -132,8 +143,10 @@ pFlow::MPI::boundaryProcessor::updataBoundary(int step)
return true;
}
bool pFlow::MPI::boundaryProcessor::transferData(int step)
bool pFlow::MPI::boundaryProcessor::transferData(uint32 iter, int step)
{
if(!boundaryListUpdate(iter))return false;
if(step==1)
{
uint32 s = size();
@ -206,24 +219,88 @@ bool pFlow::MPI::boundaryProcessor::transferData(int step)
}
else if(step ==2 )
{
if( transferIndices_.empty() )return true;
pointFieldAccessType transferPoints(
transferIndices_.size(),
transferIndices_.deviceViewAll(),
internal().pointPositionDevice());
transferIndices_.size(),
transferIndices_.deviceViewAll(),
internal().pointPositionDevice());
sender_.sendData(pFlowProcessors(), transferPoints);
message msg;
anyList varList;
varList.emplaceBack(
msg.addAndName(message::BNDR_PROCTRANSFER_SEND),
transferIndices_);
if(!notify(
internal().time().currentIter(),
internal().time().currentTime(),
internal().time().dt(),
msg,
varList))
{
fatalErrorInFunction;
return false;
}
return true;
}
else if(step == 3)
{
if(numToRecieve_ == 0u) return false;
reciever_.recieveData(pFlowProcessors(), numToRecieve_);
message msg;
anyList varList;
varList.emplaceBack(
msg.addAndName(message::BNDR_PROCTRANSFER_RECIEVE),
numToRecieve_);
if(!notify(
internal().time().currentIter(),
internal().time().currentTime(),
internal().time().dt(),
msg,
varList))
{
fatalErrorInFunction;
return false;
}
return true;
}
else if(step == 4)
{
if(numToRecieve_ == 0u) return false;
reciever_.waitBufferForUse();
//
// points should be inserted first
message msg(message::BNDR_PROCTRANSFER_WAITFILL);
anyList varList;
internal().insertPointsOnly(reciever_.buffer(), msg, varList);
const auto& indices = varList.getObject<uint32IndexContainer>(message::eventName(message::ITEM_INSERT));
auto indView = deviceViewType1D<uint32>(indices.deviceView().data(), indices.deviceView().size());
uint32Vector_D newIndices("newIndices", indView);
if(! appendNewIndices(newIndices))
{
fatalErrorInFunction;
return false;
}
if(!notify(
internal().time().currentIter(),
internal().time().currentTime(),
internal().time().dt(),
msg,
varList))
{
fatalErrorInFunction;
return false;
}
return false;
}

View File

@ -33,9 +33,11 @@ namespace pFlow::MPI
: public boundaryBase
{
public:
using pointFieldAccessType = typename boundaryBase::pointFieldAccessType;
private:
uint32 neighborProcNumPoints_ = 0;
uint32 thisNumPoints_ = 0;
@ -54,8 +56,6 @@ namespace pFlow::MPI
uint32Vector_D transferIndices_{"transferIndices"};
void checkSize() const;
void checkDataRecieved() const;
/// @brief Update processor boundary data for this processor
@ -66,9 +66,9 @@ namespace pFlow::MPI
/// allow processor boundaries to exchange data in two steps.
/// The first step is a buffered non-blocking send and the second
/// step is non-blocking recieve to get data.
bool updataBoundary(int step) override;
bool updataBoundaryData(int step) override;
bool transferData(int step) override;
bool transferData(uint32 iter, int step) override;
public:
TypeInfo("boundary<processor>");
@ -104,6 +104,18 @@ namespace pFlow::MPI
/// @brief Return a const reference to point positions in the
/// neighbor processor boundary.
const realx3Vector_D &neighborProcPoints() const override;
uint32 numToTransfer()const override
{
return numToTransfer_;
}
uint32 numToRecieve()const override
{
return numToRecieve_;
}
};
} // namespace pFlow::MPI

View File

@ -59,15 +59,28 @@ public:
return buffer_.size();
}
void sendBackData(
const localProcessors& processors)const
{
CheckMPI(
Isend(
buffer_.getSpan(),
fromProc_,
tag_,
processors.localCommunicator(),
&recvRequest_
),
true
);
}
void recieveData(
const localProcessors& processors,
uint32 numToRecv
)
{
waitBufferForUse();
buffer_.clear();
buffer_.resize(numToRecv);
resize(numToRecv);
CheckMPI(
Irecv(
buffer_.getSpan(),
@ -80,16 +93,39 @@ public:
);
}
inline
auto& buffer()
{
return buffer_;
}
inline
const auto& buffer()const
{
return buffer_;
}
inline
void fill(const T& val)
{
waitBufferForUse();
buffer_.fill(val);
}
inline
uint32 size()const
{
return buffer_.size();
}
inline
void resize(uint32 newSize)
{
waitBufferForUse();
buffer_.clear();
buffer_.resize(newSize);
}
};
}

View File

@ -26,7 +26,7 @@ public:
private:
BufferVectorType buffer_;
mutable BufferVectorType buffer_;
int toProc_;
@ -103,6 +103,34 @@ public:
}
bool recieveBackData(
const localProcessors& processors,
uint32 numToRecieve
)const
{
// make sure the buffer is ready to be used and free
// the previous request (if any).
waitBufferForUse();
// clear the buffer to prevent data copy if capacity increases
buffer_.clear();
buffer_.resize(numToRecieve);
Status status;
CheckMPI(
Irecv(
buffer_.getSpan(),
toProc_,
tag_,
processors.localCommunicator(),
&sendRequest_
),
true
);
return true;
}
auto& buffer()
{
return buffer_;
@ -113,6 +141,18 @@ public:
return buffer_;
}
inline
void fill(const T& val)
{
waitBufferForUse();
buffer_.fill(val);
}
uint32 size()const
{
return buffer_.size();
}
bool sendComplete()
{
int test;
@ -127,6 +167,14 @@ public:
}
}
inline
void resize(uint32 newSize)
{
waitBufferForUse();
buffer_.clear();
buffer_.resize(newSize);
}
};
}