• 目录:

    python 实现 http 代理


    前言

    这篇文章将会介绍一个 http 代理的实现过程…

    http 协议的文档

    关于网络协议的文档, 可以在这里查 https://www.ietf.org/ . 搜索文档编号就行. 目前关于http的文档有这几篇:

    • https://tools.ietf.org/html/rfc7230
    • https://tools.ietf.org/html/rfc7231
    • https://tools.ietf.org/html/rfc7232
    • https://tools.ietf.org/html/rfc7233
    • https://tools.ietf.org/html/rfc7234
    • https://tools.ietf.org/html/rfc7235

    项目 git 地址

    这个项目目前托管在 github 上: https://github.com/playay/http_proxy

    最原始的实现原理

    实现一个 http 代理的功能, 最容易想到的程序的运行过程, 可能是这样的:

    1. 接收 http 请求
    2. 解析请求头, 得到 host 和 uri
    3. 转发请求, 接收响应
    4. 把响应传回给请求方

    实现基本的功能

    按照前言中提到的最原始的实现原理, 我们来一步步实现.
    这里不考虑性能, 对于性能的改善, 会在改善性能这一章中介绍.

    创建工程

    一般来说, 一个目录就是一个工程, 这个目录下通常会有以下几个文件夹:

    • bin/ 可执行文件
    • lib/ 所有的第三方库
    • src/ 程序的源码
    • doc/ 项目文档

    我们写的是 python 代码, 就不需要 src 文件夹了, 可执行文件就是源码, 所以建个 http_proxy/bin/ 文件夹, 新建个 http_proxy.py 文件,这个工程就算创建好了.

    接收、解析、处理请求

    我们用 socket 接收 http 请求. 首先, 写出一个 socket 程序的模板:
    绑定、监听、启动新线程处理请求

    import logging
    logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s [line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%M:%S',
                    )
    import sys
    import socket
    import thread
    
    def proxyer(soc):
        request = soc.recv(4096)
        logging.info(request)
        
    def start():
        address = ('',int(sys.argv[1]))
        s = socket.socket()
        s.bind(address)
        s.listen(1024)
        while True:
            soc, add = s.accept()
            thread.start_new_thread(proxyer, (soc,))
           
    if __name__ == '__main__':
       start()
    

    如上所示, 我们做到了: 来一个请求, 就启动一个线程. 在新的线程里, 用 proxyer() 接收请求并打印了出来.


    这里有一个很重要的问题: 如何完整地接收请求.
    数据在用 socket 传输的过程中, 可能分多次传送, 具体原理可以参考 ip 或 tcp 报文的分段传输. 因为单个数据包的大小有限制, 如果数据大小超过这个限制就要分多次发送, 调用一次 soc.recv(4096), 就不能保证完整接收请求了, 另外如果单个数据包大小超过4096, soc.recv(4096) 也收不到完整数据. 所以, 我们需要循环调用 soc.recv(4096) .

    什么时候结束循环呢? 用 socket 接收 http 请求的数据, 最开始收到的肯定是请求头, 请求头以 \r\n\r\n 结束.
    如果是 GET、HEAD、CONNECT 方法的请求,接收完请求头就表示已经完整接收请求.
    所以对于 GET、HEAD、CONNECT 方法可以这样做:

    def proxyer(soc):
        request = ''
        got_header = False
        headers = {}
        while True:
            buf = soc.recv(4096)
            request = request + buf
            if not got_header and '\r\n\r\n' in request:
                got_header = True
                break
            if not buf:
                break
        if not '\r\n\r\n' in request:
            logging.warning('request err, close this task')
            soc.close()
            return
    

    但如果是 POST 方法, 在请求头接收完之后可能还有数据. http 协议在中, 有两种方式判断 POST 请求是否接收完整:

    1. 如果请求头中包含 Content-Length ,就用它来判断
    2. 如果请求头中包含 Transfer-Encoding , 就用它来判断

    如果两种都不包含, 默认用第二种方式, 如果两种都包含, 用第二种方式.
    Transfer-Encoding 在完整协议里有很多个可选值, 这里只当它是 chunked (实际上网时, 我只见过这一种). 它表示: 最后会发一个空的 socket 包来标记数据发送完毕.
    实际中我也只见过第一种方式的 POST 请求. 不管怎样, 解析请求头是必须的, 解析的代码如下:

    def parse_request_header(header):
        '''
        解析http请求头,
        成功,返回(host, port, method, uri, headers)
        失败,返回(None,None,None,None,None)
        '''
        lines = header.strip().split('\r\n')
        try:
            '''解析method和uri'''
            line0 = lines[0].split(' ')
            method = line0[0].upper()
            uri = line0[1]
        
            '''解析其他header'''
            headers = {}
            for i in range(1,len(lines)):
                line= lines[i].split(':')
                key = line.pop(0)
                value = ''.join(line)
                headers[key] = value.strip()
    
            '''处理目标主机和端口'''
            if method in ['CONNECT']:
                target_host_and_port = uri.split(':')
            else:
                target_host_and_port = headers['Host'].split(':')
            if len(target_host_and_port)==1:
                target_host = target_host_and_port[0]
                if method in ['CONNECT']: target_port = 443
                else: target_port = 80
            else:
                target_host = target_host_and_port[0]
                target_port = int(target_host_and_port[1].strip())
        except Exception, e: 
            logging.warning(str(type(e))+' '+str(e)+' err')
            return None,None,None,None,None
        return target_host, target_port, method, uri, headers
    

    有了解析请求头的方法, 就能在接收请求的时候, 加上对 POST 方法的支持:

    def proxyer(soc):
        '''接收http请求'''
        request = ''
        got_header = False
        headers = {}
        while True:
            buf = soc.recv(4096)
            request = request + buf
            if not got_header and '\r\n\r\n' in request:
                got_header = True
                request_header = request.split('\r\n\r\n')[0] + '\r\n\r\n'
                header_length = len(request_header)
                host, port, method, uri, headers = parse_request_header(request_header)
                if not host or not port or not method in ['HEAD','GET','POST','CONNECT']:
                    logging.warning('parser request err or method not support ,close this task')
                    soc.close()
                    return
                if method in ['GET','HEAD','CONNECT']:
                    break
            if got_header and method in ['POST']:
                if 'Content-Length' in headers:
                    if int(headers['Content-Length']) <= len(request)-header_length:
                        break
                else:
                    logging.warning('no Content-Length in POST request,close this task')
                    soc.close()
                    return
            if not buf:
                break
        if not '\r\n\r\n' in request:
            logging.warning('request err, close this task')
            soc.close()
            return
    

    对于 http 请求, 还有最后一步, 按照协议规定: 浏览器发给代理的请求头, 与正常的请求头是不一样的, 所以我们还要对请求头做一些修改, 才能转给目的主机.
    修改请求头的代码如下:

    def proxyer(soc):
        '''接收http请求'''
        ...
        
        '''按协议要求,修改请求头'''
        if method in ['GET','POST','HEAD']:
            request_header = re.sub('Proxy-Connection: .+\r\n','',request_header)
            request_header = re.sub('Connection: .+','',request_header)
            request_header = re.sub('\r\n\r\n','\r\nConnection: close\r\n\r\n',request_header)
            request_header = re.sub(uri,uri[uri.index('/',8):],request_header)
            request = request_header+request[header_length:]
    

    修改完了请求头, 就该用它去获取响应了, 但是 CONNECT 方法的请求比较特殊, 如果是 CONNECT 方法, 接下来要做的不是获取响应, 而是建立一条到目标主机的隧道. 所以我们准备好两个方法 do_proxy()do_tunnel() . 下一节中我们会分别实现这两个方法.

    def proxyer(soc):
        '''接收http请求'''
        ...
        
        '''按协议要求,修改请求头'''
        ...
        
        '''获取目标主机的http响应, 并转发响应包'''
        if method in ['CONNECT']:
            do_tunnel(host, port, soc)
        else:
            do_proxy(host, port, method, uri, headers, request, soc)
    

    得到响应并回传给请求方

    上一节中, 我们接收完了 http 请求, 并解析、处理了它. 算是完成了最原始的实现原理中的1、2两步. 最后留下了两个方法 do_proxy()do_tunnel() . 实现这两个方法, 就算是完成了剩下的3、4两步.

    先说下相对简短的 do_tunnel() . 要实现的是: 建立一条请求方到目标主机的隧道. 其实只要新建一个 socket 连到目标主机上, 然后把请求方的 socket 拿过来、对接上就OK了. 对接, 就是把一个 socket 收到的数据, 发给另一个 socket . 看代码:

    def dock_socket(recv, send, recv_from_response=False):
        try:
            while True:
                buf = recv.recv(4096)
                send.send(buf)
                if not buf:
                    break
        except Exception, e:
            recv.close()
            send.close()
            return
        if recv_from_response:
            recv.close()
            send.close()
        return
    

    因为我们只做短连接, 所以如果数据方向是响应方发给请求方的, 就可以 close 掉 socket 了. 这就是 recv_from_response 的含义
    基于 dock_socket() 方法, do_tunnel() 的实现如下:

    TUNNEL_OK = '''HTTP/1.1 200 Connection Established\r\nProxy-Connection: close\r\n\r\n'''
    
    def do_tunnel(host, port, soc):
        cos = socket.socket()
        try:
            cos.connect((host,port))
        except Exception, e:
            logging.warning('connect err'+host+':'+str(port))
            #soc.send(TUNNEL_FAIL)
            soc.close()
            return
        soc.send(TUNNEL_OK)
        thread.start_new_thread(dock_socket, (soc, cos, False))
        thread.start_new_thread(dock_socket, (cos, soc, True))
    

    隧道建立成功, 按照 http 协议要求, 要给请求方一个响应, 即第12行(倒数第三行)的 soc.send(TUNNEL_OK) .
    至此, 对 CONNECT 方法的 http 请求, 已经可以完整处理. 通过这个程序代理, 已经可以正常访问 https://www.baidu.com .


    接下来我们说 do_proxy() 方法, 要实现的功能就是:

    • 连接目标主机
    • 发送请求
    • 接收响应
    • 转发请求

    除了接收响应需要像接收请求时一样, 要注意如何完整接收报文之外. 并没有什么麻烦的地方.

    既然是像接收请求一样就收响应, 还是要先解析响应报文头:

    def parse_response_header(response_header):
        '''解析http响应报文头'''
        Transfer_Encoding = False
        Content_Length = 0
        status_code = 0
        lines = response_header.strip().split('\r\n')
        status_code = int(lines[0].split(' ')[1])
        
        headers = {}
        for i in range(1,len(lines)):
            line= lines[i].split(':')
            key = line.pop(0)
            value = ''.join(line)
            headers[key] = value.strip()
        
        return status_code, headers
    

    然后完成 do_proxy() 方法:

    def do_proxy(host, port, method, uri, request_headers, request, soc):
        '''获取目标主机的http应答, 并转发应答包'''      
        c = socket.socket()
        try:
            c.connect((host, port))
        except Exception, e:
            logging.warning(str(type(e))+' '+str(e)+' err')
            c.close()
            soc.send(str(type(e))+' '+str(e)+' err')
            soc.close()
            return
        try:   
            c.send(request)
            response = ''
            got_header = False
            headers = {}
            while True:
                buf = c.recv(4096)
                response = response + buf
                soc.send(buf)
                if not got_header and '\r\n\r\n' in response:
                    got_header = True
                    response_header = response.split('\r\n\r\n')[0] + '\r\n\r\n'
                    #logging.debug(response)
                    header_length = len(response_header)
                    status_code, headers = parser_response_header(response_header)
    
                if got_header:
                    '''没有内容,直接返回报文头就行'''
                    if method in ['HEAD']:
                        break
                    if method in ['GET', 'POST']:
                        if status_code in [204,301,302,303,304,307]:
                            break
                        '''正常的判断是否接收完响应的方式'''
                        if 'Transfer-Encoding' in headers:
                            if not buf:
                                logging.debug('not buf in tranfer-encoding')
                                break 
                        if 'Content-Length' in headers:
                            if int(headers['Content-Length']) <= len(response)-header_length:
                                break
                        if not 'Content-Length' in headers and not 'Transfer-Encoding' in headers and not buf:
                            logging.debug('not buf')
                            break 
                if not buf:
                    logging.error('response not buf')
                    break
        except Exception, e:
            logging.warning(str(type(e))+' '+str(e)+' err')
            c.close()
            soc.close()
            return
        c.close()
        soc.close()
    

    其实, 去掉各种 try, 还有对响应完整性的判断, 上面这段代码也就剩下:

    c = socket.socket()
    c.connect((host, port))
    buf = c.recv(4096)
    soc.send(buf)
    
    

    改善性能

    协程

    现在的程序还是多线程的模型,虽然功能已经实现。但是至少我自己在用的时候,明显感觉网速很慢。尤其是像腾讯、新浪这种门户网站的首页。一个页面的请求太他妈多了!!一下子就开了巨多的线程。线程相互之间切换的代价也是很大的,每个线程时间没做多少事,多数时间都在等待 IO。带宽没利用上多少,cpu 就快耗尽了,还都是耗在切换线程上,然后网速还很慢。

    有一种叫协程的东西。内部好象是用 select poll epoll 那三个实现的。具体实现还没花时间研究过(什么是epoll?)。它的效果大概是这样的:在一个线程里,有多个 socket,当一个 socket 在等待 IO 的时候,切换到其他的程序语句去执行。这个时候的切换,就相当于是函数调用的时候的切换,代价非常非常的小。用一个线程就能处理巨多的 socket ,充分利用 cpu 。

    实际用起来的效果,在代理运行在本地的时候,浏览器设置了代理,几乎感觉不到代理的存在。内存、cpu 等资源都占用得很少。部署到服务器上,带宽都耗尽了 cpu 资源都还剩余很多。

    我用的 gevent 这个第三方模块来实现协程,这个模块好像一直还没支持 python3 。反正我是一直没找到它的 python3 版本。

    时隔好久才来续写这篇文章,有点不想介绍 gevent 的使用了。这些可以直接看官方的文档。下面直接贴上代码,用 gevent 改写了两个地方。第一个是程序入口处:

    import gevent
    from gevent import socket
    from gevent.server import StreamServer
    
    if __name__ == '__main__':
        server = StreamServer(('', int(sys.argv[1])), proxyer)   
        server.start() 
        server.serve_forever() 
    

    还有一个是 do_tunnel() 方法:

    def do_tunnel(host, port, soc):
        c = socket.socket()
        try:
            c.connect((host,port))
        except Exception, e:
            logging.warning('connect err'+host+':'+str(port))
            #soc.send(TUNNEL_FAIL)
            soc.close()
            return
        soc.send(TUNNEL_OK)
        gevent.joinall([
            gevent.spawn(dock_socket, soc, c, False),
            gevent.spawn(dock_socket, c, soc, True),
        ])
    

    在不考虑底层实现的情况下,可以把协程当成线程来用,他们提供的使用方式都是相近的。

    多进程

    在一台多核 CPU 的电脑上, 与 CPU 核心数相同的进程数多数情况下能带来较大的性能.
    我们稍微修改一下程序的入口. 在为这个程序配置了监听的端口号之后, fork 出与 CPU 核心数相同个数的进程, 然后开始监听.

    from multiprocessing import cpu_count
    
    if __name__ == '__main__':
        server = StreamServer(('', int(sys.argv[1])), proxyer)   
        if cpu_count()>1: server.max_accept  = 1
        server.start() 
        pid_list = []
        for i in range(cpu_count()):
            pid = os.fork()
            if pid == 0: 
                del pid_list
                server.serve_forever() 
            else:
                pid_list.append(str(pid))
        with open(sys.path[0]+'/pid','w') as f:
            f.write('|'.join(pid_list))
        print('start processes: '+'|'.join(pid_list)+' ok')
    

    这样很容易想到一个问题, 当一个请求来的时候, 这几个进程都会接收这个请求并处理? 其实这叫做惊群现象, 网上有资料说: 在系统的内核层面会处理这个问题, 内核会负责分配这些请求到各个进程. 程序实际运行过程中, 来的每个请求确实也只被分配到一个进程中处理.