Source code for mmf.utils.debug
"""Debugging tools"""
import sys,inspect
import gc
import weakref
[docs]class Memory(object):
    """This class represents the current state of memory.
    
    Example:  (Don't run these with code coverage testing or else you
    may run out of memory!)
    >> m = Memory()              # doctest: +ELLIPSIS
    Checkpointed ... objects.
    >> m.new()                   # Now look at new objects
    """
[docs]    def __init__(self):
        self._all = {}
        self._ignore = set()
        self.ignore(self._all)
        self.ignore(self._ignore)
        
        # Ignore all memory management variables
        seen = set(self._ignore)
        self.ignore(seen)
        all = {}
        unreachable = gc.collect()
        self._getrec(gc.get_referents(self),all_=all,seen_=seen)
        self.ignore(all)
        for k in all:
            self.ignore(k)
        del all
        del seen
        self.checkpoint()
 
[docs]    def ignore(self,e):
        """Ignore the specified object."""
        self._ignore.add(id(e))
 
[docs]    def checkpoint(self):
        """Take a snapshot of the current state of the memory."""
        self._all.clear()
        seen = set(self._ignore)
        self.ignore(seen)
        unreachable = gc.collect()
        objects = gc.get_objects()
        self.ignore(objects)
        self._getrec(objects,all_=self._all,seen_=seen)
        print "Checkpointed %i objects."%len(self._all)
 
[docs]    def new(self):
        """Return a dictionary of all new objects."""
        gc.collect()
        objects = gc.get_objects()
        seen = set()
        all = {}
        seen.add(id(objects))
        seen.add(id(seen))
        seen.add(id(all))
        frame = sys._getframe()
        seen.add(id(frame))
        seen.add(id(frame.f_back))
        all.clear()
        self._getrec(objects,all,seen)
        return [(k,all[k]) for k in all if k not in self._all]
         
    def _getrec(self,slist,all_,seen_):
        """Recursively process all objects in slist and add all objects to
        the dictionary all_ except those with id's in the set seen_."""
        for e in slist:
            ide = id(e)
            if ide in seen_ or ide in self._ignore:
                continue
            seen_.add(ide)
            all_[ide] = e
            referents = gc.get_referents(e)
            seen_.add(id(referents))
            if referents:
                self._getrec(referents,all_,seen_)
            del referents
 
def _getr(slist, olist, seen):
    """Recursively expand slist's objects into olist, using seen to track
    already processed objects."""
    for e in slist:
        if id(e) in seen:
            continue
        seen[id(e)] = None
        olist.append(e)
        unreachable = gc.collect()
        tl = gc.get_referents(e)
        if tl:
            _getr(tl, olist, seen)
def _getr(slist,all_,seen_):
    """Recursively expand slist's object id's into id_set, using seen to track
    already processed objects."""
    for e in slist:
        eid = id(e)
        if eid in all_ or eid in seen_:
            continue
        seen_.add(eid)
        all_[eid] = e
        tl = gc.get_referents(e)
        seen_.add(id(eid))
        if tl:
            _getr(tl,all_,seen_)
# The public function.
[docs]def get_all_objects():
    """Return a list of all live Python
    objects, not including the list itself."""
    gcl = gc.get_objects()
    all = {}
    seen = set()
    # Just in case:
    seen.add(id(gcl))
    seen.add(id(all))
    seen.add(id(seen))
    # _getr does the real work.
    _getr(gcl, all, seen)
    return all
 
_checkpoint_ids = set()
[docs]def check_point():
    """Set a checkpoint consisting of the current objects in memory."""
    _checkpoint_ids.clear()
    all = get_all_objects()
    _checkpoint_ids.update(all)
    del all
 
[docs]def new():
    """Set a checkpoint consisting of the current objects in memory."""
    gc.collect()
    all = get_all_objects()
    ans = dict((k,all[k]) for k in all if k not in _checkpoint_ids)
    
 
[docs]def count_objects():
    """Return the number of objects alive."""
    gc.collect()
    n = len(get_all_objects())
    gc.collect()
    return n