Source code for mmf.utils.script

"""Utilities for writing scripts.

Here is a sample script to get you started:

#!/usr/bin/python
# Note that the first line only helps on unix systems.  It specified
# the interpreter to use to interpret this file.  On other systems you
# may have to create an alias with the proper python invocation.

import sys,os,re
import mmf.utils.script as script

# Parse arguments
parser = script.OptionParser()
parser.add_option("--home",
		  type="string",
		  dest="home_dir",
		  default="~",
		  help="use <home> rather than ~ for installation",
		  metavar="<home>")
parser.add_option("--src",
                  type="string",
                  dest="src_dir",
                  default="work/unix/configurations/linux",
                  help="use config files in <src> rather than " + 
                       "<home>/work/unix/configurations/linux",
                  metavar="<src>")
(options, args) = parser.parse_args()


############# Script Body
def main():
    # Setup directories
    home_dir = os.path.expanduser(options.home_dir)
    check_access(home_dir,os.F_OK | os.R_OK | os.W_OK | os.X_OK)
    if options.verbose: print "Using <home> = " + home_dir
    
    src_dir = os.path.expanduser(options.src_dir)
    if '.' == src_dir or '..' == src_dir:
	src_dir = os.path.abspath(src_dir)
    if not os.path.isabs(src_dir):
	src_dir = os.path.normpath(os.path.join(home_dir,src_dir))
    check_access(src_dir,os.F_OK | os.R_OK) 
    if options.verbose: print "Using <src> = " + src_dir

    # Get all files in src_dir directory
    for (root, dirs, files) in os.walk(src_dir):
	if 'CVS' in dirs:
	    dirs.remove('CVS')  # don't visit CVS directories
	def is_not_temp(f):
	    return ("#" not in f) and ("~" not in f)
	files = filter(is_not_temp,files)
	for f in files:
	    src = os.path.join(root,f)
	    fd = open(src,'r')

	    # Files are linked when the second line of the file
	    # looks like "# dest=~/.xsession" with optional whitespace
	    line1 = fd.readline()
	    line2 = fd.readline()
	    fd.close()

	    dest = re.match(r"\A\s*[#;\\]+\s*dest\s*=\s*(\S*)",line2)
	    if dest is not None:
		dest = os.path.expanduser(dest.group(1))
		
	    if dest is not None:
		if os.path.islink(dest):
		    mesg = "Symlink "+dest+" exists."
		    query = "Remove and replace with link to "+src+"?"
		    cmds = ["os.remove("+`dest`+")",
			    "os.symlink("+`src`+ "," +`dest`+")"]
		    do(cmds,mesg,query)
		elif os.path.isfile(dest):
		    mesg = "File "+dest+" exists."
		    query = "Backup and symlink "+src+" to "+dest+"?"
		    cmds=["backup("+`dest`+")",
			  "os.symlink("+`src`+","+`dest`+")"]
		    do(cmds,mesg,query)
		else:
		    query = "Link "+src+" to "+dest+"?"
		    cmds = ["os.symlink("+`src`+","+`dest`+")"]
		    do(cmds,query=query)
			    
if __name__=="__main__":
    main()
"""

import sys,os
import optparse

option_box = {}

[docs]class OptionParser(optparse.OptionParser): """Standard optparse.OptionParser class with following extra options: -v, --verbose -n, --no-action -i, --interactive The parse_args() method is also overloaded to maintain a local copy of the options for use as a default argument. """ def __init__(self,*argv,**kwargs): optparse.OptionParser.__init__(self,*argv,**kwargs) self.add_option("-v", "--verbose", action="store_true", dest="verbose", default=False, help="print lots of information") self.add_option("-n","--no-action", action="store_false", dest="action", default=True, help="don't do anything:" + "only print commands that would be executed") self.add_option("-i","--interactive", action="store_true", dest="interactive", default=False, help="prompt before taking action")
[docs] def parse_args(self,*argv,**kwargs): """Sets module variable options to the latest parsing of arguments. This is used as a defualt argument in other methods of this module. """ (options,args) = \ optparse.OptionParser.parse_args(self,*argv,**kwargs) option_box[0] = options return (options,args)
[docs]def yes_no(default=""): """This function prompts the user for a yes or no answer and will accept a default response. Exceptions (such as EOF) are not caught. The result is True for 'yes' and False for 'no'.""" def validate(ans): ans = ans.lower() yes = ['y','yes'] no = ['n','no'] try: yes.index(ans) return True except: pass try: no.index(ans) return False except: pass return None def raw_input_default(prompt,default): response = raw_input(prompt) if 0 == len(response): response = default return response prompts = {None: "yes/no? ", True: "Yes/no? ", False: "yes/No? "} prompt = prompts[validate(default)] ans = validate(raw_input_default(prompt,default)) while ans == None: print 'Please answer "yes" or "no".' ans = validate(raw_input_default(prompt,default)) return ans
[docs]class AttributeError(Exception): pass
[docs]def check_access(path,mode): """Check that path has proper access as specified by mode. Throws an AttributeError on failure """ if not os.access(path,mode): err = "Path " + path + " has invalid permissions:" tests = [(os.F_OK,"exist"), (os.R_OK,"be readable"), (os.W_OK,"be writable"), (os.X_OK,"be executable")] for (test_mode,msg) in tests: if (mode & test_mode) and not os.access(path,test_mode): err = err + "\n- Path must " + msg raise AttributeError(err) else: return
[docs]def execute(command,cwd=None): """This function executes the specified command at the os level.""" try: retcode = subprocess.call(command, shell=True, cwd=cwd) if retcode < 0: print >>sys.stderr, "Child was terminated by signal", -retcode else: print >>sys.stderr, "Child returned", retcode except OSError, e: print >>sys.stderr, "Execution failed:", e
[docs]def do_f(fcns,mesg=None,query=None,options=None): """Execute a function after first confirming with the user and presenting information (if verbose options are selected). Return False if there was an error. """ if options is None: options = option_box[0] success = True if options.verbose and mesg is not None: print mesg if options.action: perform = True if options.interactive: if query is None: print "Perform the following commands?" for f in fcns: print f.__doc__ else: print query perform = yes_no("yes") if perform: for f in fcns: if options.verbose: print f.__doc__ try: f() except Exception, e: print >>sys.stderr, "Command "+f.__doc__+" failed:", e success = False else: for f in fcns: print f.__doc__ return (success)
[docs]def do(cmds,mesg=None,query=None,options=None): """Execute a command after first confirming with the user and presenting information (if verbose options are selected). Return False if there was an error. """ if options is None: options = option_box[0] success = True if options.verbose and mesg is not None: print mesg if options.action: perform = True if options.interactive: if query is None: print "Perform the following commands?" for c in cmds: print c else: print query perform = yes_no("yes") if perform: for c in cmds: if options.verbose: print c try: exec(c) except Exception, e: print >>sys.stderr, "Command "+c+" failed:", e success = False else: for c in cmds: print c return (success)
[docs]def os_do(action,options=None): if options is None: options = option_box[0] if options.action: perform = True; if options.interactive: print "Perform the following action?" print action perform = yes_no("yes") if perform: execute(action) else: print action
[docs]def backup(file,options=None): """Backup a file and return backup name.""" if options is None: options = option_box[0] if os.path.isfile(file+".bak"): n = 1 while os.path.isfile(file+".bak"+`n`): n = n+1 bak = file+".bak"+`n` else: bak = file+".bak" cmds=["os.rename("+`file`+","+`bak`+")"] if do(cmds,options=options): return(bak) else: raise Exception("Could not backup "+file+" to "+bak+".")