上一篇提到了调用的基础代码,写的很low啊,大家要是有啥优化指示可以直接评论啊谢谢。下面将进行爬虫的实战部分了。
重点声明:本文涉及到基站信息查询网站,这里提示只是学习和参考使用,勿进行商业的暴力用途,如有问责,请自行负责。
目录
一、待爬取网站调研
1、基本信息
2、我们想要实现的基本目的
3、关于基本目的实现的简单说明
二、代码实现部分
1、一些基础参数的设置
2、程序主体的设计
3、完整代码
4、爬取处理结果
一、待爬取网站调研
1、基本信息
上图中为基站定位查询的首页:http://www.minigps.org/cellsearch.html
直奔主题,输入参数mcc、mnc、lac、cid、VeriCode等四个参数,查询就可以通过Google地图了解相关基站的定位信息
2、我们想要实现的基本目的
- 自动访问网站
- 自动填充数据
- 自动获得验证码
- 自动发起post请求
- 自动解析返回的定位信息入数据库或写在文件上
- 循环以上的步骤
3、关于基本目的实现的简单说明
关于2步骤:如果只查询国内的基站定位信息,那么mcc、mnc值基本固定不变,如果想爬取其他国家的可以点击右侧mcc list等进行了解,这里只针对中国国内的。
关于3步骤:验证码只适用当前的一次查询,即时刷新,我们可以通过两种方式来处理。
一种为机器学习:我们写一个程序专门爬取验证码,并将验证码信息标记在验证的图片名称上,对图片进行灰度、二值处理等,进行训练。这个我做过简单的实验,效果不是很好,毕竟参数和测试数据不太好弄,只能暂时放弃。如果有大神做好了,还请不练赐教。
一种为OCR识图:我使用了python的一些OCR识图库,但是没有达到我预期的效果,也暂时放弃。最后通过调用百度的aipocr来进行识图,虽然这个概率也是很低,不过算是能够满足目前需求。综合对比,百度OCR表现还是最好的,其他厂商的没时间试验了。
关于4步骤:LAC和CID是可变参数,也是这几个参数里面的核心参数。据了解,这两个的参数范围大概是1~100000之间。目前找不到这两个参数之间的匹配设置规律,所以只能通过随机穷举的匹配方式来进行了。
关于post链接的获取方式:
一种为通过F12开发者工具进行观察获取
一种为网页源文件的解析:
首先我们在页面右击查看网页源代码,找到对应的form表单,或者参数设置位置,以及点击请求位置
首先通过上图,我们可以获得验证码的调用 action:./validatecodeservlet.do。紧接着对应一个js方法reloadImag()跟进发现还需配置随机数参数。
其次找到post请求的action
这里并没有提供js方法以及submit。但是我们可以通过id='sub'反向查询关于这个button的点击事件,我们可以通过进一步分析下图中的js点击监听事件来得到我们想要的信息。具体的js内容可以自行分析。
关于5步骤:通过观察返回的json数据来来解析出我们想要的数据。
二、代码实现部分
1、一些基础参数的设置
将需要的一些常量都收集起来统一管理是一个好习惯,由于我都写在一个.py中,所以我选择将其统一写在文件的开头位置。我已经将说明注释在了每个变量的上面部分。
2、程序主体的设计
分为三大部分:
- 头部参数配置
- 逻辑代码实现
- 主方法调用
方法说明:
只要通过main方法一步一步的跟方法,走一遍基本上就了解了,写的很low,很直白,分分钟就能搞明白呦。里面的注释写的还算详细吧,需要的大概都写了。
征求意见:
所有的主控逻辑都在一个类里面实现了,方法的位置,方法调用的流程,以及变量名称等还需要进一步优化。我个人认为这显然不是一个好的设计。不过也将就着能用吧。如果感兴趣可以提供建议,谢谢!
3、完整代码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2019/6/11 11:09
# @Author : Hanxiaoshun@天谕传说
# @Site : www.shunzi666.cn
# @File : SpiderStationInfo.py
# @Software: PyCharmimport json
import re
import time
import random
import requests
import os
from aip import AipOcr
# pip install baidu-aip
from io import BytesIO
from PIL import Image""" 你的 APPID AK SK """
APP_ID = 'xxxxxx'
API_KEY = 'xxxxxxxxxxxxx'
SECRET_KEY = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
client = AipOcr(APP_ID, API_KEY, SECRET_KEY)# requests 的简单设置
requests.adapters.DEFAULT_RETRIES = 5
s = requests.session()
s.keep_alive = False# 设置lac 与 cid 集合准备在内存中进行组合去重用
lac_cid_array = []# 验证码图片目录,如果不存在则创建之在当前目录下
CAPT_PATH = "./capt/"
if not os.path.exists(CAPT_PATH):os.mkdir(CAPT_PATH)# 带爬取网站的首页
base_url = "http://www.minigps.org/cellsearch.html"
# 准备使用请求头
base_header = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.67 Safari/537.36'
main_header = {'User-Agent': base_header}
# 获取验证码的请求链接
verify_URL = "http://www.minigps.net/validatecodeservlet.do"
# 请求数据的post链接
post_URL = "http://www.minigps.net/map/google/location"# 设置一些比较稳定的请求头信息,这个爬虫设置是非常重要的,
# 一些简单的反爬虫基本上会过滤请求头,如果是requests等爬虫工具的默认请求头,则很容易被禁
# 这样可以做到非常简单的伪装,以下是我简单搜集的请求头分享出来
main_user_agent = ['Mozilla/5.0 (Windows NT 6.3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36','Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36 QIHU 360SE','Mozilla/5.0 (Windows NT 10.0; …) Gecko/20100101 Firefox/61.0','Mozilla/5.0 (Windows NT 10.0; …) Gecko/20100101 Firefox/63.0','Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko','Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36','Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.33 Safari/537.36','Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0','Mozilla/5.0 (Windows NT 6.3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36','Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36 QIHU 360SE']# 设置一些比较稳定的IP代理
main_proxies = ['https://106.56.102.22:8070','http://61.135.217.7:80','http://118.190.95.35:9001','https://211.159.171.58:80','https://106.56.102.228:8070']# 已经成功解析过的就不在爬取,以下为将成功解析过的注入到内存以备比对
lacs = []
with open("lac_success.lac", "r", encoding="utf-8") as foo:for line in foo.readlines():lacs.append(line)class SpiderStationInfo(object):"""使用request爬虫,以及百度api识图,获取http://www.minigps.net网站的基站信息"""def __init__(self):"""初始化参数信息"""self.cookies = requests.cookies.RequestsCookieJar()self.img_path = ""self.portUrl = ""self.verifyCode = 0self.payloadData = {}def goGet(self):"""获取get请求内容并更新cookie信息:return:"""response = requests.request('GET', verify_URL, cookies=self.cookies, verify=False)cookie_array = response.cookiesheader_array = response.headers# print(dict(cookie_array))# print(dict(header_array))self.cookies.update(response.cookies) # 保存cookiedef goPost(self, url, method, post_data):"""获取post请求信息并更新cookie信息:param url::param method::param post_data::return:"""response = requests.request(method, url, data=post_data, headers=main_header, cookies=self.cookies, verify=False) # 传递cookieself.cookies.update(response.cookies) # 保存cookiedef get_verify_code(self):""" 在线获取并解析验证码 """time.sleep(3)print("正在解析验证码。。。。。")verify_code = client.basicGeneralUrl(verify_URL + '?x=' + str(random.random())) # 调用远程OCR并获得结果if verify_code == 0:"""如果没有验证码字符数量,则解析失败"""return 0else:if len(verify_code) == 4:"""如果得到的验证码字符数量不为4,则解析失败"""return verify_codeelse:return 0def getPayloadData(self, lac_cid):"""构建请求参数体信息我们默认验证码识别失败,进行第二次查询,这样可以实现验证码试错轮询:param lac_cid::return:"""# verifyCode = self.getVerifyCode()if self.verifyCode != 0:print(f"解析验证码成功。。。。。{self.verifyCode}")self.payloadData = {"cell_towers": [{"age": 0,"cell_id": str(lac_cid['cid']),"location_area_code": str(lac_cid['lac']),"mobile_country_code": "460","mobile_network_code": "0","signal_strength": -65}],"host": "maps.google.com","verifycode": str(self.verifyCode),"version": "1.1.0"}else:print(f"正在重新解析验证码。。。。。{self.verifyCode}")time.sleep(2)""" 下载图片 (重新下载图片进行解析)"""self.capt_download()""" 读取图片内容 """self.img_code_localutils()def goto_search_single(self, lac_cid):"""开启流程作业:param lac_cid::return:"""try:if self.verifyCode == 0:print("verifyCode fail。。。")# 验证码识别错误直接放弃此次请求return 0, "verifyCode fail。。。"else:self.payloadData = {"cell_towers": [{"age": 0,"cell_id": str(lac_cid['cid']),"location_area_code": str(lac_cid['lac']),"mobile_country_code": "460","mobile_network_code": "0","signal_strength": -65}],"host": "maps.google.com","verifycode": str(self.verifyCode),"version": "1.1.0"}# 代理信息暂时可以不用,如果需要的话将其替换成自己的,并在接下了参数中填充即可proxy = "183.12.50.118:8080"proxies = {"http": proxy,"https": proxy,}# r = requests.post(post_URL, data=json.dumps(payloadData), headers=payloadHeader)# 将json字典压缩成requests请求头所能识别的格式dumpJsonData = json.dumps(self.payloadData)print(f"dumpJsonData = {dumpJsonData}")# main_header = {'User-Agent': str(random.choice(main_user_agent))}# 设置请求头信息代用main_header = {'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.67 Safari/537.36",'Origin': "http://www.minigps.org",'Accept': 'application/json, text/javascript, */*; q=0.01','Host': 'www.minigps.net','Connection': 'keep-alive',"Content-Type": "application/json; charset=UTF-8","Accept-Encoding": "gzip, deflate"}kwargs = {'main_header': main_header,'main_cookie': None,'main_timeout': 25,}# current_session = requests.session()# response = requests.request("Post",# self.portUrl,# data=dumpJsonData,# headers=main_header,# cookies=self.cookies,# timeout=25,# allow_redirects=True)# 最终请求的完整构造response = requests.post(self.portUrl,data=dumpJsonData,headers=main_header,cookies=self.cookies,proxies=None,timeout=25,allow_redirects=True,verify=False)# proxies = proxies,# response = requests.post(post_URL, data=dumpJsonData, headers=payloadHeader, timeout=timeOut, proxies=proxies, allow_redirects=True)# 下面这种直接填充json参数的方式也OK# res = requests.post(post_URL, json=payloadData, headers=header)# print(f"responseTime::{datetime.datetime.now()},"# f"statusCode::{response.status_code},"# f"text::{response.text}")# json.dumps(response.json, sort_keys=True, indent=2) # 格式化缩进两格# jsonValue = json.dumps(response.json) # 格式化缩进两格# print(jsonValue)# 保存返回的请求的json数据,并过滤掉基本的错误返回结果if '基站信息不存在' not in response.text:if "verify code error." not in response.text:if response.status_code == 200:with open("lac_response.res", "a", encoding="UTF-8") as foo:foo.write(str(response.text) + "\n")# 返回成功状态及数据return 1, response.textelse:return 0, "err"else:return 2, "err"else:return 0, "err"except Exception as e:print(str(e))# 打印并返回错误信息及状态return 0, "err"# raise edef single_goto(self, lac_cid_dict):"""根据单条请求返回的参数和数据进行判断和解析写入结果文件:lac_success_random.lac:param lac_cid_dict::return:"""try:print(f"lac_cid:::{lac_cid_dict}")code, response_text = self.goto_search_single(lac_cid_dict)if code == 1:'''返回1,表示验证码识别正常,post请求参数正常,返回数据正常,等待进一步进行解析'''if response_text.__len__() > 0:json_object = json.loads(response_text) # 将字符串解析成json对象street = json_object['location']['address']['street']latitude = str(json_object['location']['latitude'])longitude = str(json_object['location']['longitude'])if str(street).__len__() > 0:with open("lac_success_random.lac", "a", encoding="UTF-8") as foo:foo.write(str(lac_cid_dict['lac']) +"\t" + str(lac_cid_dict['cid']) + "\t" + street + "\t" + latitude + "\t" + longitude + "\t"+ "\t" + str(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))) + "\n")return 1else:with open("LAC_00_fail_ramdom.lac", "a", encoding="UTF-8") as foo:foo.write(str(lac_cid_dict) + "\n")return 0else:with open("lac_fail_random.lac", "a", encoding="UTF-8") as foo:foo.write(str(lac_cid_dict) + "\t" + str(lac_cid_dict['cid']) + "\n")return 0except Exception as e:# raise eprint(str(e))def capt_fetch(self, ):"""从网站获取验证码,将验证码转为Image对象:require requests: import requests:require time: import time:require BytesIO: from io import BytesIO:require Image: from PIL import Image:param::return capt: 一个Image对象"""# 从网站获取验证码# main_header = {'User-Agent': str(random.choice(main_user_agent))}# kwargs = {'main_header': main_header,# 'main_cookie': None,# 'main_timeout': 10,# }# current_session = requests.session()# response = current_session.get(verify_URL,# headers=kwargs['main_header'],# cookies=kwargs['main_cookie'],# timeout=kwargs['main_timeout'])# headers = main_header,response = requests.request('GET', url=verify_URL, cookies=self.cookies)self.cookies.update(response.cookies) # 保存更新cookie# capt_raw = requests.get(verify_URL)# 将二进制的验证码图片写入IO流f = BytesIO(response.content)# 将验证码转换为Image对象capt = Image.open(f)return captdef capt_download(self):"""将Image类型的验证码对象保存到本地:require Image: from PIL import Image:require os: import os:require capt_fetch(): 从nbsc网站获取验证码:require CAPT_PATH: 验证码保存路径n6m:param::return:"""try:# 开启图片下载程序capt = self.capt_fetch()# capt.show()# text = input("请输入验证码中的字符:")# 时间戳形式命名图片的一部分suffix = str(int(time.time() * 1e3))# 满足请求验证码的随机数请求参数信息time.sleep(random.random() * 5)# img_path = CAPT_PATH + text + "_" + suffix + ".jpg"img_path = CAPT_PATH + "_" + suffix + ".jpg"# 保存图片capt.save(img_path)# 返回验证码路径self.img_path = img_pathexcept Exception as e:print(str(e))# 如果下载和保存出现问题将返回错误代码self.img_path = 0def get_file_content(self, filePath):""" 注入图片到内存 """with open(filePath, 'rb') as fp:return fp.read()def img_code_localutils(self):""" 读取图片内容 """try:if self.img_path != 0:image = self.get_file_content(self.img_path)result = client.basicGeneral(image)if "words_result" in result:words = result['words_result']if words.__len__() > 0:word = words[0]value = word['words'].strip().replace(' ', '')value = value.lower()if value.__len__() == 4:"""如果验证码的长度不是4,排除掉"""pattern = re.compile(r'[0-9a-zA-Z]', re.I)none_flag = Falsefor x in str(value):"""如果验证码里面有除了数字和英文之外的另外文字,排除掉"""if pattern.match(x) is None:none_flag = Trueif none_flag:print(f"verifyCode——another code::{value}")self.verifyCode = 0else:""" 请求成功,并且返回4个字符"""print(f"verifyCode——ok::{value}")img_path_new = self.img_path.replace("capt/_", "capt/" + value + "_")os.rename(self.img_path, img_path_new)# img_path_new = CAPT_PATH + value + "_" + suffix + ".jpg"self.verifyCode = valueelse:print(f"verifyCode——fail::{value}")self.verifyCode = 0else:print(f"verifyCode—words—fail::{words}")self.verifyCode = 0else:self.verifyCode = 0else:self.verifyCode = 0except Exception as ocr_e:print(f"Exception as :{str(ocr_e)}")def start(self, lacRamdom=9779, cidRamdom=3721):# """ 下载图片 """self.capt_download()# """ 读取图片内容 """self.img_code_localutils()# 设置post请求链接self.portUrl = post_URL# 完成验证码识别的工作,开启单条测试的程序return self.single_goto({"lac": lacRamdom, "cid": cidRamdom})def process(self, i, lacRamdom, cidRamdom):"""解析返回数据结果的参数组合并录入参数组合状态文件中:param i: 测试记录数量:param lacRamdom::param cidRamdom::return:"""lac_cid_dict = {"lac": lacRamdom, "cid": cidRamdom}if ru.start(lacRamdom=lacRamdom, cidRamdom=cidRamdom) == 1:lac_cid_array.append(lac_cid_dict)print(f"查询第 {i} 条随机数据成功。。。,{lacRamdom},{cidRamdom}")with open("lac_suc_params.lac", "a", encoding="UTF-8") as foo:foo.write(str(lacRamdom)+ "\t" + str(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))) + "\n")else:print(f"查询第 {i} 条随机数据失败---,{lacRamdom},{cidRamdom}")with open("lac_fail_params.lac", "a", encoding="UTF-8") as fox:fox.write(str(lacRamdom) + "\t"+ str(cidRamdom) + "\t"+ str(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))) + "\n")def entrance(self):"""设置两个基本参数:lac 1~32000进行每股两秒钟循环递增穷举cid 取10万以内的随机数开启验证码识别以及构建请求头参数和返回结果处理的方法self.process(j, lac_random, cid_random)每次请求之间间隔2秒,来减少被识别的概率记录每每个组合的状态这个状态可能将受到验证码识别的概率干扰,效率下降很大:return:"""try:fail_num = 0success_num = 0with open("final_result.txt", "a", encoding="utf-8") as fr:fr.write("10000条 start " + "\t" + str(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))) + "\n")for j in range(1, 32000):time.sleep(2)# lac_random = int(random.random() * 31000)lac_random = jcid_random = int(random.random() * 100000)lac_cid_dict = {"lac": lac_random, "cid": cid_random}if str(lac_random) in lacs:print(f"第 {j} 条已经被解析过,LAC::{lac_random}")else:if lac_cid_dict not in lac_cid_array:self.process(j, lac_random, cid_random)with open("final_result.txt", "a", encoding="utf-8") as fr:fr.write("成功::" + str(success_num)+ "\t" + "失败::" + str(fail_num)+ "\t" + str(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) + "\n")# print(lac_cid_array)except Exception as e:# raise eprint(str(e))if __name__ == '__main__':""" 实例化工具类循环调用 10个轮回,如果全部调用成功,将尝试请求一共10*32000次每32000次之间等待60 * 30秒"""ru = SpiderStationInfo()try:for n in range(10):ru.entrance()#time.sleep(60 * 30)except Exception as e:# raise eprint(str(e))
4、爬取处理结果