website_bot/main.py

102 lines
3.3 KiB
Python
Raw Normal View History

2023-08-25 08:55:39 +00:00
import requests
from bs4 import BeautifulSoup
import os
import time
import random
import hashlib
import threading
from PIL import Image
import io
class ImageDownloaderBot:
def __init__(self, folder_name="downloaded_images"):
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
self.folder_name = folder_name
self.downloaded_images = set()
self.stop_bot = False
self.thread = None
def _get_image_hash(self, content):
return hashlib.md5(content).hexdigest()
def resize_image(self, content, base_width=300):
image = Image.open(io.BytesIO(content))
w_percent = base_width / float(image.size[0])
h_size = int(float(image.size[1]) * float(w_percent))
image = image.resize((base_width, h_size), Image.ANTIALIAS)
with io.BytesIO() as output:
image.save(output, format="JPEG")
return output.getvalue()
def get_images_from_url(self, url):
try:
response = requests.get(url, headers=self.headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
img_tags = soup.find_all('img')
count = 0
for img_tag in img_tags:
if self.stop_bot:
print("Stopping bot as requested.")
return
img_url = img_tag.get('src', '')
if not img_url or not img_url.startswith(('http:', 'https:')):
continue
content = requests.get(img_url, headers=self.headers, timeout=10).content
img_hash = self._get_image_hash(content)
if img_hash in self.downloaded_images:
print(f"Skipped downloading duplicate image: {img_url}")
continue
self.downloaded_images.add(img_hash)
# Resize the image
resized_content = self.resize_image(content)
# Create directory based on the domain name
directory = os.path.join(self.folder_name, url.split("//")[-1].split("/")[0])
if not os.path.exists(directory):
os.makedirs(directory)
img_name = os.path.join(directory, os.path.basename(img_url))
with open(img_name, 'wb') as f:
f.write(resized_content)
count += 1
time.sleep(random.uniform(1, 3))
print(f"Downloaded {count} new images from {url}.")
except requests.RequestException as e:
print(f"Error during requests to {url} : {str(e)}")
except Exception as e:
print(f"Error: {str(e)}")
def start(self, url):
if self.thread and self.thread.is_alive():
print("Bot is already running.")
return
self.stop_bot = False
self.thread = threading.Thread(target=self.get_images_from_url, args=(url,))
self.thread.start()
def stop(self):
self.stop_bot = True
if self.thread:
self.thread.join()
bot = ImageDownloaderBot()
url = "YOUR_WEBSITE_URL_HERE"
bot.start(url)
# To stop the bot at any point, you can call:
# bot.stop()