From 6f48eca95b07ea4758a665e6567fd5cb17f1d4e3 Mon Sep 17 00:00:00 2001 From: HRN Date: Tue, 30 Apr 2024 00:28:29 +0330 Subject: [PATCH] The problem with memory leak in MPI data transfer fixed and tested. --- .../processorBoundaryContactSearch.cpp | 8 +- .../twoPartContactSearch.cpp | 2 +- .../processorBoundarySphereInteraction.cpp | 9 ++- .../pointField/processorBoundaryField.cpp | 10 +-- .../pointField/processorBoundaryField.hpp | 7 +- .../boundaries/boundaryProcessor.cpp | 15 ++-- .../boundaries/boundaryProcessor.hpp | 14 +--- .../boundaries/dataReciever.hpp | 66 +++++++--------- .../pointStructure/boundaries/dataSender.hpp | 76 +++++++++++-------- 9 files changed, 99 insertions(+), 108 deletions(-) diff --git a/src/Interaction/contactSearch/boundaries/processorBoundaryContactSearch/processorBoundaryContactSearch.cpp b/src/Interaction/contactSearch/boundaries/processorBoundaryContactSearch/processorBoundaryContactSearch.cpp index 9f9384e9..8ab8e61d 100644 --- a/src/Interaction/contactSearch/boundaries/processorBoundaryContactSearch/processorBoundaryContactSearch.cpp +++ b/src/Interaction/contactSearch/boundaries/processorBoundaryContactSearch/processorBoundaryContactSearch.cpp @@ -85,7 +85,7 @@ bool pFlow::processorBoundaryContactSearch::broadSearch { if(masterSearch_) { - /*const auto thisPoints = boundary().thisPoints(); + const auto thisPoints = boundary().thisPoints(); const auto& neighborProcPoints = boundary().neighborProcPoints(); const auto& bDiams = diameter_.BoundaryField(thisBoundaryIndex()); const auto thisDiams = bDiams.thisField(); @@ -96,9 +96,9 @@ bool pFlow::processorBoundaryContactSearch::broadSearch thisPoints, thisDiams, neighborProcPoints, - neighborProcDiams); - - pOutput<<"ppPairs size in boundary"<< ppPairs.size()<::sphereSphereInter const auto & sphPar = this->sphParticles(); uint32 thisIndex = this->boundary().thisBoundaryIndex(); - const auto& a = sphPar.diameter().BoundaryField(thisIndex).neighborProcField().deviceViewAll(); - - /*pFlow::MPI::processorBoundarySIKernels::sphereSphereInteraction( + pOutput<<"beofre sphereSphereInteraction"<ppPairs(), cfModel, @@ -67,7 +66,9 @@ bool pFlow::MPI::processorBoundarySphereInteraction::sphereSphereInter sphPar.rVelocity().BoundaryField(thisIndex).neighborProcField().deviceViewAll(), sphPar.contactForce().BoundaryField(thisIndex).neighborProcField().deviceViewAll(), sphPar.contactTorque().BoundaryField(thisIndex).neighborProcField().deviceViewAll() - );*/ + ); + + pOutput<<"after sphereSphereInteraction"<::checkDataRecieved() const { if (!dataRecieved_) { - //uint32 nRecv = reciever_.waitComplete(); + uint32 nRecv = reciever_.waitBufferForUse(); dataRecieved_ = true; - /*if (nRecv != this->neighborProcSize()) + if (nRecv != this->neighborProcSize()) { fatalErrorInFunction; fatalExit; - }*/ + } } } @@ -41,7 +41,7 @@ pFlow::MPI::processorBoundaryField::updateBoundary( DataDirection direction ) { - /*if (step == 1) + if (step == 1) { // Isend if (direction == DataDirection::TwoWay || @@ -67,7 +67,7 @@ pFlow::MPI::processorBoundaryField::updateBoundary( { fatalErrorInFunction << "Invalid step number " << step << endl; return false; - }*/ + } return true; } diff --git a/src/phasicFlow/MPIParallelization/pointField/processorBoundaryField.hpp b/src/phasicFlow/MPIParallelization/pointField/processorBoundaryField.hpp index 5fb0780a..0a6bad28 100644 --- a/src/phasicFlow/MPIParallelization/pointField/processorBoundaryField.hpp +++ b/src/phasicFlow/MPIParallelization/pointField/processorBoundaryField.hpp @@ -50,11 +50,11 @@ public: private: - dataSender sender_; + dataSender sender_; - mutable dataReciever reciever_; + dataReciever reciever_; - mutable bool dataRecieved_ = true; + mutable bool dataRecieved_ = true; void checkDataRecieved()const; @@ -82,7 +82,6 @@ public: ProcVectorType& neighborProcField() override; - const ProcVectorType& neighborProcField()const override; bool hearChanges diff --git a/src/phasicFlow/MPIParallelization/pointStructure/boundaries/boundaryProcessor.cpp b/src/phasicFlow/MPIParallelization/pointStructure/boundaries/boundaryProcessor.cpp index 50098e0a..2648cc04 100644 --- a/src/phasicFlow/MPIParallelization/pointStructure/boundaries/boundaryProcessor.cpp +++ b/src/phasicFlow/MPIParallelization/pointStructure/boundaries/boundaryProcessor.cpp @@ -25,11 +25,7 @@ Licence: void pFlow::MPI::boundaryProcessor::checkSize() const { - if (!sizeObtained_) - { - //MPI_Wait(&sizeRequest_, StatusIgnore); - sizeObtained_ = true; - } + } void @@ -37,13 +33,13 @@ pFlow::MPI::boundaryProcessor::checkDataRecieved() const { if (!dataRecieved_) { - //uint32 nRecv = reciever_.waitComplete(); + uint32 nRecv = reciever_.waitBufferForUse(); dataRecieved_ = true; - /*if (nRecv != neighborProcSize()) + if (nRecv != neighborProcSize()) { fatalErrorInFunction; fatalExit; - }*/ + } } } @@ -92,8 +88,7 @@ pFlow::MPI::boundaryProcessor::beforeIteration(uint32 iterNum, real t, real dt) pFlowProcessors().localCommunicator(), MPI_STATUS_IGNORE ); - - sizeObtained_ = false; + MPI_Request_free(&req); return true; } diff --git a/src/phasicFlow/MPIParallelization/pointStructure/boundaries/boundaryProcessor.hpp b/src/phasicFlow/MPIParallelization/pointStructure/boundaries/boundaryProcessor.hpp index cb278461..1f96263d 100644 --- a/src/phasicFlow/MPIParallelization/pointStructure/boundaries/boundaryProcessor.hpp +++ b/src/phasicFlow/MPIParallelization/pointStructure/boundaries/boundaryProcessor.hpp @@ -38,21 +38,13 @@ private: uint32 neighborProcNumPoints_ = 0; - uint32 thisNumPoints_; + uint32 thisNumPoints_ = 0; realx3Vector_D neighborProcPoints_; - mutable Request sizeRequest_; + dataSender sender_; - mutable Request sSizeRequest_; - - int req_=0; - - mutable bool sizeObtained_ = true; - - mutable dataSender sender_; - - mutable dataReciever reciever_; + dataReciever reciever_; mutable bool dataRecieved_ = true; diff --git a/src/phasicFlow/MPIParallelization/pointStructure/boundaries/dataReciever.hpp b/src/phasicFlow/MPIParallelization/pointStructure/boundaries/dataReciever.hpp index 13069b2a..962146eb 100644 --- a/src/phasicFlow/MPIParallelization/pointStructure/boundaries/dataReciever.hpp +++ b/src/phasicFlow/MPIParallelization/pointStructure/boundaries/dataReciever.hpp @@ -27,13 +27,11 @@ private: BufferVectorType buffer_; - std::vector buffer0_; - int fromProc_; int tag_; - Request recvRequest_; + mutable Request recvRequest_ = RequestNull; public: @@ -46,34 +44,40 @@ public: ~dataReciever()=default; + uint32 waitBufferForUse()const + { + if(recvRequest_ != RequestNull) + { + Status status; + MPI_Wait(&recvRequest_, &status); + int count; + CheckMPI(getCount(&status, count), true); + + return static_cast(count); + } + else + return buffer_.size(); + } + void recieveData( const localProcessors& processors, uint32 numToRecv ) { - - buffer0_.clear(); - buffer0_.resize(numToRecv); - MPI_Status status; + waitBufferForUse(); + buffer_.clear(); + buffer_.resize(numToRecv); - /*CheckMPI(recv( - buffer_.getSpan(), - fromProc_, - tag_, - processors.localCommunicator(), - &status), true);*/ - MPI_Recv( - buffer0_.data(), - buffer0_.size(), - realx3Type__, - fromProc_, - tag_, - processors.localCommunicator(), - &status + CheckMPI( + Irecv( + buffer_.getSpan(), + fromProc_, + tag_, + processors.localCommunicator(), + &recvRequest_ + ), + true ); - int c; - getCount(&status, c); - pOutput<<"Number of data recieved "<(&status, count), true); - - return static_cast(count);*/ - return buffer_.size(); - } - }; } diff --git a/src/phasicFlow/MPIParallelization/pointStructure/boundaries/dataSender.hpp b/src/phasicFlow/MPIParallelization/pointStructure/boundaries/dataSender.hpp index 11c1782f..6342009b 100644 --- a/src/phasicFlow/MPIParallelization/pointStructure/boundaries/dataSender.hpp +++ b/src/phasicFlow/MPIParallelization/pointStructure/boundaries/dataSender.hpp @@ -26,15 +26,13 @@ public: private: - //BufferVectorType buffer_; - - std::vector buffer_; + BufferVectorType buffer_; int toProc_; int tag_; - Request sendRequest_ = RequestNull; + mutable Request sendRequest_ = RequestNull; public: @@ -44,7 +42,22 @@ public: tag_(tag) {} - ~dataSender()=default; + ~dataSender() + { + if(sendRequest_ != RequestNull) + { + MPI_Request_free(&sendRequest_); + } + } + + bool waitBufferForUse()const + { + if(sendRequest_ != RequestNull) + { + MPI_Wait(&sendRequest_, StatusesIgnore); + } + return true; + } void sendData( const localProcessors& processors, @@ -52,17 +65,21 @@ public: ) { using RPolicy = Kokkos::RangePolicy< - DefaultExecutionSpace, + execution_space, Kokkos::Schedule, Kokkos::IndexType>; uint32 n = scatterField.size(); + + // make sure the buffer is ready to be used and free + // the previous request (if any). + waitBufferForUse(); // clear the buffer to prevent data copy if capacity increases buffer_.clear(); buffer_.resize(n); - auto* buffView = buffer_.data(); + const auto& buffView = buffer_.deviceViewAll(); Kokkos::parallel_for( "dataSender::sendData", @@ -73,26 +90,20 @@ public: } ); Kokkos::fence(); - auto req = MPI_REQUEST_NULL; - - MPI_Isend( - buffer_.data(), - buffer_.size(), - realx3Type__, - toProc_, - tag_, - processors.localCommunicator(), - &req); - /*CheckMPI(send( - buffer_.getSpan(), - toProc_, - tag_, - processors.localCommunicator(), - MPI_STATUS_IGNORE), true);*/ + CheckMPI( + Isend(buffer_.getSpan(), + toProc_, + tag_, + processors.localCommunicator(), + &sendRequest_ + ), + true + ); + } - /*auto& buffer() + auto& buffer() { return buffer_; } @@ -100,17 +111,20 @@ public: const auto& buffer()const { return buffer_; - }*/ + } bool sendComplete() { - return true; - /*int test; - MPI_Test(&sendRequest_, &test, StatusIgnore); - if(test) - return true; + int test; + if(sendRequest_ != RequestNull) + { + MPI_Test(&sendRequest_, &test, StatusIgnore); + return test; + } else - return false;*/ + { + return true; + } } };