The code below will scrape data from three APIs. The APIs are for property listings. So, for each listing, I will have one request. There will be around 20000 requests. That's why I use ayncio to make the request. However, I will be banned by their server if I request frequently. So, I use Tor and user-agent to get rid of that.
The data I scraped will be stored in MongoDB.
I learnt Python by myself. I am wondering if the code can be further improved.
import time
import math
import json
from bson import json_util #for import into mongodb
from datetime import datetime
import requests
from stem import Signal
from stem.control import Controller
from fake_useragent import UserAgent
import aiohttp
from aiohttp import ClientSession
import asyncio
from aiohttp_socks import SocksConnector, SocksVer
import pandas as pd
import pymongo
from pymongo import MongoClient
#https://boredhacking.com/tor-webscraping-proxy/
#https://www.unixmen.com/run-tor-service-arch-linux/
#https://www.sylvaindurand.org/use-tor-with-python/
#https://stem.torproject.org/faq.html (autheticate)
client = pymongo.MongoClient('localhost',37017)#
db = client.centaline
db_propertyInfo = db.propertyInfo
db_propertyDetails = db.propertyDetails
db_buildingInfo = db.buldingInfo
current_time = datetime.now()
propertyInfo = "https://hkapi.centanet.com/api/FindProperty/MapV2.json?postType={}&order=desc&page={}&pageSize={}&pixelHeight=2220&pixelWidth=1080&sort=score&wholeTerr=1&platform=android"
propertyDetails = "https://hkapi.centanet.com/api/FindProperty/DetailV2.json?id={}&platform=android"
buildingInfo = "https://hkapi.centanet.com/api/PropertyInfo/Detail.json?cblgcode={}&cestcode={}&platform=android"
headers = {"User_Agent":UserAgent().random,
"Host":"hkapi.centanet.com", "Content-Type":"application/json; charset=UTF-8"}
def switchIP():
with Controller.from_port(port = 9051) as controller:
controller.authenticate()
controller.signal(Signal.NEWNYM)
def numOfRecords(postType):
switchIP()
session = requests.session()
session.proxies = {}
session.proxies['http'] = 'socks5://localhost:9050'
session.proxies['https'] = 'socks5://localhost:9050'
data = session.post(propertyInfo.format(postType, 1,1), headers = headers) #page=1 pageSize=1
data = json.loads(data.text)
data = data["DItems"]
data = pd.DataFrame.from_records(data)
totalRecords = data["Count"].sum()
print("Total number of records for {}: {}".format(postType, totalRecords))
return totalRecords
async def getPropertyInfo(postType, page, pageSize, session):
while True:
try:
async with session.post(propertyInfo.format(postType, page, pageSize), headers = headers) as resp:
print("started inserting property info")
data = await resp.text()
data = json.loads(data)
for idx, item in enumerate(data['AItems']):
item["DateTime"] = current_time
item["Source"] = "centaline"
db_propertyInfo.insert_one(item)
switchIP()
break
except Exception as e:
print(str(e))
print("Retry Property Info")
async def bound_getPropertyInfo(semaphore, postType, page, pageSize, session):
async with semaphore:
await getPropertyInfo(postType, page, pageSize, session)
pageSize = 1
#noOfPages_s = math.ceil(numOfRecords('s')/pageSize)
#noOfPages_r = math.ceil(numOfRecords('r')/pageSize)
noOfPages_s = 1
noOfPages_r = 0
async def run_info():
tasks = []
socks = 'socks5://localhost:9050'
connector = SocksConnector.from_url(socks)
semaphore = asyncio.Semaphore(50)
async with ClientSession(connector = connector) as session:
for page in range(noOfPages_s):
task = asyncio.ensure_future(bound_getPropertyInfo(semaphore, 's', page+1, pageSize, session))
tasks.append(task)
for page in range(noOfPages_r):
task = asyncio.ensure_future(bound_getPropertyInfo(semaphore, 'r', page+1, pageSize, session))
tasks.append(task)
response = await asyncio.gather(*tasks)
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run_info())
loop.run_until_complete(future)
id_list = db_propertyInfo.distinct("ID", {"DateTime": current_time})
cblgcode_list = db_propertyInfo.distinct("CblgCode", {"DateTime": current_time})
cestcode_list = db_propertyInfo.distinct("Cestcode", {"DateTime": current_time})
async def fetch(url, session, socks, db):
while True:
try:
async with session.post(url, headers = headers) as resp:
data = await resp.text()
switchIP()
data = json.loads(data)
data["DateTime"] = current_time
data["Source"] = "centaline"
db.insert_one(data)
print("Inserting Data")
break
return data
except Exception as e:
print(str(e))
if data.find("Sequence contains no elements") != -1:
break
else:
print(url)
print("Retry One Property Details")
async def bound_fetch(semaphore, url, session, socks, db):
async with semaphore:
return await fetch(url, session, socks, db)
async def run():
tasks = []
socks = 'socks5://localhost:9050'
connector = SocksConnector.from_url(socks)
semaphore = asyncio.Semaphore(100)
async with ClientSession(connector = connector) as session:
for i in id_list:
url = propertyDetails.format(i)
task = asyncio.ensure_future(bound_fetch(semaphore, url, session, socks, db_propertyDetails))
tasks.append(task)
for j,k in zip(cblgcode_list, cestcode_list):
url = buildingInfo.format(j,k)
task = asyncio.ensure_future(bound_fetch(semaphore, url, session, socks, db_buildingInfo))
tasks.append(task)
response = await asyncio.gather(*tasks)
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run())
loop.run_until_complete(future)
torsockswhich would do the tor routing for you. The script would then be invoked viatorsocks python3 script.py, then the code for routing to tor could be removed. \$\endgroup\$