Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Скрипт очистки #5

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
15 changes: 8 additions & 7 deletions api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from fake_headers import Headers
from requests_tor import RequestsTor as req_tor

from .proxy import MosTransportBan
from api.proxy import MosTransportBan
from config import PROXY_REUSE, TOR_PASSWORD, LIMIT_REPEAT, NUM_THREADS
from models import Stop

Expand Down Expand Up @@ -74,8 +74,9 @@ def get_station_info(self, **kwargs) -> dict:
return station_data

def current_ip(self):
r = self.make_req('https://api.ipify.org')
return r.content
# r = self.make_req('https://api.ipify.org')
# return r.content
pass

def change_ip(self):
"""Меняет прокси IP."""
Expand All @@ -98,12 +99,12 @@ def make_req(self, link, **kwargs):


class TorTransAPI(TransAPI):
def __init__(self, proxy_manager=None):
if NUM_THREADS <= 50:
self.PORTS = [i for i in range(9000, 9000 + NUM_THREADS)]
def __init__(self, proxy_manager=None, num_threads=NUM_THREADS):
if num_threads <= 50:
self.PORTS = [i for i in range(9000, 9000 + num_threads)]
log.info(f'Using {len(self.PORTS)} SOCKS proxy addresses')
else:
self.PORTS = [i for i in range(9000, 9049)]
self.PORTS = [i for i in range(9000, 9050)]
log.info(f'Number of threads exceeds number of SOCKS ports. Using {len(self.PORTS)} unique addresses instead')

self.proxy_manager = proxy_manager
Expand Down
22 changes: 12 additions & 10 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@

load_dotenv()

THREAD_SLEEP = 1 # Задержка потока
NUM_THREADS = 55 # Количество создаваемы потоков
STATION_CSV = os.getcwd() + "/stop.csv" # CSV файл с информацией об остановкам по которым делаются запросы
THREAD_SLEEP = 1 # Задержка потока
NUM_THREADS = 55 # Количество создаваемы потоков
STATION_CSV = os.getcwd() + "/stop.csv" # CSV файл с информацией об остановкам по которым делаются запросы

typeDB = {
logging.INFO: 'transmetrika',
logging.DEBUG: 'transmetrika_test'
}


def getConnectStr(loglevel):
return f"postgresql://{os.getenv('db_login')}:{os.getenv('db_pass')}" \
f"@{os.getenv('db_host')}:{os.getenv('db_port')}/{typeDB[loglevel]}"
Expand All @@ -21,26 +23,26 @@ def getConnectStr(loglevel):
# f"@{os.getenv('db_host')}:{os.getenv('db_port')}/{typeDB[logging.DEBUG]}"


DB_ECHO = True # Выводить ли в консоль SQL запросы
DB_ECHO = True # Выводить ли в консоль SQL запросы

TOR_PASSWORD = os.getenv('tor_password')


PROXIES_FILE = "proxy.txt" # файл с прокси по умолчанию
TOR_RESTART_DELAY = 5 # задержка после перезагружки сервиса тор
PROXY_REUSE = 5 # использовать прокси N раз
LIMIT_REPEAT = 5 # Максимальное количество запросов на одну остановку
PROXY_REUSE = 10 # использовать прокси N раз
LIMIT_REPEAT = 5 # Максимальное количество запросов на одну остановку

LEVEL = logging.INFO # Уровень логгирования в обычном режими
LEVEL = logging.INFO # Уровень логгирования в обычном режими
TIME_LIMIT = 9 * 60 # 9 min Лимит времени работы программы


EXAMPLE_RESULT_FILE = 'example_result.json' # Пример ответа от сервера
EXAMPLE_RESULT_FILE = 'example_result.json' # Пример ответа от сервера
NUMBER_OF_STOPS = 11783


headers = {'sec-ch-ua': 'Not;A Brand";v="95", "Google Chrome";v="95", "Chromium";v="95"', 'Host': 'moscowtransport.app',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.99 '
'Safari/537.36'} # Headers для запроса
'Safari/537.36'} # Headers для запроса

DELAY_STOPS = 60 * 6 # Задеркжа перед повторным опросом
DELAY_STOPS = 60 * 6 # Задеркжа перед повторным опросом
68 changes: 65 additions & 3 deletions db/db.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer, String, Float
import numpy as np
import pandas as pd

from time import time, asctime

from sqlalchemy import create_engine
from cl_arguments import parser
from sqlalchemy import Column, Integer, String, Float
from sqlalchemy.orm import declarative_base

from config import DB_ECHO, getConnectStr
from cl_arguments import parser

engine = create_engine(getConnectStr(parser.parse_args().loglevel), echo=DB_ECHO,
pool_size=10,
Expand Down Expand Up @@ -41,4 +47,60 @@ class Stop(Base):
lat = Column(Float)


class Cleaner:
def __init__(self):
pass

@staticmethod
def get_csv(file='db.csv'):
print('Fetching DB...')
df = pd.read_sql_table('prediction_data', getConnectStr(parser.parse_args().loglevel), index_col='id')
print('DB fetched')
df.to_csv(file)

@staticmethod
def get_cleaned(read_from=None, write_to=None):
if read_from is not None:
df = pd.read_csv(read_from, header=0)
else:
print('Fetching DB...')
df = pd.read_sql_table('prediction_data', getConnectStr(parser.parse_args().loglevel))
print('DB fetched')
if write_to is not None:
df.to_csv(write_to, index=False)

start = time()
print(f'\nCleaner started at: {asctime()}')

df = df[df['byTelemetry'] == 1]
data_cleaned = np.array(df.columns)
i = 0

for stop in df['stop_id'].unique():
for route in df[df['stop_id'] == stop]['routePathId'].unique():
for bus in df[(df['stop_id'] == stop) &
(df['routePathId'] == route)]['tmId'].unique():
dups = df[
(df['stop_id'] == stop) &
(df['routePathId'] == route) &
(df['tmId'] == bus)].sort_values('request_time', ascending=False)

data_cleaned = np.vstack((data_cleaned, dups.to_numpy()[0, :]))

for idx, dup in dups.iloc[1:, :].iterrows():
if abs(dup['forecast_time'] - data_cleaned[-1, 3]) > 1200:
data_cleaned = np.vstack((data_cleaned, dup.to_numpy()))
i += 1
if i % 1000 == 0:
elapsed = (time() - start) / 60
print(f'{i:,} items handled, {elapsed:.2f} min elapsed,')

df_cleaned = pd.DataFrame(data_cleaned[1:], columns=data_cleaned[0])
df_cleaned.drop_duplicates(inplace=True)

elapsed = (time() - start) / 60
print(f'Cleaner finished at: {asctime()}\n{elapsed:.2f} min elapsed.')
return df_cleaned


Base.metadata.create_all(engine)
27 changes: 18 additions & 9 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from logging import getLogger
import queue
import threading
from time import time

from sqlalchemy.orm import sessionmaker

Expand All @@ -12,6 +13,7 @@
from db.db import engine
from station import stops_coord
from utils import stops_list_to_queue
from config import TIME_LIMIT

log = getLogger()
args = parser.parse_args()
Expand All @@ -23,15 +25,21 @@

def parser_thread():
"""Поток получает остановки из очереди и занимается их обработкой"""
while True:
start = time() # the variable that holds the starting time
elapsed = 0 # the variable that holds the number of seconds elapsed.
TIME_LIMIT = args.time_limit
while elapsed < TIME_LIMIT:
try:
log.debug(f'{stops_queue.unfinished_tasks=}')
stop_id = stops_queue.get(block=False)
stops_queue.task_done()
except queue.Empty:
log.debug("Finish. Queue is empty.")
log.debug(f"Finish. Queue is empty. Thread {threading.current_thread().name}")
return
log.debug(f"Thread is working with {stop_id}")
log.debug(f"Thread {threading.current_thread().name} is working with {stop_id}")

api.thread_runner(stop_id, session)
elapsed = time() - start


def wait_for_threads():
Expand All @@ -44,20 +52,21 @@ def wait_for_threads():
time_start = datetime.now()
log.info(f"Started at {time_start}.")

stops_list = list(stops_coord(f_name=args.stations_csv))

NUM_THREADS = args.threads
NUM_THREADS = min(len(stops_list) - 1, NUM_THREADS)
log.info(f"Creating {NUM_THREADS} threads")

if args.proxy_file:
file_proxy = FileProxyManager(args.proxy_file)
api = TransAPI(file_proxy)
elif args.tor:
proxy = TorProxyManager()
api = TorTransAPI(proxy)
api = TorTransAPI(proxy, num_threads=NUM_THREADS)
else:
api = TransAPI()

stops_list = list(stops_coord(f_name=args.stations_csv))

NUM_THREADS = args.threads
NUM_THREADS = min(len(stops_list) - 1, NUM_THREADS)
log.info(f"Creating {NUM_THREADS} threads")

if args.number_stops != -1:
stops_list = stops_list[:args.number_stops]
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ tzdata==2021.5
tzlocal==4.1
urllib3==1.26.8
webencodings==0.5.1
numpy==1.22.3
pandas==1.4.3
2 changes: 1 addition & 1 deletion torrc
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,4 @@ ControlPort 9051

## where your password is my_password
## don't forget to paste the password to .env
## HashedControlPassword
HashedControlPassword 16:DEA656775B49A8046028A39D37043ABBA7105B3642DA7CA8C18A56B9FE
4 changes: 2 additions & 2 deletions utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import signal
import time
from contextlib import contextmanager
from multiprocessing import Queue
# from multiprocessing import Queue
from queue import Queue


def stops_list_to_queue(data: list, queue: Queue = None) -> Queue:
Expand Down