dispatcher.h
Go to the documentation of this file.
1 //
2 // Copyright 2016 Pixar
3 //
4 // Licensed under the Apache License, Version 2.0 (the "Apache License")
5 // with the following modification; you may not use this file except in
6 // compliance with the Apache License and the following modification to it:
7 // Section 6. Trademarks. is deleted and replaced with:
8 //
9 // 6. Trademarks. This License does not grant permission to use the trade
10 // names, trademarks, service marks, or product names of the Licensor
11 // and its affiliates, except as required to comply with Section 4(c) of
12 // the License and to reproduce the content of the NOTICE file.
13 //
14 // You may obtain a copy of the Apache License at
15 //
16 // http://www.apache.org/licenses/LICENSE-2.0
17 //
18 // Unless required by applicable law or agreed to in writing, software
19 // distributed under the Apache License with the above modification is
20 // distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
21 // KIND, either express or implied. See the Apache License for the specific
22 // language governing permissions and limitations under the Apache License.
23 //
24 #ifndef PXR_BASE_WORK_DISPATCHER_H
25 #define PXR_BASE_WORK_DISPATCHER_H
26 
28 
29 #include "pxr/pxr.h"
31 #include "pxr/base/work/api.h"
32 
33 #include "pxr/base/tf/errorMark.h"
35 
36 #include <tbb/concurrent_vector.h>
37 #include <tbb/task.h>
38 
39 #include <functional>
40 #include <type_traits>
41 #include <utility>
42 
43 PXR_NAMESPACE_OPEN_SCOPE
44 
76 {
77 public:
79  WORK_API WorkDispatcher();
80 
82  WORK_API ~WorkDispatcher();
83 
84  WorkDispatcher(WorkDispatcher const &) = delete;
85  WorkDispatcher &operator=(WorkDispatcher const &) = delete;
86 
87 #ifdef doxygen
88 
99  template <class Callable, class A1, class A2, ... class AN>
100  void Run(Callable &&c, A1 &&a1, A2 &&a2, ... AN &&aN);
101 
102 #else // doxygen
103 
104  template <class Callable>
105  inline void Run(Callable &&c) {
106  _rootTask->spawn(_MakeInvokerTask(std::forward<Callable>(c)));
107  }
108 
109  template <class Callable, class A0, class ... Args>
110  inline void Run(Callable &&c, A0 &&a0, Args&&... args) {
111  Run(std::bind(std::forward<Callable>(c),
112  std::forward<A0>(a0),
113  std::forward<Args>(args)...));
114  }
115 
116 #endif // doxygen
117 
119  WORK_API void Wait();
120 
131  WORK_API void Cancel();
132 
133 private:
134  typedef tbb::concurrent_vector<TfErrorTransport> _ErrorTransports;
135 
136  // Function invoker helper that wraps the invocation with an ErrorMark so we
137  // can transmit errors that occur back to the thread that Wait() s for tasks
138  // to complete.
139  template <class Fn>
140  struct _InvokerTask : public tbb::task {
141  explicit _InvokerTask(Fn &&fn, _ErrorTransports *err)
142  : _fn(std::move(fn)), _errors(err) {}
143 
144  explicit _InvokerTask(Fn const &fn, _ErrorTransports *err)
145  : _fn(fn), _errors(err) {}
146 
147  virtual tbb::task* execute() {
148  TfErrorMark m;
149  _fn();
150  if (!m.IsClean())
151  WorkDispatcher::_TransportErrors(m, _errors);
152  return NULL;
153  }
154  private:
155  Fn _fn;
156  _ErrorTransports *_errors;
157  };
158 
159  // Make an _InvokerTask instance, letting the function template deduce Fn.
160  template <class Fn>
161  _InvokerTask<typename std::remove_reference<Fn>::type>&
162  _MakeInvokerTask(Fn &&fn) {
163  return *new( _rootTask->allocate_additional_child_of(*_rootTask) )
164  _InvokerTask<typename std::remove_reference<Fn>::type>(
165  std::forward<Fn>(fn), &_errors);
166  }
167 
168  // Helper function that removes errors from \p m and stores them in a new
169  // entry in \p errors.
170  WORK_API static void
171  _TransportErrors(const TfErrorMark &m, _ErrorTransports *errors);
172 
173  // Task group context and associated root task that allows us to cancel
174  // tasks invoked directly by this dispatcher.
175  tbb::task_group_context _context;
176  tbb::empty_task* _rootTask;
177 
178  // The error transports we use to transmit errors in other threads back to
179  // this thread.
180  _ErrorTransports _errors;
181 
182  // Concurrent calls to Wait() have to serialize certain cleanup operations.
183  std::atomic_flag _waitCleanupFlag;
184 };
185 
187 
188 PXR_NAMESPACE_CLOSE_SCOPE
189 
190 #endif // PXR_BASE_WORK_DISPATCHER_H
WORK_API void Cancel()
Cancel remaining work and return immediately.
A work dispatcher runs concurrent tasks.
Definition: dispatcher.h:75
Class used to record the end of the error-list.
Definition: errorMark.h:66
void Run(Callable &&c, A1 &&a1, A2 &&a2,... AN &&aN)
Add work for the dispatcher to run.
WORK_API WorkDispatcher()
Construct a new dispatcher.
WORK_API ~WorkDispatcher()
Wait() for any pending tasks to complete, then destroy the dispatcher.
bool IsClean() const
Return true if no new errors were posted in this thread since the last call to SetMark(),...
Definition: errorMark.h:99
WORK_API void Wait()
Block until the work started by Run() completes.