import sys
import time
import requests
from PySide6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
QLabel, QLineEdit, QPushButton, QFileDialog, QProgressBar)
from PySide6.QtCore import QThread, Signal
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
from stem import Signal
from stem.control import Controller
import stem.process
# Tor proxy settings
TOR_PROXY = {
'http': 'socks5h://127.0.0.1:9050',
'https': 'socks5h://127.0.0.1:9050'
}
class DownloadThread(QThread):
progress = Signal(int)
speed = Signal(str)
finished = Signal(bool)
error = Signal(str)
def __init__(self, url, save_path):
super().__init__()
self.url = url
self.save_path = save_path
def run(self):
try:
# Test SOCKS support
try:
import socks
from socket import socket
# Test Tor connection
test_socket = socket()
test_socket.settimeout(10)
test_socket.connect(('127.0.0.1', 9050))
test_socket.close()
msg = "Tor connection verified"
print(msg)
self.error.emit(msg)
except ImportError:
msg = "SOCKS support not available. Please install PySocks."
print(msg)
self.error.emit(msg)
self.finished.emit(False)
return
except Exception as e:
msg = f"Failed to connect to Tor: {str(e)}"
print(msg)
self.error.emit(msg)
self.finished.emit(False)
return
# Create a session with retry logic
session = requests.Session()
retries = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
session.mount('http://', HTTPAdapter(max_retries=retries))
session.mount('https://', HTTPAdapter(max_retries=retries))
msg = f"Attempting to connect to: {self.url}"
print(msg)
self.error.emit(msg)
with session.get(self.url, stream=True, proxies=TOR_PROXY, timeout=30) as r:
msg = f"Connection established. Status code: {r.status_code}"
print(msg)
self.error.emit(msg)
r.raise_for_status()
total_length = int(r.headers.get('content-length', 0))
if total_length == 0:
msg = "Warning: Content length is 0 or not provided"
print(msg)
self.error.emit(msg)
downloaded = 0
start_time = time.time()
with open(self.save_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
progress = int((downloaded / total_length) * 100) if total_length > 0 else 0
elapsed_time = time.time() - start_time
speed = downloaded / (elapsed_time * 1024) if elapsed_time > 0 else 0 # KB/s
self.progress.emit(progress)
self.speed.emit(f"{speed:.2f} KB/s")
msg = f"Download completed. Total bytes: {downloaded}"
print(msg)
self.error.emit(msg)
self.finished.emit(True)
except requests.exceptions.RequestException as e:
msg = f"Network error: {str(e)}\nResponse: {e.response.text if e.response else 'No response'}"
print(msg)
self.error.emit(msg)
self.finished.emit(False)
except Exception as e:
msg = f"Error: {str(e)}\nType: {type(e).__name__}"
print(msg)
self.error.emit(msg)
self.finished.emit(False)
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("Tor Downloader")
self.setGeometry(100, 100, 400, 200)
# Create main widget and layout
main_widget = QWidget()
self.setCentralWidget(main_widget)
layout = QVBoxLayout()
# URL input
self.url_input = QLineEdit()
self.url_input.setPlaceholderText("Enter .onion URL")
layout.addWidget(QLabel("Onion URL:"))
layout.addWidget(self.url_input)
# Save location
self.save_path_input = QLineEdit()
self.save_path_input.setReadOnly(True)
browse_button = QPushButton("Browse...")
browse_button.clicked.connect(self.select_save_location)
layout.addWidget(QLabel("Save Location:"))
layout.addWidget(self.save_path_input)
layout.addWidget(browse_button)
# Progress bar
self.progress_bar = QProgressBar()
layout.addWidget(self.progress_bar)
# Speed label
self.speed_label = QLabel("Speed: 0.00 KB/s")
layout.addWidget(self.speed_label)
# Download button
self.download_button = QPushButton("Download")
self.download_button.clicked.connect(self.start_download)
layout.addWidget(self.download_button)
main_widget.setLayout(layout)
self.tor_process = None
def select_save_location(self):
folder_path = QFileDialog.getExistingDirectory(self, "Select Save Folder")
if folder_path:
self.save_path_input.setText(folder_path)
def start_tor(self):
try:
# Check if Tor is already running
try:
with Controller.from_port(port=9051) as controller:
controller.authenticate()
msg = "Connected to existing Tor process"
print(msg)
return True
except:
pass
# Start Tor process
self.tor_process = stem.process.launch_tor_with_config(
config={
'SocksPort': '9050',
'ControlPort': '9051',
},
take_ownership=True,
timeout=300
)
msg = "Started new Tor process"
print(msg)
return True
except Exception as e:
msg = f"Failed to start Tor: {str(e)}"
print(msg)
self.handle_error(msg)
return False
def stop_tor(self):
if self.tor_process:
self.tor_process.terminate()
msg = "Stopped Tor process"
print(msg)
self.tor_process = None
def start_download(self):
if not self.start_tor():
return
url = self.url_input.text()
folder_path = self.save_path_input.text()
if not url or not folder_path:
return
# Generate filename from URL
try:
# Extract filename from URL
filename = url.split('/')[-1]
if not filename or '.' not in filename:
filename = f"download_{int(time.time())}"
save_path = f"{folder_path}/{filename}"
except Exception as e:
self.speed_label.setText(f"Error generating filename: {str(e)}")
return
# Disable UI during download
self.download_button.setEnabled(False)
self.progress_bar.setValue(0)
self.speed_label.setText(f"Downloading to: {save_path}")
# Create and start download thread
self.download_thread = DownloadThread(url, save_path)
self.download_thread.progress.connect(self.progress_bar.setValue)
self.download_thread.speed.connect(self.speed_label.setText)
self.download_thread.finished.connect(self.download_finished)
self.download_thread.error.connect(self.handle_error)
self.download_thread.start()
def download_finished(self, success):
self.download_button.setEnabled(True)
if success:
self.speed_label.setText("Download complete!")
else:
self.speed_label.setText("Download failed!")
def handle_error(self, message):
print(f"UI Error: {message}") # Print to console
self.speed_label.setText(message)
self.download_button.setEnabled(True)
def closeEvent(self, event):
self.stop_tor()
event.accept()
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec())
Post a Comment