搭建自己的代理IP池
发布时间丨2021-12-27 09:49:15作者丨zhaomeng浏览丨28
爬虫最长遇到的就是被封禁本机或者服务器的IP地址,对于爬虫攻城狮而言,这无疑是一个致命问题,采集的站点无法访问了,对于“可见即可采集的原则”。这个站点数据就拿不到了。但是工作还要继续,领导催的急呢,客户也有数据需求。这就促使开发人员开发出可以绕过目标网站的反爬虫检测,从而达到解决自己IP被封的尴尬问题!上一篇博客记录了是另一种原始的通过购买VPS(也有称为秒播的)不断拨号更换本机的IP地址,从而让目标主机的反爬虫检测不到我们的本机的IP访问的频率,也就无法锁定本机的IP,达到绕过反爬虫的检测目的。但是频繁的更换代IP,就需要不断的拨号,就会倒是网卡卡死,存在拨号失败的问题。本篇的文章,主要是通过搭建自己的代理IP池的方式来自己控制IP的切换。本片文章主要是记录自己搭建代理ip的过程以及使用docker部署自己搭建的代理IP的全过程!
第一步:需要整理清楚的就是代理IP池的使用逻辑!大致如下图:
IP检测的存在两种情况,第一中是检测代理IP的过期时间,每次获取到代理的IP就将当前过期时间与当前时间的值加上4秒(在过去时间还有四秒中的时候就不在使用了)进行相减,不为0就删除当前IP,并且重新获取新的IP存入代理IP池。第二种情况是:不对代理IP的过去时间进行检测,直接将ip:port存入代理IP池。判断IP是否有效的过程,放到了使用代理IP的代码中,当代理IP失效获取网站被封了,就需要重试获取直接删除当前不可用的代理IP,再获取新的有效的代理IP。
第二步:编写代理IP的服务器(获取代理IP---检测代理IP---存入代理IP池),并使用docker部署
代理IP获取有两种途径,第一种就是获取常见的获取免费的代理IP并将免费的代理IP经常检测有效性,存入代理池。这样的IP质量往往不是很高的,可以说是质量不足以应用到正式的项目中的。我使用的是付费购买的代理IP,这样的IP质量是很高的,可用性比较好。能够满足项目的正常运行。就是需要破点财,不过对于领导来说,这点钱那都不是事儿。本片博文先记录根据过期时间进行代理IP池的搭建,另外一种情况与之基本一直,只需要调整少量的代码即可,后面有时间也会再写一篇博文记录一下。
- 编写代理IP池的存储代码
class Test(object): def __init__(self): self.client = redis.Redis(host='localhost', port=6379, password='111') def proxy_len(self): """ 获取列表长度 :return: """ return self.client.llen('proxy:start_urls') def push_proxy(self, proxy): """ 将获取到的有效ip及过期时间写入redis :param proxy: :return: """ result = self.client.lpush('proxy:start_urls', json.dumps(proxy)) # print(f"导入数据:{proxy}") logging.info(f"导入数据:{proxy}") def delete_proxy(self, proxy): """ 删除无效ip :param proxy: :return: """ # print(f"删除无效ip:{proxy},重新获取中......") logging.info(f"删除无效ip:{proxy},重新获取中......") self.client.lrem(name='proxy:start_urls', count=0, value=json.dumps(proxy)) def get_random_proxy(self): """" 重redis中随机获取一个代理ip """ proxy_list = self.client.lrange('proxy:start_urls', 0, -1) return random.choice(proxy_list) def get_all_proxies(self): """ 获取所有代理ip 类型:list """ proxy_list = self.client.lrange('proxy:start_urls', 0, -1) return proxy_list
- 编写代理IP的获取代码(获取--检测(可以省掉)--存储)
class Proxy(object): def __init__(self): self.url = '付费代理IP地址' def get_random_ip(self): ip = Test().get_random_proxy() ip = json.loads(ip) # print(ip.get("expire_time")) logging.info(f"ip地址失效时间:{ip.get('expire_time')}") f = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) now = datetime.strptime(f, "%Y-%m-%d %H:%M:%S") + timedelta(seconds=4) # 当前时间加4秒 dt1 = datetime.strptime(ip.get("expire_time"), '%Y-%m-%d %H:%M:%S') # 有效期 diff = dt1 - now # print(diff.days) # logging.info(diff.days) if diff.days == 0: return ip else: self.delete_proxy(ip) time.sleep(1) self.get_random_ip() def proxies(self): """ 获取所有可用代理ip """ proxy_list = Test().get_all_proxies() return proxy_list def add_proxy(self, proxy): """ 将新获取的 ip 写入 redis :param proxy: :return: """ Test().push_proxy(proxy) # print(f"将新代理 {proxy} 写入 redis") logging.info(f"将新代理 {proxy} 写入 redis") def delete_proxy(self, proxy): """ 删除 redis 中不可用的 ip :param proxy: :return: """ Test().delete_proxy(proxy) # print(f"删除不可用的代理 {proxy}") logging.info(f"删除不可用的代理 {proxy}") def get_proxy(self): # 获取代理IP data = requests.get(url=self.url).json() if data.get('success') and data.get("data", ''): ip = f'{data.get("data")[0].get("ip")}:{data.get("data")[0].get("port")}' expire_time = data.get("data")[0].get('expire_time') result = {"ip": ip, "expire_time": expire_time} return result else: return {"msg": "获取代理IP失败......", "err": 1} def check_ip(self, proxy_ip): # 检测代理IP的可用性 # print(f"测试ip:{proxy_ip}") logging.info(f"测试ip:{proxy_ip}") test_url = 'http://www.baidu.com/' try: response = requests.get(test_url, proxies={"http": proxy_ip}, timeout=10, verify=False) if response.status_code == 200: # print(f"PROXY {proxy_ip} validing") logging.info(f"PROXY {proxy_ip} validing") return True else: return False except: # print(f"PROXY {proxy_ip} invalid") logging.info(f"PROXY {proxy_ip} invalid") return False
- 代理IP服务器的主程序
def main(): proxy = Proxy() # print(f"当前有效ip:{proxy.proxies()}") logging.info(f"当前有效ip:{proxy.proxies()}") while True: if Test().proxy_len() < 4: try: ip = proxy.get_proxy() # 获取代理IP except Exception as e: print(e) # print("[ip]: 获取新 ip 失败,等待 2 秒重新获取 ...") logging.info("[ip]: 获取新 ip 失败,等待 2 秒重新获取 ...") time.sleep(2) main() else: if ip.get("ip", ''): if proxy.check_ip(ip.get("ip")): # 检测代理IP的可用性 proxy.add_proxy(ip) # 存入代理池 else: # print(ip.get("msg")) logging.info(ip.get("msg"))
到这里整个代理IP的服务器就搭建好了,后面就是使用fastapi或者django编写一个接口:
第三步:Django编写代理IP的调用接口
Django 的项目创建就不在此处赘述了,后面有时间会单独写一篇详细的django项目
接口逻辑代理views.py如下:
from utils.proxy import Proxy
import json
import time
from django.http.response import JsonResponse, HttpResponse
def proxy(request):
# 获取随机的代理IP
if request.method == 'GET':
data = Proxy().get_random_ip()
return JsonResponse(data, safe=False)
def all_proxy(request):
# 获取所有代理IP
if request.method == 'GET':
proxy_list = Proxy.proxies()
return JsonResponse({"data": proxy_list}, json_dumps_params={'ensure_ascii': False})
接口路由地址如下:
url('random/proxy', views.proxy), # 获取代理
url('proxies', views.all_proxy) # 获取所有代理
到这里整个搭建个人代理IP池的项目就基本完成。最后就是将代理IP服务器单独使用docker部署,django的部署可以查看我的之前的博文!
最终的使用结果如下图:
文章属于原创,如有不正,欢迎指正!转载请注明出处!