VTK-m  2.0
Messenger.h
Go to the documentation of this file.
1 //============================================================================
2 // Copyright (c) Kitware, Inc.
3 // All rights reserved.
4 // See LICENSE.txt for details.
5 //
6 // This software is distributed WITHOUT ANY WARRANTY; without even
7 // the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8 // PURPOSE. See the above copyright notice for more information.
9 //============================================================================
10 
11 #ifndef vtk_m_filter_flow_internal_Messenger_h
12 #define vtk_m_filter_flow_internal_Messenger_h
13 
14 #include <vtkm/Types.h>
15 #include <vtkm/filter/flow/vtkm_filter_flow_export.h>
16 #include <vtkm/thirdparty/diy/diy.h>
17 
18 #include <list>
19 #include <map>
20 #include <set>
21 #include <vector>
22 
23 #ifdef VTKM_ENABLE_MPI
24 #include <mpi.h>
25 #endif
26 
27 namespace vtkm
28 {
29 namespace filter
30 {
31 namespace flow
32 {
33 namespace internal
34 {
35 
36 class VTKM_FILTER_FLOW_EXPORT Messenger
37 {
38 public:
39  VTKM_CONT Messenger(vtkmdiy::mpi::communicator& comm);
40  VTKM_CONT virtual ~Messenger()
41  {
42 #ifdef VTKM_ENABLE_MPI
43  this->CleanupRequests();
44 #endif
45  }
46 
47  int GetRank() const { return this->Rank; }
48  int GetNumRanks() const { return this->NumRanks; }
49 
50 #ifdef VTKM_ENABLE_MPI
51  VTKM_CONT void RegisterTag(int tag, std::size_t numRecvs, std::size_t size);
52 
53 protected:
54  static std::size_t CalcMessageBufferSize(std::size_t msgSz);
55 
56  void InitializeBuffers();
57  void CheckPendingSendRequests();
58  void CleanupRequests(int tag = TAG_ANY);
59  void SendData(int dst, int tag, const vtkmdiy::MemoryBuffer& buff);
60  bool RecvData(const std::set<int>& tags,
61  std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers,
62  bool blockAndWait = false);
63 
64 private:
65  void PostRecv(int tag);
66  void PostRecv(int tag, std::size_t sz, int src = -1);
67 
68 
69  //Message headers.
70  typedef struct
71  {
72  int rank, tag;
73  std::size_t id, numPackets, packet, packetSz, dataSz;
74  } Header;
75 
76  bool RecvData(int tag, std::vector<vtkmdiy::MemoryBuffer>& buffers, bool blockAndWait = false);
77 
78  void PrepareForSend(int tag, const vtkmdiy::MemoryBuffer& buff, std::vector<char*>& buffList);
79  vtkm::Id GetMsgID() { return this->MsgID++; }
80  static bool PacketCompare(const char* a, const char* b);
81  void ProcessReceivedBuffers(std::vector<char*>& incomingBuffers,
82  std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers);
83 
84  // Send/Recv buffer management structures.
85  using RequestTagPair = std::pair<MPI_Request, int>;
86  using RankIdPair = std::pair<int, int>;
87 
88  //Member data
89  std::map<int, std::pair<std::size_t, std::size_t>> MessageTagInfo;
90  MPI_Comm MPIComm;
91  std::size_t MsgID;
92  int NumRanks;
93  int Rank;
94  std::map<RequestTagPair, char*> RecvBuffers;
95  std::map<RankIdPair, std::list<char*>> RecvPackets;
96  std::map<RequestTagPair, char*> SendBuffers;
97  static constexpr int TAG_ANY = -1;
98 
99  void CheckRequests(const std::map<RequestTagPair, char*>& buffer,
100  const std::set<int>& tags,
101  bool BlockAndWait,
102  std::vector<RequestTagPair>& reqTags);
103 #else
104 protected:
105  static constexpr int NumRanks = 1;
106  static constexpr int Rank = 0;
107 #endif
108 };
109 
110 }
111 }
112 }
113 } // vtkm::filter::flow::internal
114 
115 #endif // vtk_m_filter_flow_internal_Messenger_h
vtkm
Groups connected points that have the same field value.
Definition: Atomic.h:19
Types.h
vtkm::Id
vtkm::Int32 Id
Represents an ID (index into arrays).
Definition: Types.h:191
VTKM_CONT
#define VTKM_CONT
Definition: ExportMacros.h:57