2996 lines
		
	
	
	
		
			98 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
			
		
		
	
	
			2996 lines
		
	
	
	
		
			98 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
| #!/usr/bin/env python3
 | |
| 
 | |
| """
 | |
| Decompiler for Python3.7.
 | |
| Decompile a module or a function using the decompile() function
 | |
| 
 | |
| >>> from unpyc3 import decompile
 | |
| >>> def foo(x, y, z=3, *args):
 | |
| ...    global g
 | |
| ...    for i, j in zip(x, y):
 | |
| ...        if z == i + j or args[i] == j:
 | |
| ...            g = i, j
 | |
| ...            return
 | |
| ...    
 | |
| >>> print(decompile(foo))
 | |
| 
 | |
| def foo(x, y, z=3, *args):
 | |
|     global g
 | |
|     for i, j in zip(x, y):
 | |
|         if z == i + j or args[i] == j:
 | |
|             g = i, j
 | |
|             return
 | |
| >>>
 | |
| """
 | |
| from __future__ import annotations
 | |
| 
 | |
| from typing import Union, Iterable, Any, List
 | |
| 
 | |
| __all__ = ['decompile']
 | |
| 
 | |
| 
 | |
| def set_trace(trace_function):
 | |
|     global current_trace
 | |
|     current_trace = trace_function if trace_function else _trace
 | |
| 
 | |
| 
 | |
| def get_trace():
 | |
|     global current_trace
 | |
|     return None if current_trace == _trace else current_trace
 | |
| 
 | |
| 
 | |
| def trace(*args):
 | |
|     global current_trace
 | |
|     if current_trace:
 | |
|         current_trace(*args)
 | |
| 
 | |
| 
 | |
| def _trace(*args):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| current_trace = _trace
 | |
| 
 | |
| # TODO:
 | |
| # - Support for keyword-only arguments
 | |
| # - Handle assert statements better
 | |
| # - (Partly done) Nice spacing between function/class declarations
 | |
| 
 | |
| import dis
 | |
| from array import array
 | |
| from opcode import opname, opmap, HAVE_ARGUMENT, cmp_op
 | |
| import inspect
 | |
| 
 | |
| import struct
 | |
| import sys
 | |
| 
 | |
| # Masks for code object's co_flag attribute
 | |
| VARARGS = 4
 | |
| VARKEYWORDS = 8
 | |
| 
 | |
| # Put opcode names in the global namespace
 | |
| for name, val in opmap.items():
 | |
|     globals()[name] = val
 | |
| PRINT_EXPR = 70
 | |
| 
 | |
| # These opcodes will generate a statement. This is used in the first
 | |
| # pass (in Code.find_else) to find which POP_JUMP_IF_* instructions
 | |
| # are jumps to the else clause of an if statement
 | |
| stmt_opcodes = {
 | |
|     SETUP_LOOP, BREAK_LOOP, CONTINUE_LOOP,
 | |
|     SETUP_FINALLY, END_FINALLY,
 | |
|     SETUP_EXCEPT, POP_EXCEPT,
 | |
|     SETUP_WITH,
 | |
|     POP_BLOCK,
 | |
|     STORE_FAST, DELETE_FAST,
 | |
|     STORE_DEREF, DELETE_DEREF,
 | |
|     STORE_GLOBAL, DELETE_GLOBAL,
 | |
|     STORE_NAME, DELETE_NAME,
 | |
|     STORE_ATTR, DELETE_ATTR,
 | |
|     IMPORT_NAME, IMPORT_FROM,
 | |
|     RETURN_VALUE, YIELD_VALUE,
 | |
|     RAISE_VARARGS,
 | |
|     POP_TOP,
 | |
| }
 | |
| 
 | |
| # Conditional branching opcode that make up if statements and and/or
 | |
| # expressions
 | |
| pop_jump_if_opcodes = (POP_JUMP_IF_TRUE, POP_JUMP_IF_FALSE)
 | |
| 
 | |
| # These opcodes indicate that a pop_jump_if_x to the address just
 | |
| # after them is an else-jump
 | |
| else_jump_opcodes = (
 | |
|     JUMP_FORWARD, RETURN_VALUE, JUMP_ABSOLUTE,
 | |
|     SETUP_LOOP, RAISE_VARARGS, POP_TOP
 | |
| )
 | |
| 
 | |
| # These opcodes indicate for loop rather than while loop
 | |
| for_jump_opcodes = (
 | |
|     GET_ITER, FOR_ITER, GET_ANEXT
 | |
| )
 | |
| 
 | |
| unpack_stmt_opcodes = {STORE_NAME, STORE_FAST, STORE_SUBSCR, STORE_GLOBAL, STORE_DEREF, STORE_ATTR}
 | |
| unpack_terminators = stmt_opcodes - unpack_stmt_opcodes
 | |
| 
 | |
| def read_code(stream):
 | |
|     # This helper is needed in order for the PEP 302 emulation to 
 | |
|     # correctly handle compiled files
 | |
|     # Note: stream must be opened in "rb" mode
 | |
|     import marshal
 | |
| 
 | |
|     if sys.version_info < (3, 4):
 | |
|         import imp
 | |
|         runtime_magic = imp.get_magic()
 | |
|     else:
 | |
|         import importlib.util
 | |
|         runtime_magic = importlib.util.MAGIC_NUMBER
 | |
| 
 | |
|     magic = stream.read(4)
 | |
|     if magic != runtime_magic:
 | |
|         print("*** Warning: file has wrong magic number ***")
 | |
| 
 | |
|     flags = 0
 | |
|     if sys.version_info >= (3, 7):
 | |
|         flags = struct.unpack('i', stream.read(4))[0]
 | |
| 
 | |
|     if flags & 1:
 | |
|         stream.read(4)
 | |
|         stream.read(4)
 | |
|     else:
 | |
|         stream.read(4)  # Skip timestamp
 | |
|         if sys.version_info >= (3, 3):
 | |
|             stream.read(4)  # Skip rawsize
 | |
|             return marshal.load(stream)
 | |
| 
 | |
| 
 | |
| def dec_module(path) -> Suite:
 | |
|     if path.endswith(".py"):
 | |
|         if sys.version_info < (3, 6):
 | |
|             import imp
 | |
|             path = imp.cache_from_source(path)
 | |
|         else:
 | |
|             import importlib.util
 | |
|             path = importlib.util.cache_from_source(path)
 | |
|     elif not path.endswith(".pyc") and not path.endswith(".pyo"):
 | |
|         raise ValueError("path must point to a .py or .pyc file")
 | |
|     with open(path, "rb") as stream:
 | |
|         code_obj = read_code(stream)
 | |
|         code = Code(code_obj)
 | |
|         return code.get_suite(include_declarations=False, look_for_docstring=True)
 | |
| 
 | |
| 
 | |
| def decompile(obj) -> Union[Suite, PyStatement]:
 | |
|     """
 | |
|     Decompile obj if it is a module object, a function or a
 | |
|     code object. If obj is a string, it is assumed to be the path
 | |
|     to a python module.
 | |
|     """
 | |
|     if isinstance(obj, str):
 | |
|         return dec_module(obj)
 | |
|     if inspect.iscode(obj):
 | |
|         code = Code(obj)
 | |
|         return code.get_suite()
 | |
|     if inspect.isfunction(obj):
 | |
|         code = Code(obj.__code__)
 | |
|         defaults = obj.__defaults__
 | |
|         kwdefaults = obj.__kwdefaults__
 | |
|         return DefStatement(code, defaults, kwdefaults, obj.__closure__)
 | |
|     elif inspect.ismodule(obj):
 | |
|         return dec_module(obj.__file__)
 | |
|     else:
 | |
|         msg = "Object must be string, module, function or code object"
 | |
|         raise TypeError(msg)
 | |
| 
 | |
| 
 | |
| class Indent:
 | |
|     def __init__(self, indent_level=0, indent_step=4):
 | |
|         self.level = indent_level
 | |
|         self.step = indent_step
 | |
| 
 | |
|     def write(self, pattern, *args, **kwargs):
 | |
|         if args or kwargs:
 | |
|             pattern = pattern.format(*args, **kwargs)
 | |
|         return self.indent(pattern)
 | |
| 
 | |
|     def __add__(self, indent_increase):
 | |
|         return type(self)(self.level + indent_increase, self.step)
 | |
| 
 | |
| 
 | |
| class IndentPrint(Indent):
 | |
|     def indent(self, string):
 | |
|         print(" " * self.step * self.level + string)
 | |
| 
 | |
| 
 | |
| class IndentString(Indent):
 | |
|     def __init__(self, indent_level=0, indent_step=4, lines=None):
 | |
|         Indent.__init__(self, indent_level, indent_step)
 | |
|         if lines is None:
 | |
|             self.lines = []
 | |
|         else:
 | |
|             self.lines = lines
 | |
| 
 | |
|     def __add__(self, indent_increase):
 | |
|         return type(self)(self.level + indent_increase, self.step, self.lines)
 | |
| 
 | |
|     def sep(self):
 | |
|         if not self.lines or self.lines[-1]:
 | |
|             self.lines.append("")
 | |
| 
 | |
|     def indent(self, string):
 | |
|         self.lines.append(" " * self.step * self.level + string)
 | |
| 
 | |
|     def __str__(self):
 | |
|         return "\n".join(self.lines)
 | |
| 
 | |
| 
 | |
| class Stack:
 | |
|     def __init__(self):
 | |
|         self._stack = []
 | |
|         self._counts = {}
 | |
| 
 | |
|     def __bool__(self):
 | |
|         return bool(self._stack)
 | |
| 
 | |
|     def __len__(self):
 | |
|         return len(self._stack)
 | |
| 
 | |
|     def __contains__(self, val):
 | |
|         return self.get_count(val) > 0
 | |
| 
 | |
|     def get_count(self, obj):
 | |
|         return self._counts.get(id(obj), 0)
 | |
| 
 | |
|     def set_count(self, obj, val):
 | |
|         if val:
 | |
|             self._counts[id(obj)] = val
 | |
|         else:
 | |
|             del self._counts[id(obj)]
 | |
| 
 | |
|     def pop1(self):
 | |
|         val = None
 | |
|         if self._stack:
 | |
|             val = self._stack.pop()
 | |
|         else:
 | |
|             raise Exception('Empty stack popped!')
 | |
|         self.set_count(val, self.get_count(val) - 1)
 | |
|         return val
 | |
| 
 | |
|     def pop(self, count=None):
 | |
|         if count is None:
 | |
|             val = self.pop1()
 | |
|             return val
 | |
|         else:
 | |
|             vals = [self.pop1() for i in range(count)]
 | |
|             vals.reverse()
 | |
|             return vals
 | |
| 
 | |
|     def push(self, *args):
 | |
|         for val in args:
 | |
|             self.set_count(val, self.get_count(val) + 1)
 | |
|             self._stack.append(val)
 | |
| 
 | |
|     def peek(self, count=None):
 | |
|         if count is None:
 | |
|             return self._stack[-1]
 | |
|         else:
 | |
|             return self._stack[-count:]
 | |
| 
 | |
| 
 | |
| def code_walker(code):
 | |
|     l = len(code)
 | |
|     code = array('B', code)
 | |
|     oparg = 0
 | |
|     i = 0
 | |
|     extended_arg = 0
 | |
| 
 | |
|     while i < l:
 | |
|         op = code[i]
 | |
|         offset = 1
 | |
|         if sys.version_info >= (3, 6):
 | |
|             oparg = code[i + offset]
 | |
|             offset += 1
 | |
|         elif op >= HAVE_ARGUMENT:
 | |
|             oparg = code[i + offset] + code[i + offset + 1] * 256 + extended_arg
 | |
|             extended_arg = 0
 | |
|             offset += 2
 | |
|         if op == EXTENDED_ARG:
 | |
|             if sys.version_info >= (3, 6):
 | |
|                 op = code[i + offset]
 | |
|                 offset += 1
 | |
|                 oparg <<= 8
 | |
|                 oparg |= code[i + offset]
 | |
|                 offset += 1
 | |
|             else:
 | |
|                 extended_arg = oparg * 65536
 | |
|         yield i, (op, oparg)
 | |
|         i += offset
 | |
| 
 | |
| 
 | |
| class CodeFlags(object):
 | |
|     def __init__(self, cf):
 | |
|         self.flags = cf
 | |
| 
 | |
|     @property
 | |
|     def optimized(self):
 | |
|         return self.flags & 0x1
 | |
| 
 | |
|     @property
 | |
|     def new_local(self):
 | |
|         return self.flags & 0x2
 | |
| 
 | |
|     @property
 | |
|     def varargs(self):
 | |
|         return self.flags & 0x4
 | |
| 
 | |
|     @property
 | |
|     def varkwargs(self):
 | |
|         return self.flags & 0x8
 | |
|     @property
 | |
|     def nested(self):
 | |
|         return self.flags & 0x10
 | |
| 
 | |
|     @property
 | |
|     def generator(self):
 | |
|         return self.flags & 0x20
 | |
| 
 | |
|     @property
 | |
|     def no_free(self):
 | |
|         return self.flags & 0x40
 | |
| 
 | |
|     @property
 | |
|     def coroutine(self):
 | |
|         return self.flags & 0x80
 | |
| 
 | |
|     @property
 | |
|     def iterable_coroutine(self):
 | |
|         return self.flags & 0x100
 | |
| 
 | |
|     @property
 | |
|     def async_generator(self):
 | |
|         return self.flags & 0x200
 | |
| 
 | |
| 
 | |
| class Code:
 | |
|     def __init__(self, code_obj, parent=None):
 | |
|         self.code_obj = code_obj
 | |
|         self.parent = parent
 | |
|         self.derefnames = [PyName(v)
 | |
|                            for v in code_obj.co_cellvars + code_obj.co_freevars]
 | |
|         self.consts = list(map(PyConst, code_obj.co_consts))
 | |
|         self.names = list(map(PyName, code_obj.co_names))
 | |
|         self.varnames = list(map(PyName, code_obj.co_varnames))
 | |
|         self.instr_seq = list(code_walker(code_obj.co_code))
 | |
|         self.instr_map = {addr: i for i, (addr, _) in enumerate(self.instr_seq)}
 | |
|         self.name = code_obj.co_name
 | |
|         self.globals = []
 | |
|         self.nonlocals = []
 | |
|         self.jump_targets = []
 | |
|         self.find_else()
 | |
|         self.find_jumps()
 | |
|         trace('================================================')
 | |
|         trace(self.code_obj)
 | |
|         trace('================================================')
 | |
|         for addr in self:
 | |
|             trace(str(addr))
 | |
|             if addr.opcode in stmt_opcodes or addr.opcode in pop_jump_if_opcodes:
 | |
|                 trace(' ')
 | |
|         trace('================================================')
 | |
|         self.flags: CodeFlags = CodeFlags(code_obj.co_flags)
 | |
| 
 | |
|     def __getitem__(self, instr_index):
 | |
|         if 0 <= instr_index < len(self.instr_seq):
 | |
|             return Address(self, instr_index)
 | |
| 
 | |
|     def __iter__(self):
 | |
|         for i in range(len(self.instr_seq)):
 | |
|             yield Address(self, i)
 | |
| 
 | |
|     def show(self):
 | |
|         for addr in self:
 | |
|             print(addr)
 | |
| 
 | |
|     def address(self, addr):
 | |
|         return self[self.instr_map[addr]]
 | |
| 
 | |
|     def iscellvar(self, i):
 | |
|         return i < len(self.code_obj.co_cellvars)
 | |
| 
 | |
|     def find_jumps(self):
 | |
|         for addr in self:
 | |
|             opcode, arg = addr
 | |
|             jt = addr.jump()
 | |
|             if jt:
 | |
|                 self.jump_targets.append(jt)
 | |
| 
 | |
|     def find_else(self):
 | |
|         jumps = {}
 | |
|         last_jump = None
 | |
|         for addr in self:
 | |
|             opcode, arg = addr
 | |
|             if opcode in pop_jump_if_opcodes:
 | |
|                 jump_addr = self.address(arg)
 | |
|                 if (jump_addr[-1].opcode in else_jump_opcodes
 | |
|                         or jump_addr.opcode == FOR_ITER):
 | |
|                     last_jump = addr
 | |
|                     jumps[jump_addr] = addr
 | |
|             elif opcode == JUMP_ABSOLUTE:
 | |
|                 # This case is to deal with some nested ifs such as:
 | |
|                 # if a:
 | |
|                 # if b:
 | |
|                 #         f()
 | |
|                 #     elif c:
 | |
|                 #         g()
 | |
|                 jump_addr = self.address(arg)
 | |
|                 if jump_addr in jumps:
 | |
|                     jumps[addr] = jumps[jump_addr]
 | |
|             elif opcode == JUMP_FORWARD:
 | |
|                 jump_addr = addr[1] + arg
 | |
|                 if jump_addr in jumps:
 | |
|                     jumps[addr] = jumps[jump_addr]
 | |
|             elif opcode in stmt_opcodes and last_jump is not None:
 | |
|                 # This opcode will generate a statement, so it means
 | |
|                 # that the last POP_JUMP_IF_x was an else-jump
 | |
|                 jumps[addr] = last_jump
 | |
|         self.else_jumps = set(jumps.values())
 | |
| 
 | |
|     def get_suite(self, include_declarations=True, look_for_docstring=False) -> Suite:
 | |
|         dec = SuiteDecompiler(self[0])
 | |
|         dec.run()
 | |
|         first_stmt = dec.suite and dec.suite[0]
 | |
|         # Change __doc__ = "docstring" to "docstring"
 | |
|         if look_for_docstring and isinstance(first_stmt, AssignStatement):
 | |
|             chain = first_stmt.chain
 | |
|             if len(chain) == 2 and str(chain[0]) == "__doc__":
 | |
|                 dec.suite[0] = DocString(first_stmt.chain[1].val)
 | |
|         if include_declarations and (self.globals or self.nonlocals):
 | |
|             suite = Suite()
 | |
|             if self.globals:
 | |
|                 stmt = "global " + ", ".join(map(str, self.globals))
 | |
|                 suite.add_statement(SimpleStatement(stmt))
 | |
|             if self.nonlocals:
 | |
|                 stmt = "nonlocal " + ", ".join(map(str, self.nonlocals))
 | |
|                 suite.add_statement(SimpleStatement(stmt))
 | |
|             for stmt in dec.suite:
 | |
|                 suite.add_statement(stmt)
 | |
|             return suite
 | |
|         else:
 | |
|             return dec.suite
 | |
| 
 | |
|     def declare_global(self, name):
 | |
|         """
 | |
|         Declare name as a global.  Called by STORE_GLOBAL and
 | |
|         DELETE_GLOBAL
 | |
|         """
 | |
|         if name not in self.globals:
 | |
|             self.globals.append(name)
 | |
| 
 | |
|     def ensure_global(self, name):
 | |
|         """
 | |
|         Declare name as global only if it is also a local variable
 | |
|         name in one of the surrounding code objects.  This is called
 | |
|         by LOAD_GLOBAL
 | |
|         """
 | |
|         parent = self.parent
 | |
|         while parent:
 | |
|             if name in parent.varnames:
 | |
|                 return self.declare_global(name)
 | |
|             parent = parent.parent
 | |
| 
 | |
|     def declare_nonlocal(self, name):
 | |
|         """
 | |
|         Declare name as nonlocal.  Called by STORE_DEREF and
 | |
|         DELETE_DEREF (but only when the name denotes a free variable,
 | |
|         not a cell one).
 | |
|         """
 | |
|         if name not in self.nonlocals:
 | |
|             self.nonlocals.append(name)
 | |
| 
 | |
| 
 | |
| 
 | |
| class Address:
 | |
|     def __init__(self, code, instr_index):
 | |
|         self.code = code
 | |
|         self.index = instr_index
 | |
|         self.addr, (self.opcode, self.arg) = code.instr_seq[instr_index]
 | |
| 
 | |
|     def __le__(self, other):
 | |
|         return isinstance(other, type(self)) and self.index <= other.index
 | |
| 
 | |
|     def __ge__(self, other):
 | |
|         return isinstance(other, type(self)) and self.index >= other.index
 | |
| 
 | |
|     def __eq__(self, other):
 | |
|         return (isinstance(other, type(self))
 | |
|                 and self.code == other.code and self.index == other.index)
 | |
| 
 | |
|     def __lt__(self, other):
 | |
|         return other is None or (isinstance(other, type(self))
 | |
|                                  and self.code == other.code and self.index < other.index)
 | |
| 
 | |
|     def __str__(self):
 | |
|         mark = "* " if self in self.code.else_jumps else "  "
 | |
|         jump = self.jump()
 | |
|         jt = '>>' if self.is_jump_target else '  '
 | |
|         arg = self.arg or "  "
 | |
|         jdest = '\t(to {})'.format(jump.addr) if jump and jump.addr != self.arg else ''
 | |
|         val = ''
 | |
|         op = opname[self.opcode].ljust(18, ' ')
 | |
|         try:
 | |
| 
 | |
|             val = len(self.code.globals) and self.code.globals[self.arg] and self.arg + 1 < len(self.code.globals) if 'GLOBAL' in op else \
 | |
|                 self.code.names[self.arg] if 'ATTR' in op else \
 | |
|                     self.code.names[self.arg] if 'NAME' in op else \
 | |
|                         self.code.names[self.arg] if 'LOAD_METHOD' in op else \
 | |
|                             self.code.consts[self.arg] if 'CONST' in op else \
 | |
|                                 self.code.varnames[self.arg] if 'FAST' in op else \
 | |
|                                     self.code.derefnames[self.arg] if 'DEREF' in op else \
 | |
|                                         cmp_op[self.arg] if 'COMPARE' in op else ''
 | |
|             if val != '':
 | |
|                 val = '\t({})'.format(val)
 | |
|         except:
 | |
|             pass
 | |
| 
 | |
|         return "{}{}\t{}\t{}\t{}{}{}".format(
 | |
|             jt,
 | |
|             mark,
 | |
|             self.addr,
 | |
|             op,
 | |
|             arg,
 | |
|             jdest,
 | |
|             val
 | |
|         )
 | |
| 
 | |
|     def __add__(self, delta):
 | |
|         return self.code.address(self.addr + delta)
 | |
| 
 | |
|     def __getitem__(self, index) -> Address:
 | |
|         return self.code[self.index + index]
 | |
| 
 | |
|     def __iter__(self):
 | |
|         yield self.opcode
 | |
|         yield self.arg
 | |
| 
 | |
|     def __hash__(self):
 | |
|         return hash((self.code, self.index))
 | |
| 
 | |
|     @property
 | |
|     def is_else_jump(self):
 | |
|         return self in self.code.else_jumps
 | |
| 
 | |
|     @property
 | |
|     def is_jump_target(self):
 | |
|         return self in self.code.jump_targets
 | |
| 
 | |
|     def change_instr(self, opcode, arg=None):
 | |
|         self.code.instr_seq[self.index] = (self.addr, (opcode, arg))
 | |
| 
 | |
|     def jump(self) -> Address:
 | |
|         opcode = self.opcode
 | |
|         if opcode in dis.hasjrel:
 | |
|             return self[1] + self.arg
 | |
|         elif opcode in dis.hasjabs:
 | |
|             return self.code.address(self.arg)
 | |
| 
 | |
|     def seek(self, opcode: Iterable, increment: int, end: Address = None) -> Address:
 | |
|         if not isinstance(opcode, Iterable):
 | |
|             opcode = (opcode,)
 | |
|         a = self[increment]
 | |
|         while a and a != end:
 | |
|             if a.opcode in opcode:
 | |
|                 return a
 | |
|             a = a[increment]
 | |
| 
 | |
|     def seek_back(self, opcode: Union[Iterable, int], end: Address = None) -> Address:
 | |
|         return self.seek(opcode, -1, end)
 | |
| 
 | |
|     def seek_forward(self, opcode: Union[Iterable, int], end: Address = None) -> Address:
 | |
|         return self.seek(opcode, 1, end)
 | |
| 
 | |
| 
 | |
|     def seek_back_statement(self, opcode: Union[Iterable, int]) -> Address:
 | |
|         last_statement = self.seek_back(stmt_opcodes)
 | |
|         return self.seek(opcode, -1, last_statement)
 | |
| 
 | |
|     def seek_forward_statement(self, opcode: Union[Iterable, int]) -> Address:
 | |
|         next_statement = self.seek_forward(stmt_opcodes)
 | |
|         return self.seek(opcode, 1, next_statement)
 | |
| 
 | |
| 
 | |
| class AsyncMixin:
 | |
|     def __init__(self):
 | |
|         self.is_async = False
 | |
| 
 | |
|     @property
 | |
|     def async_prefix(self):
 | |
|         return 'async ' if self.is_async else ''
 | |
| 
 | |
| 
 | |
| class AwaitableMixin:
 | |
| 
 | |
|     def __init__(self):
 | |
|         self.is_awaited = False
 | |
| 
 | |
|     @property
 | |
|     def await_prefix(self):
 | |
|         return 'await ' if self.is_awaited else ''
 | |
| 
 | |
| 
 | |
| class PyExpr:
 | |
|     def wrap(self, condition=True):
 | |
|         if condition:
 | |
|             return "({})".format(self)
 | |
|         else:
 | |
|             return str(self)
 | |
| 
 | |
|     def store(self, dec, dest):
 | |
|         chain = dec.assignment_chain
 | |
|         chain.append(dest)
 | |
|         if self not in dec.stack:
 | |
|             chain.append(self)
 | |
|             dec.suite.add_statement(AssignStatement(chain))
 | |
|             dec.assignment_chain = []
 | |
| 
 | |
|     def on_pop(self, dec : SuiteDecompiler):
 | |
|         dec.write(str(self))
 | |
| 
 | |
| 
 | |
| class PyConst(PyExpr):
 | |
| 
 | |
|     def __init__(self, val):
 | |
|         self.val = val
 | |
|         if isinstance(val, int):
 | |
|             self.precedence=14
 | |
|         else:
 | |
|             self.precedence = 100
 | |
| 
 | |
|     def __str__(self):
 | |
|         if self.val == 1e10000:
 | |
|             return '1e10000'
 | |
|         elif isinstance(self.val, frozenset):
 | |
|             l = list(self.val)
 | |
|             l.sort()
 | |
|             vals = ', '.join(map(repr,l))
 | |
|             return f'{{{vals}}}'
 | |
|         elif isinstance(self.val, str) and len(self.val) > 20 and '\0' not in self.val and '\x01' not in self.val:
 | |
|             splt = self.val.split('\n')
 | |
|             if len(splt) > 1:
 | |
|                 return '\"\"\"' + '\n'.join(map(lambda s: s.replace('\\', '\\\\').replace('"', '\\"'), splt)) \
 | |
|                        + '\"\"\"'
 | |
|         return repr(self.val)
 | |
| 
 | |
|     def __iter__(self):
 | |
|         return iter(self.val)
 | |
| 
 | |
|     def __eq__(self, other):
 | |
|         return isinstance(other, PyConst) and self.val == other.val
 | |
| 
 | |
| 
 | |
| class PyFormatValue(PyConst):
 | |
|     def __init__(self, val):
 | |
|         super().__init__(val)
 | |
|         self.formatter = ''
 | |
| 
 | |
|     @staticmethod
 | |
|     def fmt(string):
 | |
|         return f'f\'{string}\''
 | |
| 
 | |
|     def base(self):
 | |
|         return f'{{{self.val}{self.formatter}}}'
 | |
| 
 | |
|     def __str__(self):
 | |
|         return self.fmt(self.base())
 | |
| 
 | |
| class PyFormatString(PyExpr):
 | |
|     precedence = 100
 | |
| 
 | |
|     def __init__(self, params):
 | |
|         super().__init__()
 | |
|         self.params = params
 | |
| 
 | |
|     def __str__(self):
 | |
|         return "f'{}'".format(''.join([
 | |
|             p.base().replace('\'', '\"') if isinstance(p, PyFormatValue) else
 | |
|             p.name if isinstance(p, PyName) else
 | |
|             str(p.val.encode('utf-8'))[1:].replace('\'', '').replace('{','{{').replace('}','}}')
 | |
|             for p in self.params])
 | |
|         )
 | |
| 
 | |
| 
 | |
| class PyTuple(PyExpr):
 | |
|     precedence = 0
 | |
| 
 | |
|     def __init__(self, values):
 | |
|         self.values = values
 | |
| 
 | |
|     def __str__(self):
 | |
|         if not self.values:
 | |
|             return "()"
 | |
|         valstr = [val.wrap(val.precedence <= self.precedence)
 | |
|                   for val in self.values]
 | |
|         if len(valstr) == 1:
 | |
|             return '(' + valstr[0] + "," + ')'
 | |
|         else:
 | |
|             return '(' + ", ".join(valstr) + ')'
 | |
| 
 | |
|     def __iter__(self):
 | |
|         return iter(self.values)
 | |
| 
 | |
|     def wrap(self, condition=True):
 | |
|         return str(self)
 | |
| 
 | |
| 
 | |
| class PyList(PyExpr):
 | |
|     precedence = 16
 | |
| 
 | |
|     def __init__(self, values):
 | |
|         self.values = values
 | |
| 
 | |
|     def __str__(self):
 | |
|         valstr = ", ".join(val.wrap(val.precedence <= 0)
 | |
|                            for val in self.values)
 | |
|         return "[{}]".format(valstr)
 | |
| 
 | |
|     def __iter__(self):
 | |
|         return iter(self.values)
 | |
| 
 | |
| 
 | |
| class PySet(PyExpr):
 | |
|     precedence = 16
 | |
| 
 | |
|     def __init__(self, values):
 | |
|         self.values = values
 | |
| 
 | |
|     def __str__(self):
 | |
|         valstr = ", ".join(val.wrap(val.precedence <= 0)
 | |
|                            for val in self.values)
 | |
|         return "{{{}}}".format(valstr)
 | |
| 
 | |
|     def __iter__(self):
 | |
|         return iter(self.values)
 | |
| 
 | |
| 
 | |
| class PyDict(PyExpr):
 | |
|     precedence = 16
 | |
| 
 | |
|     def __init__(self):
 | |
|         self.items = []
 | |
| 
 | |
|     def set_item(self, key, val):
 | |
|         self.items.append((key, val))
 | |
| 
 | |
|     def __str__(self):
 | |
|         itemstr = ", ".join(f"{kv[0]}: {kv[1]}" if len(kv) == 2 else str(kv[0]) for kv in self.items)
 | |
|         return f"{{{itemstr}}}"
 | |
| 
 | |
| 
 | |
| class PyName(PyExpr,AwaitableMixin):
 | |
|     precedence = 100
 | |
| 
 | |
|     def __init__(self, name):
 | |
|         AwaitableMixin.__init__(self)
 | |
|         self.name = name
 | |
| 
 | |
|     def __str__(self):
 | |
|         return f'{self.await_prefix}{self.name}'
 | |
| 
 | |
|     def __eq__(self, other):
 | |
|         return isinstance(other, type(self)) and self.name == other.name
 | |
| 
 | |
| 
 | |
| class PyUnaryOp(PyExpr):
 | |
|     def __init__(self, operand):
 | |
|         self.operand = operand
 | |
| 
 | |
|     def __str__(self):
 | |
|         opstr = self.operand.wrap(self.operand.precedence < self.precedence)
 | |
|         return self.pattern.format(opstr)
 | |
| 
 | |
|     @classmethod
 | |
|     def instr(cls, stack):
 | |
|         stack.push(cls(stack.pop()))
 | |
| 
 | |
| 
 | |
| class PyBinaryOp(PyExpr):
 | |
|     def __init__(self, left, right):
 | |
|         self.left = left
 | |
|         self.right = right
 | |
| 
 | |
|     def wrap_left(self):
 | |
|         return self.left.wrap(self.left.precedence < self.precedence)
 | |
| 
 | |
|     def wrap_right(self):
 | |
|         return self.right.wrap(self.right.precedence <= self.precedence)
 | |
| 
 | |
|     def __str__(self):
 | |
|         return self.pattern.format(self.wrap_left(), self.wrap_right())
 | |
| 
 | |
|     @classmethod
 | |
|     def instr(cls, stack):
 | |
|         right = stack.pop()
 | |
|         left = stack.pop()
 | |
|         stack.push(cls(left, right))
 | |
| 
 | |
| 
 | |
| class PySubscript(PyBinaryOp):
 | |
|     precedence = 15
 | |
|     pattern = "{}[{}]"
 | |
| 
 | |
|     def wrap_right(self):
 | |
|         return str(self.right)
 | |
| 
 | |
| 
 | |
| class PySlice(PyExpr):
 | |
|     precedence = 1
 | |
| 
 | |
|     def __init__(self, args):
 | |
|         assert len(args) in (2, 3)
 | |
|         if len(args) == 2:
 | |
|             self.start, self.stop = args
 | |
|             self.step = None
 | |
|         else:
 | |
|             self.start, self.stop, self.step = args
 | |
|         if self.start == PyConst(None):
 | |
|             self.start = ""
 | |
|         if self.stop == PyConst(None):
 | |
|             self.stop = ""
 | |
| 
 | |
|     def __str__(self):
 | |
|         if self.step is None:
 | |
|             return "{}:{}".format(self.start, self.stop)
 | |
|         else:
 | |
|             return "{}:{}:{}".format(self.start, self.stop, self.step)
 | |
| 
 | |
| 
 | |
| class PyCompare(PyExpr):
 | |
|     precedence = 6
 | |
| 
 | |
|     def __init__(self, complist):
 | |
|         self.complist = complist
 | |
| 
 | |
|     def __str__(self):
 | |
|         return " ".join(x if i % 2 else x.wrap(x.precedence <= 6)
 | |
|                         for i, x in enumerate(self.complist))
 | |
| 
 | |
|     def extends(self, other):
 | |
|         if not isinstance(other, PyCompare):
 | |
|             return False
 | |
|         else:
 | |
|             return self.complist[0] == other.complist[-1]
 | |
| 
 | |
|     def chain(self, other):
 | |
|         return PyCompare(self.complist + other.complist[1:])
 | |
| 
 | |
| 
 | |
| class PyBooleanAnd(PyBinaryOp):
 | |
|     precedence = 4
 | |
|     pattern = "{} and {}"
 | |
| 
 | |
| 
 | |
| class PyBooleanOr(PyBinaryOp):
 | |
|     precedence = 3
 | |
|     pattern = "{} or {}"
 | |
| 
 | |
| 
 | |
| class PyIfElse(PyExpr):
 | |
|     precedence = 2
 | |
| 
 | |
|     def __init__(self, cond, true_expr, false_expr):
 | |
|         self.cond = cond
 | |
|         self.true_expr = true_expr
 | |
|         self.false_expr = false_expr
 | |
| 
 | |
|     def __str__(self):
 | |
|         p = self.precedence
 | |
|         cond_str = self.cond.wrap(self.cond.precedence <= p)
 | |
|         true_str = self.true_expr.wrap(self.cond.precedence <= p)
 | |
|         false_str = self.false_expr.wrap(self.cond.precedence < p)
 | |
|         return "{} if {} else {}".format(true_str, cond_str, false_str)
 | |
| 
 | |
| 
 | |
| class PyAttribute(PyExpr):
 | |
|     precedence = 15
 | |
| 
 | |
|     def __init__(self, expr, attrname):
 | |
|         self.expr = expr
 | |
|         self.attrname = attrname
 | |
| 
 | |
|     def __str__(self):
 | |
|         expr_str = self.expr.wrap(self.expr.precedence < self.precedence)
 | |
|         attrname = self.attrname
 | |
| 
 | |
|         if isinstance(self.expr, PyName) and self.expr.name == 'self':
 | |
|             __ = attrname.name.find('__')
 | |
|             if __ > 0:
 | |
|                 attrname = PyName(self.attrname.name[__:])
 | |
|         return "{}.{}".format(expr_str, attrname)
 | |
| 
 | |
| 
 | |
| class PyCallFunction(PyExpr, AwaitableMixin):
 | |
|     precedence = 15
 | |
| 
 | |
|     def __init__(self, func: PyAttribute, args: list, kwargs: list, varargs=None, varkw=None):
 | |
|         AwaitableMixin.__init__(self)
 | |
|         self.func = func
 | |
|         self.args = args
 | |
|         self.kwargs = kwargs
 | |
|         self.varargs = varargs if not varargs or isinstance(varargs,Iterable) else {varargs}
 | |
|         self.varkw = varkw if not varkw or isinstance(varkw,Iterable) else {varkw}
 | |
| 
 | |
|     def __str__(self):
 | |
|         funcstr = self.func.wrap(self.func.precedence < self.precedence)
 | |
|         if hasattr(self.args, '__iter__') and len(self.args) == 1 and not (self.kwargs or self.varargs
 | |
|                                                                            or self.varkw):
 | |
|             arg = self.args[0]
 | |
|             if isinstance(arg, PyGenExpr):
 | |
|                 # Only one pair of brackets arount a single arg genexpr
 | |
|                 return "{}{}".format(funcstr, arg)
 | |
|         args = [x.wrap(x.precedence <= 0) for x in self.args]
 | |
|         if self.varargs is not None:
 | |
|             for varargs in self.varargs:
 | |
|                 args.append("*{}".format(varargs))
 | |
|         args.extend("{}={}".format(str(k).replace('\'', ''), v.wrap(v.precedence <= 0))
 | |
|                     for k, v in self.kwargs)
 | |
|         if self.varkw is not None:
 | |
|             for varkw in self.varkw:
 | |
|                 args.append("**{}".format(varkw))
 | |
|         return "{}{}({})".format(self.await_prefix, funcstr, ", ".join(args))
 | |
| 
 | |
| 
 | |
| class FunctionDefinition:
 | |
|     def __init__(self, code: Code, defaults, kwdefaults, closure, paramobjs=None, annotations=None):
 | |
|         self.code = code
 | |
|         self.defaults = defaults
 | |
|         self.kwdefaults = kwdefaults
 | |
|         self.closure = closure
 | |
|         self.paramobjs = paramobjs if paramobjs else {}
 | |
|         self.annotations = annotations if annotations else []
 | |
| 
 | |
|     def is_coroutine(self):
 | |
|         return self.code.code_obj.co_flags & 0x100
 | |
| 
 | |
|     def getparams(self):
 | |
|         code_obj = self.code.code_obj
 | |
|         l = code_obj.co_argcount
 | |
|         params = []
 | |
|         for name in code_obj.co_varnames[:l]:
 | |
|             if name in self.paramobjs:
 | |
|                 params.append('{}:{}'.format(name, str(self.paramobjs[name])))
 | |
|             else:
 | |
|                 params.append(name)
 | |
|         if self.defaults:
 | |
|             for i, arg in enumerate(reversed(self.defaults)):
 | |
|                 name = params[-i - 1]
 | |
|                 if name in self.paramobjs:
 | |
|                     params[-i - 1] = "{}:{}={}".format(name, str(self.paramobjs[name]), arg)
 | |
|                 else:
 | |
|                     params[-i - 1] = "{}={}".format(name, arg)
 | |
|         kwcount = code_obj.co_kwonlyargcount
 | |
|         kwparams = []
 | |
|         if kwcount:
 | |
|             for i in range(kwcount):
 | |
|                 name = code_obj.co_varnames[l + i]
 | |
|                 if name in self.kwdefaults and name in self.paramobjs:
 | |
|                     kwparams.append("{}:{}={}".format(name, self.paramobjs[name], self.kwdefaults[name]))
 | |
|                 elif name in self.kwdefaults:
 | |
|                     kwparams.append("{}={}".format(name, self.kwdefaults[name]))
 | |
|                 else:
 | |
|                     kwparams.append(name)
 | |
|             l += kwcount
 | |
|         if code_obj.co_flags & VARARGS:
 | |
|             name = code_obj.co_varnames[l]
 | |
|             if name in self.paramobjs:
 | |
|                 params.append(f'*{name}:{str(self.paramobjs[name])}')
 | |
|             else:
 | |
|                 params.append(f'*{name}')
 | |
|             l += 1
 | |
|         elif kwparams:
 | |
|             params.append("*")
 | |
|         params.extend(kwparams)
 | |
|         if code_obj.co_flags & VARKEYWORDS:
 | |
|             name = code_obj.co_varnames[l]
 | |
|             if name in self.paramobjs:
 | |
|                 params.append(f'**{name}:{str(self.paramobjs[name])}')
 | |
|             else:
 | |
|                 params.append(f'**{name}')
 | |
| 
 | |
|         return params
 | |
| 
 | |
|     def getreturn(self):
 | |
|         if self.paramobjs and 'return' in self.paramobjs:
 | |
|             return self.paramobjs['return']
 | |
|         return None
 | |
| 
 | |
| 
 | |
| class PyLambda(PyExpr, FunctionDefinition):
 | |
|     precedence = 1
 | |
| 
 | |
|     def __str__(self):
 | |
|         suite = self.code.get_suite()
 | |
|         params = ", ".join(self.getparams())
 | |
|         if len(suite.statements) > 0:
 | |
|             def strip_return(val):
 | |
|                 return val[len("return "):] if val.startswith('return') else val
 | |
| 
 | |
|             def strip_yield_none(val):
 | |
|                 return '(yield)' if val == 'yield None' else val
 | |
| 
 | |
|             if isinstance(suite[0], IfStatement):
 | |
|                 end = suite[1] if len(suite) > 1 else PyConst(None)
 | |
|                 expr = "{} if {} else {}".format(
 | |
|                     strip_return(str(suite[0].true_suite)),
 | |
|                     str(suite[0].cond),
 | |
|                     strip_return(str(end))
 | |
|                 )
 | |
|             else:
 | |
|                 expr = strip_return(str(suite[0]))
 | |
|                 expr = strip_yield_none(expr)
 | |
|         else:
 | |
|             expr = "None"
 | |
|         return "lambda {}: {}".format(params, expr)
 | |
| 
 | |
| 
 | |
| class PyComp(PyExpr):
 | |
|     """
 | |
|     Abstraction for list, set, dict comprehensions and generator expressions
 | |
|     """
 | |
|     precedence = 16
 | |
| 
 | |
|     def __init__(self, code, defaults, kwdefaults, closure, paramobjs={}, annotations=[]):
 | |
|         assert not defaults and not kwdefaults
 | |
|         self.code = code
 | |
|         code[0].change_instr(NOP)
 | |
|         last_i = len(code.instr_seq) - 1
 | |
|         code[last_i].change_instr(NOP)
 | |
|         self.annotations = annotations
 | |
| 
 | |
|     def set_iterable(self, iterable):
 | |
|         self.code.varnames[0] = iterable
 | |
| 
 | |
|     def __str__(self):
 | |
|         suite = self.code.get_suite()
 | |
|         return self.pattern.format(suite.gen_display())
 | |
| 
 | |
| 
 | |
| class PyListComp(PyComp):
 | |
|     pattern = "[{}]"
 | |
| 
 | |
| 
 | |
| class PySetComp(PyComp):
 | |
|     pattern = "{{{}}}"
 | |
| 
 | |
| 
 | |
| class PyKeyValue(PyBinaryOp):
 | |
|     """This is only to create dict comprehensions"""
 | |
|     precedence = 1
 | |
|     pattern = "{}: {}"
 | |
| 
 | |
| 
 | |
| class PyDictComp(PyComp):
 | |
|     pattern = "{{{}}}"
 | |
| 
 | |
| 
 | |
| class PyGenExpr(PyComp):
 | |
|     precedence = 16
 | |
|     pattern = "({})"
 | |
| 
 | |
|     def __init__(self, code, defaults, kwdefaults, closure, paramobjs={}, annotations=[]):
 | |
|         self.code = code
 | |
| 
 | |
| 
 | |
| class PyYield(PyExpr):
 | |
|     precedence = 0
 | |
|     pattern = "yield {}"
 | |
| 
 | |
|     def __init__(self, value):
 | |
|         self.value = value
 | |
| 
 | |
|     def __str__(self):
 | |
|         return self.pattern.format(self.value)
 | |
| 
 | |
| class PyYieldFrom(PyExpr):
 | |
|     precedence = 0
 | |
|     pattern = "yield from {}"
 | |
| 
 | |
|     def __init__(self, value):
 | |
|         self.value = value
 | |
| 
 | |
|     def __str__(self):
 | |
|         return self.pattern.format(self.value)
 | |
| 
 | |
| 
 | |
| class PyStarred(PyExpr):
 | |
|     """Used in unpacking assigments"""
 | |
|     precedence = 15
 | |
| 
 | |
|     def __init__(self, expr):
 | |
|         self.expr = expr
 | |
| 
 | |
|     def __str__(self):
 | |
|         es = self.expr.wrap(self.expr.precedence < self.precedence)
 | |
|         return "*{}".format(es)
 | |
| 
 | |
| 
 | |
| code_map = {
 | |
|     '<lambda>': PyLambda,
 | |
|     '<listcomp>': PyListComp,
 | |
|     '<setcomp>': PySetComp,
 | |
|     '<dictcomp>': PyDictComp,
 | |
|     '<genexpr>': PyGenExpr,
 | |
| }
 | |
| 
 | |
| unary_ops = [
 | |
|     ('UNARY_POSITIVE', 'Positive', '+{}', 13),
 | |
|     ('UNARY_NEGATIVE', 'Negative', '-{}', 13),
 | |
|     ('UNARY_NOT', 'Not', 'not {}', 5),
 | |
|     ('UNARY_INVERT', 'Invert', '~{}', 13),
 | |
| ]
 | |
| 
 | |
| binary_ops = [
 | |
|     ('POWER', 'Power', '{}**{}', 14, '{} **= {}'),
 | |
|     ('MULTIPLY', 'Multiply', '{}*{}', 12, '{} *= {}'),
 | |
|     ('FLOOR_DIVIDE', 'FloorDivide', '{}//{}', 12, '{} //= {}'),
 | |
|     ('TRUE_DIVIDE', 'TrueDivide', '{}/{}', 12, '{} /= {}'),
 | |
|     ('MODULO', 'Modulo', '{} % {}', 12, '{} %= {}'),
 | |
|     ('ADD', 'Add', '{} + {}', 11, '{} += {}'),
 | |
|     ('SUBTRACT', 'Subtract', '{} - {}', 11, '{} -= {}'),
 | |
|     ('SUBSCR', 'Subscript', '{}[{}]', 15, None),
 | |
|     ('LSHIFT', 'LeftShift', '{} << {}', 10, '{} <<= {}'),
 | |
|     ('RSHIFT', 'RightShift', '{} >> {}', 10, '{} >>= {}'),
 | |
|     ('AND', 'And', '{} & {}', 9, '{} &= {}'),
 | |
|     ('XOR', 'Xor', '{} ^ {}', 8, '{} ^= {}'),
 | |
|     ('OR', 'Or', '{} | {}', 7, '{} |= {}'),
 | |
|     ('MATRIX_MULTIPLY', 'MatrixMultiply', '{} @ {}', 12, '{} @= {}'),
 | |
| ]
 | |
| 
 | |
| 
 | |
| class PyStatement(object):
 | |
|     def __str__(self):
 | |
|         istr = IndentString()
 | |
|         self.display(istr)
 | |
|         return str(istr)
 | |
| 
 | |
|     def wrap(self, condition=True):
 | |
|         if condition:
 | |
|             assert not condition
 | |
|             return "({})".format(self)
 | |
|         else:
 | |
|             return str(self)
 | |
| 
 | |
|     def on_pop(self, dec):
 | |
|         # dec.write("#ERROR: Unexpected context 'on_pop': pop on statement:  ")
 | |
|         pass
 | |
| 
 | |
| 
 | |
| class DocString(PyStatement):
 | |
|     def __init__(self, string):
 | |
|         self.string = string
 | |
| 
 | |
|     def display(self, indent):
 | |
|         if '\n' not in self.string:
 | |
|             indent.write(repr(self.string))
 | |
|         else:
 | |
|             if "'''" not in self.string:
 | |
|                 fence = "'''"
 | |
|             else:
 | |
|                 fence = '"""'
 | |
|             lines = self.string.split('\n')
 | |
|             text = '\n'.join(l.encode('unicode_escape').decode().replace(fence,'\\'+fence)
 | |
|                              for l in lines)
 | |
|             docstring = "{0}{1}{0}".format(fence, text)
 | |
|             indent.write(docstring)
 | |
| 
 | |
| 
 | |
| class AssignStatement(PyStatement):
 | |
|     def __init__(self, chain):
 | |
|         self.chain = chain
 | |
| 
 | |
|     def display(self, indent):
 | |
|         indent.write(" = ".join(map(str, self.chain)))
 | |
| 
 | |
| 
 | |
| class InPlaceOp(PyStatement):
 | |
|     def __init__(self, left, right):
 | |
|         self.right = right
 | |
|         self.left = left
 | |
| 
 | |
|     def store(self, dec, dest):
 | |
|         # assert dest is self.left
 | |
|         dec.suite.add_statement(self)
 | |
| 
 | |
|     def display(self, indent):
 | |
|         indent.write(self.pattern, self.left, self.right)
 | |
| 
 | |
|     @classmethod
 | |
|     def instr(cls, stack):
 | |
|         right = stack.pop()
 | |
|         left = stack.pop()
 | |
|         stack.push(cls(left, right))
 | |
| 
 | |
| 
 | |
| class Unpack:
 | |
|     precedence = 50
 | |
| 
 | |
|     def __init__(self, val, length, star_index=None):
 | |
|         self.val = val
 | |
|         self.length = length
 | |
|         self.star_index = star_index
 | |
|         self.dests = []
 | |
| 
 | |
|     def store(self, dec, dest):
 | |
|         if len(self.dests) == self.star_index:
 | |
|             dest = PyStarred(dest)
 | |
|         self.dests.append(dest)
 | |
|         if len(self.dests) == self.length:
 | |
|             dec.stack.push(self.val)
 | |
|             dec.store(PyTuple(self.dests))
 | |
| 
 | |
| 
 | |
| class ImportStatement(PyStatement):
 | |
|     alias = ""
 | |
|     precedence = 100
 | |
| 
 | |
|     def __init__(self, name, level, fromlist):
 | |
|         self.name = name
 | |
|         self.alias = name
 | |
|         self.level = level
 | |
|         self.fromlist = fromlist
 | |
|         self.aslist = []
 | |
| 
 | |
|     def store(self, dec: SuiteDecompiler, dest):
 | |
|         self.alias = dest
 | |
|         dec.suite.add_statement(self)
 | |
| 
 | |
|     def on_pop(self, dec):
 | |
|         dec.suite.add_statement(self)
 | |
| 
 | |
|     def display(self, indent):
 | |
|         if self.fromlist == PyConst(None):
 | |
|             name = self.name.name
 | |
|             alias = self.alias.name
 | |
|             if name == alias or name.startswith(alias + "."):
 | |
|                 indent.write("import {}", name)
 | |
|             else:
 | |
|                 indent.write("import {} as {}", name, alias)
 | |
|         elif self.fromlist == PyConst(('*',)):
 | |
|             indent.write("from {} import *", self.name.name)
 | |
|         else:
 | |
|             names = []
 | |
|             for name, alias in zip(self.fromlist, self.aslist):
 | |
|                 if name == alias:
 | |
|                     names.append(name)
 | |
|                 else:
 | |
|                     names.append("{} as {}".format(name, alias))
 | |
|             indent.write("from {}{} import {}", ''.join(['.' for i in range(self.level.val)]), self.name,
 | |
|                          ", ".join(names))
 | |
| 
 | |
| 
 | |
| class ImportFrom:
 | |
|     def __init__(self, name):
 | |
|         self.name = name
 | |
| 
 | |
|     def store(self, dec, dest):
 | |
|         imp = dec.stack.peek()
 | |
|         assert isinstance(imp, ImportStatement)
 | |
| 
 | |
|         if imp.fromlist != PyConst(None):
 | |
| 
 | |
|             imp.aslist.append(dest.name)
 | |
|         else:
 | |
|             imp.alias = dest
 | |
| 
 | |
| 
 | |
| class SimpleStatement(PyStatement):
 | |
|     def __init__(self, val):
 | |
|         assert val is not None
 | |
|         self.val = val
 | |
| 
 | |
|     def display(self, indent):
 | |
|         indent.write(self.val)
 | |
| 
 | |
|     def gen_display(self, seq=()):
 | |
|         return " ".join((self.val,) + seq)
 | |
| 
 | |
| 
 | |
| class IfStatement(PyStatement):
 | |
|     def __init__(self, cond, true_suite, false_suite):
 | |
|         self.cond = cond
 | |
|         self.true_suite = true_suite
 | |
|         self.false_suite = false_suite
 | |
| 
 | |
|     def display(self, indent, is_elif=False):
 | |
|         ptn = "elif {}:" if is_elif else "if {}:"
 | |
|         indent.write(ptn, self.cond)
 | |
|         self.true_suite.display(indent + 1)
 | |
|         if not self.false_suite:
 | |
|             return
 | |
|         if len(self.false_suite) == 1:
 | |
|             stmt = self.false_suite[0]
 | |
|             if isinstance(stmt, IfStatement):
 | |
|                 stmt.display(indent, is_elif=True)
 | |
|                 return
 | |
|         indent.write("else:")
 | |
|         self.false_suite.display(indent + 1)
 | |
| 
 | |
|     def gen_display(self, seq=()):
 | |
|         assert not self.false_suite
 | |
|         s = "if {}".format(self.cond)
 | |
|         return self.true_suite.gen_display(seq + (s,))
 | |
| 
 | |
| 
 | |
| class ForStatement(PyStatement, AsyncMixin):
 | |
|     def __init__(self, iterable):
 | |
|         AsyncMixin.__init__(self)
 | |
|         self.iterable = iterable
 | |
|         self.else_body: Suite = None
 | |
| 
 | |
|     def store(self, dec, dest):
 | |
|         self.dest = dest
 | |
| 
 | |
|     def display(self, indent):
 | |
|         indent.write("{}for {} in {}:", self.async_prefix, self.dest, self.iterable)
 | |
|         self.body.display(indent + 1)
 | |
|         if self.else_body:
 | |
|             indent.write('else:')
 | |
|             self.else_body.display(indent + 1)
 | |
| 
 | |
|     def gen_display(self, seq=()):
 | |
|         s = "{}for {} in {}".format(self.async_prefix, self.dest, self.iterable.wrap() if isinstance(self.iterable, PyIfElse) else self.iterable)
 | |
|         return self.body.gen_display(seq + (s,))
 | |
| 
 | |
| 
 | |
| class WhileStatement(PyStatement):
 | |
|     def __init__(self, cond, body):
 | |
|         self.cond = cond
 | |
|         self.body = body
 | |
|         self.else_body: Suite = None
 | |
| 
 | |
|     def display(self, indent):
 | |
|         indent.write("while {}:", self.cond)
 | |
|         self.body.display(indent + 1)
 | |
|         if self.else_body:
 | |
|             indent.write('else:')
 | |
|             self.else_body.display(indent + 1)
 | |
| 
 | |
| 
 | |
| class DecorableStatement(PyStatement):
 | |
|     def __init__(self):
 | |
|         self.decorators = []
 | |
| 
 | |
|     def display(self, indent):
 | |
|         indent.sep()
 | |
|         for f in reversed(self.decorators):
 | |
|             indent.write("@{}", f)
 | |
|         self.display_undecorated(indent)
 | |
|         indent.sep()
 | |
| 
 | |
|     def decorate(self, f):
 | |
|         self.decorators.append(f)
 | |
| 
 | |
| 
 | |
| class DefStatement(FunctionDefinition, DecorableStatement, AsyncMixin):
 | |
|     def __init__(self, code: Code, defaults, kwdefaults, closure, paramobjs=None, annotations=None):
 | |
|         FunctionDefinition.__init__(self, code, defaults, kwdefaults, closure, paramobjs, annotations)
 | |
|         DecorableStatement.__init__(self)
 | |
|         AsyncMixin.__init__(self)
 | |
|         self.is_async = code.flags.coroutine or code.flags.async_generator
 | |
| 
 | |
|     def display_undecorated(self, indent):
 | |
|         paramlist = ", ".join(self.getparams())
 | |
|         result = self.getreturn()
 | |
|         if result:
 | |
|             indent.write("{}def {}({}) -> {}:", self.async_prefix, self.code.name, paramlist, result)
 | |
|         else:
 | |
|             indent.write("{}def {}({}):", self.async_prefix, self.code.name, paramlist)
 | |
|         # Assume that co_consts starts with None unless the function
 | |
|         # has a docstring, in which case it starts with the docstring
 | |
|         if self.code.consts[0] != PyConst(None):
 | |
|             docstring = self.code.consts[0].val
 | |
|             DocString(docstring).display(indent + 1)
 | |
|         self.code.get_suite().display(indent + 1)
 | |
| 
 | |
|     def store(self, dec, dest):
 | |
|         self.name = dest
 | |
|         dec.suite.add_statement(self)
 | |
| 
 | |
| 
 | |
| class TryStatement(PyStatement):
 | |
|     def __init__(self, try_suite):
 | |
|         self.try_suite: Suite = try_suite
 | |
|         self.except_clauses: List[Any, str, Suite] = []
 | |
|         self.else_suite: Suite = None
 | |
| 
 | |
|     def add_except_clause(self, exception_type, suite):
 | |
|         self.except_clauses.append([exception_type, None, suite])
 | |
| 
 | |
|     def store(self, dec, dest):
 | |
|         self.except_clauses[-1][1] = dest
 | |
| 
 | |
|     def display(self, indent):
 | |
|         indent.write("try:")
 | |
|         self.try_suite.display(indent + 1)
 | |
|         for type, name, suite in self.except_clauses:
 | |
|             if type is None:
 | |
|                 indent.write("except:")
 | |
|             elif name is None:
 | |
|                 indent.write("except {}:", type)
 | |
|             else:
 | |
|                 indent.write("except {} as {}:", type, name)
 | |
|             suite.display(indent + 1)
 | |
|         if self.else_suite:
 | |
|             indent.write('else:')
 | |
|             self.else_suite.display(indent + 1)
 | |
| 
 | |
| 
 | |
| class FinallyStatement(PyStatement):
 | |
|     def __init__(self, try_suite, finally_suite):
 | |
|         self.try_suite = try_suite
 | |
|         self.finally_suite = finally_suite
 | |
| 
 | |
|     def display(self, indent):
 | |
|         # Wrap the try suite in a TryStatement if necessary
 | |
|         try_stmt = None
 | |
|         if len(self.try_suite) == 1:
 | |
|             try_stmt = self.try_suite[0]
 | |
|             if not isinstance(try_stmt, TryStatement):
 | |
|                 try_stmt = None
 | |
|         if try_stmt is None:
 | |
|             try_stmt = TryStatement(self.try_suite)
 | |
|         try_stmt.display(indent)
 | |
|         indent.write("finally:")
 | |
|         self.finally_suite.display(indent + 1)
 | |
| 
 | |
| 
 | |
| class WithStatement(PyStatement):
 | |
|     def __init__(self, with_expr):
 | |
|         self.with_expr = with_expr
 | |
|         self.with_name = None
 | |
|         self.is_async = False
 | |
| 
 | |
|     @property
 | |
|     def async_prefix(self):
 | |
|         return 'async ' if self.is_async else ''
 | |
| 
 | |
|     def store(self, dec, dest):
 | |
|         self.with_name = dest
 | |
| 
 | |
|     def display(self, indent, args=None):
 | |
|         # args to take care of nested withs:
 | |
|         # with x as t:
 | |
|         # with y as u:
 | |
|         #         <suite>
 | |
|         # --->
 | |
|         # with x as t, y as u:
 | |
|         #     <suite>
 | |
|         if args is None:
 | |
|             args = []
 | |
|         if self.with_name is None:
 | |
|             args.append(str(self.with_expr))
 | |
|         else:
 | |
|             args.append("{} as {}".format(self.with_expr, self.with_name))
 | |
|         if len(self.suite) == 1 and isinstance(self.suite[0], WithStatement):
 | |
|             self.suite[0].display(indent, args)
 | |
|         else:
 | |
|             indent.write(self.async_prefix + "with {}:", ", ".join(args))
 | |
|             self.suite.display(indent + 1)
 | |
| 
 | |
| 
 | |
| class ClassStatement(DecorableStatement):
 | |
|     def __init__(self, func, name, parents, kwargs):
 | |
|         DecorableStatement.__init__(self)
 | |
|         self.func = func
 | |
|         self.parents = parents
 | |
|         self.kwargs = kwargs
 | |
| 
 | |
|     def store(self, dec, dest):
 | |
|         self.name = dest
 | |
|         dec.suite.add_statement(self)
 | |
| 
 | |
|     def display_undecorated(self, indent):
 | |
|         if self.parents or self.kwargs:
 | |
|             args = [str(x) for x in self.parents]
 | |
|             kwargs = ["{}={}".format(str(k).replace('\'', ''), v) for k, v in self.kwargs]
 | |
|             all_args = ", ".join(args + kwargs)
 | |
|             indent.write("class {}({}):", self.name, all_args)
 | |
|         else:
 | |
|             indent.write("class {}:", self.name)
 | |
|         suite = self.func.code.get_suite(look_for_docstring=True)
 | |
|         if suite:
 | |
|             # TODO: find out why sometimes the class suite ends with
 | |
|             # "return __class__"
 | |
|             last_stmt = suite[-1]
 | |
|             if isinstance(last_stmt, SimpleStatement):
 | |
|                 if last_stmt.val.startswith("return "):
 | |
|                     suite.statements.pop()
 | |
|             clean_vars = ['__module__', '__qualname__']
 | |
|             for clean_var in clean_vars:
 | |
|                 for i in range(len(suite.statements)):
 | |
|                     stmt = suite.statements[i]
 | |
|                     if isinstance(stmt, AssignStatement) and str(stmt).startswith(clean_var):
 | |
|                         suite.statements.pop(i)
 | |
|                         break
 | |
| 
 | |
|         suite.display(indent + 1)
 | |
| 
 | |
| 
 | |
| class Suite:
 | |
|     def __init__(self):
 | |
|         self.statements = []
 | |
| 
 | |
|     def __bool__(self) -> bool:
 | |
|         return bool(self.statements)
 | |
| 
 | |
|     def __len__(self) -> int:
 | |
|         return len(self.statements)
 | |
| 
 | |
|     def __getitem__(self, i) -> PyStatement:
 | |
|         return self.statements[i]
 | |
| 
 | |
|     def __setitem__(self, i, val: PyStatement):
 | |
|         self.statements[i] = val
 | |
| 
 | |
|     def __str__(self):
 | |
|         istr = IndentString()
 | |
|         self.display(istr)
 | |
|         return str(istr)
 | |
| 
 | |
|     def display(self, indent):
 | |
|         if self.statements:
 | |
|             for stmt in self.statements:
 | |
|                 stmt.display(indent)
 | |
|         else:
 | |
|             indent.write("pass")
 | |
| 
 | |
|     def gen_display(self, seq=()):
 | |
|         if len(self) != 1:
 | |
|             raise Exception('There should only be one statement in a generator.')
 | |
|         return self[0].gen_display(seq)
 | |
| 
 | |
|     def add_statement(self, stmt):
 | |
|         self.statements.append(stmt)
 | |
| 
 | |
| 
 | |
| class SuiteDecompiler:
 | |
|     # An instruction handler can return this to indicate to the run()
 | |
|     # function that it should return immediately
 | |
|     END_NOW = object()
 | |
| 
 | |
|     # This is put on the stack by LOAD_BUILD_CLASS
 | |
|     BUILD_CLASS = object()
 | |
| 
 | |
|     def __init__(self, start_addr: Address, end_addr: Address=None, stack=None):
 | |
|         self.start_addr = start_addr
 | |
|         self.end_addr = end_addr
 | |
|         self.code: Code = start_addr.code
 | |
|         self.stack = Stack() if stack is None else stack
 | |
|         self.suite: Suite = Suite()
 | |
|         self.assignment_chain = []
 | |
|         self.popjump_stack = []
 | |
|         self.last_addr: Address = None
 | |
| 
 | |
|     def push_popjump(self, jtruthiness, jaddr, jcond, original_jaddr: Address):
 | |
|         stack = self.popjump_stack
 | |
|         if jaddr and jaddr[-1].is_else_jump:
 | |
|             if jtruthiness or jaddr[-1].jump() <= original_jaddr.jump():
 | |
|                 # Increase jaddr to the 'else' address if it jumps to the 'then'
 | |
|                 jaddr = jaddr[-1].jump()
 | |
|         while stack:
 | |
|             truthiness, addr, cond, original_addr = stack[-1]
 | |
|             # if jaddr == None:
 | |
|             #     raise Exception("#ERROR: jaddr is None")
 | |
|             # jaddr == None
 | |
|             if jaddr:
 | |
|                 if jaddr < addr:
 | |
|                     break
 | |
|                 if jaddr == addr and (truthiness or jtruthiness):
 | |
|                     break
 | |
|                 # if jaddr == addr and not (truthiness or jtruthiness):
 | |
|                 #     break
 | |
|             stack.pop()
 | |
|             obj_maker = PyBooleanOr if truthiness else PyBooleanAnd
 | |
|             if truthiness and jtruthiness:
 | |
|                 if original_jaddr.arg == original_addr.arg:
 | |
|                     if original_jaddr[2]  and original_jaddr[2].opcode == RAISE_VARARGS:
 | |
|                         obj_maker = PyBooleanOr
 | |
|                         cond = cond
 | |
|                         jcond = jcond
 | |
|                     else:
 | |
|                         obj_maker = PyBooleanAnd
 | |
|                         cond = PyNot(cond)
 | |
|                         jcond = PyNot(jcond)
 | |
|                 elif original_jaddr.arg > original_addr.arg:
 | |
|                     obj_maker = PyBooleanOr
 | |
|                     jcond = PyNot(jcond)
 | |
|             if not truthiness and not jtruthiness:
 | |
|                 if original_jaddr.arg < original_addr.arg:
 | |
|                     obj_maker = PyBooleanOr
 | |
|                     cond = PyNot(cond)
 | |
|                 elif original_jaddr.arg > original_addr.arg:
 | |
|                     obj_maker = PyBooleanOr
 | |
|                     cond = PyNot(cond)
 | |
|             if truthiness and not jtruthiness:
 | |
|                 if original_jaddr.arg == original_addr.arg:
 | |
|                     obj_maker = PyBooleanAnd
 | |
|                     if original_jaddr.opcode != original_addr.opcode:
 | |
|                         cond = PyNot(cond)
 | |
|             if not truthiness and  jtruthiness:
 | |
|                 if original_jaddr.arg == original_addr.arg:
 | |
|                     jcond = PyNot(jcond)
 | |
|                     # cond = PyNot(cond)
 | |
|             last_true = original_addr.seek_back(POP_JUMP_IF_TRUE)
 | |
|             if isinstance(cond, PyBooleanOr)and obj_maker == PyBooleanAnd and (not last_true or last_true.jump() > original_jaddr):
 | |
|                 jcond = PyBooleanOr(cond.left, obj_maker(cond.right, jcond))
 | |
|             elif isinstance(jcond, obj_maker):
 | |
|                 # Use associativity of 'and' and 'or' to minimise the
 | |
|                 # number of parentheses
 | |
|                 jcond = obj_maker(obj_maker(cond, jcond.left), jcond.right)
 | |
|             else:
 | |
|                 jcond = obj_maker(cond, jcond)
 | |
|         stack.append((jtruthiness, jaddr, jcond, original_jaddr))
 | |
| 
 | |
|     def pop_popjump(self):
 | |
|         if not self.popjump_stack:
 | |
|             raise Exception('Attempted to pop an empty popjump stack.')
 | |
|         truthiness, addr, cond, original_addr = self.popjump_stack.pop()
 | |
|         return cond
 | |
| 
 | |
|     def run(self):
 | |
|         addr, end_addr = self.start_addr, self.end_addr
 | |
|         while addr and addr < end_addr:
 | |
|             opcode, arg = addr
 | |
|             args = (addr,) if opcode < HAVE_ARGUMENT else (addr, arg)
 | |
|             method = getattr(self, opname[opcode])
 | |
|             self.last_addr = addr
 | |
|             new_addr = method(*args)
 | |
|             if new_addr is self.END_NOW:
 | |
|                 break
 | |
|             elif new_addr is None:
 | |
|                 new_addr = addr[1]
 | |
|             addr = new_addr
 | |
|         return addr
 | |
| 
 | |
|     def write(self, template, *args):
 | |
|         def fmt(x):
 | |
|             if isinstance(x, int):
 | |
|                 return self.stack.getval(x)
 | |
|             else:
 | |
|                 return x
 | |
| 
 | |
|         if args:
 | |
|             line = template.format(*map(fmt, args))
 | |
|         else:
 | |
|             line = template
 | |
|         self.suite.add_statement(SimpleStatement(line))
 | |
| 
 | |
|     def store(self, dest):
 | |
|         val = self.stack.pop()
 | |
|         val.store(self, dest)
 | |
| 
 | |
|     def is_for_loop(self, addr, end_addr):
 | |
|         i = 0
 | |
|         while 1:
 | |
|             cur_addr = addr[i]
 | |
|             if cur_addr == end_addr:
 | |
|                 break
 | |
|             elif cur_addr.opcode in else_jump_opcodes:
 | |
|                 cur_addr = cur_addr.jump()
 | |
|                 if cur_addr and cur_addr.opcode in for_jump_opcodes:
 | |
|                     return True
 | |
|                 break
 | |
|             elif cur_addr.opcode in for_jump_opcodes:
 | |
|                 return True
 | |
|             i = i + 1
 | |
|         return False
 | |
| 
 | |
|     def scan_to_first_jump_if(self, addr: Address, end_addr: Address) -> Union[Address, None]:
 | |
|         i = 0
 | |
|         while 1:
 | |
|             cur_addr = addr[i]
 | |
|             if cur_addr == end_addr:
 | |
|                 break
 | |
|             elif cur_addr.opcode in pop_jump_if_opcodes:
 | |
|                 return cur_addr
 | |
|             elif cur_addr.opcode in else_jump_opcodes:
 | |
|                 break
 | |
|             elif cur_addr.opcode in for_jump_opcodes:
 | |
|                 break
 | |
|             i = i + 1
 | |
|         return None
 | |
| 
 | |
|     def scan_for_final_jump(self, start_addr, end_addr):
 | |
|         i = 0
 | |
|         end = None
 | |
|         while 1:
 | |
|             cur_addr = end_addr[i]
 | |
|             if cur_addr == start_addr:
 | |
|                 break
 | |
|             elif cur_addr.opcode == JUMP_ABSOLUTE:
 | |
|                 end = cur_addr
 | |
|                 return end
 | |
|             elif cur_addr.opcode in else_jump_opcodes:
 | |
|                 break
 | |
|             elif cur_addr.opcode in pop_jump_if_opcodes:
 | |
|                 break
 | |
|             i = i - 1
 | |
|         return end
 | |
| 
 | |
|     #
 | |
|     # All opcode methods in CAPS below.
 | |
|     #
 | |
| 
 | |
|     def SETUP_LOOP(self, addr: Address, delta):
 | |
|         jump_addr = addr.jump()
 | |
|         end_addr = jump_addr[-1]
 | |
|         if self.is_for_loop(addr[1], end_addr):
 | |
|             return
 | |
| 
 | |
|         end_cond = addr.seek_forward(pop_jump_if_opcodes)
 | |
|         while end_cond and (end_cond.jump() != end_addr and end_cond.jump().opcode != POP_BLOCK):
 | |
|             end_cond = end_cond.seek_forward(pop_jump_if_opcodes)
 | |
|         if end_cond:
 | |
|             end_cond_j = end_cond.jump()
 | |
|             d_body = SuiteDecompiler(addr[1], end_cond.jump())
 | |
|             d_body.run()
 | |
|             result = d_body.suite.statements.pop()
 | |
|             if isinstance(result, IfStatement):
 | |
|                 while_stmt = WhileStatement(result.cond, result.true_suite)
 | |
|                 if(end_cond_j.opcode == POP_BLOCK):
 | |
|                     d_else = SuiteDecompiler(end_cond_j[1],jump_addr)
 | |
|                     d_else.run()
 | |
|                     while_stmt.else_body = d_else.suite
 | |
|                 self.suite.add_statement(while_stmt)
 | |
|             elif isinstance(result, WhileStatement):
 | |
|                 self.suite.add_statement(result)
 | |
|             return jump_addr
 | |
| 
 | |
|         else:
 | |
|             d_body = SuiteDecompiler(addr[1], end_addr)
 | |
|             while_stmt = WhileStatement(PyConst(True), d_body.suite)
 | |
|             d_body.stack.push(while_stmt)
 | |
|             d_body.run()
 | |
|             while_stmt.body = d_body.suite
 | |
|             self.suite.add_statement(while_stmt)
 | |
|             return jump_addr
 | |
|         return None
 | |
| 
 | |
|     def BREAK_LOOP(self, addr):
 | |
|         self.write("break")
 | |
| 
 | |
|     def CONTINUE_LOOP(self, addr, *argv):
 | |
|         self.write("continue")
 | |
| 
 | |
|     def SETUP_FINALLY(self, addr, delta):
 | |
|         start_finally: Address = addr.jump()
 | |
|         d_try = SuiteDecompiler(addr[1], start_finally)
 | |
|         d_try.run()
 | |
|         d_finally = SuiteDecompiler(start_finally)
 | |
|         end_finally = d_finally.run()
 | |
|         self.suite.add_statement(FinallyStatement(d_try.suite, d_finally.suite))
 | |
|         if end_finally:
 | |
|             return end_finally[1]
 | |
|         else:
 | |
|             return self.END_NOW
 | |
| 
 | |
|     def END_FINALLY(self, addr):
 | |
|         return self.END_NOW
 | |
| 
 | |
|     def SETUP_EXCEPT(self, addr, delta):
 | |
|         end_addr = addr
 | |
|         start_except = addr.jump()
 | |
|         start_try = addr[1]
 | |
|         end_try = start_except
 | |
|         if sys.version_info < (3, 7):
 | |
|             if end_try.opcode == JUMP_FORWARD:
 | |
|                 end_try = end_try[1] + end_try.arg
 | |
|             elif end_try.opcode == JUMP_ABSOLUTE:
 | |
|                 end_try = end_try[-1]
 | |
|             else:
 | |
|                 end_try = end_try[1]
 | |
|         d_try = SuiteDecompiler(start_try, end_try)
 | |
|         d_try.run()
 | |
| 
 | |
|         stmt = TryStatement(d_try.suite)
 | |
|         j_except: Address = None
 | |
|         while start_except.opcode != END_FINALLY:
 | |
|             if start_except.opcode == DUP_TOP:
 | |
|                 # There's a new except clause
 | |
|                 d_except = SuiteDecompiler(start_except[1])
 | |
|                 d_except.stack.push(stmt)
 | |
|                 d_except.run()
 | |
|                 start_except = stmt.next_start_except
 | |
|                 j_except = start_except[-1]
 | |
|                 end_addr = start_except[1]
 | |
|             elif start_except.opcode == POP_TOP:
 | |
|                 # It's a bare except clause - it starts:
 | |
|                 # POP_TOP
 | |
|                 # POP_TOP
 | |
|                 # POP_TOP
 | |
|                 # <except stuff>
 | |
|                 # POP_EXCEPT
 | |
|                 start_except = start_except[3]
 | |
|                 end_except = start_except
 | |
| 
 | |
|                 nested_try: int = 0
 | |
|                 while end_except and end_except[-1].opcode != RETURN_VALUE:
 | |
|                     if end_except.opcode == SETUP_EXCEPT:
 | |
|                         nested_try += 1
 | |
|                     if end_except.opcode == POP_EXCEPT:
 | |
|                         if nested_try == 0:
 | |
|                             break
 | |
|                         nested_try -= 1
 | |
|                     end_except = end_except[1]
 | |
|                 # Handle edge case where there is a return in the except
 | |
|                 if end_except[-1].opcode == RETURN_VALUE:
 | |
|                     d_except = SuiteDecompiler(start_except, end_except)
 | |
|                     end_except = d_except.run()
 | |
|                     stmt.add_except_clause(None, d_except.suite)
 | |
|                     self.suite.add_statement(stmt)
 | |
|                     return end_except
 | |
| 
 | |
|                 d_except = SuiteDecompiler(start_except, end_except)
 | |
|                 end_except = d_except.run()
 | |
|                 stmt.add_except_clause(None, d_except.suite)
 | |
|                 start_except = end_except[2]
 | |
|                 assert start_except.opcode == END_FINALLY
 | |
| 
 | |
|                 end_addr = start_except[1]
 | |
|                 j_except: Address = end_except[1]
 | |
|         self.suite.add_statement(stmt)
 | |
|         last_loop = addr.seek_back(SETUP_LOOP)
 | |
|         if last_loop and last_loop.jump() < addr:
 | |
|             last_loop = None
 | |
|         has_normal_else_clause = j_except and j_except.opcode == JUMP_FORWARD and j_except[2] != j_except.jump()
 | |
|         has_end_of_loop_else_clause = j_except.opcode == JUMP_ABSOLUTE and last_loop
 | |
|         has_return_else_clause = j_except.opcode == RETURN_VALUE
 | |
|         if has_normal_else_clause or has_end_of_loop_else_clause or has_return_else_clause:
 | |
|             assert j_except[1].opcode == END_FINALLY
 | |
|             start_else = j_except[2]
 | |
|             if has_return_else_clause and start_else.opcode == JUMP_ABSOLUTE and start_else[1].opcode == POP_BLOCK:
 | |
|                 start_else = start_else[-1]
 | |
|             end_else: Address = None
 | |
|             if has_normal_else_clause:
 | |
|                 end_else = j_except.jump()
 | |
|             elif has_end_of_loop_else_clause:
 | |
|                 end_else = last_loop.jump().seek_back(JUMP_ABSOLUTE)
 | |
|             elif has_return_else_clause:
 | |
|                 end_else = j_except[1].seek_forward(RETURN_VALUE)[1]
 | |
|             if has_return_else_clause and not end_else:
 | |
|                 return end_addr
 | |
|             d_else = SuiteDecompiler(start_else, end_else)
 | |
|             end_addr = d_else.run()
 | |
|             if not end_addr:
 | |
|                 end_addr = self.END_NOW
 | |
|             stmt.else_suite = d_else.suite
 | |
|         return end_addr
 | |
| 
 | |
|     def SETUP_WITH(self, addr, delta):
 | |
|         end_with = addr.jump()
 | |
|         with_stmt = WithStatement(self.stack.pop())
 | |
|         d_with = SuiteDecompiler(addr[1], end_with)
 | |
|         d_with.stack.push(with_stmt)
 | |
|         d_with.run()
 | |
|         with_stmt.suite = d_with.suite
 | |
|         self.suite.add_statement(with_stmt)
 | |
|         if sys.version_info <= (3, 4):
 | |
|             assert end_with.opcode == WITH_CLEANUP
 | |
|             assert end_with[1].opcode == END_FINALLY
 | |
|             return end_with[2]
 | |
|         else:
 | |
|             assert end_with.opcode == WITH_CLEANUP_START
 | |
|             assert end_with[1].opcode == WITH_CLEANUP_FINISH
 | |
|             return end_with[3]
 | |
| 
 | |
|     def POP_BLOCK(self, addr):
 | |
|         pass
 | |
| 
 | |
|     def POP_EXCEPT(self, addr):
 | |
|         return self.END_NOW
 | |
| 
 | |
|     def NOP(self, addr):
 | |
|         return
 | |
| 
 | |
|     def SETUP_ANNOTATIONS(self, addr):
 | |
|         return
 | |
| 
 | |
|     def COMPARE_OP(self, addr, compare_opname):
 | |
|         left, right = self.stack.pop(2)
 | |
|         if compare_opname != 10:  # 10 is exception match
 | |
|             self.stack.push(PyCompare([left, cmp_op[compare_opname], right]))
 | |
|         else:
 | |
|             # It's an exception match
 | |
|             # left is a TryStatement
 | |
|             # right is the exception type to be matched
 | |
|             # It goes:
 | |
|             # COMPARE_OP 10
 | |
|             # POP_JUMP_IF_FALSE <next except>
 | |
|             # POP_TOP
 | |
|             # POP_TOP or STORE_FAST (if the match is named)
 | |
|             # POP_TOP
 | |
|             # SETUP_FINALLY if the match was named
 | |
|             assert addr[1].opcode == POP_JUMP_IF_FALSE
 | |
|             left.next_start_except = addr[1].jump()
 | |
|             assert addr[2].opcode == POP_TOP
 | |
|             assert addr[4].opcode == POP_TOP
 | |
|             if addr[5].opcode == SETUP_FINALLY:
 | |
|                 except_start = addr[6]
 | |
|                 except_end = addr[5].jump()
 | |
|             else:
 | |
|                 except_start = addr[5]
 | |
|                 except_end = left.next_start_except
 | |
|             d_body = SuiteDecompiler(except_start, except_end)
 | |
|             d_body.run()
 | |
|             left.add_except_clause(right, d_body.suite)
 | |
|             if addr[3].opcode != POP_TOP:
 | |
|                 # The exception is named
 | |
|                 d_exc_name = SuiteDecompiler(addr[3], addr[4])
 | |
|                 d_exc_name.stack.push(left)
 | |
|                 # This will store the name in left:
 | |
|                 d_exc_name.run()
 | |
|             # We're done with this except clause
 | |
|             return self.END_NOW
 | |
| 
 | |
|     def PRINT_EXPR(self, addr):
 | |
|         expr = self.stack.pop()
 | |
|         self.write("{}", expr)
 | |
|         
 | |
|     #
 | |
|     # Stack manipulation
 | |
|     #
 | |
| 
 | |
|     def POP_TOP(self, addr):
 | |
|         self.stack.pop().on_pop(self)
 | |
|     
 | |
|     def ROT_TWO(self, addr: Address):
 | |
|         # special case: x, y = z, t
 | |
| 
 | |
|         if addr[-1].opcode in (LOAD_ATTR, LOAD_GLOBAL, LOAD_NAME, BINARY_SUBSCR, BUILD_LIST):
 | |
|             next_stmt = addr.seek_forward((*(stmt_opcodes- unpack_stmt_opcodes), *pop_jump_if_opcodes, *else_jump_opcodes))
 | |
|             first = addr.seek_forward(unpack_stmt_opcodes, next_stmt)
 | |
|             second = first and first.seek_forward(unpack_stmt_opcodes, next_stmt)
 | |
|             if first and second and len({*[first.opcode, second.opcode]}) == 1:
 | |
|                 val = PyTuple(self.stack.pop(2))
 | |
|                 unpack = Unpack(val, 2)
 | |
|                 self.stack.push(unpack)
 | |
|                 self.stack.push(unpack)
 | |
|                 return
 | |
| 
 | |
|         tos1, tos = self.stack.pop(2)
 | |
|         self.stack.push(tos, tos1)
 | |
| 
 | |
|     def ROT_THREE(self, addr: Address):
 | |
|         # special case: x, y, z = a, b, c
 | |
|         next_stmt = addr.seek_forward(unpack_terminators)
 | |
|         rot_two = addr[1]
 | |
|         first = rot_two and rot_two.seek_forward(unpack_stmt_opcodes, next_stmt)
 | |
|         second = first and first.seek_forward(unpack_stmt_opcodes, next_stmt)
 | |
|         third = second and second.seek_forward(unpack_stmt_opcodes, next_stmt)
 | |
|         if first and second and third and len({*[first.opcode, second.opcode,third.opcode]}) == 1:
 | |
|             val = PyTuple(self.stack.pop(3))
 | |
|             unpack = Unpack(val, 3)
 | |
|             self.stack.push(unpack)
 | |
|             self.stack.push(unpack)
 | |
|             self.stack.push(unpack)
 | |
|             return addr[2]
 | |
|         else:
 | |
|             tos2, tos1, tos = self.stack.pop(3)
 | |
|             self.stack.push(tos, tos2, tos1)
 | |
| 
 | |
|     def DUP_TOP(self, addr):
 | |
|         self.stack.push(self.stack.peek())
 | |
| 
 | |
|     def DUP_TOP_TWO(self, addr):
 | |
|         self.stack.push(*self.stack.peek(2))
 | |
| 
 | |
|     #
 | |
|     # LOAD / STORE / DELETE
 | |
|     #
 | |
| 
 | |
|     # FAST
 | |
| 
 | |
|     def LOAD_FAST(self, addr, var_num):
 | |
|         name = self.code.varnames[var_num]
 | |
|         self.stack.push(name)
 | |
| 
 | |
|     def STORE_FAST(self, addr, var_num):
 | |
|         name = self.code.varnames[var_num]
 | |
|         self.store(name)
 | |
| 
 | |
|     def DELETE_FAST(self, addr, var_num):
 | |
|         name = self.code.varnames[var_num]
 | |
|         self.write("del {}", name)
 | |
| 
 | |
|     # DEREF
 | |
| 
 | |
|     def LOAD_DEREF(self, addr, i):
 | |
|         name = self.code.derefnames[i]
 | |
|         self.stack.push(name)
 | |
| 
 | |
|     def LOAD_CLASSDEREF(self, addr, i):
 | |
|         name = self.code.derefnames[i]
 | |
|         self.stack.push(name)
 | |
| 
 | |
|     def STORE_DEREF(self, addr, i):
 | |
|         name = self.code.derefnames[i]
 | |
|         if not self.code.iscellvar(i):
 | |
|             self.code.declare_nonlocal(name)
 | |
|         self.store(name)
 | |
| 
 | |
|     def DELETE_DEREF(self, addr, i):
 | |
|         name = self.code.derefnames[i]
 | |
|         if not self.code.iscellvar(i):
 | |
|             self.code.declare_nonlocal(name)
 | |
|         self.write("del {}", name)
 | |
| 
 | |
|     # GLOBAL
 | |
| 
 | |
|     def LOAD_GLOBAL(self, addr, namei):
 | |
|         name = self.code.names[namei]
 | |
|         self.code.ensure_global(name)
 | |
|         self.stack.push(name)
 | |
| 
 | |
|     def STORE_GLOBAL(self, addr, namei):
 | |
|         name = self.code.names[namei]
 | |
|         self.code.declare_global(name)
 | |
|         self.store(name)
 | |
| 
 | |
|     def DELETE_GLOBAL(self, addr, namei):
 | |
|         name = self.code.names[namei]
 | |
|         self.declare_global(name)
 | |
|         self.write("del {}", name)
 | |
| 
 | |
|     # NAME
 | |
| 
 | |
|     def LOAD_NAME(self, addr, namei):
 | |
|         name = self.code.names[namei]
 | |
|         self.stack.push(name)
 | |
| 
 | |
|     def STORE_NAME(self, addr, namei):
 | |
|         name = self.code.names[namei]
 | |
|         self.store(name)
 | |
| 
 | |
|     def DELETE_NAME(self, addr, namei):
 | |
|         name = self.code.names[namei]
 | |
|         self.write("del {}", name)
 | |
| 
 | |
|     # METHOD
 | |
|     def LOAD_METHOD(self, addr, namei):
 | |
|         expr = self.stack.pop()
 | |
|         attrname = self.code.names[namei]
 | |
|         self.stack.push(PyAttribute(expr, attrname))
 | |
| 
 | |
|     def CALL_METHOD(self, addr, argc, have_var=False, have_kw=False):
 | |
|         kw_argc = argc >> 8
 | |
|         pos_argc = argc
 | |
|         varkw = self.stack.pop() if have_kw else None
 | |
|         varargs = self.stack.pop() if have_var else None
 | |
|         kwargs_iter = iter(self.stack.pop(2 * kw_argc))
 | |
|         kwargs = list(zip(kwargs_iter, kwargs_iter))
 | |
|         posargs = self.stack.pop(pos_argc)
 | |
|         func = self.stack.pop()
 | |
|         if func is self.BUILD_CLASS:
 | |
|             # It's a class construction
 | |
|             # TODO: check the assert statement below is correct
 | |
|             assert not (have_var or have_kw)
 | |
|             func, name, *parents = posargs
 | |
|             self.stack.push(ClassStatement(func, name, parents, kwargs))
 | |
|         elif isinstance(func, PyComp):
 | |
|             # It's a list/set/dict comprehension or generator expression
 | |
|             assert not (have_var or have_kw)
 | |
|             assert len(posargs) == 1 and not kwargs
 | |
|             func.set_iterable(posargs[0])
 | |
|             self.stack.push(func)
 | |
|         elif posargs and isinstance(posargs[0], DecorableStatement):
 | |
|             # It's a decorator for a def/class statement
 | |
|             assert len(posargs) == 1 and not kwargs
 | |
|             defn = posargs[0]
 | |
|             defn.decorate(func)
 | |
|             self.stack.push(defn)
 | |
|         else:
 | |
|             # It's none of the above, so it must be a normal function call
 | |
|             func_call = PyCallFunction(func, posargs, kwargs, varargs, varkw)
 | |
|             self.stack.push(func_call)
 | |
| 
 | |
|     # ATTR
 | |
| 
 | |
|     def LOAD_ATTR(self, addr, namei):
 | |
|         expr = self.stack.pop()
 | |
|         attrname = self.code.names[namei]
 | |
|         self.stack.push(PyAttribute(expr, attrname))
 | |
| 
 | |
|     def STORE_ATTR(self, addr, namei):
 | |
|         expr = self.stack.pop()
 | |
|         attrname = self.code.names[namei]
 | |
|         self.store(PyAttribute(expr, attrname))
 | |
| 
 | |
|     def DELETE_ATTR(self, addr, namei):
 | |
|         expr = self.stack.pop()
 | |
|         attrname = self.code.names[namei]
 | |
|         self.write("del {}.{}", expr, attrname)
 | |
| 
 | |
|     # SUBSCR
 | |
| 
 | |
|     def STORE_SUBSCR(self, addr):
 | |
|         expr, sub = self.stack.pop(2)
 | |
|         self.store(PySubscript(expr, sub))
 | |
| 
 | |
|     def DELETE_SUBSCR(self, addr):
 | |
|         expr, sub = self.stack.pop(2)
 | |
|         self.write("del {}[{}]", expr, sub)
 | |
| 
 | |
|     # CONST
 | |
|     CONST_LITERALS = {
 | |
|         Ellipsis: PyName('...')
 | |
|     }
 | |
|     def LOAD_CONST(self, addr, consti):
 | |
|         const = self.code.consts[consti]
 | |
|         if const.val in self.CONST_LITERALS:
 | |
|             const = self.CONST_LITERALS[const.val]
 | |
|         self.stack.push(const)
 | |
| 
 | |
|     #
 | |
|     # Import statements
 | |
|     #
 | |
| 
 | |
|     def IMPORT_NAME(self, addr, namei):
 | |
|         name = self.code.names[namei]
 | |
|         level, fromlist = self.stack.pop(2)
 | |
|         self.stack.push(ImportStatement(name, level, fromlist))
 | |
|         # special case check for import x.y.z as w syntax which uses
 | |
|         # attributes and assignments and is difficult to workaround
 | |
|         i = 1
 | |
|         while addr[i].opcode == LOAD_ATTR: i = i + 1
 | |
|         if i > 1 and addr[i].opcode in (STORE_FAST, STORE_NAME):
 | |
|             return addr[i]
 | |
|         return None
 | |
| 
 | |
|     def IMPORT_FROM(self, addr: Address, namei):
 | |
|         name = self.code.names[namei]
 | |
|         self.stack.push(ImportFrom(name))
 | |
|         if addr[1].opcode == ROT_TWO:
 | |
|            return addr.seek_forward(STORE_NAME)
 | |
| 
 | |
| 
 | |
|     def IMPORT_STAR(self, addr):
 | |
|         self.POP_TOP(addr)
 | |
| 
 | |
|     #
 | |
|     # Function call
 | |
|     #
 | |
| 
 | |
|     def STORE_LOCALS(self, addr):
 | |
|         self.stack.pop()
 | |
|         return addr[3]
 | |
| 
 | |
|     def LOAD_BUILD_CLASS(self, addr):
 | |
|         self.stack.push(self.BUILD_CLASS)
 | |
| 
 | |
|     def RETURN_VALUE(self, addr):
 | |
|         value = self.stack.pop()
 | |
|         if self.code.flags.generator and isinstance(value, PyConst) and not value.val and not addr[-2]:
 | |
|             cond = PyConst(False)
 | |
|             body = SimpleStatement('yield None')
 | |
|             loop = WhileStatement(cond, body)
 | |
|             self.suite.add_statement(loop)
 | |
|             return
 | |
|         if isinstance(value, PyConst) and value.val is None:
 | |
|             if addr[1] is not None:
 | |
|                 if self.code.flags.generator and addr[3] and not self.code[0].seek_forward({YIELD_FROM, YIELD_VALUE}):
 | |
|                     self.write('yield')
 | |
|                 else:
 | |
|                     self.write("return")
 | |
|             return
 | |
|         if self.code.flags.iterable_coroutine:
 | |
|             self.write("yield {}", value)
 | |
|         else:
 | |
|             self.write("return {}", value)
 | |
|             if self.code.flags.generator:
 | |
|                 self.write('yield')
 | |
| 
 | |
|     def GET_YIELD_FROM_ITER(self, addr):
 | |
|         pass
 | |
| 
 | |
|     def YIELD_VALUE(self, addr):
 | |
|         if self.code.name == '<genexpr>':
 | |
|             return
 | |
|         value = self.stack.pop()
 | |
|         self.stack.push(PyYield(value))
 | |
| 
 | |
|     def YIELD_FROM(self, addr):
 | |
|         value = self.stack.pop()  # TODO:  from statement ?
 | |
|         value = self.stack.pop()
 | |
|         self.stack.push(PyYieldFrom(value))
 | |
| 
 | |
|     def CALL_FUNCTION_CORE(self, func, posargs, kwargs, varargs, varkw):
 | |
|         if func is self.BUILD_CLASS:
 | |
|             # It's a class construction
 | |
|             # TODO: check the assert statement below is correct
 | |
|             # assert not (have_var or have_kw)
 | |
|             func, name, *parents = posargs
 | |
|             self.stack.push(ClassStatement(func, name, parents, kwargs))
 | |
|         elif isinstance(func, PyComp):
 | |
|             # It's a list/set/dict comprehension or generator expression
 | |
|             # assert not (have_var or have_kw)
 | |
|             assert len(posargs) == 1 and not kwargs
 | |
|             func.set_iterable(posargs[0])
 | |
|             self.stack.push(func)
 | |
|         elif posargs and isinstance(posargs, list) and isinstance(posargs[0], DecorableStatement):
 | |
|             # It's a decorator for a def/class statement
 | |
|             assert len(posargs) == 1 and not kwargs
 | |
|             defn = posargs[0]
 | |
|             defn.decorate(func)
 | |
|             self.stack.push(defn)
 | |
|         else:
 | |
|             # It's none of the above, so it must be a normal function call
 | |
|             func_call = PyCallFunction(func, posargs, kwargs, varargs, varkw)
 | |
|             self.stack.push(func_call)
 | |
| 
 | |
|     def CALL_FUNCTION(self, addr, argc, have_var=False, have_kw=False):
 | |
|         if sys.version_info >= (3, 6):
 | |
|             pos_argc = argc
 | |
|             posargs = self.stack.pop(pos_argc)
 | |
|             func = self.stack.pop()
 | |
|             self.CALL_FUNCTION_CORE(func, posargs, [], None, None)
 | |
|         else:
 | |
|             kw_argc = argc >> 8
 | |
|             pos_argc = argc & 0xFF
 | |
|             varkw = self.stack.pop() if have_kw else None
 | |
|             varargs = self.stack.pop() if have_var else None
 | |
|             kwargs_iter = iter(self.stack.pop(2 * kw_argc))
 | |
|             kwargs = list(zip(kwargs_iter, kwargs_iter))
 | |
|             posargs = self.stack.pop(pos_argc)
 | |
|             func = self.stack.pop()
 | |
|             self.CALL_FUNCTION_CORE(func, posargs, kwargs, varargs, varkw)
 | |
| 
 | |
|     def CALL_FUNCTION_VAR(self, addr, argc):
 | |
|         self.CALL_FUNCTION(addr, argc, have_var=True)
 | |
| 
 | |
|     def CALL_FUNCTION_KW(self, addr, argc):
 | |
|         if sys.version_info >= (3, 6):
 | |
|             keys = self.stack.pop()
 | |
|             kwargc = len(keys.val)
 | |
|             kwarg_values = self.stack.pop(kwargc)
 | |
|             posargs = self.stack.pop(argc - kwargc)
 | |
|             func = self.stack.pop()
 | |
|             kwarg_dict = list(zip([PyName(k) for k in keys], kwarg_values))
 | |
|             self.CALL_FUNCTION_CORE(func, posargs, kwarg_dict, None, None)
 | |
|         else:
 | |
|             self.CALL_FUNCTION(addr, argc, have_kw=True)
 | |
| 
 | |
|     def CALL_FUNCTION_EX(self, addr, flags):
 | |
|         kwarg_unpacks = []
 | |
|         if flags & 1:
 | |
|             kwarg_unpacks = self.stack.pop()
 | |
| 
 | |
|         kwarg_dict = PyDict()
 | |
|         if isinstance(kwarg_unpacks,PyDict):
 | |
|             kwarg_dict = kwarg_unpacks
 | |
|             kwarg_unpacks = []
 | |
|         elif isinstance(kwarg_unpacks, list):
 | |
|             if len(kwarg_unpacks):
 | |
|                 if isinstance(kwarg_unpacks[0], PyDict):
 | |
|                     kwarg_dict = kwarg_unpacks[0]
 | |
|                     kwarg_unpacks = kwarg_unpacks[1:]
 | |
|         else:
 | |
|             kwarg_unpacks = [kwarg_unpacks]
 | |
| 
 | |
|         if any(filter(lambda kv: '.' in str(kv[0]), kwarg_dict.items)):
 | |
|             kwarg_unpacks.append(kwarg_dict)
 | |
|             kwarg_dict = PyDict()
 | |
| 
 | |
|         posargs_unpacks = self.stack.pop()
 | |
|         posargs = PyTuple([])
 | |
|         if isinstance(posargs_unpacks,PyTuple):
 | |
|             posargs = posargs_unpacks
 | |
|             posargs_unpacks = []
 | |
|         elif isinstance(posargs_unpacks, list):
 | |
|             if len(posargs_unpacks) > 0:
 | |
|                 if isinstance(posargs_unpacks[0], PyTuple):
 | |
|                     posargs = posargs_unpacks[0]
 | |
|                     posargs_unpacks = posargs_unpacks[1:]
 | |
|                 elif isinstance(posargs_unpacks[0], PyConst) and isinstance(posargs_unpacks[0].val, tuple):
 | |
|                     posargs = PyTuple(list(map(PyConst,posargs_unpacks[0].val)))
 | |
|                     posargs_unpacks = posargs_unpacks[1:]
 | |
| 
 | |
|         else:
 | |
|             posargs_unpacks = [posargs_unpacks]
 | |
| 
 | |
|         func = self.stack.pop()
 | |
|         self.CALL_FUNCTION_CORE(func, list(posargs.values), list(kwarg_dict.items), posargs_unpacks, kwarg_unpacks)
 | |
| 
 | |
|     def CALL_FUNCTION_VAR_KW(self, addr, argc):
 | |
|         self.CALL_FUNCTION(addr, argc, have_var=True, have_kw=True)
 | |
| 
 | |
|     # a, b, ... = ...
 | |
| 
 | |
|     def UNPACK_SEQUENCE(self, addr, count):
 | |
|         unpack = Unpack(self.stack.pop(), count)
 | |
|         for i in range(count):
 | |
|             self.stack.push(unpack)
 | |
| 
 | |
|     def UNPACK_EX(self, addr, counts):
 | |
|         rcount = counts >> 8
 | |
|         lcount = counts & 0xFF
 | |
|         count = lcount + rcount + 1
 | |
|         unpack = Unpack(self.stack.pop(), count, lcount)
 | |
|         for i in range(count):
 | |
|             self.stack.push(unpack)
 | |
| 
 | |
|     # Build operations
 | |
| 
 | |
|     def BUILD_SLICE(self, addr, argc):
 | |
|         assert argc in (2, 3)
 | |
|         self.stack.push(PySlice(self.stack.pop(argc)))
 | |
| 
 | |
|     def BUILD_TUPLE(self, addr, count):
 | |
|         values = [self.stack.pop() for i in range(count)]
 | |
|         values.reverse()
 | |
|         self.stack.push(PyTuple(values))
 | |
| 
 | |
|     def BUILD_TUPLE_UNPACK(self, addr, count):
 | |
|         values = []
 | |
|         for o in self.stack.pop(count):
 | |
|             if isinstance(o, PyTuple):
 | |
|                 values.extend(o.values)
 | |
|             else:
 | |
|                 values.append(PyStarred(o))
 | |
| 
 | |
|         self.stack.push(PyTuple(values))
 | |
| 
 | |
|     def BUILD_TUPLE_UNPACK_WITH_CALL(self, addr, count):
 | |
|         self.stack.push(self.stack.pop(count))
 | |
| 
 | |
|     def BUILD_LIST(self, addr, count):
 | |
|         values = [self.stack.pop() for i in range(count)]
 | |
|         values.reverse()
 | |
|         self.stack.push(PyList(values))
 | |
| 
 | |
|     def BUILD_LIST_UNPACK(self, addr, count):
 | |
|         values = []
 | |
|         for o in self.stack.pop(count):
 | |
|             if isinstance(o, PyTuple):
 | |
|                 values.extend(o.values)
 | |
|             else:
 | |
|                 values.append(PyStarred(o))
 | |
| 
 | |
|         self.stack.push(PyList(values))
 | |
| 
 | |
|     def BUILD_SET(self, addr, count):
 | |
|         values = [self.stack.pop() for i in range(count)]
 | |
|         values.reverse()
 | |
|         self.stack.push(PySet(values))
 | |
| 
 | |
|     def BUILD_SET_UNPACK(self, addr, count):
 | |
|         values = []
 | |
|         for o in self.stack.pop(count):
 | |
|             if isinstance(o, PySet):
 | |
|                 values.extend(o.values)
 | |
|             else:
 | |
|                 values.append(PyStarred(o))
 | |
| 
 | |
|         self.stack.push(PySet(values))
 | |
| 
 | |
|     def BUILD_MAP(self, addr, count):
 | |
|         d = PyDict()
 | |
|         if sys.version_info >= (3, 5):
 | |
|             for i in range(count):
 | |
|                 d.items.append(tuple(self.stack.pop(2)))
 | |
|             d.items = list(reversed(d.items))
 | |
|         self.stack.push(d)
 | |
| 
 | |
|     def BUILD_MAP_UNPACK(self, addr, count):
 | |
|         d = PyDict()
 | |
|         for i in range(count):
 | |
|             o = self.stack.pop()
 | |
|             if isinstance(o, PyDict):
 | |
|                 for item in reversed(o.items):
 | |
|                     k, v = item
 | |
|                     d.set_item(PyConst(k.val if isinstance(k, PyConst) else k.name), v)
 | |
|             else:
 | |
|                 d.items.append((PyStarred(PyStarred(o)),))
 | |
|         d.items = list(reversed(d.items))
 | |
|         self.stack.push(d)
 | |
| 
 | |
|     def BUILD_MAP_UNPACK_WITH_CALL(self, addr, count):
 | |
|         self.stack.push(self.stack.pop(count))
 | |
| 
 | |
|     def BUILD_CONST_KEY_MAP(self, addr, count):
 | |
|         keys = self.stack.pop()
 | |
|         vals = self.stack.pop(count)
 | |
|         dict = PyDict()
 | |
|         for i in range(count):
 | |
|             dict.set_item(PyConst(keys.val[i]), vals[i])
 | |
|         self.stack.push(dict)
 | |
| 
 | |
|     def STORE_MAP(self, addr):
 | |
|         v, k = self.stack.pop(2)
 | |
|         d = self.stack.peek()
 | |
|         d.set_item(k, v)
 | |
| 
 | |
|     # Comprehension operations - just create an expression statement
 | |
| 
 | |
|     def LIST_APPEND(self, addr, i):
 | |
|         self.POP_TOP(addr)
 | |
| 
 | |
|     def SET_ADD(self, addr, i):
 | |
|         self.POP_TOP(addr)
 | |
| 
 | |
|     def MAP_ADD(self, addr, i):
 | |
|         value, key = self.stack.pop(2)
 | |
|         self.stack.push(PyKeyValue(key, value))
 | |
|         self.POP_TOP(addr)
 | |
| 
 | |
|     # and operator
 | |
| 
 | |
|     def JUMP_IF_FALSE_OR_POP(self, addr: Address, target):
 | |
|         end_addr = addr.jump()
 | |
|         truthiness = not addr.seek_back_statement(POP_JUMP_IF_TRUE)
 | |
|         self.push_popjump(truthiness, end_addr, self.stack.pop(), addr)
 | |
|         left = self.pop_popjump()
 | |
|         if end_addr.opcode == ROT_TWO:
 | |
|             opc, arg = end_addr[-1]
 | |
|             if opc == JUMP_FORWARD and arg == 2:
 | |
|                 end_addr = end_addr[2]
 | |
|             elif opc == RETURN_VALUE or opc == JUMP_FORWARD:
 | |
|                 end_addr = end_addr[-1]
 | |
|                 d = SuiteDecompiler(addr[1], end_addr, self.stack)
 | |
|                 d.run()
 | |
|                 right = self.stack.pop()
 | |
|                 if isinstance(right, PyCompare) and right.extends(left):
 | |
|                     py_and = left.chain(right)
 | |
|                 else:
 | |
|                     py_and = PyBooleanAnd(left, right)
 | |
|                 self.stack.push(py_and)
 | |
|                 return end_addr[3]
 | |
| 
 | |
|         d = SuiteDecompiler(addr[1], end_addr, self.stack)
 | |
|         d.run()
 | |
|         # if end_addr.opcode == RETURN_VALUE:
 | |
|         #     return end_addr[2]
 | |
|         right = self.stack.pop()
 | |
|         if isinstance(right, PyCompare) and right.extends(left):
 | |
|             py_and = left.chain(right)
 | |
|         else:
 | |
|             py_and = PyBooleanAnd(left, right)
 | |
|         self.stack.push(py_and)
 | |
|         return end_addr
 | |
| 
 | |
|     # This appears when there are chained comparisons, e.g. 1 <= x < 10
 | |
| 
 | |
|     def JUMP_FORWARD(self, addr, delta):
 | |
|         ## if delta == 2 and addr[1].opcode == ROT_TWO and addr[2].opcode == POP_TOP:
 | |
|         ##     # We're in the special case of chained comparisons
 | |
|         ##     return addr[3]
 | |
|         ## else:
 | |
|         ##     # I'm hoping its an unused JUMP in an if-else statement
 | |
|         ##     return addr[1]
 | |
|         return addr.jump()
 | |
| 
 | |
|     # or operator
 | |
| 
 | |
|     def JUMP_IF_TRUE_OR_POP(self, addr, target):
 | |
|         end_addr = addr.jump()
 | |
|         self.push_popjump(True, end_addr, self.stack.pop(), addr)
 | |
|         left = self.pop_popjump()
 | |
|         d = SuiteDecompiler(addr[1], end_addr, self.stack)
 | |
|         d.run()
 | |
|         right = self.stack.pop()
 | |
|         self.stack.push(PyBooleanOr(left, right))
 | |
|         return end_addr
 | |
| 
 | |
|     #
 | |
|     # If-else statements/expressions and related structures
 | |
|     #
 | |
| 
 | |
|     def POP_JUMP_IF(self, addr: Address, target: int, truthiness: bool) -> Union[Address, None]:
 | |
|         jump_addr = addr.jump()
 | |
|         next_addr = addr[1]
 | |
| 
 | |
|         last_loop = addr.seek_back(SETUP_LOOP)
 | |
|         in_loop = last_loop and last_loop.jump() > addr
 | |
|         is_loop_condition = False
 | |
|         if in_loop:
 | |
|             end_addr = last_loop.jump()[-1]
 | |
|             end_cond = addr.seek_forward(stmt_opcodes).seek_back(pop_jump_if_opcodes)
 | |
|             while end_cond and end_cond.jump() != end_addr:
 | |
|                 end_cond = end_cond.seek_back(pop_jump_if_opcodes)
 | |
|             is_loop_condition = end_cond == addr
 | |
| 
 | |
|         end_of_loop = jump_addr.opcode == FOR_ITER or jump_addr[-1].opcode == SETUP_LOOP
 | |
|         if jump_addr.opcode == FOR_ITER:
 | |
|             # We are in a for-loop with nothing after the if-suite
 | |
|             # But take care: for-loops in generator expression do
 | |
|             # not end in POP_BLOCK, hence the test below.
 | |
|             jump_addr = jump_addr.jump()
 | |
|         elif end_of_loop:
 | |
|             # We are in a while-loop with nothing after the if-suite
 | |
|             jump_addr = jump_addr[-1].jump()[-1]
 | |
|         cond = self.stack.pop()
 | |
|         # chained compare
 | |
|         # ex:
 | |
|         # if x <= y <= z:
 | |
|         if addr[-3] and \
 | |
|                 addr[-1].opcode == COMPARE_OP and \
 | |
|                 addr[-2].opcode == ROT_THREE and \
 | |
|                 addr[-3].opcode == DUP_TOP:
 | |
|             if self.popjump_stack:
 | |
|                 c = self.pop_popjump()
 | |
|                 c = c.chain(cond)
 | |
|                 self.push_popjump(not truthiness, jump_addr, c, addr)
 | |
|             else:
 | |
|                 self.push_popjump(not truthiness, jump_addr, cond, addr)
 | |
|             return
 | |
| 
 | |
|         is_chained = isinstance(cond, PyCompare) and addr.seek_back(ROT_THREE, addr.seek_back(stmt_opcodes))
 | |
|         if is_chained and self.popjump_stack:
 | |
|             pj = self.pop_popjump()
 | |
|             if isinstance(pj, PyCompare):
 | |
|                 cond = pj.chain(cond)
 | |
| 
 | |
|         if not addr.is_else_jump and not is_loop_condition:
 | |
|             # Handle generator expressions with or clause
 | |
|             for_iter = addr.seek_back(FOR_ITER)
 | |
|             if for_iter:
 | |
|                 end_of_for = for_iter.jump()
 | |
|                 if end_of_for.addr > addr.addr:
 | |
|                     gen = jump_addr.seek_forward((YIELD_VALUE, LIST_APPEND), end_of_for)
 | |
|                     if gen:
 | |
|                         if not truthiness:
 | |
|                             truthiness = not truthiness
 | |
|                             if truthiness:
 | |
|                                 cond = PyNot(cond)
 | |
|                         self.push_popjump(truthiness, jump_addr, cond, addr)
 | |
|                         return None
 | |
| 
 | |
|             self.push_popjump(truthiness, jump_addr, cond, addr)
 | |
|             # Dictionary comprehension
 | |
|             if jump_addr.seek_forward(MAP_ADD):
 | |
|                 return None
 | |
| 
 | |
|             if addr.code.name=='<lambda>':
 | |
|                 return None
 | |
|             # Generator
 | |
|             if jump_addr.seek_forward(YIELD_VALUE, jump_addr.seek_forward(stmt_opcodes)):
 | |
|                 return None
 | |
| 
 | |
|             if jump_addr.seek_back(JUMP_IF_TRUE_OR_POP,jump_addr[-2]):
 | |
|                 return None
 | |
|             # Generator
 | |
|             if jump_addr.opcode != END_FINALLY and jump_addr[1] and jump_addr[1].opcode == JUMP_ABSOLUTE:
 | |
|                 return None
 | |
| 
 | |
|             next_addr = addr[1]
 | |
|             while next_addr and next_addr < jump_addr:
 | |
|                 if next_addr.opcode in stmt_opcodes:
 | |
|                     break
 | |
|                 if next_addr.opcode in pop_jump_if_opcodes:
 | |
|                     next_jump_addr = next_addr.jump()
 | |
|                     if next_jump_addr > jump_addr or \
 | |
|                             (next_jump_addr == jump_addr and jump_addr[-1].opcode in else_jump_opcodes) or \
 | |
|                             (next_jump_addr[-1].opcode == SETUP_LOOP):
 | |
|                         return None
 | |
|                     if next_addr[1] == jump_addr and addr.arg != next_addr.arg:
 | |
|                         return None
 | |
|                     if next_jump_addr.opcode == FOR_ITER:
 | |
|                         return None
 | |
|                     if next_addr.opcode == addr.opcode and next_addr.arg == addr.arg:
 | |
|                         return None
 | |
| 
 | |
| 
 | |
|                 if next_addr.opcode in (JUMP_IF_FALSE_OR_POP, JUMP_IF_TRUE_OR_POP):
 | |
|                     next_jump_addr = next_addr.jump()
 | |
|                     if next_jump_addr > jump_addr or (next_jump_addr == jump_addr and jump_addr[-1].opcode in else_jump_opcodes):
 | |
|                         return None
 | |
|                 next_addr = next_addr[1]
 | |
|             # if there are no nested conditionals and no else clause, write the true portion and jump ahead to the end of the conditional
 | |
|             cond = self.pop_popjump()
 | |
|             end_true = jump_addr
 | |
|             if jump_addr.opcode == JUMP_ABSOLUTE and in_loop:
 | |
|                 end_true = end_true.seek_back(JUMP_ABSOLUTE, addr)
 | |
|             if truthiness and not isinstance(cond, PyBooleanOr):
 | |
|                 cond = PyNot(cond)
 | |
|             d_true = SuiteDecompiler(addr[1], end_true)
 | |
|             d_true.run()
 | |
|             stmt = IfStatement(cond, d_true.suite, None)
 | |
|             self.suite.add_statement(stmt)
 | |
|             return end_true
 | |
| 
 | |
| 
 | |
|         end_true = jump_addr[-1]
 | |
| 
 | |
|         is_assert = \
 | |
|             end_true.opcode == RAISE_VARARGS and \
 | |
|             next_addr.opcode == LOAD_GLOBAL and \
 | |
|             next_addr.code.names[next_addr.arg].name == 'AssertionError'
 | |
| 
 | |
|         # Increase jump_addr to pop all previous jumps
 | |
|         self.push_popjump(truthiness, jump_addr[1], cond, addr)
 | |
|         cond = self.pop_popjump()
 | |
| 
 | |
|         if truthiness:
 | |
|             x = addr.seek_back(pop_jump_if_opcodes, addr.seek_back(stmt_opcodes))
 | |
| 
 | |
|             while x and x.jump() < addr.jump():
 | |
|                 x = x.seek_back(pop_jump_if_opcodes)
 | |
|             last_pj = addr.seek_back(pop_jump_if_opcodes)
 | |
|             if not (x is not None and x.jump() == addr.jump()):
 | |
|                 if last_pj and last_pj.arg != addr.arg and isinstance(cond, PyBooleanOr):
 | |
|                     if last_pj.opcode != addr.opcode:
 | |
|                         cond.right = PyNot(cond.right)
 | |
|                 elif end_true.opcode and not is_assert:
 | |
|                     cond = PyNot(cond)
 | |
| 
 | |
|         if end_true.opcode == RETURN_VALUE:
 | |
|             end_false = jump_addr.seek_forward(RETURN_VALUE)
 | |
|             if end_false and end_false[2] and end_false[2].opcode == RETURN_VALUE:
 | |
|                 d_true = SuiteDecompiler(addr[1], end_true[1])
 | |
|                 d_true.run()
 | |
|                 d_false = SuiteDecompiler(jump_addr, end_false[1])
 | |
|                 d_false.run()
 | |
|                 self.suite.add_statement(IfStatement(cond, d_true.suite, d_false.suite))
 | |
|                 self.last_addr = end_false[1]
 | |
|                 return max(d_false.last_addr, d_false.end_addr)
 | |
| 
 | |
|         if is_assert:
 | |
|             # cond = cond.operand if isinstance(cond, PyNot) else PyNot(cond)
 | |
|             d_true = SuiteDecompiler(addr[1], end_true)
 | |
|             d_true.run()
 | |
|             assert_pop = d_true.stack.pop()
 | |
|             assert_args = assert_pop.args if isinstance(assert_pop, PyCallFunction) else []
 | |
|             assert_arg_str = ', '.join(map(str,[cond, *assert_args]))
 | |
|             self.suite.add_statement(SimpleStatement(f'assert {assert_arg_str}'))
 | |
|             return end_true[1]
 | |
|         # - If the true clause ends in return, make sure it's included
 | |
|         # - If the true clause ends in RAISE_VARARGS, then it's an
 | |
|         # assert statement. For now I just write it as a raise within
 | |
|         # an if (see below)
 | |
|         if end_true.opcode in (RETURN_VALUE, RAISE_VARARGS, POP_TOP):
 | |
|             d_true = SuiteDecompiler(addr[1], end_true[1])
 | |
|             d_true.run()
 | |
|             self.suite.add_statement(IfStatement(cond, d_true.suite, Suite()))
 | |
|             return jump_addr
 | |
|         if is_chained and addr[1].opcode == JUMP_ABSOLUTE:
 | |
|             end_true = end_true[-2]
 | |
|         d_true = SuiteDecompiler(addr[1], end_true)
 | |
|         d_true.run()
 | |
|         if in_loop and not is_loop_condition and addr[1].opcode == JUMP_ABSOLUTE:
 | |
|             j = addr[1].jump()
 | |
|             l = last_loop[1]
 | |
|             while l.opcode not in stmt_opcodes:
 | |
|                 if l == j:
 | |
|                     d_true.suite.add_statement(SimpleStatement('continue'))
 | |
| 
 | |
|                     self.suite.add_statement(IfStatement(cond, d_true.suite, None))
 | |
|                     return addr[2]
 | |
|                 l = l[1]
 | |
| 
 | |
|         if jump_addr.opcode == POP_BLOCK and is_loop_condition:
 | |
|             # It's a while loop
 | |
|             stmt = WhileStatement(cond, d_true.suite)
 | |
|             self.suite.add_statement(stmt)
 | |
|             return jump_addr[1]
 | |
|         # It's an if-else (expression or statement)
 | |
|         if end_true.opcode == JUMP_FORWARD:
 | |
|             end_false = end_true.jump()
 | |
|         elif end_true.opcode == JUMP_ABSOLUTE:
 | |
|             end_false = end_true.jump()
 | |
|             if end_false.opcode == FOR_ITER:
 | |
|                 # We are in a for-loop with nothing after the else-suite
 | |
|                 end_false = end_false.jump()[-1]
 | |
|             elif end_false[-1].opcode == SETUP_LOOP:
 | |
|                 # We are in a while-loop with nothing after the else-suite
 | |
|                 end_false = end_false[-1].jump()[-1]
 | |
|             if end_false.opcode == RETURN_VALUE:
 | |
|                 end_false = end_false[1]
 | |
|         elif end_true.opcode == RETURN_VALUE:
 | |
|             # find the next RETURN_VALUE
 | |
|             end_false = jump_addr
 | |
|             while end_false.opcode != RETURN_VALUE:
 | |
|                 end_false = end_false[1]
 | |
|             end_false = end_false[1]
 | |
|         elif end_true.opcode == BREAK_LOOP:
 | |
|             # likely in a loop in a try/except
 | |
|             end_false = jump_addr
 | |
|         else:
 | |
|             end_false = jump_addr
 | |
|             # # normal statement
 | |
|             # raise Exception("#ERROR: Unexpected statement: {} | {}\n".format(end_true, jump_addr, jump_addr[-1]))
 | |
|             # # raise Unknown
 | |
|             # jump_addr = end_true[-2]
 | |
|             # stmt = IfStatement(cond, d_true.suite, None)
 | |
|             # self.suite.add_statement(stmt)
 | |
|             # return jump_addr or self.END_NOW
 | |
|         d_false = SuiteDecompiler(jump_addr, end_false)
 | |
|         d_false.run()
 | |
|         if d_true.stack and d_false.stack:
 | |
|             assert len(d_true.stack) == len(d_false.stack) == 1
 | |
|             # self.write("#ERROR: Unbalanced stacks {} != {}".format(len(d_true.stack),len(d_false.stack)))
 | |
|             assert not (d_true.suite or d_false.suite)
 | |
|             # this happens in specific if else conditions with assigments
 | |
|             true_expr = d_true.stack.pop()
 | |
|             false_expr = d_false.stack.pop()
 | |
|             self.stack.push(PyIfElse(cond, true_expr, false_expr))
 | |
|         else:
 | |
|             stmt = IfStatement(cond, d_true.suite, d_false.suite)
 | |
|             self.suite.add_statement(stmt)
 | |
|         return end_false or self.END_NOW
 | |
| 
 | |
|     def POP_JUMP_IF_FALSE(self, addr, target):
 | |
|         return self.POP_JUMP_IF(addr, target, truthiness=False)
 | |
| 
 | |
|     def POP_JUMP_IF_TRUE(self, addr, target):
 | |
|         return self.POP_JUMP_IF(addr, target, truthiness=True)
 | |
| 
 | |
|     def JUMP_ABSOLUTE(self, addr, target):
 | |
|         # print("*** JUMP ABSOLUTE ***", addr)
 | |
|         # return addr.jump()
 | |
| 
 | |
|         # TODO: print out continue if not final jump
 | |
|         jump_addr = addr.jump()
 | |
|         if jump_addr[-1].opcode == SETUP_LOOP:
 | |
|             end_addr = jump_addr + jump_addr[-1].arg
 | |
|             last_jump = self.scan_for_final_jump(jump_addr, end_addr[-1])
 | |
|             if last_jump != addr:
 | |
|                 pass
 | |
|         pass
 | |
| 
 | |
|     #
 | |
|     # For loops
 | |
|     #
 | |
| 
 | |
|     def GET_ITER(self, addr):
 | |
|         pass
 | |
| 
 | |
|     def FOR_ITER(self, addr: Address, delta):
 | |
|         if addr[-1] and addr[-1].opcode == RETURN_VALUE:
 | |
|             # Dead code
 | |
|             return self.END_NOW
 | |
|         iterable = self.stack.pop()
 | |
|         jump_addr = addr.jump()
 | |
|         end_body = jump_addr
 | |
|         if end_body.opcode != POP_BLOCK:
 | |
|             end_body = end_body[-1]
 | |
|         d_body = SuiteDecompiler(addr[1], end_body)
 | |
|         for_stmt = ForStatement(iterable)
 | |
|         d_body.stack.push(for_stmt)
 | |
|         d_body.run()
 | |
|         for_stmt.body = d_body.suite
 | |
|         loop = addr.seek_back(SETUP_LOOP)
 | |
|         # while loop:
 | |
|         #     outer_loop = loop.seek_back(SETUP_LOOP)
 | |
|         #     if outer_loop:
 | |
|         #         if outer_loop.jump().addr < loop.addr:
 | |
|         #             break
 | |
|         #         else:
 | |
|         #             loop = outer_loop
 | |
|         #     else:
 | |
|         #         break
 | |
|         end_addr = jump_addr
 | |
|         if loop and not jump_addr[1].opcode in else_jump_opcodes:
 | |
|             end_of_loop = loop.jump()[-1]
 | |
|             if end_of_loop.opcode != POP_BLOCK:
 | |
|                 else_start = end_of_loop.seek_back(POP_BLOCK)
 | |
|                 d_else = SuiteDecompiler(else_start, loop.jump())
 | |
|                 d_else.run()
 | |
|                 for_stmt.else_body = d_else.suite
 | |
|                 end_addr = loop.jump()
 | |
|         self.suite.add_statement(for_stmt)
 | |
|         return end_addr
 | |
| 
 | |
|     # Function creation
 | |
| 
 | |
|     def MAKE_FUNCTION_OLD(self, addr, argc, is_closure=False):
 | |
|         testType = self.stack.pop().val
 | |
|         if isinstance(testType, str):
 | |
|             code = Code(self.stack.pop().val, self.code)
 | |
|         else:
 | |
|             code = Code(testType, self.code)
 | |
|         closure = self.stack.pop() if is_closure else None
 | |
|         # parameter annotation objects
 | |
|         paramobjs = {}
 | |
|         paramcount = (argc >> 16) & 0x7FFF
 | |
|         if paramcount:
 | |
|             paramobjs = dict(zip(self.stack.pop().val, self.stack.pop(paramcount - 1)))
 | |
|         # default argument objects in positional order 
 | |
|         defaults = self.stack.pop(argc & 0xFF)
 | |
|         # pairs of name and default argument, with the name just below the object on the stack, for keyword-only parameters 
 | |
|         kwdefaults = {}
 | |
|         for i in range((argc >> 8) & 0xFF):
 | |
|             k, v = self.stack.pop(2)
 | |
|             if hasattr(k, 'name'):
 | |
|                 kwdefaults[k.name] = v
 | |
|             elif hasattr(k, 'val'):
 | |
|                 kwdefaults[k.val] = v
 | |
|             else:
 | |
|                 kwdefaults[str(k)] = v
 | |
|         func_maker = code_map.get(code.name, DefStatement)
 | |
|         self.stack.push(func_maker(code, defaults, kwdefaults, closure, paramobjs))
 | |
| 
 | |
|     def MAKE_FUNCTION_NEW(self, addr, argc, is_closure=False):
 | |
|         testType = self.stack.pop().val
 | |
|         if isinstance(testType, str):
 | |
|             code = Code(self.stack.pop().val, self.code)
 | |
|         else:
 | |
|             code = Code(testType, self.code)
 | |
|         closure = self.stack.pop() if is_closure else None
 | |
|         annotations = {}
 | |
|         kwdefaults = {}
 | |
|         defaults = {}
 | |
|         if argc & 8:
 | |
|             annotations = list(self.stack.pop())
 | |
|         if argc & 4:
 | |
|             annotations = self.stack.pop()
 | |
|             if isinstance(annotations, PyDict):
 | |
|                 annotations = {str(k[0].val).replace('\'', ''): str(k[1]) for k in annotations.items}
 | |
|         if argc & 2:
 | |
|             kwdefaults = self.stack.pop()
 | |
|             if isinstance(kwdefaults, PyDict):
 | |
|                 kwdefaults = {str(k[0].val): str(k[1] if isinstance(k[1], PyExpr) else PyConst(k[1])) for k in
 | |
|                               kwdefaults.items}
 | |
|             if not kwdefaults:
 | |
|                 kwdefaults = {}
 | |
|         if argc & 1:
 | |
|             defaults = list(map(lambda x: str(x if isinstance(x, PyExpr) else PyConst(x)), self.stack.pop()))
 | |
|         func_maker = code_map.get(code.name, DefStatement)
 | |
|         self.stack.push(func_maker(code, defaults, kwdefaults, closure, annotations, annotations))
 | |
| 
 | |
|     def MAKE_FUNCTION(self, addr, argc, is_closure=False):
 | |
|         if sys.version_info < (3, 6):
 | |
|             self.MAKE_FUNCTION_OLD(addr, argc, is_closure)
 | |
|         else:
 | |
|             self.MAKE_FUNCTION_NEW(addr, argc, is_closure)
 | |
| 
 | |
|     def LOAD_CLOSURE(self, addr, i):
 | |
|         # Push the varname.  It doesn't matter as it is not used for now.
 | |
|         self.stack.push(self.code.derefnames[i])
 | |
| 
 | |
|     def MAKE_CLOSURE(self, addr, argc):
 | |
|         self.MAKE_FUNCTION(addr, argc, is_closure=True)
 | |
| 
 | |
|     #
 | |
|     # Raising exceptions
 | |
|     #
 | |
| 
 | |
|     def RAISE_VARARGS(self, addr, argc):
 | |
|         # TODO: find out when argc is 2 or 3
 | |
|         # Answer: In Python 3, only 0, 1, or 2 argument (see PEP 3109)
 | |
|         if argc == 0:
 | |
|             self.write("raise")
 | |
|         elif argc == 1:
 | |
|             exception = self.stack.pop()
 | |
|             self.write("raise {}", exception)
 | |
|         elif argc == 2:
 | |
|             from_exc, exc = self.stack.pop(), self.stack.pop()
 | |
|             self.write("raise {} from {}".format(exc, from_exc))
 | |
|         else:
 | |
|             raise Unknown
 | |
| 
 | |
|     def EXTENDED_ARG(self, addr, ext):
 | |
|         # self.write("# ERROR: {} : {}".format(addr, ext) )
 | |
|         pass
 | |
| 
 | |
|     def WITH_CLEANUP(self, addr, *args, **kwargs):
 | |
|         # self.write("# ERROR: {} : {}".format(addr, args))
 | |
|         pass
 | |
| 
 | |
|     def WITH_CLEANUP_START(self, addr, *args, **kwargs):
 | |
|         pass
 | |
| 
 | |
|     def WITH_CLEANUP_FINISH(self, addr, *args, **kwargs):
 | |
|         jaddr = addr.jump()
 | |
|         return jaddr
 | |
| 
 | |
|     # Formatted string literals
 | |
|     def FORMAT_VALUE(self, addr, flags):
 | |
|         formatter = ''
 | |
|         if (flags & 0x03) == 0x01:
 | |
|             formatter = '!s'
 | |
|         elif (flags & 0x03) == 0x02:
 | |
|             formatter = '!r'
 | |
|         elif (flags & 0x03) == 0x03:
 | |
|             formatter = '!a'
 | |
|         if (flags & 0x04) == 0x04:
 | |
|             formatter = formatter + ':' + self.stack.pop().val
 | |
|         val = self.stack.pop()
 | |
|         f = PyFormatValue(val)
 | |
|         f.formatter = formatter
 | |
|         self.stack.push(f)
 | |
| 
 | |
|     def BUILD_STRING(self, addr, c):
 | |
|         params = self.stack.pop(c)
 | |
|         self.stack.push(PyFormatString(params))
 | |
| 
 | |
|     # Coroutines
 | |
|     def GET_AWAITABLE(self, addr: Address):
 | |
|         func: AwaitableMixin = self.stack.pop()
 | |
|         func.is_awaited = True
 | |
|         self.stack.push(func)
 | |
|         yield_op = addr.seek_forward(YIELD_FROM)
 | |
|         return yield_op[1]
 | |
| 
 | |
|     def BEFORE_ASYNC_WITH(self, addr: Address):
 | |
|         with_addr = addr.seek_forward(SETUP_ASYNC_WITH)
 | |
|         end_with = with_addr.jump()
 | |
|         with_stmt = WithStatement(self.stack.pop())
 | |
|         with_stmt.is_async = True
 | |
|         d_with = SuiteDecompiler(addr[1], end_with)
 | |
|         d_with.stack.push(with_stmt)
 | |
|         d_with.run()
 | |
|         with_stmt.suite = d_with.suite
 | |
|         self.suite.add_statement(with_stmt)
 | |
|         if sys.version_info <= (3, 4):
 | |
|             assert end_with.opcode == WITH_CLEANUP
 | |
|             assert end_with[1].opcode == END_FINALLY
 | |
|             return end_with[2]
 | |
|         else:
 | |
|             assert end_with.opcode == WITH_CLEANUP_START
 | |
|             assert end_with[1].opcode == GET_AWAITABLE
 | |
|             assert end_with[4].opcode == WITH_CLEANUP_FINISH
 | |
|             return end_with[5]
 | |
| 
 | |
|     def SETUP_ASYNC_WITH(self, addr: Address, arg):
 | |
|         pass
 | |
| 
 | |
|     def GET_AITER(self, addr: Address):
 | |
|         return addr[2]
 | |
| 
 | |
|     def GET_ANEXT(self, addr: Address):
 | |
|         iterable = self.stack.pop()
 | |
|         for_stmt = ForStatement(iterable)
 | |
|         for_stmt.is_async = True
 | |
|         jump_addr = addr[-1].jump()
 | |
|         d_body = SuiteDecompiler(addr[3], jump_addr[-1])
 | |
|         d_body.stack.push(for_stmt)
 | |
|         d_body.run()
 | |
|         jump_addr = jump_addr[-1].jump()
 | |
|         new_start = jump_addr
 | |
|         new_end = jump_addr[-2].jump()[-1]
 | |
|         d_body.start_addr = new_start
 | |
| 
 | |
|         d_body.end_addr = new_end
 | |
| 
 | |
|         d_body.run()
 | |
| 
 | |
|         for_stmt.body = d_body.suite
 | |
|         self.suite.add_statement(for_stmt)
 | |
|         new_end = new_end.seek_forward(POP_BLOCK)
 | |
|         return new_end
 | |
| 
 | |
| 
 | |
| def make_dynamic_instr(cls):
 | |
|     def method(self, addr):
 | |
|         cls.instr(self.stack)
 | |
| 
 | |
|     return method
 | |
| 
 | |
| 
 | |
| # Create unary operators types and opcode handlers
 | |
| for op, name, ptn, prec in unary_ops:
 | |
|     name = 'Py' + name
 | |
|     tp = type(name, (PyUnaryOp,), dict(pattern=ptn, precedence=prec))
 | |
|     globals()[name] = tp
 | |
|     setattr(SuiteDecompiler, op, make_dynamic_instr(tp))
 | |
| 
 | |
| # Create binary operators types and opcode handlers
 | |
| for op, name, ptn, prec, inplace_ptn in binary_ops:
 | |
|     # Create the binary operator
 | |
|     tp_name = 'Py' + name
 | |
|     tp = globals().get(tp_name, None)
 | |
|     if tp is None:
 | |
|         tp = type(tp_name, (PyBinaryOp,), dict(pattern=ptn, precedence=prec))
 | |
|         globals()[tp_name] = tp
 | |
| 
 | |
|     setattr(SuiteDecompiler, 'BINARY_' + op, make_dynamic_instr(tp))
 | |
|     # Create the in-place operation
 | |
|     if inplace_ptn is not None:
 | |
|         inplace_op = "INPLACE_" + op
 | |
|         tp_name = 'InPlace' + name
 | |
|         tp = type(tp_name, (InPlaceOp,), dict(pattern=inplace_ptn))
 | |
|         globals()[tp_name] = tp
 | |
|         setattr(SuiteDecompiler, inplace_op, make_dynamic_instr(tp))
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     import sys
 | |
| 
 | |
|     if len(sys.argv) == 1:
 | |
|         print('USAGE: {} <filename.pyc>'.format(sys.argv[0]))
 | |
|     else:
 | |
|         print(decompile(sys.argv[1]))
 |