engine.hh
Go to the documentation of this file.
00001 /* -*- mode: C++; c-basic-offset: 2; indent-tabs-mode: nil -*- */ 00002 /* 00003 * Main authors: 00004 * Christian Schulte <schulte@gecode.org> 00005 * 00006 * Copyright: 00007 * Christian Schulte, 2009 00008 * 00009 * Last modified: 00010 * $Date: 2011-02-01 14:11:54 +0100 (Tue, 01 Feb 2011) $ by $Author: schulte $ 00011 * $Revision: 11591 $ 00012 * 00013 * This file is part of Gecode, the generic constraint 00014 * development environment: 00015 * http://www.gecode.org 00016 * 00017 * Permission is hereby granted, free of charge, to any person obtaining 00018 * a copy of this software and associated documentation files (the 00019 * "Software"), to deal in the Software without restriction, including 00020 * without limitation the rights to use, copy, modify, merge, publish, 00021 * distribute, sublicense, and/or sell copies of the Software, and to 00022 * permit persons to whom the Software is furnished to do so, subject to 00023 * the following conditions: 00024 * 00025 * The above copyright notice and this permission notice shall be 00026 * included in all copies or substantial portions of the Software. 00027 * 00028 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 00029 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 00030 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 00031 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 00032 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 00033 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 00034 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 00035 * 00036 */ 00037 00038 #ifndef __GECODE_SEARCH_PARALLEL_ENGINE_HH__ 00039 #define __GECODE_SEARCH_PARALLEL_ENGINE_HH__ 00040 00041 #include <gecode/search.hh> 00042 #include <gecode/search/support.hh> 00043 #include <gecode/search/worker.hh> 00044 #include <gecode/search/parallel/path.hh> 00045 00046 namespace Gecode { namespace Search { namespace Parallel { 00047 00049 class Engine : public Search::Engine { 00050 protected: 00052 class Worker : public Search::Worker, public Support::Runnable { 00053 protected: 00055 Engine& _engine; 00057 Support::Mutex m; 00059 Path path; 00061 Space* cur; 00063 unsigned int d; 00065 bool idle; 00066 public: 00068 Worker(Space* s, size_t sz, Engine& e); 00070 Space* steal(unsigned long int& d); 00072 Statistics statistics(void); 00074 Engine& engine(void) const; 00076 virtual ~Worker(void); 00077 }; 00079 const Options _opt; 00080 public: 00082 const Options& opt(void) const; 00084 unsigned int workers(void) const; 00085 00087 00088 00089 enum Cmd { 00090 C_WORK, 00091 C_WAIT, 00092 C_RESET, 00093 C_TERMINATE 00094 }; 00095 protected: 00097 volatile Cmd _cmd; 00099 Support::Mutex _m_wait; 00100 public: 00102 Cmd cmd(void) const; 00104 void block(void); 00106 void release(Cmd c); 00108 void wait(void); 00110 00112 00113 protected: 00115 Support::Mutex _m_term; 00117 volatile unsigned int _n_term_not_ack; 00119 Support::Event _e_term_ack; 00121 Support::Mutex _m_wait_terminate; 00123 volatile unsigned int _n_not_terminated; 00125 Support::Event _e_terminate; 00126 public: 00128 void ack_terminate(void); 00130 void terminated(void); 00132 void wait_terminate(void); 00134 void terminate(void); 00136 00138 00139 protected: 00141 Support::Mutex _m_reset; 00143 volatile unsigned int _n_reset_not_ack; 00145 Support::Event e_reset_ack_start; 00147 Support::Event e_reset_ack_stop; 00149 Support::Mutex m_wait_reset; 00150 public: 00152 void ack_reset_start(void); 00154 void ack_reset_stop(void); 00156 void wait_reset(void); 00158 00160 00161 protected: 00163 Support::Mutex m_search; 00165 Support::Event e_search; 00167 Support::DynamicQueue<Space*,Heap> solutions; 00169 volatile unsigned int n_busy; 00171 volatile bool has_stopped; 00173 bool signal(void) const; 00174 public: 00176 void idle(void); 00178 void busy(void); 00180 void stop(void); 00182 00184 00185 00186 Engine(const Options& o); 00188 virtual Space* next(void); 00190 virtual bool stopped(void) const; 00192 }; 00193 00194 00195 /* 00196 * Basic access routines 00197 */ 00198 forceinline Engine& 00199 Engine::Worker::engine(void) const { 00200 return _engine; 00201 } 00202 forceinline const Options& 00203 Engine::opt(void) const { 00204 return _opt; 00205 } 00206 forceinline unsigned int 00207 Engine::workers(void) const { 00208 return static_cast<unsigned int>(opt().threads); 00209 } 00210 00211 00212 /* 00213 * Engine: command and wait handling 00214 */ 00215 forceinline Engine::Cmd 00216 Engine::cmd(void) const { 00217 return _cmd; 00218 } 00219 forceinline void 00220 Engine::block(void) { 00221 _cmd = C_WAIT; 00222 _m_wait.acquire(); 00223 } 00224 forceinline void 00225 Engine::release(Cmd c) { 00226 _cmd = c; 00227 _m_wait.release(); 00228 } 00229 forceinline void 00230 Engine::wait(void) { 00231 _m_wait.acquire(); _m_wait.release(); 00232 } 00233 00234 00235 /* 00236 * Engine: initialization 00237 */ 00238 forceinline 00239 Engine::Worker::Worker(Space* s, size_t sz, Engine& e) 00240 : Search::Worker(sz), _engine(e), d(0), idle(false) { 00241 if (s != NULL) { 00242 cur = (s->status(*this) == SS_FAILED) ? 00243 NULL : snapshot(s,engine().opt(),false); 00244 if (cur == NULL) 00245 fail++; 00246 } else { 00247 cur = NULL; 00248 } 00249 current(s); 00250 current(NULL); 00251 current(cur); 00252 } 00253 00254 forceinline 00255 Engine::Engine(const Options& o) 00256 : _opt(o), solutions(heap) { 00257 // Initialize termination information 00258 _n_term_not_ack = workers(); 00259 _n_not_terminated = workers(); 00260 // Initialize search information 00261 n_busy = workers(); 00262 has_stopped = false; 00263 // Initialize reset information 00264 _n_reset_not_ack = workers(); 00265 } 00266 00267 00268 /* 00269 * Statistics 00270 */ 00271 forceinline Statistics 00272 Engine::Worker::statistics(void) { 00273 m.acquire(); 00274 Statistics s = *this; 00275 s.memory += path.size(); 00276 m.release(); 00277 return s; 00278 } 00279 00280 00281 /* 00282 * Engine: search control 00283 */ 00284 forceinline bool 00285 Engine::signal(void) const { 00286 return solutions.empty() && (n_busy > 0) && !has_stopped; 00287 } 00288 forceinline void 00289 Engine::idle(void) { 00290 m_search.acquire(); 00291 bool bs = signal(); 00292 n_busy--; 00293 if (bs && (n_busy == 0)) 00294 e_search.signal(); 00295 m_search.release(); 00296 } 00297 forceinline void 00298 Engine::busy(void) { 00299 m_search.acquire(); 00300 assert(n_busy > 0); 00301 n_busy++; 00302 m_search.release(); 00303 } 00304 forceinline void 00305 Engine::stop(void) { 00306 m_search.acquire(); 00307 bool bs = signal(); 00308 has_stopped = true; 00309 if (bs) 00310 e_search.signal(); 00311 m_search.release(); 00312 } 00313 00314 00315 /* 00316 * Engine: termination control 00317 */ 00318 forceinline void 00319 Engine::terminated(void) { 00320 unsigned int n; 00321 _m_term.acquire(); 00322 n = --_n_not_terminated; 00323 _m_term.release(); 00324 // The signal must be outside of the look, otherwise a thread might be 00325 // terminated that still holds a mutex. 00326 if (n == 0) 00327 _e_terminate.signal(); 00328 } 00329 00330 forceinline void 00331 Engine::ack_terminate(void) { 00332 _m_term.acquire(); 00333 if (--_n_term_not_ack == 0) 00334 _e_term_ack.signal(); 00335 _m_term.release(); 00336 } 00337 00338 forceinline void 00339 Engine::wait_terminate(void) { 00340 _m_wait_terminate.acquire(); 00341 _m_wait_terminate.release(); 00342 } 00343 00344 forceinline void 00345 Engine::terminate(void) { 00346 // Grab the wait mutex for termination 00347 _m_wait_terminate.acquire(); 00348 // Release all threads 00349 release(C_TERMINATE); 00350 // Wait until all threads have acknowledged termination request 00351 _e_term_ack.wait(); 00352 // Release waiting threads 00353 _m_wait_terminate.release(); 00354 // Wait until all threads have in fact terminated 00355 _e_terminate.wait(); 00356 // Now all threads are terminated! 00357 } 00358 00359 00360 00361 /* 00362 * Engine: reset control 00363 */ 00364 forceinline void 00365 Engine::ack_reset_start(void) { 00366 _m_reset.acquire(); 00367 if (--_n_reset_not_ack == 0) 00368 e_reset_ack_start.signal(); 00369 _m_reset.release(); 00370 } 00371 00372 forceinline void 00373 Engine::ack_reset_stop(void) { 00374 _m_reset.acquire(); 00375 if (++_n_reset_not_ack == workers()) 00376 e_reset_ack_stop.signal(); 00377 _m_reset.release(); 00378 } 00379 00380 forceinline void 00381 Engine::wait_reset(void) { 00382 m_wait_reset.acquire(); 00383 m_wait_reset.release(); 00384 } 00385 00386 00387 00388 /* 00389 * Worker: finding and stealing working 00390 */ 00391 forceinline Space* 00392 Engine::Worker::steal(unsigned long int& d) { 00393 /* 00394 * Make a quick check whether the worker might have work 00395 * 00396 * If that is not true any longer, the worker will be asked 00397 * again eventually. 00398 */ 00399 if (!path.steal()) 00400 return NULL; 00401 m.acquire(); 00402 Space* s = path.steal(*this,d); 00403 m.release(); 00404 // Tell that there will be one more busy worker 00405 if (s != NULL) 00406 engine().busy(); 00407 return s; 00408 } 00409 00410 }}} 00411 00412 #endif 00413 00414 // STATISTICS: search-parallel