src/visualizer/visualizer/ipython_view.py
author Tom Henderson <tomh@tomh.org>
Wed, 10 May 2017 07:06:03 -0700
changeset 12863 6058f899dbd4
parent 12843 026489a1f662
child 13666 156a43a11e92
permissions -rw-r--r--
visualizer: Fix indentation problems from changeset 12843

"""
Backend to the console plugin.

@author: Eitan Isaacson
@organization: IBM Corporation
@copyright: Copyright (c) 2007 IBM Corporation
@license: BSD

All rights reserved. This program and the accompanying materials are made 
available under the terms of the BSD which accompanies this distribution, and 
is available at U{http://www.opensource.org/licenses/bsd-license.php}
"""
# this file is a modified version of source code from the Accerciser project
# http://live.gnome.org/accerciser

import gtk, gobject
import re
import sys
import os
import pango
from StringIO import StringIO
import IPython

from pkg_resources import parse_version

try:
  import IPython
except ImportError:
  ##@ var IPython
  # 
  IPython = None

## IterableIPShell class
class IterableIPShell:
  ## @var IP
  #  IP
  ## @var iter_more
  #  iterate more
  ## @var history_level
  #  history level
  ## @var complete_sep
  #  separators
  ## @var prompt
  #  prompt
  ## @var header
  #  header
  ## @var config
  #  config
  ## @var user_ns
  #  user_ns
  ## @var old_stdout
  #  saved stdout
  ## @var old_stderr
  #  saved stderr
  ## @var system
  #  system 
  ## @var cfg
  # configuration
  ## @var colors
  #  colors
  ## @var raw_input_original
  #  original raw input
  ## @var stdin
  #  cin
  ## @var stdout
  #  cout
  ## @var stderr
  #  cerr
  ## @var raw_input
  #  raw input
  ## @var excepthook
  #  exception hook 
  ## Constructor
  def __init__(self,argv=None,user_ns=None,user_global_ns=None,
               cin=None, cout=None,cerr=None, input_func=None):
    """! Initializer
    
    @param self: this object
    @param argv: Command line options for IPython
    @param user_ns: User namespace.
    @param user_global_ns: User global namespace.
    @param cin: Console standard input.
    @param cout: Console standard output.
    @param cerr: Console standard error.
    @param input_func: Replacement for builtin raw_input()
    @return none
    """
    io = IPython.utils.io
    if input_func:
      if parse_version(IPython.release.version) >= parse_version("1.2.1"):
        IPython.terminal.interactiveshell.raw_input_original = input_func
      else:
        IPython.frontend.terminal.interactiveshell.raw_input_original = input_func
    if cin:
      io.stdin = io.IOStream(cin)
    if cout:
      io.stdout = io.IOStream(cout)
    if cerr:
      io.stderr = io.IOStream(cerr)

    # This is to get rid of the blockage that occurs during 
    # IPython.Shell.InteractiveShell.user_setup()

    io.raw_input = lambda x: None

    os.environ['TERM'] = 'dumb'
    excepthook = sys.excepthook 

    from IPython.config.loader import Config
    cfg = Config()
    cfg.InteractiveShell.colors = "Linux"

    # InteractiveShell's __init__ overwrites io.stdout,io.stderr with
    # sys.stdout, sys.stderr, this makes sure they are right
    #
    old_stdout, old_stderr = sys.stdout, sys.stderr
    sys.stdout, sys.stderr = io.stdout.stream, io.stderr.stream

    # InteractiveShell inherits from SingletonConfigurable, so use instance()
    #
    if parse_version(IPython.release.version) >= parse_version("1.2.1"):
      self.IP = IPython.terminal.embed.InteractiveShellEmbed.instance(\
              config=cfg, user_ns=user_ns)
    else:
      self.IP = IPython.frontend.terminal.embed.InteractiveShellEmbed.instance(\
              config=cfg, user_ns=user_ns)

    sys.stdout, sys.stderr = old_stdout, old_stderr

    self.IP.system = lambda cmd: self.shell(self.IP.var_expand(cmd),
                                            header='IPython system call: ')
#                                            local_ns=user_ns)
                                            #global_ns=user_global_ns)
                                            #verbose=self.IP.rc.system_verbose)

    self.IP.raw_input = input_func
    sys.excepthook = excepthook
    self.iter_more = 0
    self.history_level = 0
    self.complete_sep =  re.compile('[\s\{\}\[\]\(\)]')
    self.updateNamespace({'exit':lambda:None})
    self.updateNamespace({'quit':lambda:None})
    self.IP.readline_startup_hook(self.IP.pre_readline)
    # Workaround for updating namespace with sys.modules
    #
    self.__update_namespace()

  def __update_namespace(self):
    """!
    Update self.IP namespace for autocompletion with sys.modules
    """
    for k, v in list(sys.modules.items()):
        if not '.' in k:
          self.IP.user_ns.update({k:v})

  def execute(self):
    """!
    Executes the current line provided by the shell object.
    """
    self.history_level = 0
    orig_stdout = sys.stdout
    sys.stdout = IPython.utils.io.stdout

    orig_stdin = sys.stdin
    sys.stdin = IPython.utils.io.stdin;
    self.prompt = self.generatePrompt(self.iter_more)

    self.IP.hooks.pre_prompt_hook()
    if self.iter_more:
        try:
            self.prompt = self.generatePrompt(True)
        except:
            self.IP.showtraceback()
        if self.IP.autoindent:
            self.IP.rl_do_indent = True

    try:
      line = self.IP.raw_input(self.prompt)
    except KeyboardInterrupt:
      self.IP.write('\nKeyboardInterrupt\n')
      self.IP.input_splitter.reset()
    except:
      self.IP.showtraceback()
    else:
      self.IP.input_splitter.push(line)
      self.iter_more = self.IP.input_splitter.push_accepts_more()
      self.prompt = self.generatePrompt(self.iter_more)
      if (self.IP.SyntaxTB.last_syntax_error and
          self.IP.autoedit_syntax):
          self.IP.edit_syntax_error()
      if not self.iter_more:
          if parse_version(IPython.release.version) >= parse_version("2.0.0-dev"):
            source_raw = self.IP.input_splitter.raw_reset()
          else:
            source_raw = self.IP.input_splitter.source_raw_reset()[1]
          self.IP.run_cell(source_raw, store_history=True)
          self.IP.rl_do_indent = False
      else:
          # TODO: Auto-indent
          #
          self.IP.rl_do_indent = True
          pass

    sys.stdout = orig_stdout
    sys.stdin = orig_stdin

  def generatePrompt(self, is_continuation):
    """!
    Generate prompt depending on is_continuation value

    @param is_continuation
    @return: The prompt string representation

    """

    # Backwards compatibility with ipyton-0.11
    #
    ver = IPython.__version__
    if '0.11' in ver:
        prompt = self.IP.hooks.generate_prompt(is_continuation)
    else:
        if is_continuation:
            prompt = self.IP.prompt_manager.render('in2')
        else:
            prompt = self.IP.prompt_manager.render('in')

    return prompt


  def historyBack(self):
    """!
    Provides one history command back.
    
    @param self this object
    @return: The command string.
    """
    self.history_level -= 1
    if not self._getHistory():
      self.history_level +=1
    return self._getHistory()

  def historyForward(self):
    """!
    Provides one history command forward.
    
    @param self this object
    @return: The command string.
    """
    if self.history_level < 0:
      self.history_level += 1
    return self._getHistory()

  def _getHistory(self):
    """!
    Get's the command string of the current history level.
    
    @param self this object
    @return: Historic command string.
    """
    try:
      rv = self.IP.user_ns['In'][self.history_level].strip('\n')
    except IndexError:
      rv = ''
    return rv

  def updateNamespace(self, ns_dict):
    """!
    Add the current dictionary to the shell namespace.
    
    @param ns_dict: A dictionary of symbol-values.
    @return none
    """
    self.IP.user_ns.update(ns_dict)

  def complete(self, line):
    """!
    Returns an auto completed line and/or posibilities for completion.
    
    @param line: Given line so far.
    @return: Line completed as for as possible, and possible further completions.
    """
    split_line = self.complete_sep.split(line)
    if split_line[-1]:
      possibilities = self.IP.complete(split_line[-1])
    else:
      completed = line
      possibilities = ['', []]
    if possibilities:
      def _commonPrefix(str1, str2):
        """!
        Reduction function. returns common prefix of two given strings.
        
        @param str1: First string.
        @param str2: Second string
        @return: Common prefix to both strings.
        """
        for i in range(len(str1)):
          if not str2.startswith(str1[:i+1]):
            return str1[:i]
        return str1
      if possibilities[1]:
        common_prefix = reduce(_commonPrefix, possibilities[1]) or line[-1]
        completed = line[:-len(split_line[-1])]+common_prefix
      else:
        completed = line
    else:
      completed = line
    return completed, possibilities[1]
  

  def shell(self, cmd,verbose=0,debug=0,header=''):
    """!
    Replacement method to allow shell commands without them blocking.
    
    @param cmd: Shell command to execute.
    @param verbose: Verbosity
    @param debug: Debug level
    @param header: Header to be printed before output
    @return none
    """
    stat = 0
    if verbose or debug: print header+cmd
    # flush stdout so we don't mangle python's buffering
    if not debug:
      input, output = os.popen4(cmd)
      print output.read()
      output.close()
      input.close()

## ConsoleView class
class ConsoleView(gtk.TextView):
  ## @var ANSI_COLORS
  #  color list
  ## @var text_buffer
  #  text buffer
  ## @var mark
  #  scroll mark
  ## @var color_pat
  #  color pattern
  ## @var line_start
  #  line start
  """
  Specialized text view for console-like workflow.

  @cvar ANSI_COLORS: Mapping of terminal colors to X11 names.
  @type ANSI_COLORS: dictionary

  @ivar text_buffer: Widget's text buffer.
  @type text_buffer: gtk.TextBuffer
  @ivar color_pat: Regex of terminal color pattern
  @type color_pat: _sre.SRE_Pattern
  @ivar mark: Scroll mark for automatic scrolling on input.
  @type mark: gtk.TextMark
  @ivar line_start: Start of command line mark.
  @type line_start: gtk.TextMark
  """
  ANSI_COLORS =  {'0;30': 'Black',     '0;31': 'Red',
                  '0;32': 'Green',     '0;33': 'Brown',
                  '0;34': 'Blue',      '0;35': 'Purple',
                  '0;36': 'Cyan',      '0;37': 'LightGray',
                  '1;30': 'DarkGray',  '1;31': 'DarkRed',
                  '1;32': 'SeaGreen',  '1;33': 'Yellow',
                  '1;34': 'LightBlue', '1;35': 'MediumPurple',
                  '1;36': 'LightCyan', '1;37': 'White'}

  def __init__(self):
    """
    Initialize console view.
    """
    gtk.TextView.__init__(self)
    self.modify_font(pango.FontDescription('Mono'))
    self.set_cursor_visible(True)
    self.text_buffer = self.get_buffer()
    self.mark = self.text_buffer.create_mark('scroll_mark',
                                             self.text_buffer.get_end_iter(),
                                             False)
    for code in self.ANSI_COLORS:
      self.text_buffer.create_tag(code, 
                                  foreground=self.ANSI_COLORS[code], 
                                  weight=700)
    self.text_buffer.create_tag('0')
    self.text_buffer.create_tag('notouch', editable=False)
    self.color_pat = re.compile('\x01?\x1b\[(.*?)m\x02?')
    self.line_start = \
        self.text_buffer.create_mark('line_start', 
                                     self.text_buffer.get_end_iter(), True)
    self.connect('key-press-event', self.onKeyPress)
    
  def write(self, text, editable=False):
    """!
    Write given text to buffer.
    
    @param text: Text to append.
    @param editable: If true, added text is editable.
    @return none
    """
    gobject.idle_add(self._write, text, editable)

  def _write(self, text, editable=False):
    """!
    Write given text to buffer.
    
    @param text: Text to append.
    @param editable: If true, added text is editable.
    @return none
    """
    segments = self.color_pat.split(text)
    segment = segments.pop(0)
    start_mark = self.text_buffer.create_mark(None,
                                              self.text_buffer.get_end_iter(),
                                              True)
    self.text_buffer.insert(self.text_buffer.get_end_iter(), segment)

    if segments:
      ansi_tags = self.color_pat.findall(text)
      for tag in ansi_tags:
        i = segments.index(tag)
        self.text_buffer.insert_with_tags_by_name(self.text_buffer.get_end_iter(),
                                                  segments[i+1], str(tag))
        segments.pop(i)
    if not editable:
      self.text_buffer.apply_tag_by_name('notouch',
                                         self.text_buffer.get_iter_at_mark(start_mark),
                                         self.text_buffer.get_end_iter())
    self.text_buffer.delete_mark(start_mark)
    self.scroll_mark_onscreen(self.mark)

  def showPrompt(self, prompt):
    """!
    Prints prompt at start of line.
    
    @param prompt: Prompt to print.
    @return none
    """
    gobject.idle_add(self._showPrompt, prompt)

  def _showPrompt(self, prompt):
    """!
    Prints prompt at start of line.
    
    @param prompt: Prompt to print.
    @return none
    """
    self._write(prompt)
    self.text_buffer.move_mark(self.line_start,
                               self.text_buffer.get_end_iter())

  def changeLine(self, text):
    """!
    Replace currently entered command line with given text.
    
    @param text: Text to use as replacement.
    @return none
    """
    gobject.idle_add(self._changeLine, text)

  def _changeLine(self, text):
    """!
    Replace currently entered command line with given text.
    
    @param text: Text to use as replacement.
    @return none
    """
    iter = self.text_buffer.get_iter_at_mark(self.line_start)
    iter.forward_to_line_end()
    self.text_buffer.delete(self.text_buffer.get_iter_at_mark(self.line_start), iter)
    self._write(text, True)

  def getCurrentLine(self):
    """!
    Get text in current command line.
    
    @return Text of current command line.
    """
    rv = self.text_buffer.get_slice(
      self.text_buffer.get_iter_at_mark(self.line_start),
      self.text_buffer.get_end_iter(), False)
    return rv

  def showReturned(self, text):
    """!
    Show returned text from last command and print new prompt.
    
    @param text: Text to show.
    @return none
    """
    gobject.idle_add(self._showReturned, text)

  def _showReturned(self, text):
    """!
    Show returned text from last command and print new prompt.
    
    @param text: Text to show.
    @return none
    """
    iter = self.text_buffer.get_iter_at_mark(self.line_start)
    iter.forward_to_line_end()
    self.text_buffer.apply_tag_by_name(
      'notouch', 
      self.text_buffer.get_iter_at_mark(self.line_start),
      iter)
    self._write('\n'+text)
    if text:
      self._write('\n')
    self._showPrompt(self.prompt)
    self.text_buffer.move_mark(self.line_start,self.text_buffer.get_end_iter())
    self.text_buffer.place_cursor(self.text_buffer.get_end_iter())

    if self.IP.rl_do_indent:
      indentation = self.IP.input_splitter.indent_spaces * ' '
      self.text_buffer.insert_at_cursor(indentation)

  def onKeyPress(self, widget, event):
    """!
    Key press callback used for correcting behavior for console-like 
    interfaces. For example 'home' should go to prompt, not to begining of
    line.
    
    @param widget: Widget that key press accored in.
    @param event: Event object
    @return Return True if event should not trickle.
    """
    insert_mark = self.text_buffer.get_insert()
    insert_iter = self.text_buffer.get_iter_at_mark(insert_mark)
    selection_mark = self.text_buffer.get_selection_bound()
    selection_iter = self.text_buffer.get_iter_at_mark(selection_mark)
    start_iter = self.text_buffer.get_iter_at_mark(self.line_start)
    if event.keyval == gtk.keysyms.Home:
      if event.state & gtk.gdk.CONTROL_MASK or event.state & gtk.gdk.MOD1_MASK:
        pass
      elif event.state & gtk.gdk.SHIFT_MASK:
        self.text_buffer.move_mark(insert_mark, start_iter)
        return True
      else:
        self.text_buffer.place_cursor(start_iter)
        return True
    elif event.keyval == gtk.keysyms.Left:
      insert_iter.backward_cursor_position()
      if not insert_iter.editable(True):
        return True
    elif not event.string:
      pass
    elif start_iter.compare(insert_iter) <= 0 and \
          start_iter.compare(selection_iter) <= 0:
      pass
    elif start_iter.compare(insert_iter) > 0 and \
          start_iter.compare(selection_iter) > 0:
      self.text_buffer.place_cursor(start_iter)
    elif insert_iter.compare(selection_iter) < 0:
      self.text_buffer.move_mark(insert_mark, start_iter)
    elif insert_iter.compare(selection_iter) > 0:
      self.text_buffer.move_mark(selection_mark, start_iter)             

    return self.onKeyPressExtend(event)

  def onKeyPressExtend(self, event):
    """!
    For some reason we can't extend onKeyPress directly (bug #500900).
    @param event key press 
    @return none
    """
    pass

## IPythonView class
class IPythonView(ConsoleView, IterableIPShell):
  ## @var cout
  #  cout
  ## @var interrupt
  #  interrupt
  ## @var execute
  #  execute
  ## @var prompt
  #  prompt
  ## @var showPrompt
  #  show prompt
  ## @var history_pos
  #  history list
  ## @var window
  #  GTK Window
  """
  Sub-class of both modified IPython shell and L{ConsoleView} this makes
  a GTK+ IPython console.
  """
  def __init__(self):
    """
    Initialize. Redirect I/O to console.
    """
    ConsoleView.__init__(self)
    self.cout = StringIO()
    IterableIPShell.__init__(self, cout=self.cout,cerr=self.cout,
                             input_func=self.raw_input)
    self.interrupt = False
    self.execute()
    self.prompt = self.generatePrompt(False)
    self.cout.truncate(0)
    self.showPrompt(self.prompt)

  def raw_input(self, prompt=''):
    """!
    Custom raw_input() replacement. Get's current line from console buffer.
    
    @param prompt: Prompt to print. Here for compatability as replacement.
    @return The current command line text.
    """
    if self.interrupt:
      self.interrupt = False
      raise KeyboardInterrupt
    return self.getCurrentLine()

  def onKeyPressExtend(self, event):
    """!
    Key press callback with plenty of shell goodness, like history, 
    autocompletions, etc.
    
    @param event: Event object.
    @return True if event should not trickle.
    """

    if event.state & gtk.gdk.CONTROL_MASK and event.keyval == 99:
      self.interrupt = True
      self._processLine()
      return True
    elif event.keyval == gtk.keysyms.Return:
      self._processLine()
      return True
    elif event.keyval == gtk.keysyms.Up:
      self.changeLine(self.historyBack())
      return True
    elif event.keyval == gtk.keysyms.Down:
      self.changeLine(self.historyForward())
      return True
    elif event.keyval == gtk.keysyms.Tab:
      if not self.getCurrentLine().strip():
        return False
      completed, possibilities = self.complete(self.getCurrentLine())
      if len(possibilities) > 1:
        slice = self.getCurrentLine()
        self.write('\n')
        for symbol in possibilities:
          self.write(symbol+'\n')
        self.showPrompt(self.prompt)
      self.changeLine(completed or slice)
      return True

  def _processLine(self):
    """!
    Process current command line.
    @return none
    """
    self.history_pos = 0
    self.execute()
    rv = self.cout.getvalue()
    if rv: rv = rv.strip('\n')
    self.showReturned(rv)
    self.cout.truncate(0)
    self.cout.seek(0)
 
if __name__ == "__main__":
  window = gtk.Window()
  window.set_default_size(640, 320)
  window.connect('delete-event', lambda x, y: gtk.main_quit())
  window.add(IPythonView())
  window.show_all()
  gtk.main()