From 665879f8caebc2af6c2742ad1fd33a7d1b7f7a62 Mon Sep 17 00:00:00 2001 From: HRN Date: Sun, 12 May 2024 19:06:53 +0330 Subject: [PATCH] Data transfer between processors - Data transfer is done - contact search at processor boundary is done - contact force calculation at processor boundary is done - tests have been done on rotating drum using serial and openMP --- .../processorBoundaryContactSearch.cpp | 9 +- .../processorBoundaryContactSearch.hpp | 2 + .../twoPartContactSearch.cpp | 11 +- .../twoPartContactSearch.hpp | 3 +- .../twoPartContactSearchKernels.cpp | 4 +- .../processorBoundarySphereInteraction.cpp | 140 ++++++++++++++---- .../processorBoundarySphereInteraction.hpp | 10 +- .../domain/MPISimulationDomain.cpp | 12 ++ .../pointField/processorBoundaryField.cpp | 126 +++++++++++++++- .../pointField/processorBoundaryField.hpp | 26 ++-- .../boundaries/boundaryProcessor.cpp | 103 +++++++++++-- .../boundaries/boundaryProcessor.hpp | 20 ++- .../boundaries/dataReciever.hpp | 44 +++++- .../pointStructure/boundaries/dataSender.hpp | 50 ++++++- 14 files changed, 483 insertions(+), 77 deletions(-) diff --git a/src/Interaction/contactSearch/boundaries/processorBoundaryContactSearch/processorBoundaryContactSearch.cpp b/src/Interaction/contactSearch/boundaries/processorBoundaryContactSearch/processorBoundaryContactSearch.cpp index 8ab8e61d..8281c55c 100644 --- a/src/Interaction/contactSearch/boundaries/processorBoundaryContactSearch/processorBoundaryContactSearch.cpp +++ b/src/Interaction/contactSearch/boundaries/processorBoundaryContactSearch/processorBoundaryContactSearch.cpp @@ -52,7 +52,8 @@ pFlow::processorBoundaryContactSearch::processorBoundaryContactSearch( : boundaryContactSearch(dict, boundary, cSearch), diameter_(cSearch.Particles().boundingSphere()), - masterSearch_(this->isBoundaryMaster()) + masterSearch_(this->isBoundaryMaster()), + sizeRatio_(dict.getVal("sizeRatio")) { if(masterSearch_) @@ -65,7 +66,8 @@ pFlow::processorBoundaryContactSearch::processorBoundaryContactSearch( ppContactSearch_ = makeUnique( searchBox_, - maxD); + maxD, + sizeRatio_); } else { @@ -96,7 +98,8 @@ bool pFlow::processorBoundaryContactSearch::broadSearch thisPoints, thisDiams, neighborProcPoints, - neighborProcDiams + neighborProcDiams, + name() ); //pOutput<<"ppSize "<< ppPairs.size()< &points1, const deviceScatteredFieldAccess &diams1, const realx3Vector_D& points2, - const realVector_D& diams2 + const realVector_D& diams2, + const word& name ) { buildList(points1); @@ -148,9 +151,9 @@ bool pFlow::twoPartContactSearch::broadSearchPP auto oldCap = ppPairs.capacity(); ppPairs.increaseCapacityBy(len); - + INFORMATION<< "Particle-particle contact pair container capacity increased from "<< - oldCap << " to "< &points1, const deviceScatteredFieldAccess &diams1, const realx3Vector_D& points2, - const realVector_D& diams2); + const realVector_D& diams2, + const word& name); const auto& searchCells()const { diff --git a/src/Interaction/contactSearch/boundaries/twoPartContactSearch/twoPartContactSearchKernels.cpp b/src/Interaction/contactSearch/boundaries/twoPartContactSearch/twoPartContactSearchKernels.cpp index 56f1885d..515e5af1 100644 --- a/src/Interaction/contactSearch/boundaries/twoPartContactSearch/twoPartContactSearchKernels.cpp +++ b/src/Interaction/contactSearch/boundaries/twoPartContactSearch/twoPartContactSearchKernels.cpp @@ -20,9 +20,7 @@ pFlow::twoPartContactSearchKernels::buildNextHead( deviceViewType1D& next ) { - if (points.empty()) - return; - + uint32 n = points.size(); Kokkos::parallel_for( diff --git a/src/Interaction/sphereInteraction/boundaries/processorBoundarySphereInteraction/processorBoundarySphereInteraction.cpp b/src/Interaction/sphereInteraction/boundaries/processorBoundarySphereInteraction/processorBoundarySphereInteraction.cpp index 809b3c6f..f0e2a9a1 100644 --- a/src/Interaction/sphereInteraction/boundaries/processorBoundarySphereInteraction/processorBoundarySphereInteraction.cpp +++ b/src/Interaction/sphereInteraction/boundaries/processorBoundarySphereInteraction/processorBoundarySphereInteraction.cpp @@ -32,42 +32,126 @@ pFlow::MPI::processorBoundarySphereInteraction::processorBoundarySpher geomMotion ), masterInteraction_(boundary.isBoundaryMaster()) -{} + , + inter_("inter"), + send_("send"), + recv_("recv"), + add_("add") +{ + if(masterInteraction_) + { + this->allocatePPPairs(); + this->allocatePWPairs(); + } + +} template bool pFlow::MPI::processorBoundarySphereInteraction::sphereSphereInteraction ( real dt, - const ContactForceModel &cfModel + const ContactForceModel &cfModel, + uint32 step ) { - return true; - if(!masterInteraction_) return true; - const auto & sphPar = this->sphParticles(); - uint32 thisIndex = this->boundary().thisBoundaryIndex(); - pOutput<<"beofre sphereSphereInteraction"<ppPairs(), - cfModel, - this->boundary().thisPoints(), - sphPar.diameter().deviceViewAll(), - sphPar.propertyId().deviceViewAll(), - sphPar.velocity().deviceViewAll(), - sphPar.rVelocity().deviceViewAll(), - sphPar.contactForce().deviceViewAll(), - sphPar.contactTorque().deviceViewAll(), - this->boundary().neighborProcPoints().deviceViewAll(), - sphPar.diameter().BoundaryField(thisIndex).neighborProcField().deviceViewAll(), - sphPar.propertyId().BoundaryField(thisIndex).neighborProcField().deviceViewAll(), - sphPar.velocity().BoundaryField(thisIndex).neighborProcField().deviceViewAll(), - sphPar.rVelocity().BoundaryField(thisIndex).neighborProcField().deviceViewAll(), - sphPar.contactForce().BoundaryField(thisIndex).neighborProcField().deviceViewAll(), - sphPar.contactTorque().BoundaryField(thisIndex).neighborProcField().deviceViewAll() - ); + // master processor calculates the contact force/torque and sends data back to the + // neighbor processor (slave processor). + // slave processor recieves the data and adds the data to the internalField + if(masterInteraction_) + { + if(step==1)return true; - pOutput<<"after sphereSphereInteraction"<sphParticles(); + uint32 thisIndex = this->boundary().thisBoundaryIndex(); + + const auto& cfBndry = static_cast&> ( + sphPar.contactForce().BoundaryField(thisIndex)); - return true; + const auto& ctBndry = static_cast&> ( + sphPar.contactTorque().BoundaryField(thisIndex)); + + if(step == 2 ) + { + iter++; + inter_.start(); + pFlow::MPI::processorBoundarySIKernels::sphereSphereInteraction( + dt, + this->ppPairs(), + cfModel, + this->boundary().thisPoints(), + sphPar.diameter().deviceViewAll(), + sphPar.propertyId().deviceViewAll(), + sphPar.velocity().deviceViewAll(), + sphPar.rVelocity().deviceViewAll(), + sphPar.contactForce().deviceViewAll(), + sphPar.contactTorque().deviceViewAll(), + this->boundary().neighborProcPoints().deviceViewAll(), + sphPar.diameter().BoundaryField(thisIndex).neighborProcField().deviceViewAll(), + sphPar.propertyId().BoundaryField(thisIndex).neighborProcField().deviceViewAll(), + sphPar.velocity().BoundaryField(thisIndex).neighborProcField().deviceViewAll(), + sphPar.rVelocity().BoundaryField(thisIndex).neighborProcField().deviceViewAll(), + cfBndry.neighborProcField().deviceViewAll(), + ctBndry.neighborProcField().deviceViewAll() + ); + inter_.end(); + return true; + } + else if(step == 3 ) + { + send_.start(); + cfBndry.sendBackData(); + ctBndry.sendBackData(); + send_.end(); + return true; + } + + if(iter % 1000 == 0u) + { + pOutput<<"inter "<< inter_.totalTime()<sphParticles(); + uint32 thisIndex = this->boundary().thisBoundaryIndex(); + const auto& cfBndry = static_cast&>( + sphPar.contactForce().BoundaryField(thisIndex)); + const auto& ctBndry = static_cast&> ( + sphPar.contactTorque().BoundaryField(thisIndex)); + if(step==1) + { + recv_.start(); + cfBndry.recieveBackData(); + ctBndry.recieveBackData(); + recv_.end(); + return false; + } + else if(step == 2) + { + iter++; + return true; + } + else if(step == 3) + { + add_.start(); + cfBndry.addBufferToInternalField(); + ctBndry.addBufferToInternalField(); + add_.end(); + return true; + } + + if(iter % 1000 == 0u) + { + pOutput<<"recive "<< recv_.totalTime()<subDict("MPIBoundaries"); real neighborLength = boundaries.getVal("neighborLength"); + auto boundaryExtntionLengthRatio = + boundaries.getValOrSet("boundaryExtntionLengthRatio", 0.1); + auto updateIntercal = boundaries.getValOrSet("updateInterval", 1u); auto neighbors = findPlaneNeighbors(); @@ -61,6 +64,15 @@ bool pFlow::MPI::MPISimulationDomain::createBoundaryDicts() "in dictionary "<< boundaries.globalName()<::processorBoundaryField( boundary.mirrorBoundaryIndex() ) { - this->addEvent(message::BNDR_PROCTRANS1). - addEvent(message::BNDR_PROCTRANS2); + this->addEvent(message::BNDR_PROCTRANSFER_SEND). + addEvent(message::BNDR_PROCTRANSFER_RECIEVE). + addEvent(message::BNDR_PROCTRANSFER_WAITFILL). + addEvent(message::BNDR_PROC_SIZE_CHANGED); } template @@ -109,4 +112,123 @@ const typename pFlow::MPI::processorBoundaryField:: { checkDataRecieved(); return reciever_.buffer(); +} + +template +bool pFlow::MPI::processorBoundaryField::hearChanges( + real t, + real dt, + uint32 iter, + const message& msg, + const anyList& varList +) +{ + BoundaryFieldType::hearChanges(t,dt,iter, msg,varList); + if(msg.equivalentTo(message::BNDR_PROC_SIZE_CHANGED)) + { + auto newProcSize = varList.getObject("size"); + reciever_.resize(newProcSize); + } + + if(msg.equivalentTo(message::BNDR_PROCTRANSFER_SEND)) + { + const auto& indices = varList.getObject( + message::eventName(message::BNDR_PROCTRANSFER_SEND) + ); + + FieldAccessType transferData( + indices.size(), + indices.deviceViewAll(), + this->internal().deviceViewAll() + ); + sender_.sendData(pFlowProcessors(),transferData); + } + else if(msg.equivalentTo(message::BNDR_PROCTRANSFER_RECIEVE)) + { + uint32 numRecieved = varList.getObject( + message::eventName(message::BNDR_PROCTRANSFER_RECIEVE) + ); + reciever_.recieveData(pFlowProcessors(), numRecieved); + } + else if(msg.equivalentTo(message::BNDR_PROCTRANSFER_WAITFILL)) + { + + uint32 numRecieved = reciever_.waitBufferForUse(); + + if(msg.equivalentTo(message::CAP_CHANGED)) + { + auto newCap = varList.getObject( + message::eventName(message::CAP_CHANGED)); + this->internal().field().reserve(newCap); + + } + if(msg.equivalentTo(message::SIZE_CHANGED)) + { + auto newSize = varList.getObject( + message::eventName(message::SIZE_CHANGED)); + this->internal().field().resize(newSize); + } + + const auto& indices = varList.getObject( + message::eventName(message::ITEM_INSERT)); + + this->internal().field().insertSetElement(indices, reciever_.buffer().deviceView()); + + return true; + } + + return true; +} +template +void pFlow::MPI::processorBoundaryField::sendBackData() const +{ + reciever_.sendBackData(pFlowProcessors()); + dataRecieved_ = false; +} + +template +void pFlow::MPI::processorBoundaryField::recieveBackData() const +{ + sender_.recieveBackData(pFlowProcessors(), this->size()); +} + +template +void pFlow::MPI::processorBoundaryField::addBufferToInternalField()const +{ + using RPolicy = Kokkos::RangePolicy< + execution_space, + Kokkos::Schedule, + Kokkos::IndexType>; + + sender_.waitBufferForUse(); + + const auto& buffView = sender_.buffer().deviceViewAll(); + const auto& field = this->internal().deviceViewAll(); + + if constexpr( isDeviceAccessible ) + { + const auto& indices = this->indexList().deviceViewAll(); + Kokkos::parallel_for( + "dataSender::recieveBackData", + RPolicy(0,this->size()), + LAMBDA_HD(uint32 i) + { + field[indices[i]] += buffView[i]; + } + ); + Kokkos::fence(); + } + else + { + const auto& indices = this->boundary().indexListHost().deviceViewAll(); + Kokkos::parallel_for( + "dataSender::recieveBackData", + RPolicy(0,this->size()), + LAMBDA_HD(uint32 i) + { + field[indices[i]] += buffView[i]; + } + ); + Kokkos::fence(); + } } \ No newline at end of file diff --git a/src/phasicFlow/MPIParallelization/pointField/processorBoundaryField.hpp b/src/phasicFlow/MPIParallelization/pointField/processorBoundaryField.hpp index 0a6bad28..fd2c72e0 100644 --- a/src/phasicFlow/MPIParallelization/pointField/processorBoundaryField.hpp +++ b/src/phasicFlow/MPIParallelization/pointField/processorBoundaryField.hpp @@ -83,25 +83,25 @@ public: ProcVectorType& neighborProcField() override; const ProcVectorType& neighborProcField()const override; + + void fill(const T& val)override + { + reciever_.fill(val); + } - bool hearChanges - ( + bool hearChanges( real t, real dt, uint32 iter, const message& msg, const anyList& varList - ) override - { - BoundaryFieldType::hearChanges(t,dt,iter, msg,varList); - - if(msg.equivalentTo(message::BNDR_DELETE)) - { - // do nothing; - } - - return true; - } + ) override; + + void sendBackData()const; + + void recieveBackData()const; + + void addBufferToInternalField()const; }; diff --git a/src/phasicFlow/MPIParallelization/pointStructure/boundaries/boundaryProcessor.cpp b/src/phasicFlow/MPIParallelization/pointStructure/boundaries/boundaryProcessor.cpp index 246959b1..76be7508 100644 --- a/src/phasicFlow/MPIParallelization/pointStructure/boundaries/boundaryProcessor.cpp +++ b/src/phasicFlow/MPIParallelization/pointStructure/boundaries/boundaryProcessor.cpp @@ -23,12 +23,9 @@ Licence: #include "mpiCommunication.hpp" #include "boundaryBaseKernels.hpp" #include "internalPoints.hpp" +#include "Time.hpp" +#include "anyList.hpp" -void -pFlow::MPI::boundaryProcessor::checkSize() const -{ - -} void pFlow::MPI::boundaryProcessor::checkDataRecieved() const @@ -69,8 +66,11 @@ pFlow::MPI::boundaryProcessor::boundaryProcessor( bool pFlow::MPI::boundaryProcessor::beforeIteration(uint32 iterNum, real t, real dt) { + thisNumPoints_ = size(); + uint32 oldNeighborProcNumPoints = neighborProcNumPoints_; + auto req = MPI_REQUEST_NULL; MPI_Isend( &thisNumPoints_, @@ -92,13 +92,24 @@ pFlow::MPI::boundaryProcessor::beforeIteration(uint32 iterNum, real t, real dt) ); MPI_Request_free(&req); + anyList varList; + message msg; + + varList.emplaceBack(msg.addAndName(message::BNDR_PROC_SIZE_CHANGED), neighborProcNumPoints_); + + if( !notify(iterNum, t, dt, msg, varList) ) + { + fatalErrorInFunction; + return false; + } + + return true; } pFlow::uint32 pFlow::MPI::boundaryProcessor::neighborProcSize() const { - checkSize(); return neighborProcNumPoints_; } @@ -117,7 +128,7 @@ pFlow::MPI::boundaryProcessor::neighborProcPoints() const } bool -pFlow::MPI::boundaryProcessor::updataBoundary(int step) +pFlow::MPI::boundaryProcessor::updataBoundaryData(int step) { if (step == 1) { @@ -132,8 +143,10 @@ pFlow::MPI::boundaryProcessor::updataBoundary(int step) return true; } -bool pFlow::MPI::boundaryProcessor::transferData(int step) +bool pFlow::MPI::boundaryProcessor::transferData(uint32 iter, int step) { + if(!boundaryListUpdate(iter))return false; + if(step==1) { uint32 s = size(); @@ -206,24 +219,88 @@ bool pFlow::MPI::boundaryProcessor::transferData(int step) } else if(step ==2 ) { + if( transferIndices_.empty() )return true; + pointFieldAccessType transferPoints( - transferIndices_.size(), - transferIndices_.deviceViewAll(), - internal().pointPositionDevice()); + transferIndices_.size(), + transferIndices_.deviceViewAll(), + internal().pointPositionDevice()); sender_.sendData(pFlowProcessors(), transferPoints); + message msg; + anyList varList; + varList.emplaceBack( + msg.addAndName(message::BNDR_PROCTRANSFER_SEND), + transferIndices_); + + if(!notify( + internal().time().currentIter(), + internal().time().currentTime(), + internal().time().dt(), + msg, + varList)) + { + fatalErrorInFunction; + return false; + } + return true; } else if(step == 3) { - + if(numToRecieve_ == 0u) return false; reciever_.recieveData(pFlowProcessors(), numToRecieve_); + + message msg; + anyList varList; + varList.emplaceBack( + msg.addAndName(message::BNDR_PROCTRANSFER_RECIEVE), + numToRecieve_); + + if(!notify( + internal().time().currentIter(), + internal().time().currentTime(), + internal().time().dt(), + msg, + varList)) + { + fatalErrorInFunction; + return false; + } + return true; } else if(step == 4) { + if(numToRecieve_ == 0u) return false; reciever_.waitBufferForUse(); - // + + // points should be inserted first + message msg(message::BNDR_PROCTRANSFER_WAITFILL); + anyList varList; + + internal().insertPointsOnly(reciever_.buffer(), msg, varList); + const auto& indices = varList.getObject(message::eventName(message::ITEM_INSERT)); + auto indView = deviceViewType1D(indices.deviceView().data(), indices.deviceView().size()); + uint32Vector_D newIndices("newIndices", indView); + + if(! appendNewIndices(newIndices)) + { + fatalErrorInFunction; + return false; + } + + if(!notify( + internal().time().currentIter(), + internal().time().currentTime(), + internal().time().dt(), + msg, + varList)) + { + fatalErrorInFunction; + return false; + } + return false; } diff --git a/src/phasicFlow/MPIParallelization/pointStructure/boundaries/boundaryProcessor.hpp b/src/phasicFlow/MPIParallelization/pointStructure/boundaries/boundaryProcessor.hpp index a222cabe..8771869e 100644 --- a/src/phasicFlow/MPIParallelization/pointStructure/boundaries/boundaryProcessor.hpp +++ b/src/phasicFlow/MPIParallelization/pointStructure/boundaries/boundaryProcessor.hpp @@ -33,9 +33,11 @@ namespace pFlow::MPI : public boundaryBase { public: + using pointFieldAccessType = typename boundaryBase::pointFieldAccessType; private: + uint32 neighborProcNumPoints_ = 0; uint32 thisNumPoints_ = 0; @@ -54,8 +56,6 @@ namespace pFlow::MPI uint32Vector_D transferIndices_{"transferIndices"}; - void checkSize() const; - void checkDataRecieved() const; /// @brief Update processor boundary data for this processor @@ -66,9 +66,9 @@ namespace pFlow::MPI /// allow processor boundaries to exchange data in two steps. /// The first step is a buffered non-blocking send and the second /// step is non-blocking recieve to get data. - bool updataBoundary(int step) override; + bool updataBoundaryData(int step) override; - bool transferData(int step) override; + bool transferData(uint32 iter, int step) override; public: TypeInfo("boundary"); @@ -104,6 +104,18 @@ namespace pFlow::MPI /// @brief Return a const reference to point positions in the /// neighbor processor boundary. const realx3Vector_D &neighborProcPoints() const override; + + + uint32 numToTransfer()const override + { + return numToTransfer_; + } + + + uint32 numToRecieve()const override + { + return numToRecieve_; + } }; } // namespace pFlow::MPI diff --git a/src/phasicFlow/MPIParallelization/pointStructure/boundaries/dataReciever.hpp b/src/phasicFlow/MPIParallelization/pointStructure/boundaries/dataReciever.hpp index 962146eb..547e09f9 100644 --- a/src/phasicFlow/MPIParallelization/pointStructure/boundaries/dataReciever.hpp +++ b/src/phasicFlow/MPIParallelization/pointStructure/boundaries/dataReciever.hpp @@ -59,15 +59,28 @@ public: return buffer_.size(); } + void sendBackData( + const localProcessors& processors)const + { + CheckMPI( + Isend( + buffer_.getSpan(), + fromProc_, + tag_, + processors.localCommunicator(), + &recvRequest_ + ), + true + ); + } + void recieveData( const localProcessors& processors, uint32 numToRecv ) { - waitBufferForUse(); - buffer_.clear(); - buffer_.resize(numToRecv); - + resize(numToRecv); + CheckMPI( Irecv( buffer_.getSpan(), @@ -80,16 +93,39 @@ public: ); } + inline auto& buffer() { return buffer_; } + inline const auto& buffer()const { return buffer_; } + inline + void fill(const T& val) + { + waitBufferForUse(); + buffer_.fill(val); + } + + inline + uint32 size()const + { + return buffer_.size(); + } + + inline + void resize(uint32 newSize) + { + waitBufferForUse(); + buffer_.clear(); + buffer_.resize(newSize); + } + }; } diff --git a/src/phasicFlow/MPIParallelization/pointStructure/boundaries/dataSender.hpp b/src/phasicFlow/MPIParallelization/pointStructure/boundaries/dataSender.hpp index 6342009b..18b907c8 100644 --- a/src/phasicFlow/MPIParallelization/pointStructure/boundaries/dataSender.hpp +++ b/src/phasicFlow/MPIParallelization/pointStructure/boundaries/dataSender.hpp @@ -26,7 +26,7 @@ public: private: - BufferVectorType buffer_; + mutable BufferVectorType buffer_; int toProc_; @@ -103,6 +103,34 @@ public: } + bool recieveBackData( + const localProcessors& processors, + uint32 numToRecieve + )const + { + // make sure the buffer is ready to be used and free + // the previous request (if any). + waitBufferForUse(); + + // clear the buffer to prevent data copy if capacity increases + buffer_.clear(); + buffer_.resize(numToRecieve); + + Status status; + CheckMPI( + Irecv( + buffer_.getSpan(), + toProc_, + tag_, + processors.localCommunicator(), + &sendRequest_ + ), + true + ); + + return true; + } + auto& buffer() { return buffer_; @@ -113,6 +141,18 @@ public: return buffer_; } + inline + void fill(const T& val) + { + waitBufferForUse(); + buffer_.fill(val); + } + + uint32 size()const + { + return buffer_.size(); + } + bool sendComplete() { int test; @@ -127,6 +167,14 @@ public: } } + inline + void resize(uint32 newSize) + { + waitBufferForUse(); + buffer_.clear(); + buffer_.resize(newSize); + } + }; }