Package mbuild :: Module work_queue
[frames] | no frames]

Source Code for Module mbuild.work_queue

   1  # -*- python -*- 
   2  # Mark Charney  
   3  #BEGIN_LEGAL 
   4  # 
   5  #Copyright (c) 2016 Intel Corporation 
   6  # 
   7  #  Licensed under the Apache License, Version 2.0 (the "License"); 
   8  #  you may not use this file except in compliance with the License. 
   9  #  You may obtain a copy of the License at 
  10  # 
  11  #      http://www.apache.org/licenses/LICENSE-2.0 
  12  # 
  13  #  Unless required by applicable law or agreed to in writing, software 
  14  #  distributed under the License is distributed on an "AS IS" BASIS, 
  15  #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  16  #  See the License for the specific language governing permissions and 
  17  #  limitations under the License. 
  18  #   
  19  #END_LEGAL 
  20   
  21  """Command objects and parallel work queue""" 
  22   
  23  import os 
  24  import sys 
  25  import types 
  26  import Queue 
  27  from threading import Thread 
  28  from collections import deque 
  29   
  30  from base import * 
  31  from util import * 
  32  from dag import * 
  33   
  34   
  35  ############################################################################ 
36 -class dir_cmd_t(object):
37 """For holding a directory and a command. When you call 38 execute(), it changes to the directory an executes the command""" 39
40 - def __init__(self, dir, command, output_file=None):
41 self.dir= dir 42 self.command= command 43 self.output_file = output_file
44 - def __str__(self):
45 return "DIR: %s\nCOMMAND: %s" % (self.dir, self.command)
46
47 - def execute(self,args=None, env=None):
48 """Change to the specified directory and execute the command, 49 unbufferred""" 50 orig = os.getcwd() 51 try: 52 msgb("CHDIR TO", self.dir) 53 os.chdir(self.dir) 54 except: 55 return (-1, ["no such dir: " + self.dir]) 56 msgb("EXECUTING", self.command) 57 if self.output_file: 58 (retcode, out, err) = \ 59 run_command_output_file(self.command, self.output_file) 60 msgb("WROTE", self.output_file) 61 else: 62 (retcode, out, err) = run_command_unbufferred(self.command) 63 os.chdir(orig) 64 if not err: 65 err = [] 66 if not out: 67 out = [] 68 if err: 69 return (retcode, out+err) 70 else: 71 return (retcode, out)
72
73 -class command_t(object):
74 """The primary data structure used to track jobs in this script. It 75 is created when you add L{plan_t} objects to the DAG 76 L{dag_t}.""" 77 78 _ids = 0 79
80 - def __init__(self, 81 command=None, 82 args=None, 83 xenv=None, 84 unbufferred=False, 85 output_file_name=None, 86 shell_executable=None, 87 directory=None, 88 name=None, 89 show_output=True, 90 osenv=None, 91 seconds=0, 92 input_file_name=None):
93 """ 94 This is the unit of work for the L{work_queue_t}. These are 95 typically created by the L{dag_t} but they can also be created 96 by hand and added to the L{work_queue_t} to execute arbitrary 97 commands. 98 99 @type command: string or python function, or a list of both 100 @param command: command line string to execute or a python function 101 102 @type args: anything 103 @param args: (optional) typically a list of arguments for the python function. 104 105 @type xenv: L{env_t} 106 @param xenv: (optional) environment for used by the python 107 function. Passed as the second argument to the python function. 108 109 @type osenv: dictionary 110 @param osenv: (optional) the environment that will be set in the new subprocess. 111 112 @type unbufferred: L{bool} 113 @param unbufferred: (optional) true if the output should be unbufferred. 114 115 @type output_file_name: string 116 @param output_file_name: (optional) file name for stderr/stdout 117 118 @type show_output: L{bool} 119 @param show_output: (optional) show output, default True 120 121 @type input_file_name: string 122 @param input_file_name: (optional) file name for stdin 123 124 """ 125 self.id = command_t._ids 126 command_t._ids += 1 127 # store the command as a list 128 if isinstance(command,types.ListType): 129 self.command = command 130 else: 131 self.command = [ command ] 132 self.name = name 133 self.shell_executable = shell_executable 134 self.args = args 135 self.xenv = xenv 136 self.osenv = osenv 137 self.exit_status = 0 138 self.output = [] 139 self.stderr = [] 140 self.unbufferred = unbufferred 141 self.input_file_name = input_file_name 142 self.output_file_name = output_file_name 143 self.start_time = 0 144 self.end_time = 0 145 self.directory = directory 146 self.show_output = show_output 147 self.input_file_name = input_file_name 148 149 # Has this command be submitted to the work queue? 150 self.submitted = False 151 152 # executed is set to True when this command tries to execute. 153 self.executed = False 154 155 # all prerequisite commands are ready 156 self.ready = False 157 158 # completed is set to True when this command exits successfully. 159 self.completed = False 160 161 # things that depend on this command completing sucessfully 162 self.after_me = [] 163 164 # things that must complete before this command can run 165 self.before_me = [] 166 167 # from the file DAG. A list of inputs upon which this command depends 168 self.inputs = [] 169 # from the file DAG. A list of things generated by this command 170 self.targets = [] 171 172 # used for special signals to the worker threads to tell them to 173 # shut down. 174 self.terminator = False 175 self.timeout = seconds
176
177 - def failed(self):
178 """ 179 Return the exit status. 180 @rtype: bool 181 @return: True if the command failed (exit status != 0) 182 """ 183 if self.exit_status != 0: 184 return True 185 return False
186
187 - def _complete(self):
188 self.completed = True
189
190 - def _ready(self):
191 """Return true if all things that must execute before this node 192 have completed and false otherwise. Updates self.ready.""" 193 if self.ready: 194 return True 195 196 for n in self.before_me: 197 if not n.completed: 198 return False 199 200 self.ready=True 201 return True
202
203 - def is_python_command(self, i=0):
204 """Return true if the command list element is a python function 205 @rtype: bool 206 """ 207 if isinstance(self.command[i],types.FunctionType): 208 return True 209 return False
210
211 - def is_dir_cmd(self, i=0):
212 """Return true if the command list element is a python dir_cmd_t object 213 @rtype: bool 214 """ 215 if isinstance(self.command[i],dir_cmd_t): 216 return True 217 return False
218
219 - def has_python_subcommand(self):
220 """Return true if the command list has a python function 221 @rtype: bool 222 """ 223 for c in self.command: 224 if isinstance(c,types.FunctionType): 225 return True 226 return False
227
228 - def is_command_line(self, i=0):
229 """Return true if the command list element is normal string command 230 line. 231 @rtype: bool 232 """ 233 if not isinstance(self.command[i],types.FunctionType) and \ 234 not isinstance(self.command[i],dir_cmd_t): 235 return True 236 return False
237
238 - def dagkey(self):
239 s = [] 240 for i in self.command: 241 if not isinstance(i,types.FunctionType): 242 s.append(i) 243 t = "MBUILD_COMMAND_KEY " + (" - ".join(s)) 244 return t
245
246 - def hash(self):
247 s = [] 248 for i in self.command: 249 if not isinstance(i,types.FunctionType): 250 s.append(i) 251 t = " - ".join(s) 252 h = hash_string(t) 253 return h
254
255 - def add_before_me(self,n):
256 """Make the current command execute after command n 257 @type n: L{command_t} 258 @param n: another (earlier) command 259 """ 260 if isinstance(n,types.ListType): 261 for x in n: 262 self.before_me.append(x) 263 x.after_me.append(self) 264 else: 265 self.before_me.append(n) 266 n.after_me.append(self)
267
268 - def add_after_me(self,n):
269 """Make the current command execute before command n. 270 @type n: L{command_t} 271 @param n: another (later) command 272 """ 273 if isinstance(n, types.ListType): 274 for x in n: 275 self.after_me.append(x) 276 x.before_me.append(self) 277 else: 278 self.after_me.append(n) 279 n.before_me.append(self)
280
281 - def _check_afters(self):
282 """Return a list of after nodes that are as-yet not submitted 283 but now ready""" 284 ready = [] 285 for x in self.after_me: 286 if not x.submitted and x._ready(): 287 ready.append(x) 288 return ready
289
290 - def elapsed_time(self):
291 """Return the elapsed time as an number of seconds""" 292 if self.end_time == None: 293 self.end_time = get_time() 294 return self.end_time - self.start_time
295
296 - def elapsed(self):
297 """Return the elapsed time. 298 @rtype: string 299 @returns: the elapsed wall clock time of execution. 300 """ 301 if self.end_time == None: 302 self.end_time = get_time() 303 elapsed = get_elapsed_time(self.start_time, self.end_time) 304 return elapsed
305
306 - def dump_cmd(self):
307 return self._pretty_cmd_str()
308
309 - def stderr_exists(self):
310 if self.stderr and len(self.stderr) > 0: 311 if len(self.stderr) == 1 and len(self.stderr[0]) == 0: 312 return False 313 return True 314 return False
315
316 - def stdout_exists(self):
317 if self.output and len(self.output) > 0: 318 if len(self.output) == 1 and len(self.output[0]) == 0: 319 return False 320 return True 321 return False
322
323 - def _pretty_cmd_str(self):
324 s = [] 325 for cmd in self.command: 326 if isinstance(cmd,types.FunctionType): 327 s.append("PYTHON FN: " + cmd.__name__) 328 elif isinstance(cmd,types.StringType): 329 s.append(cmd) 330 else: 331 s.append(str(cmd)) 332 return " ;;;; ".join(s)
333 334
335 - def dump(self, tab_output=False, show_output=True):
336 s = [] 337 nl = '\n' 338 #s.append( bracket('ID ', str(self.id))) 339 #s.append(nl) 340 341 if verbose(1): 342 pass 343 elif self.failed(): 344 pass 345 elif self.targets: 346 s.append(bracket('TARGET ', " ".join(self.targets))) 347 s.append(nl) 348 if self.name: 349 s.append(bracket('NAME ', self.name)) 350 s.append(nl) 351 if self.command: 352 s.append(bracket('COMMAND ', self._pretty_cmd_str())) 353 s.append(nl) 354 else: 355 s.append( bracket('COMMAND ', 'none') ) 356 s.append(nl) 357 if self.args: 358 args_string = str(self.args) 359 print_limit = 400 360 if len(args_string) > print_limit: 361 args_string = args_string[:print_limit] 362 s.append(bracket('ARGS ', args_string)) 363 s.append(nl) 364 if self.xenv: 365 s.append(bracket('ENV ', 'some env')) 366 s.append(nl) 367 #if self.submitted: 368 # s.append(bracket('START_TIME ', self.start_time)) 369 # s.append(nl) 370 if self.input_file_name: 371 s.append(bracket('INPUT_FILE ', self.input_file_name)) 372 s.append(nl) 373 374 if self.completed or self.failed(): 375 if self.exit_status != 0: 376 s.append(bracket('EXIT_STATUS ', str(self.exit_status))) 377 s.append(nl) 378 if self.elapsed_time() > 1: 379 s.append(bracket('ELAPSED_TIME', self.elapsed())) 380 s.append(nl) 381 if self.input_file_name: 382 s.append(bracket('INPUT FILE', self.input_file_name)) 383 s.append(nl) 384 if self.output_file_name: 385 s.append(bracket('OUTPUT FILE', self.output_file_name)) 386 s.append(nl) 387 388 if self.unbufferred == False and self.output_file_name==None: 389 if show_output and self.show_output and self.stdout_exists(): 390 s.append(bracket('OUTPUT')) 391 s.append(nl) 392 for line in self.output: 393 if tab_output: 394 s.append('\t') 395 s.append(line) 396 if show_output and self.show_output and self.stderr_exists(): 397 s.append(bracket('STDERR')) 398 s.append(nl) 399 for line in self.stderr: 400 if tab_output: 401 s.append('\t') 402 s.append(line) 403 return "".join(s)
404
405 - def __str__(self):
406 return self.dump()
407
408 - def _extend_output(self,output):
409 if output: 410 if isinstance(output,types.ListType): 411 self.output.extend(output) 412 else: 413 self.output.append(output)
414
415 - def _extend_output_stderr(self,output, stderr):
416 self._extend_output(output) 417 if stderr: 418 if isinstance(stderr,types.ListType): 419 self.stderr.extend(stderr) 420 else: 421 self.stderr.append(stderr)
422 423
424 - def execute(self):
425 """Execute the command whether it be a python function or a 426 command string. This is executed by worker threads but is made 427 available here for potential debugging. Record execution exit/return 428 status and output. 429 430 Sets the exit_status, output and stderr error fields of the 431 432 command object. 433 """ 434 self.executed = True 435 self.start_time = get_time() 436 self.output = [] 437 self.stderr = [] 438 for cmd in self.command: 439 try: 440 if isinstance(cmd, dir_cmd_t): 441 # execute dir_cmd_t objects 442 (self.exit_status, output) = cmd.execute( self.args, self.xenv ) 443 self._extend_output(output) 444 445 elif isinstance(cmd,types.FunctionType): 446 # execute python functions 447 (self.exit_status, output) = cmd( self.args, self.xenv ) 448 self._extend_output(output) 449 450 elif isinstance(cmd,types.StringType): 451 # execute command strings 452 if self.output_file_name: 453 (self.exit_status, output, stderr) = \ 454 run_command_output_file(cmd, 455 self.output_file_name, 456 shell_executable=self.shell_executable, 457 directory=self.directory, 458 osenv=self.osenv, 459 input_file_name=self.input_file_name) 460 self._extend_output_stderr(output,stderr) 461 462 elif self.unbufferred: 463 (self.exit_status, output, stderr) = \ 464 run_command_unbufferred(cmd, 465 shell_executable= 466 self.shell_executable, 467 directory = self.directory, 468 osenv = self.osenv, 469 input_file_name=self.input_file_name) 470 self._extend_output_stderr(output, stderr) 471 else: 472 # execute timed_cmd_t objects 473 (self.exit_status, output, stderr) = \ 474 run_command_timed(cmd, 475 shell_executable=self.shell_executable, 476 directory = self.directory, 477 osenv = self.osenv, 478 seconds=self.timeout, 479 input_file_name = self.input_file_name) 480 self._extend_output_stderr(output, stderr) 481 482 else: 483 self.exit_status = 1 484 self.extend_output("Unhandled command object: " + self.dump()) 485 486 # stop if something failed 487 if self.exit_status != 0: 488 break; 489 except Exception, e: 490 self.exit_status = 1 491 self.stderr.append("Execution error for: %s\n%s" % (str(e), self.dump())) 492 break 493 494 self.end_time = get_time()
495 496 497
498 -def _worker_one_task(incoming,outgoing):
499 """A thread. Takes stuff from the incoming queue and puts stuff on 500 the outgoing queue. calls execute for each command it takes off the 501 in queue. Return False when we receive a terminator command""" 502 #msgb("WORKER WAITING") 503 item = incoming.get() 504 #msgb("WORKER GOT A TASK") 505 if item.terminator: 506 outgoing.put(item) 507 return False 508 item.execute() 509 #incoming.task_done() # PYTHON2.5 ONLY 510 outgoing.put(item) 511 return True
512
513 -def _worker(incoming,outgoing):
514 """A thread. Takes stuff from the incoming queue and puts stuff on 515 the outgoing queue. calls execute for each command it takes off the 516 in queue. Return when we get a terminator command""" 517 keep_going = True 518 while keep_going: 519 keep_going = _worker_one_task(incoming, outgoing)
520
521 -class work_queue_t(object):
522 """This stores the threads and controls their execution"""
523 - def __init__(self, max_parallelism=4):
524 """ 525 @type max_parallelism: int 526 @param max_parallelism: the number of worker threads to start 527 """ 528 max_parallelism = int(max_parallelism) 529 if max_parallelism <= 0: 530 die("Bad value for --jobs option: " + str(max_parallelism)) 531 self.max_parallelism = max_parallelism 532 self.use_threads = True 533 self.threads = [] 534 535 # worker threads can add stuff to the new_queue so we 536 # use an MT-safe queue. 537 self.new_queue = Queue.Queue(0) 538 self.out_queue = Queue.Queue(0) 539 self.back_queue = Queue.Queue(0) 540 self.pending_commands = deque() 541 542 self.message_delay = 10 543 self.min_message_delay = 10 544 self.message_delay_delta = 10 545 546 self.job_num = 0 547 self.pending = 0 548 self._clean_slate() 549 550 if self.use_threads: 551 if len(self.threads) == 0: 552 self._start_daemons()
553
554 - def _empty_queue(self, q):
555 while not q.empty(): 556 item = q.get_nowait()
557
558 - def _cleanup(self):
559 """After a failed build we want to clean up our any in-progress state 560 so we can re-use the work queue object""" 561 562 # the new_queue, job_num and pending get updated by add() before we build. 563 # so we must clean them up after every build. Also good hygene to clean out 564 # the task queues that we use to talk to the workers. 565 self.pending_commands = deque() 566 self._empty_queue(self.new_queue) 567 self._empty_queue(self.out_queue) 568 self._empty_queue(self.back_queue) 569 self.job_num = 0 570 self.pending = 0
571
572 - def _clean_slate(self):
573 self.running_commands = [] 574 self.all_commands = [] 575 self.running = 0 576 self.sent = 0 577 self.finished = 0 578 self.errors = 0 579 self.dag = None 580 581 # for message limiting in _status() 582 self.last_time = 0 583 self.last_pending = 0 584 self.last_finished = 0 585 self.last_running = 0 586 587 self.start_time = get_time() 588 self.end_time = None 589 590 # we set dying to to True when we are trying to stop because of an error 591 self.dying = False 592 593 self._empty_queue(self.out_queue) 594 self._empty_queue(self.back_queue)
595 596
597 - def clear_commands(self):
598 """Remove any previously remembered commands""" 599 self.all_commands = []
600 - def commands(self):
601 """Return list of all commands involved in last build""" 602 return self.all_commands
603
604 - def elapsed_time(self):
605 """Return the elapsed time as an a number""" 606 if self.end_time == None: 607 self.end_time = get_time() 608 return self.end_time - self.start_time
609
610 - def elapsed(self):
611 """Return the elapsed time as a pretty string 612 @rtype: string 613 @returns: the elapsed wall clock time of execution. 614 """ 615 if self.end_time == None: 616 self.end_time = get_time() 617 elapsed = get_elapsed_time(self.start_time, self.end_time) 618 return elapsed
619
620 - def __del__(self):
621 if verbose(3): 622 msgb("DEL WORK QUEUE") 623 self._terminate()
624
625 - def _terminate(self):
626 """Shut everything down. Kill the worker threads if any were 627 being used. This is called when the work_queue_t is garbage 628 collected, but can be called directly.""" 629 self.dying = True 630 if self.use_threads: 631 self._stop_daemons() 632 self._join_threads()
633
634 - def _start_daemons(self):
635 """Start up a bunch of daemon worker threads to process jobs from 636 the queue.""" 637 for i in range(self.max_parallelism): 638 t = Thread(target=_worker, args=(self.out_queue, self.back_queue)) 639 t.setDaemon(True) 640 t.start() 641 self.threads.append(t)
642
643 - def _stop_daemons(self):
644 """Send terminator objects to all the workers""" 645 for i in range(self.max_parallelism): 646 t = command_t() 647 t.terminator = True 648 if verbose(3): 649 msgb("SENT TERMINATOR", str(i)) 650 self._start_a_job(t) 651 self.threads = []
652
653 - def _join_threads(self):
654 """Use this when not running threads in daemon-mode""" 655 for t in self.threads: 656 t.join() 657 if verbose(3): 658 msgb("WORKER THREAD TERMINATED")
659
660 - def _add_one(self,command):
661 """Add a single command of type L{command_t} to the list 662 of jobs to run.""" 663 # FIXME: make this take a string and build a command_t 664 665 if command.completed: 666 if verbose(5): 667 msgb("SKIPPING COMPLETED CMD", str(command.command)) 668 msgb("SKIPPING COMPLETED CMD", str(command.command)) 669 self.add(command._check_afters()) 670 return 671 if command.submitted: 672 if verbose(5): 673 msgb("SKIPPING SUBMITTED CMD", str(command.command)) 674 msgb("SKIPPING SUBMITTED CMD", str(command.command)) 675 return 676 command.submitted = True 677 if verbose(6): 678 msgb("WQ ADDING", str(command.command)) 679 self.job_num += 1 680 self.new_queue.put( command ) 681 self.pending += 1
682
683 - def add_sequential(self,command_strings, unbufferred=False):
684 """ 685 Add a list of command strings as sequential tasks to the work queue. 686 687 @type command_strings: list of strings 688 @param command_strings: command strings to add to the L{work_queue_t} 689 690 @rtype: list of L{command_t} 691 @return: the commands created 692 """ 693 last_cmd = None 694 cmds = [] 695 for c in command_strings: 696 co = command_t(c, unbufferred=unbufferred) 697 cmds.append(co) 698 self.add(co) 699 if last_cmd: 700 last_cmd.add_after_me(co) 701 last_cmd = co 702 return cmds
703
704 - def add(self,command):
705 """Add a command or list of commands of type L{command_t} 706 to the list of jobs to run. 707 708 @type command: L{command_t} 709 @param command: the command to run 710 """ 711 if verbose(5): 712 msgb("ADD CMD", str(type(command))) 713 714 if command: 715 if isinstance(command,types.ListType): 716 for c in command: 717 if verbose(5): 718 msgb("ADD CMD", str(type(c))) 719 self._add_one(c) 720 else: 721 self._add_one(command)
722
723 - def _done(self):
724 if self.running > 0: 725 return False 726 if not self.dying and self.pending > 0: 727 return False 728 return True
729
730 - def _status(self):
731 if self.show_progress or verbose(2): 732 s = ( 'RUNNING: %d PENDING: %d COMPLETED: %d ' + 733 'ERRORS: %d ELAPSED: %s %s' ) 734 cur_time = get_time() 735 736 changed = False 737 if (self.running != self.last_running or 738 self.pending != self.last_pending or 739 self.finished != self.last_finished): 740 changed = True 741 742 if (changed or 743 # have we waited sufficiently long? 744 cur_time >= self.last_time + self.message_delay): 745 746 # speed back up when anything finishes 747 if self.finished != self.last_finished: 748 self.message_delay = self.min_message_delay 749 elif self.last_time != 0: 750 # only printing because of timeout delay, so 751 # we increase the time a little bit. 752 self.message_delay += self.min_message_delay 753 754 # store the other limiters for next time 755 self.last_time = cur_time 756 self.last_pending = self.pending 757 self.last_finished = self.finished 758 self.last_running = self.running 759 760 msgb('STATUS', 761 s % (self.running, 762 self.pending, 763 self.finished, 764 self.errors, 765 get_elapsed_time(self.start_time, get_time()), 766 self._command_names()))
767
768 - def _start_more_jobs(self):
769 """If there are jobs to start and we didn't hit our parallelism 770 limit, start more jobs""" 771 772 # copy from new_queue to pending_commands to avoid data 773 # race on iterating over pending commands. 774 started = False 775 while not self.new_queue.empty(): 776 self.pending_commands.append( self.new_queue.get() ) 777 778 ready = deque() 779 for cmd in self.pending_commands: 780 if cmd._ready(): 781 ready.append(cmd) 782 783 while self.running < self.max_parallelism and ready: 784 cmd = ready.popleft() 785 # FIXME: small concern that this could be slow 786 self.pending_commands.remove(cmd) 787 if verbose(2): 788 msgb("LAUNCHING", cmd.dump_cmd()) 789 self._start_a_job(cmd) 790 self.pending -= 1 791 started = True 792 return started
793
794 - def _start_a_job(self,cmd):
795 """Private function to kick off a command""" 796 self.out_queue.put(cmd) 797 self.running_commands.append(cmd) 798 if not cmd.terminator: 799 self.all_commands.append(cmd) 800 self.sent += 1 801 self.running += 1
802
803 - def _command_names(self):
804 s = [] 805 anonymous_jobs = 0 806 for r in self.running_commands: 807 if hasattr(r,'name') and r.name: 808 s.append(r.name) 809 else: 810 anonymous_jobs += 1 811 if s: 812 if anonymous_jobs: 813 s.append('%d-anonymous' % (anonymous_jobs)) 814 return '[' + ' '.join(s) + ']' 815 else: 816 return ''
817
818 - def _wait_for_jobs(self):
819 """Return one command object when it finishes, or None on timeout (or 820 other non-keyboard-interrupt exceptions).""" 821 if self.running > 0: 822 try: 823 cmd = self.back_queue.get(block=True, timeout=self.join_timeout) 824 self.running -= 1 825 self.finished += 1 826 self.running_commands.remove(cmd) 827 return cmd 828 except Queue.Empty: 829 return None 830 except KeyboardInterrupt: 831 msgb('INTERRUPT') 832 self._terminate() 833 self.dying = True 834 sys.exit(1) 835 return None # NOT REACHED 836 except: 837 return None 838 return None
839
840 - def build(self, 841 dag=None, 842 targets=None, 843 die_on_errors=True, 844 show_output=True, 845 error_limit=0, 846 show_progress=False, 847 show_errors_only=False, 848 join_timeout=10.0):
849 """ 850 This makes the work queue start building stuff. If no targets 851 are specified then all the targets are considered and built if 852 necessary. All commands that get run or generated are stored in 853 the all_commands attribute. That attribute gets re-initialized 854 on each call to build. 855 856 @type dag: L{dag_t} 857 @param dag: the dependence tree object 858 859 @type targets: list 860 @param targets: specific targets to build 861 862 @type die_on_errors: bool 863 @param die_on_errors: keep going or die on errors 864 865 @type show_output: bool 866 @param show_output: show stdout/stderr (or just buffer it in 867 memory for later processing). Setting this to False is good for 868 avoiding voluminous screen output. The default is True. 869 870 @type show_progress: bool 871 @param show_progress: show the running/pending/completed/errors msgs 872 873 @type show_errors_only: bool 874 @param show_errors_only: normally print the commands as they complete. 875 If True, only show the commands that fail. 876 877 @type join_timeout: float 878 @param join_timeout: how long to wait for thread to terminate. default 10s 879 """ 880 self._clean_slate() 881 882 self.show_progress = show_progress 883 self.join_timeout = join_timeout 884 self.errors = 0 885 self.show_errors_only = show_errors_only 886 self.message_delay = self.min_message_delay 887 self.last_time = 0 888 self.clear_commands() 889 self.dag = dag 890 if self.dag: 891 for x in self.dag._leaves_with_changes(targets): 892 self.add(x.creator) 893 okay = self._build_blind(die_on_errors, show_output, error_limit) 894 if okay and self.dag: 895 did_not_build = self.dag.check_for_skipped() 896 if len(did_not_build) > 0: 897 # some stuff did not build, force an error status return 898 msgb("ERROR: DID NOT BUILD SOME STUFF", "\n\t".join(did_not_build)) 899 if self.dag: 900 print self.dag.dump() 901 self.end_time = get_time() 902 self._cleanup() 903 return False 904 # normal exit path 905 self.end_time = get_time() 906 if self.dag: 907 self.dag.dag_write_signatures() 908 self._cleanup() 909 return okay
910
911 - def _build_blind(self, die_on_errors=True, show_output=True, error_limit=0):
912 """Start running the commands that are pending and kick off 913 dependent jobs as those complete. If die_on_errors is True, the 914 default, we stop running new jobs after one job returns a nonzero 915 status. Returns True if no errors""" 916 if self.use_threads: 917 return self._build_blind_threads(die_on_errors, 918 show_output, 919 error_limit) 920 else: 921 return self._build_blind_no_threads(die_on_errors, 922 show_output, 923 error_limit)
924
925 - def _build_blind_threads(self, 926 die_on_errors=True, 927 show_output=True, 928 error_limit=0):
929 """Start running the commands that are pending and kick off 930 dependent jobs as those complete. If die_on_errors is True, the 931 default, we stop running new jobs after one job returns a nonzero 932 status. Returns True if no errors""" 933 okay = True 934 started = False 935 while 1: 936 c = None 937 if started: 938 c = self._wait_for_jobs() 939 if c: 940 if verbose(3): 941 msgb("JOB COMPLETED") 942 if c.failed(): 943 self.errors += 1 944 okay = False 945 if die_on_errors or (error_limit != 0 and 946 self.errors > error_limit): 947 warn("Command execution failed. " + 948 "Waiting for remaining jobs and exiting.") 949 self.dying = True 950 951 if not self.dying: 952 started |= self._start_more_jobs() 953 self._status() 954 955 if c and not self.dying: 956 c._complete() 957 # Command objects can depend on each other 958 # directly. Enable execution of dependent commands. 959 if verbose(3): 960 msgb("ADD CMD-AFTERS") 961 self.add(c._check_afters()) 962 # Or we might find new commands from the file DAG. 963 if self.dag: 964 for x in self.dag._enable_successors(c): 965 self.add(x.creator) 966 if c and (self.show_errors_only==False or c.failed()): 967 print c.dump(show_output=show_output) 968 if self._done(): 969 break; 970 return okay
971
972 - def _build_blind_no_threads(self, die_on_errors=True, 973 show_output=True, error_limit=0):
974 """Start running the commands that are pending and kick off 975 dependent jobs as those complete. If die_on_errors is True, the 976 default, we stop running new jobs after one job returns a nonzero 977 status. Returns True if no errors""" 978 okay = True 979 while 1: 980 started = False 981 if not self.dying: 982 started = self._start_more_jobs() 983 if started: 984 self._status() 985 986 # EXECUTE THE TASK OURSELVES 987 if self.running > 0: 988 _worker_one_task(self.out_queue, self.back_queue) 989 c = self._wait_for_jobs() 990 if c: 991 if verbose(3): 992 msgb("JOB COMPLETED") 993 if c.failed(): 994 okay = False 995 self.errors += 1 996 if die_on_errors or (error_limit !=0 and 997 self.errors > error_limit): 998 warn("Command execution failed. " + 999 "Waiting for remaining jobs and exiting.") 1000 self.dying = True 1001 if not self.dying: 1002 c._complete() 1003 # Command objects can depende on each other 1004 # directly. Enable execution of dependent commands. 1005 if verbose(3): 1006 msgb("ADD CMD-AFTERS") 1007 self.add(c._check_afters()) 1008 # Or we might find new commands from the file DAG. 1009 if self.dag: 1010 for x in self.dag._enable_successors(c): 1011 self.add(x.creator) 1012 if self.show_errors_only==False or c.failed(): 1013 print c.dump(show_output=show_output) 1014 self._status() 1015 if self._done(): 1016 break; 1017 return okay
1018