PF的代码审计日记_CTF篇_DAY_1


前言

自己代码审计太差了,决定开始一波学习,就先从CTF开始了,虽然之前刷过沙雕CVE,不过是直接危险函数,来快速找的,没有什么意义,不过挺大佬说,代码审计是不需要环境搭建的,纯白盒,所以这次就不黑白结合了。(虽然比赛都是黑白结合这样搞的)

DDCTF homebrew_event_loop

进入之后下载源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# -*- encoding: utf-8 -*-
# written in python 2.7
__author__ = 'garzon'

from flask import Flask, session, request, Response
import urllib

app = Flask(__name__)
# <hidden>
import uuid

# this part is hidden from the contestants of the challenge for reducing unnecessary complexity
app.secret_key = 'theDthN4y56S7u563jhjh56iRr785j5rHhj5hj5H4hjH53hbty857omnEi6H7vcv24rfcvr'
url_prefix = '/<string:prefix>'

valid_url_prefixs = {
'd5afe1f66747e857': '3v41_3v3nt_100p_aNd_fLASK_cOOk1e',
'd5af31f99147e857': '3v41_3v3nt_1OOp_aNd_fLASK_cOOk1e',
'd5af31f66147e657': '3v41_3v3nt_1O0p_aNd_fLASK_cOOk1e',
'd5af31f86147e857': '3v41_3v3nt_10Op_aNd_fLASK_cOOk1e',
'd5afe1f66147e857': '3v41_3v3nt_100p_aNd_fLASK_c0Ok1e',
'd5af31f96147e657': '3v41_3v3nt_100p_aNd_fLASK_cO0k1e',
'd5af31f66147e857': '3v41_3v3nt_100p_aNd_fLASK_c00k1e',
'd5af31f66741e857': '3v4l_3v3nt_100p_aNd_fLASK_cOOk1e',
'd5af33f66147e857': '3v41_3v3nt_100p_aNd_fLASK_cOOkle',
'd5af31f88147e857': '3v41_3v3nt_l00p_aNd_fLASK_cOOkle',
'd5af31f66177e857': '3v4l_3v3nt_100p_aNd_fLASK_cOOkle', }


def FLAG():
flag = valid_url_prefixs[request.user_prefix]
try:
with open('flag.log', 'ab') as f:
f.write(str((request.remote_addr, flag, session['log'])) + '\n')
except:
return 'Something went wrong, please contact admin of DDCTF to get your flag.'
return flag


original_app_route = app.route


def new_app_route(url_pattern, **kwargs):
def __dummy(f):
def _(*args, **kwargs):
if kwargs['prefix'] not in valid_url_prefixs: return '404 NOT FOUND', 404
request.user_prefix = kwargs['prefix']
del kwargs['prefix']
if len(args) == 0 and len(kwargs) == 0: return f()
if len(kwargs) == 0: return f(*args)
if len(args) == 0: return f(**kwargs)
return f(*args, **kwargs)

_.__name__ = str(uuid.uuid4())
return original_app_route(url_pattern, **kwargs)(_)

return __dummy


app.route = new_app_route

'''
#</hidden>
app.secret_key = '*********************' # censored
url_prefix = '{{URL_PREFIX}}'
def FLAG():
return 'FLAG_is_here_but_i_wont_show_you' # censored
#<hidden>
'''


# </hidden>

def trigger_event(event):
session['log'].append(event)
if len(session['log']) > 5: session['log'] = session['log'][-5:]
if type(event) == type([]):
request.event_queue += event
else:
request.event_queue.append(event)


def get_mid_str(haystack, prefix, postfix=None):
haystack = haystack[haystack.find(prefix) + len(prefix):]
if postfix is not None:
haystack = haystack[:haystack.find(postfix)]
return haystack


class RollBackException: pass


# <hidden>
# final payload: /?action:trigger_event%23;action:buy;5%23action:get_flag;
# </hidden>
def execute_event_loop():
valid_event_chars = set('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_0123456789:;#')
resp = None
while len(request.event_queue) > 0:
event = request.event_queue[0] # `event` is something like "action:ACTION;ARGS0#ARGS1#ARGS2......"
request.event_queue = request.event_queue[1:]
if not event.startswith(('action:', 'func:')): continue
for c in event:
if c not in valid_event_chars: break
else:
is_action = event[0] == 'a'
action = get_mid_str(event, ':', ';')
args = get_mid_str(event, action + ';').split('#')
try:
event_handler = eval(action + ('_handler' if is_action else '_function'))
ret_val = event_handler(args)
except RollBackException:
if resp is None: resp = ''
resp += 'ERROR! All transactions have been cancelled. <br />'
resp += '<a href="./?action:view;index">Go back to index.html</a><br />'
session['num_items'] = request.prev_session['num_items']
session['points'] = request.prev_session['points']
break
except Exception, e:
if resp is None: resp = ''
# resp += str(e) # only for debugging
continue
if ret_val is not None:
if resp is None:
resp = ret_val
else:
resp += ret_val
if resp is None or resp == '': resp = ('404 NOT FOUND', 404)
session.modified = True
return resp


@app.route(url_prefix + '/')
def entry_point():
querystring = urllib.unquote(request.query_string)
request.event_queue = []
if querystring == '' or (not querystring.startswith('action:')) or len(querystring) > 100:
querystring = 'action:index;False#False'
if 'num_items' not in session:
session['num_items'] = 0
session['points'] = 3
session['log'] = []
request.prev_session = dict(session)
trigger_event(querystring)
return execute_event_loop()


# handlers/functions below --------------------------------------

def view_handler(args):
page = args[0]
html = ''
html += '[INFO] you have {} diamonds, {} points now.<br />'.format(session['num_items'], session['points'])
if page == 'index':
html += '<a href="./?action:index;True%23False">View source code</a><br />'
html += '<a href="./?action:view;shop">Go to e-shop</a><br />'
html += '<a href="./?action:view;reset">Reset</a><br />'
elif page == 'shop':
html += '<a href="./?action:buy;1">Buy a diamond (1 point)</a><br />'
elif page == 'reset':
del session['num_items']
html += 'Session reset.<br />'
html += '<a href="./?action:view;index">Go back to index.html</a><br />'
return html


def index_handler(args):
bool_show_source = args[0]
bool_download_source = args[1]
if bool_show_source == 'True':

source = open('eventLoop.py', 'r')
html = ''
if bool_download_source != 'True':
html += '<a href="./?action:index;True%23True">Download this .py file</a><br />'
html += '<a href="./?action:view;index">Go back to index.html</a><br />'

# <hidden>
is_hidden = False
# </hidden>
for line in source:
# <hidden>
if line.strip() == '#</hidden>':
is_hidden = False
continue
if line.strip() == '#<hidden>':
is_hidden = True
if is_hidden: continue
line = line.replace('{{URL_PREFIX}}', '/' + request.user_prefix)
# </hidden>
if bool_download_source != 'True':
html += line.replace('&', '&amp;').replace('\t', '&nbsp;' * 4).replace(' ', '&nbsp;').replace('<',
'&lt;').replace(
'>', '&gt;').replace('\n', '<br />')
else:
html += line
source.close()

if bool_download_source == 'True':
headers = {}
headers['Content-Type'] = 'text/plain'
headers['Content-Disposition'] = 'attachment; filename=serve.py'
return Response(html, headers=headers)
else:
return html
else:
trigger_event('action:view;index')


def buy_handler(args):
num_items = int(args[0])
if num_items <= 0: return 'invalid number({}) of diamonds to buy<br />'.format(args[0])
session['num_items'] += num_items
trigger_event(['func:consume_point;{}'.format(num_items), 'action:view;index'])


def consume_point_function(args):
point_to_consume = int(args[0])
if session['points'] < point_to_consume: raise RollBackException()
session['points'] -= point_to_consume


def show_flag_function(args):
flag = args[0]
# return flag # GOTCHA! We noticed that here is a backdoor planted by a hacker which will print the flag, so we disabled it.
return 'You naughty boy! ;) <br />'


def get_flag_handler(args):
if session['num_items'] >= 5:
trigger_event('func:show_flag;' + FLAG()) # show_flag_function has been disabled, no worries
trigger_event('action:view;index')


if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0')

开始分析

由于是flask所以我们从路由开始分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@app.route(url_prefix + '/')
def entry_point():
print(request.query_string)
querystring = urllib.unquote(request.query_string)
request.event_queue = []
if querystring == '' or (not querystring.startswith('action:')) or len(querystring) > 100:
querystring = 'action:index;False#False'
if 'num_items' not in session:
session['num_items'] = 0
session['points'] = 3
session['log'] = []
request.prev_session = dict(session)
trigger_event(querystring)
return execute_event_loop()

  1. 获取你的query_string然后进行url解码,并进行检查,如果非法就会强制赋值
  2. 如果session中没有num_items参数就给session设置一系列数据
  3. 记录一下当前session,以query_string为参数进入trigger_event
  4. 进入execute_event_loop()

跟进trigger_event

1
2
3
4
5
6
7
def trigger_event(event):
session['log'].append(event)
if len(session['log']) > 5: session['log'] = session['log'][-5:]
if type(event) == type([]):
request.event_queue += event
else:
request.event_queue.append(event)

1.将参数写入session[‘log’]中,如果日志比5大,则返回log的后五项
2.如果参数的参数是list,就直接加在当前list末尾,如果不是,就直接用append方法添加

跟进execute_event_loop

  1. 循环从中列表中取出事件,每次取出一个事件,且事件必须是action or func:开头,且不能包含非法字符
  2. 每个eventtype:name;arg1#arg2#arg3的形式
  3. 通过
    1
    2
    event_handler = eval(action + ('_handler' if is_action else '_function'))
    ret_val = event_handler(args)

动态拼接来执行函数

  1. 如果出现错误就回滚

查看一下动态拼接出来的函数

buy_handler

  1. 首先判断你要购买的数量是否是负数,负数直接报错
  2. 给你的钻石数量加上你要购买的数量
  3. 将消费扣款的函数加入队列进行

consume_point_function

  1. 参数传递你买了多少钻石
  2. 扣钱,如果钱不够,报错

get_flag_handler

  1. 如果你的钻石数量大于五个就把flag写入log中

解题

  • 看完上述函数,有一个很有趣的事情,buy_handler函数是用来购买钻石的,但是购买钻石是买完先给你,然后把扣钱加入队列,就是货到后付款,如果可以货到后直接调用getflag的函数,就可以得到flag然后在扣钱报错,但是此时我们已经将flag写入了session中

  • 实现的前提是,在队列中buy_handler后跟的是get_flag_handler,但是buy_handler会在调用时在队列中自动插入一个consume_point_function

tricks

1
valid_event_chars = set('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_0123456789:;#')

合法字符中有#,且使用了eval拼接,在python中#代表了注释,所以#后面的一切将消失,所以动态拼接调用的函数就不必是handler或function结尾了

  • 这时候我们思考,我们可以任意调用,我们就可以调用trigger_event,且他可以传入列表,而split在分割时返回的也刚好是列表,所以我们可以按照自己的想法选择handler调用的顺序

payload:
?action:trigger_event%23;action:buy;5%23action:get_flag;

这样调用顺序就变为了 buy =>get_flag => consume_point