The problem with memory leak in MPI data transfer fixed and tested.

This commit is contained in:
HRN 2024-04-30 00:28:29 +03:30
parent b5a81bc0fc
commit 6f48eca95b
9 changed files with 99 additions and 108 deletions

View File

@ -85,7 +85,7 @@ bool pFlow::processorBoundaryContactSearch::broadSearch
{
if(masterSearch_)
{
/*const auto thisPoints = boundary().thisPoints();
const auto thisPoints = boundary().thisPoints();
const auto& neighborProcPoints = boundary().neighborProcPoints();
const auto& bDiams = diameter_.BoundaryField(thisBoundaryIndex());
const auto thisDiams = bDiams.thisField();
@ -96,9 +96,9 @@ bool pFlow::processorBoundaryContactSearch::broadSearch
thisPoints,
thisDiams,
neighborProcPoints,
neighborProcDiams);
pOutput<<"ppPairs size in boundary"<< ppPairs.size()<<endl; */
neighborProcDiams
);
//pOutput<<"ppSize "<< ppPairs.size()<<endl;
return true;
}else

View File

@ -99,7 +99,7 @@ bool pFlow::twoPartContactSearch::broadSearchPP
ppPairs.increaseCapacityBy(len);
INFORMATION<< "Particle-particle contact pair container capacity increased from "<<
oldCap << " to "<<ppPairs.capacity()<<" in peiodicBoundaryContactSearch."<<END_INFO;
oldCap << " to "<<ppPairs.capacity()<<" in contact search in boundary region."<<END_INFO;
}

View File

@ -47,9 +47,8 @@ bool pFlow::MPI::processorBoundarySphereInteraction<cFM, gMM>::sphereSphereInter
const auto & sphPar = this->sphParticles();
uint32 thisIndex = this->boundary().thisBoundaryIndex();
const auto& a = sphPar.diameter().BoundaryField(thisIndex).neighborProcField().deviceViewAll();
/*pFlow::MPI::processorBoundarySIKernels::sphereSphereInteraction(
pOutput<<"beofre sphereSphereInteraction"<<endl;
pFlow::MPI::processorBoundarySIKernels::sphereSphereInteraction(
dt,
this->ppPairs(),
cfModel,
@ -67,7 +66,9 @@ bool pFlow::MPI::processorBoundarySphereInteraction<cFM, gMM>::sphereSphereInter
sphPar.rVelocity().BoundaryField(thisIndex).neighborProcField().deviceViewAll(),
sphPar.contactForce().BoundaryField(thisIndex).neighborProcField().deviceViewAll(),
sphPar.contactTorque().BoundaryField(thisIndex).neighborProcField().deviceViewAll()
);*/
);
pOutput<<"after sphereSphereInteraction"<<endl;
return true;
}

View File

@ -24,13 +24,13 @@ pFlow::MPI::processorBoundaryField<T, MemorySpace>::checkDataRecieved() const
{
if (!dataRecieved_)
{
//uint32 nRecv = reciever_.waitComplete();
uint32 nRecv = reciever_.waitBufferForUse();
dataRecieved_ = true;
/*if (nRecv != this->neighborProcSize())
if (nRecv != this->neighborProcSize())
{
fatalErrorInFunction;
fatalExit;
}*/
}
}
}
@ -41,7 +41,7 @@ pFlow::MPI::processorBoundaryField<T, MemorySpace>::updateBoundary(
DataDirection direction
)
{
/*if (step == 1)
if (step == 1)
{
// Isend
if (direction == DataDirection::TwoWay ||
@ -67,7 +67,7 @@ pFlow::MPI::processorBoundaryField<T, MemorySpace>::updateBoundary(
{
fatalErrorInFunction << "Invalid step number " << step << endl;
return false;
}*/
}
return true;
}

View File

@ -50,11 +50,11 @@ public:
private:
dataSender<T, MemorySpace> sender_;
dataSender<T, MemorySpace> sender_;
mutable dataReciever<T, MemorySpace> reciever_;
dataReciever<T, MemorySpace> reciever_;
mutable bool dataRecieved_ = true;
mutable bool dataRecieved_ = true;
void checkDataRecieved()const;
@ -82,7 +82,6 @@ public:
ProcVectorType& neighborProcField() override;
const ProcVectorType& neighborProcField()const override;
bool hearChanges

View File

@ -25,11 +25,7 @@ Licence:
void
pFlow::MPI::boundaryProcessor::checkSize() const
{
if (!sizeObtained_)
{
//MPI_Wait(&sizeRequest_, StatusIgnore);
sizeObtained_ = true;
}
}
void
@ -37,13 +33,13 @@ pFlow::MPI::boundaryProcessor::checkDataRecieved() const
{
if (!dataRecieved_)
{
//uint32 nRecv = reciever_.waitComplete();
uint32 nRecv = reciever_.waitBufferForUse();
dataRecieved_ = true;
/*if (nRecv != neighborProcSize())
if (nRecv != neighborProcSize())
{
fatalErrorInFunction;
fatalExit;
}*/
}
}
}
@ -92,8 +88,7 @@ pFlow::MPI::boundaryProcessor::beforeIteration(uint32 iterNum, real t, real dt)
pFlowProcessors().localCommunicator(),
MPI_STATUS_IGNORE
);
sizeObtained_ = false;
MPI_Request_free(&req);
return true;
}

View File

@ -38,21 +38,13 @@ private:
uint32 neighborProcNumPoints_ = 0;
uint32 thisNumPoints_;
uint32 thisNumPoints_ = 0;
realx3Vector_D neighborProcPoints_;
mutable Request sizeRequest_;
dataSender<realx3> sender_;
mutable Request sSizeRequest_;
int req_=0;
mutable bool sizeObtained_ = true;
mutable dataSender<realx3> sender_;
mutable dataReciever<realx3> reciever_;
dataReciever<realx3> reciever_;
mutable bool dataRecieved_ = true;

View File

@ -27,13 +27,11 @@ private:
BufferVectorType buffer_;
std::vector<T> buffer0_;
int fromProc_;
int tag_;
Request recvRequest_;
mutable Request recvRequest_ = RequestNull;
public:
@ -46,34 +44,40 @@ public:
~dataReciever()=default;
uint32 waitBufferForUse()const
{
if(recvRequest_ != RequestNull)
{
Status status;
MPI_Wait(&recvRequest_, &status);
int count;
CheckMPI(getCount<T>(&status, count), true);
return static_cast<uint32>(count);
}
else
return buffer_.size();
}
void recieveData(
const localProcessors& processors,
uint32 numToRecv
)
{
buffer0_.clear();
buffer0_.resize(numToRecv);
MPI_Status status;
waitBufferForUse();
buffer_.clear();
buffer_.resize(numToRecv);
/*CheckMPI(recv(
buffer_.getSpan(),
fromProc_,
tag_,
processors.localCommunicator(),
&status), true);*/
MPI_Recv(
buffer0_.data(),
buffer0_.size(),
realx3Type__,
fromProc_,
tag_,
processors.localCommunicator(),
&status
CheckMPI(
Irecv(
buffer_.getSpan(),
fromProc_,
tag_,
processors.localCommunicator(),
&recvRequest_
),
true
);
int c;
getCount<realx3>(&status, c);
pOutput<<"Number of data recieved "<<c<<endl;
}
auto& buffer()
@ -86,20 +90,6 @@ public:
return buffer_;
}
uint32 waitComplete()
{
/*Status status;
CheckMPI(MPI_Wait(&recvRequest_, &status), true);
int count;
CheckMPI(getCount<T>(&status, count), true);
return static_cast<uint32>(count);*/
return buffer_.size();
}
};
}

View File

@ -26,15 +26,13 @@ public:
private:
//BufferVectorType buffer_;
std::vector<T> buffer_;
BufferVectorType buffer_;
int toProc_;
int tag_;
Request sendRequest_ = RequestNull;
mutable Request sendRequest_ = RequestNull;
public:
@ -44,7 +42,22 @@ public:
tag_(tag)
{}
~dataSender()=default;
~dataSender()
{
if(sendRequest_ != RequestNull)
{
MPI_Request_free(&sendRequest_);
}
}
bool waitBufferForUse()const
{
if(sendRequest_ != RequestNull)
{
MPI_Wait(&sendRequest_, StatusesIgnore);
}
return true;
}
void sendData(
const localProcessors& processors,
@ -52,17 +65,21 @@ public:
)
{
using RPolicy = Kokkos::RangePolicy<
DefaultExecutionSpace,
execution_space,
Kokkos::Schedule<Kokkos::Static>,
Kokkos::IndexType<pFlow::uint32>>;
uint32 n = scatterField.size();
// make sure the buffer is ready to be used and free
// the previous request (if any).
waitBufferForUse();
// clear the buffer to prevent data copy if capacity increases
buffer_.clear();
buffer_.resize(n);
auto* buffView = buffer_.data();
const auto& buffView = buffer_.deviceViewAll();
Kokkos::parallel_for(
"dataSender::sendData",
@ -73,26 +90,20 @@ public:
}
);
Kokkos::fence();
auto req = MPI_REQUEST_NULL;
MPI_Isend(
buffer_.data(),
buffer_.size(),
realx3Type__,
toProc_,
tag_,
processors.localCommunicator(),
&req);
/*CheckMPI(send(
buffer_.getSpan(),
toProc_,
tag_,
processors.localCommunicator(),
MPI_STATUS_IGNORE), true);*/
CheckMPI(
Isend(buffer_.getSpan(),
toProc_,
tag_,
processors.localCommunicator(),
&sendRequest_
),
true
);
}
/*auto& buffer()
auto& buffer()
{
return buffer_;
}
@ -100,17 +111,20 @@ public:
const auto& buffer()const
{
return buffer_;
}*/
}
bool sendComplete()
{
return true;
/*int test;
MPI_Test(&sendRequest_, &test, StatusIgnore);
if(test)
return true;
int test;
if(sendRequest_ != RequestNull)
{
MPI_Test(&sendRequest_, &test, StatusIgnore);
return test;
}
else
return false;*/
{
return true;
}
}
};