3
\$\begingroup\$

The code below will scrape data from three APIs. The APIs are for property listings. So, for each listing, I will have one request. There will be around 20000 requests. That's why I use ayncio to make the request. However, I will be banned by their server if I request frequently. So, I use Tor and user-agent to get rid of that.

The data I scraped will be stored in MongoDB.

I learnt Python by myself. I am wondering if the code can be further improved.

import time
import math
import json
from bson import json_util #for import into mongodb
from datetime import datetime

import requests
from stem import Signal
from stem.control import Controller
from fake_useragent import UserAgent
import aiohttp
from aiohttp import ClientSession
import asyncio
from aiohttp_socks import SocksConnector, SocksVer

import pandas as pd

import pymongo
from pymongo import MongoClient

#https://boredhacking.com/tor-webscraping-proxy/
#https://www.unixmen.com/run-tor-service-arch-linux/
#https://www.sylvaindurand.org/use-tor-with-python/
#https://stem.torproject.org/faq.html (autheticate)

client = pymongo.MongoClient('localhost',37017)#
db = client.centaline
db_propertyInfo = db.propertyInfo
db_propertyDetails = db.propertyDetails
db_buildingInfo = db.buldingInfo

current_time = datetime.now()

propertyInfo = "https://hkapi.centanet.com/api/FindProperty/MapV2.json?postType={}&order=desc&page={}&pageSize={}&pixelHeight=2220&pixelWidth=1080&sort=score&wholeTerr=1&platform=android"

propertyDetails = "https://hkapi.centanet.com/api/FindProperty/DetailV2.json?id={}&platform=android"

buildingInfo = "https://hkapi.centanet.com/api/PropertyInfo/Detail.json?cblgcode={}&cestcode={}&platform=android"

headers = {"User_Agent":UserAgent().random, 
           "Host":"hkapi.centanet.com", "Content-Type":"application/json; charset=UTF-8"}

def switchIP():
    with Controller.from_port(port = 9051) as controller:
        controller.authenticate()
        controller.signal(Signal.NEWNYM)

def numOfRecords(postType):
    switchIP()
    session = requests.session()
    session.proxies = {}
    session.proxies['http'] = 'socks5://localhost:9050'
    session.proxies['https'] = 'socks5://localhost:9050'

    data = session.post(propertyInfo.format(postType, 1,1), headers = headers) #page=1 pageSize=1
    data = json.loads(data.text)
    data = data["DItems"]
    data = pd.DataFrame.from_records(data)
    totalRecords = data["Count"].sum()
    print("Total number of records for {}: {}".format(postType, totalRecords))

    return totalRecords

async def getPropertyInfo(postType, page, pageSize, session):
    while True:
        try:
            async with session.post(propertyInfo.format(postType, page, pageSize), headers = headers) as resp:
                print("started inserting property info")
                data = await resp.text()
                data = json.loads(data)
                for idx, item in enumerate(data['AItems']):
                    item["DateTime"] = current_time
                    item["Source"] = "centaline"
                    db_propertyInfo.insert_one(item)
                switchIP()
                break
        except Exception as e:
            print(str(e))
            print("Retry Property Info")

async def bound_getPropertyInfo(semaphore, postType, page, pageSize, session):
    async with semaphore:
        await getPropertyInfo(postType, page, pageSize, session)

pageSize = 1
#noOfPages_s = math.ceil(numOfRecords('s')/pageSize)
#noOfPages_r = math.ceil(numOfRecords('r')/pageSize)
noOfPages_s = 1
noOfPages_r = 0

async def run_info():
    tasks = []
    socks = 'socks5://localhost:9050'
    connector = SocksConnector.from_url(socks)
    semaphore = asyncio.Semaphore(50)
    async with ClientSession(connector = connector) as session:
        for page in range(noOfPages_s):
            task = asyncio.ensure_future(bound_getPropertyInfo(semaphore, 's', page+1, pageSize, session))
            tasks.append(task)

        for page in range(noOfPages_r):
            task = asyncio.ensure_future(bound_getPropertyInfo(semaphore, 'r', page+1, pageSize, session))
            tasks.append(task)

        response = await asyncio.gather(*tasks)

loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run_info())
loop.run_until_complete(future)

id_list = db_propertyInfo.distinct("ID", {"DateTime": current_time})
cblgcode_list = db_propertyInfo.distinct("CblgCode", {"DateTime": current_time})
cestcode_list = db_propertyInfo.distinct("Cestcode", {"DateTime": current_time})

async def fetch(url, session, socks, db):
    while True:
        try:
            async with session.post(url, headers = headers) as resp:
                data = await resp.text()
                switchIP()   
                data = json.loads(data)
                data["DateTime"] = current_time
                data["Source"] = "centaline"
                db.insert_one(data)
                print("Inserting Data")
                break
                return data
        except Exception as e:
            print(str(e))
            if data.find("Sequence contains no elements") != -1:
                break
            else:
                print(url)
                print("Retry One Property Details")

async def bound_fetch(semaphore, url, session, socks, db):
    async with semaphore:
        return await fetch(url, session, socks, db)

async def run():
    tasks = []
    socks = 'socks5://localhost:9050'
    connector = SocksConnector.from_url(socks)
    semaphore = asyncio.Semaphore(100)
    async with ClientSession(connector = connector) as session:
        for i in id_list:
            url = propertyDetails.format(i)
            task = asyncio.ensure_future(bound_fetch(semaphore, url, session, socks, db_propertyDetails))
            tasks.append(task)

        for j,k in zip(cblgcode_list, cestcode_list):
            url = buildingInfo.format(j,k)
            task = asyncio.ensure_future(bound_fetch(semaphore, url, session, socks, db_buildingInfo))
            tasks.append(task)

        response = await asyncio.gather(*tasks)

loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run())
loop.run_until_complete(future)
\$\endgroup\$
3
  • 1
    \$\begingroup\$ Welcome to Code Review! This question lacks any indication of what the code is intended to achieve. To help reviewers give you better answers, please add sufficient context to your question, including a title that summarises the purpose of the code. We want to know why much more than how. The more you tell us about what your code is for, the easier it will be for reviewers to help you. The title needs an edit to simply state the task. \$\endgroup\$ Commented Nov 27, 2019 at 8:12
  • 1
    \$\begingroup\$ @TobySpeight Modified \$\endgroup\$ Commented Nov 27, 2019 at 8:22
  • 1
    \$\begingroup\$ This code is doing a lot. Depending on how the script is run, it could use torsocks which would do the tor routing for you. The script would then be invoked via torsocks python3 script.py, then the code for routing to tor could be removed. \$\endgroup\$ Commented Dec 1, 2019 at 22:57

0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.