VTK-m  2.0
AdvectAlgorithmThreaded.h
Go to the documentation of this file.
1 //============================================================================
2 // Copyright (c) Kitware, Inc.
3 // All rights reserved.
4 // See LICENSE.txt for details.
5 //
6 // This software is distributed WITHOUT ANY WARRANTY; without even
7 // the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8 // PURPOSE. See the above copyright notice for more information.
9 //============================================================================
10 
11 #ifndef vtk_m_filter_flow_internal_AdvectAlgorithmThreaded_h
12 #define vtk_m_filter_flow_internal_AdvectAlgorithmThreaded_h
13 
19 
20 #include <thread>
21 
22 namespace vtkm
23 {
24 namespace filter
25 {
26 namespace flow
27 {
28 namespace internal
29 {
30 
31 template <typename DSIType, template <typename> class ResultType, typename ParticleType>
32 class AdvectAlgorithmThreaded : public AdvectAlgorithm<DSIType, ResultType, ParticleType>
33 {
34 public:
35  AdvectAlgorithmThreaded(const vtkm::filter::flow::internal::BoundsMap& bm,
36  std::vector<DSIType>& blocks)
37  : AdvectAlgorithm<DSIType, ResultType, ParticleType>(bm, blocks)
38  , Done(false)
39  , WorkerActivate(false)
40  {
41  //For threaded algorithm, the particles go out of scope in the Work method.
42  //When this happens, they are destructed by the time the Manage thread gets them.
43  //Set the copy flag so the std::vector is copied into the ArrayHandle
44  for (auto& block : this->Blocks)
45  block.SetCopySeedFlag(true);
46  }
47 
48  void Go() override
49  {
50  vtkm::Id nLocal = static_cast<vtkm::Id>(this->Active.size() + this->Inactive.size());
51  this->ComputeTotalNumParticles(nLocal);
52 
53  std::vector<std::thread> workerThreads;
54  workerThreads.emplace_back(std::thread(AdvectAlgorithmThreaded::Worker, this));
55  this->Manage();
56 
57  //This will only work for 1 thread. For > 1, the Blocks will need a mutex.
58  VTKM_ASSERT(workerThreads.size() == 1);
59  for (auto& t : workerThreads)
60  t.join();
61  }
62 
63 protected:
64  bool GetActiveParticles(std::vector<ParticleType>& particles, vtkm::Id& blockId) override
65  {
66  std::lock_guard<std::mutex> lock(this->Mutex);
67  bool val = this->AdvectAlgorithm<DSIType, ResultType, ParticleType>::GetActiveParticles(
68  particles, blockId);
69  this->WorkerActivate = val;
70  return val;
71  }
72 
73  void UpdateActive(const std::vector<ParticleType>& particles,
74  const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& idsMap) override
75  {
76  if (!particles.empty())
77  {
78  std::lock_guard<std::mutex> lock(this->Mutex);
79  this->AdvectAlgorithm<DSIType, ResultType, ParticleType>::UpdateActive(particles, idsMap);
80 
81  //Let workers know there is new work
82  this->WorkerActivateCondition.notify_all();
83  this->WorkerActivate = true;
84  }
85  }
86 
87  bool CheckDone()
88  {
89  std::lock_guard<std::mutex> lock(this->Mutex);
90  return this->Done;
91  }
92 
93  void SetDone()
94  {
95  std::lock_guard<std::mutex> lock(this->Mutex);
96  this->Done = true;
97  this->WorkerActivateCondition.notify_all();
98  }
99 
100  static void Worker(AdvectAlgorithmThreaded* algo) { algo->Work(); }
101 
102  void WorkerWait()
103  {
104  std::unique_lock<std::mutex> lock(this->Mutex);
105  this->WorkerActivateCondition.wait(lock, [this] { return WorkerActivate || Done; });
106  }
107 
108  void UpdateWorkerResult(vtkm::Id blockId, DSIHelperInfoType& b)
109  {
110  std::lock_guard<std::mutex> lock(this->Mutex);
111  auto& it = this->WorkerResults[blockId];
112  it.emplace_back(b);
113  }
114 
115  void Work()
116  {
117  while (!this->CheckDone())
118  {
119  std::vector<ParticleType> v;
120  vtkm::Id blockId = -1;
121  if (this->GetActiveParticles(v, blockId))
122  {
123  auto& block = this->GetDataSet(blockId);
124  DSIHelperInfoType bb =
125  DSIHelperInfo<ParticleType>(v, this->BoundsMap, this->ParticleBlockIDsMap);
126  block.Advect(bb, this->StepSize, this->NumberOfSteps);
127  this->UpdateWorkerResult(blockId, bb);
128  }
129  else
130  this->WorkerWait();
131  }
132  }
133 
134  void Manage()
135  {
136  vtkm::filter::flow::internal::ParticleMessenger<ParticleType> messenger(
137  this->Comm, this->BoundsMap, 1, 128);
138 
139  while (this->TotalNumTerminatedParticles < this->TotalNumParticles)
140  {
141  std::unordered_map<vtkm::Id, std::vector<DSIHelperInfoType>> workerResults;
142  this->GetWorkerResults(workerResults);
143 
144  vtkm::Id numTerm = 0;
145  for (auto& it : workerResults)
146  {
147  for (auto& r : it.second)
148  numTerm += this->UpdateResult(r.Get<DSIHelperInfo<ParticleType>>());
149  }
150 
151  vtkm::Id numTermMessages = 0;
152  this->Communicate(messenger, numTerm, numTermMessages);
153 
154  this->TotalNumTerminatedParticles += (numTerm + numTermMessages);
155  if (this->TotalNumTerminatedParticles > this->TotalNumParticles)
156  throw vtkm::cont::ErrorFilterExecution("Particle count error");
157  }
158 
159  //Let the workers know that we are done.
160  this->SetDone();
161  }
162 
163  bool GetBlockAndWait(const vtkm::Id& numLocalTerm) override
164  {
165  std::lock_guard<std::mutex> lock(this->Mutex);
166 
167  return (
168  this->AdvectAlgorithm<DSIType, ResultType, ParticleType>::GetBlockAndWait(numLocalTerm) &&
169  !this->WorkerActivate && this->WorkerResults.empty());
170  }
171 
172  void GetWorkerResults(std::unordered_map<vtkm::Id, std::vector<DSIHelperInfoType>>& results)
173  {
174  results.clear();
175 
176  std::lock_guard<std::mutex> lock(this->Mutex);
177  if (!this->WorkerResults.empty())
178  {
179  results = this->WorkerResults;
180  this->WorkerResults.clear();
181  }
182  }
183 
184  std::atomic<bool> Done;
185  std::mutex Mutex;
186  bool WorkerActivate;
187  std::condition_variable WorkerActivateCondition;
188  std::unordered_map<vtkm::Id, std::vector<DSIHelperInfoType>> WorkerResults;
189 };
190 
191 }
192 }
193 }
194 } //vtkm::filter::flow::internal
195 
196 #endif //vtk_m_filter_flow_internal_AdvectAlgorithmThreaded_h
BoundsMap.h
vtkm
Groups connected points that have the same field value.
Definition: Atomic.h:19
VTKM_ASSERT
#define VTKM_ASSERT(condition)
Definition: Assert.h:43
vtkm::cont::ErrorFilterExecution
This class is primarily intended to filters to throw in the control environment to indicate an execut...
Definition: ErrorFilterExecution.h:27
DataSetIntegrator.h
ParticleMessenger.h
vtkm::Id
vtkm::Int32 Id
Represents an ID (index into arrays).
Definition: Types.h:191
AdvectAlgorithm.h
PartitionedDataSet.h