VTK-m  2.0
ParticleMessenger.h
Go to the documentation of this file.
1 //============================================================================
2 // Copyright (c) Kitware, Inc.
3 // All rights reserved.
4 // See LICENSE.txt for details.
5 //
6 // This software is distributed WITHOUT ANY WARRANTY; without even
7 // the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8 // PURPOSE. See the above copyright notice for more information.
9 //============================================================================
10 
11 #ifndef vtk_m_filter_flow_internal_ParticleMessenger_h
12 #define vtk_m_filter_flow_internal_ParticleMessenger_h
13 
14 #include <vtkm/Particle.h>
17 #include <vtkm/filter/flow/vtkm_filter_flow_export.h>
18 
19 #include <list>
20 #include <map>
21 #include <set>
22 #include <vector>
23 
24 namespace vtkm
25 {
26 namespace filter
27 {
28 namespace flow
29 {
30 namespace internal
31 {
32 
33 template <typename ParticleType>
34 class VTKM_FILTER_FLOW_EXPORT ParticleMessenger : public vtkm::filter::flow::internal::Messenger
35 {
36  //sendRank, message
37  using MsgCommType = std::pair<int, std::vector<int>>;
38 
39  //particle + blockIDs.
40  using ParticleCommType = std::pair<ParticleType, std::vector<vtkm::Id>>;
41 
42  //sendRank, vector of ParticleCommType.
43  using ParticleRecvCommType = std::pair<int, std::vector<ParticleCommType>>;
44 
45 public:
46  VTKM_CONT ParticleMessenger(vtkmdiy::mpi::communicator& comm,
47  const vtkm::filter::flow::internal::BoundsMap& bm,
48  int msgSz = 1,
49  int numParticles = 128,
50  int numBlockIds = 2);
51  VTKM_CONT ~ParticleMessenger() {}
52 
53  VTKM_CONT void Exchange(const std::vector<ParticleType>& outData,
54  const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
55  vtkm::Id numLocalTerm,
56  std::vector<ParticleType>& inData,
57  std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
58  vtkm::Id& numTerminateMessages,
59  bool blockAndWait = false);
60 
61 protected:
62 #ifdef VTKM_ENABLE_MPI
63  static constexpr int MSG_TERMINATE = 1;
64 
65  enum { MESSAGE_TAG = 0x42000, PARTICLE_TAG = 0x42001 };
66 
67  VTKM_CONT void RegisterMessages(int msgSz, int nParticles, int numBlockIds);
68 
69  // Send/Recv particles
70  VTKM_CONT
71  template <typename P,
72  template <typename, typename>
73  class Container,
74  typename Allocator = std::allocator<P>>
75  inline void SendParticles(int dst, const Container<P, Allocator>& c);
76 
77  VTKM_CONT
78  template <typename P,
79  template <typename, typename>
80  class Container,
81  typename Allocator = std::allocator<P>>
82  inline void SendParticles(const std::unordered_map<int, Container<P, Allocator>>& m);
83 
84  // Send/Recv messages.
85  VTKM_CONT void SendMsg(int dst, const std::vector<int>& msg);
86  VTKM_CONT void SendAllMsg(const std::vector<int>& msg);
87  VTKM_CONT bool RecvMsg(std::vector<MsgCommType>& msgs) { return RecvAny(&msgs, NULL, false); }
88 
89  // Send/Recv datasets.
90  VTKM_CONT bool RecvAny(std::vector<MsgCommType>* msgs,
91  std::vector<ParticleRecvCommType>* recvParticles,
92  bool blockAndWait);
93  const vtkm::filter::flow::internal::BoundsMap& BoundsMap;
94 
95 #endif
96 
97  VTKM_CONT void SerialExchange(
98  const std::vector<ParticleType>& outData,
99  const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
100  vtkm::Id numLocalTerm,
101  std::vector<ParticleType>& inData,
102  std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
103  bool blockAndWait) const;
104 
105  static std::size_t CalcParticleBufferSize(std::size_t nParticles, std::size_t numBlockIds = 2);
106 };
107 
108 //methods
109 
110 VTKM_CONT
111 template <typename ParticleType>
112 ParticleMessenger<ParticleType>::ParticleMessenger(
113  vtkmdiy::mpi::communicator& comm,
114  const vtkm::filter::flow::internal::BoundsMap& boundsMap,
115  int msgSz,
116  int numParticles,
117  int numBlockIds)
118  : Messenger(comm)
119 #ifdef VTKM_ENABLE_MPI
120  , BoundsMap(boundsMap)
121 #endif
122 {
123 #ifdef VTKM_ENABLE_MPI
124  this->RegisterMessages(msgSz, numParticles, numBlockIds);
125 #else
126  (void)(boundsMap);
127  (void)(msgSz);
128  (void)(numParticles);
129  (void)(numBlockIds);
130 #endif
131 }
132 
133 template <typename ParticleType>
134 std::size_t ParticleMessenger<ParticleType>::CalcParticleBufferSize(std::size_t nParticles,
135  std::size_t nBlockIds)
136 {
137  ParticleType pTmp;
138  std::size_t pSize = ParticleType::Sizeof();
139 
140 #ifndef NDEBUG
141  vtkmdiy::MemoryBuffer buff;
142  ParticleType p;
143  vtkmdiy::save(buff, p);
144 
145  //Make sure the buffer size is correct.
146  //If this fires, then the size of the class has changed.
147  VTKM_ASSERT(pSize == buff.size());
148 #endif
149 
150  return
151  // rank
152  sizeof(int)
153  //std::vector<ParticleType> p;
154  //p.size()
155  + sizeof(std::size_t)
156  //nParticles of ParticleType
157  + nParticles * pSize
158  // std::vector<vtkm::Id> blockIDs for each particle.
159  // blockIDs.size() for each particle
160  + nParticles * sizeof(std::size_t)
161  // nBlockIDs of vtkm::Id for each particle.
162  + nParticles * nBlockIds * sizeof(vtkm::Id);
163 }
164 
165 VTKM_CONT
166 template <typename ParticleType>
167 void ParticleMessenger<ParticleType>::SerialExchange(
168  const std::vector<ParticleType>& outData,
169  const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
170  vtkm::Id vtkmNotUsed(numLocalTerm),
171  std::vector<ParticleType>& inData,
172  std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
173  bool vtkmNotUsed(blockAndWait)) const
174 {
175  for (auto& p : outData)
176  {
177  const auto& bids = outBlockIDsMap.find(p.GetID())->second;
178  inData.emplace_back(p);
179  inDataBlockIDsMap[p.GetID()] = bids;
180  }
181 }
182 
183 VTKM_CONT
184 template <typename ParticleType>
185 void ParticleMessenger<ParticleType>::Exchange(
186  const std::vector<ParticleType>& outData,
187  const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
188  vtkm::Id numLocalTerm,
189  std::vector<ParticleType>& inData,
190  std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
191  vtkm::Id& numTerminateMessages,
192  bool blockAndWait)
193 {
194  numTerminateMessages = 0;
195  inDataBlockIDsMap.clear();
196 
197  if (this->GetNumRanks() == 1)
198  return this->SerialExchange(
199  outData, outBlockIDsMap, numLocalTerm, inData, inDataBlockIDsMap, blockAndWait);
200 
201 #ifdef VTKM_ENABLE_MPI
202 
203  //dstRank, vector of (particles,blockIDs)
204  std::unordered_map<int, std::vector<ParticleCommType>> sendData;
205 
206  for (const auto& p : outData)
207  {
208  const auto& bids = outBlockIDsMap.find(p.GetID())->second;
209  int dstRank = this->BoundsMap.FindRank(bids[0]);
210  sendData[dstRank].emplace_back(std::make_pair(p, bids));
211  }
212 
213  //Do all the sends first.
214  if (numLocalTerm > 0)
215  this->SendAllMsg({ MSG_TERMINATE, static_cast<int>(numLocalTerm) });
216  this->SendParticles(sendData);
217  this->CheckPendingSendRequests();
218 
219  //Check if we have anything coming in.
220  std::vector<ParticleRecvCommType> particleData;
221  std::vector<MsgCommType> msgData;
222  if (RecvAny(&msgData, &particleData, blockAndWait))
223  {
224  for (const auto& it : particleData)
225  for (const auto& v : it.second)
226  {
227  const auto& p = v.first;
228  const auto& bids = v.second;
229  inData.emplace_back(p);
230  inDataBlockIDsMap[p.GetID()] = bids;
231  }
232 
233  for (const auto& m : msgData)
234  {
235  if (m.second[0] == MSG_TERMINATE)
236  numTerminateMessages += static_cast<vtkm::Id>(m.second[1]);
237  }
238  }
239 #endif
240 }
241 
242 
243 #ifdef VTKM_ENABLE_MPI
244 
245 VTKM_CONT
246 template <typename ParticleType>
247 void ParticleMessenger<ParticleType>::RegisterMessages(int msgSz, int nParticles, int numBlockIds)
248 {
249  //Determine buffer size for msg and particle tags.
250  std::size_t messageBuffSz = CalcMessageBufferSize(msgSz + 1);
251  std::size_t particleBuffSz = CalcParticleBufferSize(nParticles, numBlockIds);
252 
253  int numRecvs = std::min(64, this->GetNumRanks() - 1);
254 
255  this->RegisterTag(ParticleMessenger::MESSAGE_TAG, numRecvs, messageBuffSz);
256  this->RegisterTag(ParticleMessenger::PARTICLE_TAG, numRecvs, particleBuffSz);
257 
258  this->InitializeBuffers();
259 }
260 
261 VTKM_CONT
262 template <typename ParticleType>
263 void ParticleMessenger<ParticleType>::SendMsg(int dst, const std::vector<int>& msg)
264 {
265  vtkmdiy::MemoryBuffer buff;
266 
267  //Write data.
268  vtkmdiy::save(buff, this->GetRank());
269  vtkmdiy::save(buff, msg);
270  this->SendData(dst, ParticleMessenger::MESSAGE_TAG, buff);
271 }
272 
273 VTKM_CONT
274 template <typename ParticleType>
275 void ParticleMessenger<ParticleType>::SendAllMsg(const std::vector<int>& msg)
276 {
277  for (int i = 0; i < this->GetNumRanks(); i++)
278  if (i != this->GetRank())
279  this->SendMsg(i, msg);
280 }
281 
282 VTKM_CONT
283 template <typename ParticleType>
284 bool ParticleMessenger<ParticleType>::RecvAny(std::vector<MsgCommType>* msgs,
285  std::vector<ParticleRecvCommType>* recvParticles,
286  bool blockAndWait)
287 {
288  std::set<int> tags;
289  if (msgs)
290  {
291  tags.insert(ParticleMessenger::MESSAGE_TAG);
292  msgs->resize(0);
293  }
294  if (recvParticles)
295  {
296  tags.insert(ParticleMessenger::PARTICLE_TAG);
297  recvParticles->resize(0);
298  }
299 
300  if (tags.empty())
301  return false;
302 
303  std::vector<std::pair<int, vtkmdiy::MemoryBuffer>> buffers;
304  if (!this->RecvData(tags, buffers, blockAndWait))
305  return false;
306 
307  for (auto& buff : buffers)
308  {
309  if (buff.first == ParticleMessenger::MESSAGE_TAG)
310  {
311  int sendRank;
312  std::vector<int> m;
313  vtkmdiy::load(buff.second, sendRank);
314  vtkmdiy::load(buff.second, m);
315  msgs->emplace_back(std::make_pair(sendRank, m));
316  }
317  else if (buff.first == ParticleMessenger::PARTICLE_TAG)
318  {
319  int sendRank;
320  std::vector<ParticleCommType> particles;
321 
322  vtkmdiy::load(buff.second, sendRank);
323  vtkmdiy::load(buff.second, particles);
324  recvParticles->emplace_back(std::make_pair(sendRank, particles));
325  }
326  }
327 
328  return true;
329 }
330 
331 VTKM_CONT
332 template <typename ParticleType>
333 template <typename P, template <typename, typename> class Container, typename Allocator>
334 inline void ParticleMessenger<ParticleType>::SendParticles(int dst,
335  const Container<P, Allocator>& c)
336 {
337  if (dst == this->GetRank())
338  {
339  VTKM_LOG_S(vtkm::cont::LogLevel::Error, "Error. Sending a particle to yourself.");
340  return;
341  }
342  if (c.empty())
343  return;
344 
345  vtkmdiy::MemoryBuffer bb;
346  vtkmdiy::save(bb, this->GetRank());
347  vtkmdiy::save(bb, c);
348  this->SendData(dst, ParticleMessenger::PARTICLE_TAG, bb);
349 }
350 
351 VTKM_CONT
352 template <typename ParticleType>
353 template <typename P, template <typename, typename> class Container, typename Allocator>
354 inline void ParticleMessenger<ParticleType>::SendParticles(
355  const std::unordered_map<int, Container<P, Allocator>>& m)
356 {
357  for (const auto& mit : m)
358  if (!mit.second.empty())
359  this->SendParticles(mit.first, mit.second);
360 }
361 #endif
362 
363 }
364 }
365 }
366 } // vtkm::filter::flow::internal
367 
368 #endif // vtk_m_filter_flow_internal_ParticleMessenger_h
BoundsMap.h
vtkm
Groups connected points that have the same field value.
Definition: Atomic.h:19
VTKM_ASSERT
#define VTKM_ASSERT(condition)
Definition: Assert.h:43
vtkm::Id
vtkm::Int32 Id
Represents an ID (index into arrays).
Definition: Types.h:191
vtkm::exec::arg::load
VTKM_SUPPRESS_EXEC_WARNINGS VTKM_EXEC T load(const U &u, vtkm::Id v)
Definition: FetchTagArrayDirectIn.h:36
VTKM_CONT
#define VTKM_CONT
Definition: ExportMacros.h:57
vtkmNotUsed
#define vtkmNotUsed(parameter_name)
Simple macro to identify a parameter as unused.
Definition: ExportMacros.h:128
vtkm::cont::LogLevel::Error
@ Error
Important but non-fatal errors, such as device fail-over.
VTKM_LOG_S
#define VTKM_LOG_S(level,...)
Writes a message using stream syntax to the indicated log level.
Definition: Logging.h:261
Messenger.h
Particle.h