"""Async interactions with the Nexus Telegram bot.
The routines here handle authentication, command dispatch, and output parsing
for the Nexus search bot. They are structured around ``Telethon`` event loops
so they can be driven from the CLI without blocking other concurrent work.
"""
# Python script to interact with Nexus bot on Telegram
from telethon import TelegramClient, events
import asyncio
import json
import os
import sys
from datetime import datetime
import platform
import argparse
import logging
from pathlib import Path
from telethon import connection
import requests
from bs4 import BeautifulSoup
import random
import socket
import ssl
import aiohttp
from urllib.parse import urlparse
import socks
from aiohttp_socks import ProxyConnector
import re
import select
import time
import readline
import signal
from concurrent.futures import ThreadPoolExecutor, as_completed
import shutil
from datetime import timedelta
import datetime as dt # Add this import at the top if not already present
import itertools
import getpass
from . import getpapers, proxy_config
if platform.system() == 'Windows':
import msvcrt
# You need to get API credentials from https://my.telegram.org
TG_API_ID = "" # Replace with your actual API ID
TG_API_HASH = "" # Replace with your actual API hash
PHONE = "" # Replace with your phone number
BOT_USERNAME = "SciNexBot" # Replace with Nexus bot username
SESSION_FILE = "" # Path to save the session file
# Global variables for logging
verbose_mode = False
logger = None
[docs]
def setup_logging(log_file=None, verbose=False):
"""Setup logging configuration"""
global logger, verbose_mode
verbose_mode = verbose
# Create logger
logger = logging.getLogger('TelegramBot')
logger.setLevel(logging.DEBUG if verbose else logging.INFO)
# Clear any existing handlers
logger.handlers.clear()
# Create formatter
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG if verbose else logging.INFO)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# File handler if log_file is specified
if log_file:
# Create log directory if it doesn't exist
log_dir = os.path.dirname(log_file)
if log_dir:
os.makedirs(log_dir, exist_ok=True)
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.info(f"Logging to file: {log_file}")
[docs]
def debug_print(message):
"""Print debug message if verbose mode is enabled"""
if verbose_mode and logger:
logger.debug(message)
elif verbose_mode:
print(f"DEBUG: {message}")
[docs]
def info_print(message):
"""Print info message"""
if logger:
logger.info(message)
else:
print(message)
[docs]
def error_print(message):
"""Print error message"""
if logger:
logger.error(message)
else:
print(f"ERROR: {message}")
# Set file paths based on operating system
[docs]
def get_file_paths():
"""Get the appropriate file paths based on the operating system, using a single config dir for all except downloads."""
system = platform.system()
home = os.path.expanduser("~")
if system == "Windows":
# Windows: Use AppData\Local\getscipapers\nexus as config dir
config_dir = os.path.join(os.environ.get("LOCALAPPDATA", home), "getscipapers", "nexus")
elif system == "Darwin": # macOS
# macOS: Use ~/Library/Application Support/getscipapers/nexus as config dir
config_dir = os.path.join(home, "Library", "Application Support", "getscipapers", "nexus")
else: # Linux and other Unix-like systems
# Linux: Use ~/.config/getscipapers/nexus as config dir
config_dir = os.path.join(home, ".config", "getscipapers", "nexus")
# Download dir is always ~/Downloads/getscipapers/nexus
download_dir = os.path.join(home, "Downloads", "getscipapers", "nexus")
# Log file in config dir/logs
log_dir = os.path.join(config_dir, "logs")
os.makedirs(config_dir, exist_ok=True)
os.makedirs(log_dir, exist_ok=True)
os.makedirs(download_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d")
default_log_file = os.path.join(log_dir, f"telegram_bot_{timestamp}.log")
return {
"session": os.path.join(config_dir, "telegram_session.session"),
"credentials": os.path.join(config_dir, "credentials.json"),
"proxy": os.path.join(config_dir, "proxy.json"),
"log": default_log_file,
"download": download_dir
}
# Update file paths to use the platform-specific paths
file_paths = get_file_paths()
SESSION_FILE = file_paths["session"]
CREDENTIALS_FILE = file_paths["credentials"]
DEFAULT_PROXY_FILE = file_paths["proxy"]
DEFAULT_LOG_FILE = file_paths["log"]
DEFAULT_DOWNLOAD_DIR = file_paths["download"]
[docs]
def get_free_proxies():
"""Retrieve and store free proxies using the shared proxy helper."""
info_print("Retrieving free proxies from free-proxy-list.net...")
settings = proxy_config.auto_discover_proxy(
config_path=DEFAULT_PROXY_FILE,
save_list=True,
verbose=verbose_mode,
)
return settings.enabled
[docs]
def test_proxy_speed(ip, port, timeout=10):
"""
Test proxy speed by making a simple HTTP request through the proxy
Args:
ip: Proxy IP address
port: Proxy port
timeout: Request timeout in seconds
Returns:
Response time in milliseconds (0 if failed)
"""
try:
proxy_url = f"http://{ip}:{port}"
proxies = {
'http': proxy_url,
'https': proxy_url
}
# Test URL - use a fast, reliable endpoint
test_urls = [
'http://httpbin.org/ip',
'http://icanhazip.com',
'http://ipinfo.io/ip'
]
for test_url in test_urls:
try:
debug_print(f"Testing proxy {ip}:{port} with {test_url}")
start_time = time.time()
response = requests.get(
test_url,
proxies=proxies,
timeout=timeout,
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
)
end_time = time.time()
if response.status_code == 200:
response_time_ms = (end_time - start_time) * 1000
debug_print(f"Proxy {ip}:{port} responded in {response_time_ms:.0f}ms")
return response_time_ms
else:
debug_print(f"Proxy {ip}:{port} returned status {response.status_code}")
except requests.RequestException as e:
debug_print(f"Proxy {ip}:{port} failed with {test_url}: {str(e)}")
continue
# If all test URLs failed
debug_print(f"Proxy {ip}:{port} failed all connectivity tests")
return 0
except Exception as e:
debug_print(f"Error testing proxy {ip}:{port}: {str(e)}")
return 0
[docs]
def load_proxy_config(proxy):
"""Load proxy configuration from file or dict"""
if isinstance(proxy, str):
debug_print(f"Loading proxy configuration from file: {proxy}")
try:
with open(proxy, 'r') as f:
proxy_config = json.load(f)
info_print(f"Loaded proxy configuration from: {proxy}")
debug_print(f"Proxy config: {proxy_config}")
return proxy_config
except (FileNotFoundError, json.JSONDecodeError) as e:
error_print(f"Error loading proxy configuration file: {e}")
info_print("Attempting to fetch new proxy configuration...")
if get_free_proxies():
debug_print("Successfully fetched new proxy, retrying load...")
try:
with open(proxy, 'r') as f:
proxy_config = json.load(f)
info_print(f"Loaded new proxy configuration from: {proxy}")
debug_print(f"New proxy config: {proxy_config}")
return proxy_config
except (FileNotFoundError, json.JSONDecodeError) as retry_e:
error_print(f"Error loading newly fetched proxy configuration: {retry_e}")
return None
else:
error_print("Failed to fetch new proxy configuration")
return None
return proxy
[docs]
async def test_proxy_telegram_connection(proxy_config, timeout=10):
"""
Test if a proxy can successfully connect to Telegram
Based on OONI probe methodology for testing Telegram connectivity
"""
if not proxy_config:
debug_print("No proxy configuration provided for testing")
return {"success": False, "error": "No proxy configuration"}
debug_print(f"Testing proxy connection to Telegram: {proxy_config['addr']}:{proxy_config['port']}")
# Telegram endpoints to test (similar to OONI probe)
telegram_endpoints = [
("149.154.175.50", 443), # DC2 (primary)
("149.154.167.51", 443), # DC4
("149.154.175.100", 443), # DC2 alt
("95.161.76.100", 443), # DC1
]
# Telegram web endpoints
telegram_web_endpoints = [
"https://web.telegram.org",
"https://telegram.org",
"https://core.telegram.org"
]
results = {
"success": False,
"proxy": f"{proxy_config['type']}://{proxy_config['addr']}:{proxy_config['port']}",
"tcp_connect": {},
"web_connectivity": {},
"error": None
}
try:
# Test 1: Direct TCP connection to Telegram servers
debug_print("Testing TCP connections to Telegram servers...")
tcp_success_count = 0
for host, port in telegram_endpoints:
try:
debug_print(f"Testing TCP connection to {host}:{port}")
# Create socket connection through proxy
if proxy_config['type'].lower() == 'socks5':
try:
sock = socks.socksocket()
sock.set_proxy(socks.SOCKS5, proxy_config['addr'], proxy_config['port'],
username=proxy_config.get('username'),
password=proxy_config.get('password'))
sock.settimeout(timeout)
sock.connect((host, port))
sock.close()
results["tcp_connect"][f"{host}:{port}"] = {"success": True, "time": "< timeout"}
tcp_success_count += 1
debug_print(f"✓ TCP connection successful to {host}:{port}")
except ImportError:
debug_print("PySocks not available for SOCKS5 testing, skipping TCP test")
results["tcp_connect"][f"{host}:{port}"] = {"success": False, "error": "PySocks not available"}
except Exception as e:
results["tcp_connect"][f"{host}:{port}"] = {"success": False, "error": str(e)}
debug_print(f"✗ TCP connection failed to {host}:{port}: {e}")
else:
# For HTTP proxies, we'll test via web connectivity instead
debug_print(f"Skipping direct TCP test for HTTP proxy, will test via web connectivity")
results["tcp_connect"][f"{host}:{port}"] = {"success": None, "note": "HTTP proxy - tested via web"}
except Exception as e:
results["tcp_connect"][f"{host}:{port}"] = {"success": False, "error": str(e)}
debug_print(f"✗ TCP connection failed to {host}:{port}: {e}")
# Test 2: Web connectivity to Telegram websites
debug_print("Testing web connectivity to Telegram websites...")
web_success_count = 0
# Setup proxy for aiohttp
proxy_url = None
proxy_auth = None
if proxy_config['type'].lower() in ['http', 'https']:
proxy_url = f"http://{proxy_config['addr']}:{proxy_config['port']}"
if proxy_config.get('username') and proxy_config.get('password'):
proxy_auth = aiohttp.BasicAuth(proxy_config['username'], proxy_config['password'])
elif proxy_config['type'].lower() == 'socks5':
proxy_url = f"socks5://{proxy_config['addr']}:{proxy_config['port']}"
if proxy_config.get('username') and proxy_config.get('password'):
proxy_url = f"socks5://{proxy_config['username']}:{proxy_config['password']}@{proxy_config['addr']}:{proxy_config['port']}"
connector = None
if proxy_url:
try:
# Try to create connector with proxy support
if proxy_config['type'].lower() == 'socks5':
try:
connector = ProxyConnector.from_url(proxy_url)
debug_print(f"Using SOCKS5 connector: {proxy_url}")
except ImportError:
debug_print("aiohttp-socks not available, falling back to basic connector")
connector = aiohttp.TCPConnector()
else:
connector = aiohttp.TCPConnector()
except Exception as e:
debug_print(f"Error creating connector: {e}")
connector = aiohttp.TCPConnector()
timeout_config = aiohttp.ClientTimeout(total=timeout)
async with aiohttp.ClientSession(
connector=connector,
timeout=timeout_config
) as session:
for url in telegram_web_endpoints:
try:
debug_print(f"Testing web connectivity to {url}")
start_time = asyncio.get_event_loop().time()
kwargs = {}
if proxy_url and proxy_config['type'].lower() in ['http', 'https']:
kwargs['proxy'] = proxy_url
if proxy_auth:
kwargs['proxy_auth'] = proxy_auth
async with session.get(url, **kwargs) as response:
end_time = asyncio.get_event_loop().time()
response_time = round((end_time - start_time) * 1000, 2) # ms
if response.status == 200:
# Check if response contains Telegram-specific content
content = await response.text()
telegram_indicators = ['telegram', 'Telegram', 'MTProto', 'telegram.org']
has_telegram_content = any(indicator in content for indicator in telegram_indicators)
results["web_connectivity"][url] = {
"success": True,
"status_code": response.status,
"response_time_ms": response_time,
"has_telegram_content": has_telegram_content
}
web_success_count += 1
debug_print(f"✓ Web connectivity successful to {url} ({response_time}ms)")
else:
results["web_connectivity"][url] = {
"success": False,
"status_code": response.status,
"response_time_ms": response_time,
"error": f"HTTP {response.status}"
}
debug_print(f"✗ Web connectivity failed to {url}: HTTP {response.status}")
except asyncio.TimeoutError:
results["web_connectivity"][url] = {
"success": False,
"error": "Timeout",
"response_time_ms": timeout * 1000
}
debug_print(f"✗ Web connectivity timeout to {url}")
except Exception as e:
results["web_connectivity"][url] = {
"success": False,
"error": str(e)
}
debug_print(f"✗ Web connectivity failed to {url}: {e}")
# Determine overall success
total_tcp_tests = len([r for r in results["tcp_connect"].values() if r.get("success") is not None])
total_web_tests = len(telegram_web_endpoints)
tcp_success_rate = tcp_success_count / max(total_tcp_tests, 1)
web_success_rate = web_success_count / total_web_tests
# Consider proxy working if at least 50% of web tests pass
# (TCP tests are optional depending on proxy type)
if web_success_rate >= 0.5:
results["success"] = True
info_print(f"✓ Proxy connectivity test PASSED - Web: {web_success_count}/{total_web_tests}, TCP: {tcp_success_count}/{total_tcp_tests}")
else:
results["success"] = False
results["error"] = f"Low success rate - Web: {web_success_rate:.1%}, TCP: {tcp_success_rate:.1%}"
info_print(f"✗ Proxy connectivity test FAILED - {results['error']}")
return results
except Exception as e:
error_msg = f"Proxy test failed: {str(e)}"
error_print(error_msg)
debug_print(f"Proxy test exception: {type(e).__name__}: {str(e)}")
results["success"] = False
results["error"] = error_msg
return results
[docs]
async def test_and_select_working_proxy():
"""Test multiple proxies in parallel and select the first working one for Telegram"""
info_print("Testing proxy connectivity to Telegram servers...")
# First try to get fresh proxies
if not get_free_proxies():
error_print("Failed to fetch proxy list")
return None
# Load the proxy list
proxy_list_file = Path(DEFAULT_PROXY_FILE).with_name(
f"{Path(DEFAULT_PROXY_FILE).stem}{proxy_config.PROXY_LIST_SUFFIX}"
)
try:
with open(proxy_list_file, 'r') as f:
proxy_data = json.load(f)
working_proxies = proxy_data.get('working') or proxy_data.get('all_proxies', [])
if not working_proxies:
error_print("No proxies available for testing")
return None
test_proxy_list = working_proxies[:10]
info_print(f"Testing top {len(test_proxy_list)} fastest proxies for Telegram connectivity in parallel...")
info_print("Will use the first working proxy found...")
# Create tasks for parallel testing
async def test_single_proxy(proxy_info, index):
"""Test a single proxy and return result with index"""
try:
addr = proxy_info.get('addr') or proxy_info.get('ip')
port = proxy_info.get('port')
if not addr or not port:
return None, index, proxy_info
debug_print(
f"Testing proxy {index}: {addr}:{port} ({proxy_info.get('country', 'Unknown')}) - {proxy_info.get('speed_ms', 'N/A')}ms"
)
candidate = {
'type': proxy_info.get('type', 'http'),
'addr': addr,
'port': port,
'username': proxy_info.get('username'),
'password': proxy_info.get('password'),
}
test_result = await test_proxy_telegram_connection(candidate, timeout=15)
if test_result['success']:
info_print(f"✓ Proxy {index} WORKS: {addr}:{port}")
return candidate, index, proxy_info
debug_print(f"✗ Proxy {index} failed: {test_result.get('error', 'Unknown error')}")
return None, index, proxy_info
except Exception as e:
debug_print(f"✗ Proxy {index} error: {str(e)}")
return None, index, proxy_info
# Create tasks for all proxies
tasks = [
test_single_proxy(proxy_info, i + 1)
for i, proxy_info in enumerate(test_proxy_list)
]
# Convert coroutines to tasks for proper cancellation
actual_tasks = [asyncio.create_task(coro) for coro in tasks]
# Run tasks in parallel and return when first one succeeds
try:
for task in asyncio.as_completed(actual_tasks):
proxy_config, _, proxy_info = await task
if proxy_config: # Found a working proxy
addr = proxy_info.get('addr') or proxy_info.get('ip')
port = proxy_info.get('port')
info_print(f"Selected working proxy: {addr}:{port} ({proxy_info.get('country', 'Unknown')})")
# Cancel remaining tasks
for remaining_task in actual_tasks:
if not remaining_task.done():
remaining_task.cancel()
# Save the working proxy configuration
with open(DEFAULT_PROXY_FILE, 'w') as f:
json.dump(proxy_config, f, indent=2)
info_print(f"Working proxy configuration saved to: {DEFAULT_PROXY_FILE}")
return proxy_config
# If we reach here, no working proxy was found
error_print("No working proxies found for Telegram")
return None
except Exception as e:
error_print(f"Error during parallel proxy testing: {e}")
debug_print(f"Parallel testing error: {type(e).__name__}: {str(e)}")
return None
except Exception as e:
error_print(f"Error testing proxies: {e}")
debug_print(f"Proxy testing error: {type(e).__name__}: {str(e)}")
return None
[docs]
async def test_telegram_connection(api_id, api_hash, phone_number, session_file=SESSION_FILE, proxy=None):
"""
Test connection to Telegram servers with comprehensive diagnostics
Args:
api_id: Your Telegram API ID
api_hash: Your Telegram API hash
phone_number: Your phone number (not used, kept for compatibility)
session_file: Name of the session file
proxy: Proxy configuration dict or file path
"""
print("\n" + "="*70)
print("TELEGRAM CONNECTION TEST")
print("="*70)
# Test 1: Proxy connectivity (if configured)
if proxy:
print("🔧 Step 1: Testing proxy configuration...")
# Load proxy configuration
proxy_config = load_proxy_config(proxy)
if proxy_config is None:
error_print("✗ Failed to load proxy configuration")
return
info_print(f"Using proxy: {proxy_config['type']}://{proxy_config['addr']}:{proxy_config['port']}")
# Test proxy connectivity to Telegram
proxy_test_result = await test_proxy_telegram_connection(proxy_config, timeout=15)
if proxy_test_result['success']:
print("✅ Proxy connectivity: PASSED")
# Show detailed proxy test results if verbose
if verbose_mode:
print(" 📊 Proxy Test Details:")
tcp_results = proxy_test_result.get('tcp_connect', {})
for endpoint, result in tcp_results.items():
status = "✓" if result.get('success') else "✗"
print(f" {status} TCP {endpoint}: {result.get('error', 'OK')}")
web_results = proxy_test_result.get('web_connectivity', {})
for url, result in web_results.items():
status = "✓" if result.get('success') else "✗"
time_info = f" ({result.get('response_time_ms', 0)}ms)" if result.get('response_time_ms') else ""
print(f" {status} WEB {url}{time_info}")
else:
print("❌ Proxy connectivity: FAILED")
error_print(f"Proxy test error: {proxy_test_result.get('error', 'Unknown error')}")
print(" Try running with --no-proxy to test direct connection")
else:
print("🔧 Step 1: No proxy configured - testing direct connection")
print()
# Test 2: Session file validation
print("🔧 Step 2: Checking session file...")
if os.path.exists(session_file):
file_size = os.path.getsize(session_file)
file_age = datetime.now() - datetime.fromtimestamp(os.path.getmtime(session_file))
print(f"✅ Session file exists: {session_file}")
print(f" 📏 File size: {file_size} bytes")
print(f" 📅 Last modified: {file_age.days} days ago")
if file_size < 100:
print(" ⚠️ Warning: Session file seems unusually small")
if file_age.days > 30:
print(" ⚠️ Warning: Session file is quite old, may need re-authentication")
else:
print("❌ Session file not found")
print(f" Expected location: {session_file}")
print(" Run with --create-session to create a new session")
return
print()
# Test 3: Telegram client connection
print("🔧 Step 3: Testing Telegram client connection...")
# Load proxy configuration
proxy_config = load_proxy_config(proxy) if proxy else None
# Create client
client = create_telegram_client(api_id, api_hash, session_file, proxy_config)
try:
# Test connection
start_time = datetime.now()
debug_print("Starting Telegram client for connection test...")
await client.start()
connect_time = (datetime.now() - start_time).total_seconds()
print(f"✅ Client connection: SUCCESSFUL ({connect_time:.2f}s)")
# Test authorization
if await client.is_user_authorized():
print("✅ User authorization: VALID")
# Get user info
try:
me = await client.get_me()
print(f" 👤 User: {me.first_name} {me.last_name or ''} (@{me.username or 'no_username'})")
print(f" 📱 Phone: {me.phone or 'not_available'}")
print(f" 🆔 User ID: {me.id}")
except Exception as e:
debug_print(f"Could not get user info: {e}")
print(" ℹ️ User info: Could not retrieve")
else:
print("❌ User authorization: EXPIRED")
print(" Run with --create-session to re-authenticate")
return
print()
# Test 4: Bot connectivity
print("🔧 Step 4: Testing bot connectivity...")
try:
bot_entity = await client.get_entity(BOT_USERNAME)
print(f"✅ Bot resolution: Found @{BOT_USERNAME}")
print(f" 🤖 Bot ID: {bot_entity.id}")
print(f" 📝 Bot Name: {getattr(bot_entity, 'first_name', 'N/A')}")
# Test sending a simple message
print(" 📤 Testing message send...")
test_message = "/start"
start_time = datetime.now()
result = await client.send_message(BOT_USERNAME, test_message)
send_time = (datetime.now() - start_time).total_seconds()
print(f"✅ Message send: SUCCESSFUL ({send_time:.2f}s)")
print(f" 🆔 Message ID: {result.id}")
# Wait briefly for potential reply
print(" ⏳ Waiting for bot response...")
await asyncio.sleep(3)
# Check for recent messages
message_count = 0
async for message in client.iter_messages(bot_entity, limit=3):
if message.date >= result.date:
message_count += 1
if message_count == 1:
print(f"✅ Bot response: RECEIVED")
response_text = message.text[:100] + "..." if len(message.text) > 100 else message.text
print(f" 💬 Response: {response_text}")
if message.reply_markup and message.reply_markup.rows:
button_count = sum(len(row.buttons) for row in message.reply_markup.rows)
print(f" 🔘 Buttons: {button_count} available")
break
if message_count == 0:
print("⚠️ Bot response: No immediate response (may be normal)")
except Exception as e:
print(f"❌ Bot connectivity: FAILED")
error_print(f"Bot test error: {str(e)}")
debug_print(f"Bot test exception: {type(e).__name__}: {str(e)}")
print()
# Test 5: Network performance
print("🔧 Step 5: Network performance test...")
try:
# Test multiple small operations
start_time = datetime.now()
operations = 0
# Get dialogs (conversations)
async for dialog in client.iter_dialogs(limit=5):
operations += 1
performance_time = (datetime.now() - start_time).total_seconds()
if performance_time < 2.0:
print(f"✅ Network performance: EXCELLENT ({performance_time:.2f}s for {operations} ops)")
elif performance_time < 5.0:
print(f"✅ Network performance: GOOD ({performance_time:.2f}s for {operations} ops)")
elif performance_time < 10.0:
print(f"⚠️ Network performance: SLOW ({performance_time:.2f}s for {operations} ops)")
else:
print(f"❌ Network performance: VERY SLOW ({performance_time:.2f}s for {operations} ops)")
except Exception as e:
print(f"⚠️ Network performance: Could not test ({str(e)})")
except Exception as e:
print("❌ Client connection: FAILED")
error_print(f"Connection test error: {str(e)}")
debug_print(f"Connection test exception: {type(e).__name__}: {str(e)}")
# Provide troubleshooting suggestions
print("\n🔧 Troubleshooting suggestions:")
if "proxy" in str(e).lower():
print(" • Check proxy configuration and connectivity")
print(" • Try with --no-proxy for direct connection")
if "auth" in str(e).lower() or "login" in str(e).lower():
print(" • Run with --create-session to re-authenticate")
if "network" in str(e).lower() or "timeout" in str(e).lower():
print(" • Check your internet connection")
print(" • Try using a proxy with --proxy-config-file")
finally:
debug_print("Disconnecting client after connection test...")
await client.disconnect()
print()
print("="*70)
print("CONNECTION TEST COMPLETED")
print("="*70)
[docs]
async def decide_proxy_usage(api_id, api_hash, phone_number, session_file=SESSION_FILE, proxy_file=DEFAULT_PROXY_FILE, print_result=True):
"""
Decide whether to use a proxy for Telegram connection.
If connection works without proxy, return None (no proxy).
If not, try default proxy file. If that fails, select a new proxy and try again.
Returns:
None if no proxy needed,
proxy_file if proxy is needed,
False if neither works.
"""
if print_result:
info_print("Testing Telegram connection without proxy...")
try:
client = create_telegram_client(api_id, api_hash, session_file, proxy=None)
await client.start(phone=phone_number if phone_number else None)
is_auth = await client.is_user_authorized()
await client.disconnect()
if is_auth:
if print_result:
info_print("Direct connection to Telegram works. Proxy is not needed.")
return None
else:
if print_result:
info_print("Direct connection failed (not authorized). Trying default proxy configuration...")
except Exception as e:
if print_result:
info_print(f"Direct connection failed: {e}. Trying default proxy configuration...")
# Try default proxy file if it exists
if os.path.exists(proxy_file):
try:
proxy_config = load_proxy_config(proxy_file)
client = create_telegram_client(api_id, api_hash, session_file, proxy_config)
await client.start(phone=phone_number if phone_number else None)
is_auth = await client.is_user_authorized()
await client.disconnect()
if is_auth:
if print_result:
info_print("Connection via default proxy works. Proxy will be used.")
return proxy_file
else:
if print_result:
info_print("Connection failed with default proxy (not authorized). Will try to find a new working proxy...")
except Exception as e:
if print_result:
info_print(f"Connection failed with default proxy: {e}. Will try to find a new working proxy...")
# Try to find a new working proxy
working_proxy = await test_and_select_working_proxy()
if not working_proxy:
if print_result:
error_print("Could not find a working proxy for Telegram")
return False
try:
proxy_config = load_proxy_config(proxy_file)
client = create_telegram_client(api_id, api_hash, session_file, proxy_config)
await client.start(phone=phone_number if phone_number else None)
is_auth = await client.is_user_authorized()
await client.disconnect()
if is_auth:
if print_result:
info_print("Connection via new proxy works. Proxy will be used.")
return proxy_file
else:
if print_result:
error_print("Connection failed with new proxy (not authorized).")
return False
except Exception as e:
if print_result:
error_print(f"Connection failed with new proxy: {e}")
return False
[docs]
def create_telegram_client(api_id, api_hash, session_file=SESSION_FILE, proxy=None):
"""Create TelegramClient with or without proxy"""
if proxy:
debug_print(f"Using proxy: {proxy['type']}://{proxy['addr']}:{proxy['port']}")
if proxy['type'].lower() == 'socks5':
return TelegramClient(
session_file, api_id, api_hash,
proxy=(proxy['type'], proxy['addr'], proxy['port'],
proxy.get('username'), proxy.get('password')),
connection=connection.ConnectionTcpMTProxyRandomizedIntermediate
)
else: # http proxy
return TelegramClient(
session_file, api_id, api_hash,
proxy=(proxy['type'], proxy['addr'], proxy['port'],
proxy.get('username'), proxy.get('password'))
)
else:
return TelegramClient(session_file, api_id, api_hash)
[docs]
def create_message_handler(bot_entity):
"""Create message handler for bot replies"""
bot_reply = None
async def handler(event):
nonlocal bot_reply
debug_print(f"Received message from bot: ID={event.message.id}, Text={event.message.text[:100]}...")
buttons = extract_button_info(event.message.reply_markup)
bot_reply = {
"message_id": event.message.id,
"date": event.message.date.timestamp(),
"text": event.message.text,
"buttons": buttons
}
debug_print(f"Bot reply captured: {len(buttons)} buttons found")
return handler, lambda: bot_reply
[docs]
async def wait_for_reply(get_bot_reply, timeout=30):
"""Wait for bot reply with timeout"""
elapsed = 0
debug_print("Waiting for bot reply...")
while get_bot_reply() is None and elapsed < timeout:
await asyncio.sleep(0.1)
elapsed += 0.1
if int(elapsed) != int(elapsed - 0.1): # Print every second
debug_print(f"Waiting for reply... {int(elapsed)}s / {timeout}s")
if int(elapsed) % 5 == 0: # Print progress every 5 seconds
debug_print(f"Still waiting... {int(elapsed)}s")
return get_bot_reply()
[docs]
async def handle_search_message(get_bot_reply, set_bot_reply):
"""Handle 'searching...' message and wait for actual result"""
bot_reply = get_bot_reply()
if bot_reply and "searching..." in bot_reply.get("text", "").lower():
debug_print("Searching by Nexus bot, waiting for final result...")
debug_print("Detected 'searching...' message, resetting bot_reply to wait for actual result")
set_bot_reply(None) # Reset to wait for the next message
# Wait for the actual search result (extended timeout)
return await wait_for_reply(get_bot_reply, timeout=30)
return bot_reply
[docs]
async def fetch_recent_messages(client, bot_entity, sent_message):
"""Fetch recent messages from bot if no immediate reply"""
debug_print("No immediate reply received, checking for recent messages...")
debug_print("Attempting to fetch recent messages from bot...")
await asyncio.sleep(5) # Wait a bit more
message_count = 0
seen_messages = set() # Track message IDs to avoid duplicates
async for message in client.iter_messages(bot_entity, limit=5):
message_count += 1
# Skip if we've already processed this message
if message.id in seen_messages:
debug_print(f"Skipping duplicate message ID: {message.id}")
continue
seen_messages.add(message.id)
debug_print(f"Checking message {message_count}: ID={message.id}, Date={message.date}, Text={message.text[:50]}...")
# Check if this message is newer than our sent message
if message.date >= sent_message.date:
debug_print("Found newer message from bot!")
buttons = extract_button_info(message.reply_markup)
bot_reply = {
"message_id": message.id,
"date": message.date.timestamp(),
"text": message.text,
"buttons": buttons
}
debug_print("Found recent message from bot!")
return bot_reply
debug_print(f"No newer messages found among {len(seen_messages)} unique recent messages")
return None
[docs]
async def send_message_to_bot(api_id, api_hash, phone_number, bot_username, message, session_file=SESSION_FILE, proxy=None, limit=None):
"""
Send a message from your user account to a Telegram bot and wait for its reply.
Args:
api_id: Your Telegram API ID (get from my.telegram.org)
api_hash: Your Telegram API hash
phone_number: Your phone number
bot_username: Bot's username (e.g., 'your_bot_name')
message: Message text to send (search query or DOI)
session_file: Name of the session file to save/load
proxy: Proxy configuration dict or file path (see create_telegram_client)
limit: Maximum number of search results to fetch (default: 1 for DOI, 5 for search; can be set by user)
Returns:
dict: {
"ok": True if successful, False or "error" key otherwise,
"sent_message": {
"message_id": int,
"date": float (timestamp),
"text": str
},
"bot_reply": {
"message_id": int,
"date": float (timestamp),
"text": str, # reply text, possibly concatenated for search
"buttons": list of dicts with button info (text, type, callback_data/url)
}
}
If an error occurs, returns {"error": "..."}.
"""
debug_print(f"Initializing TelegramClient with session file: {session_file}")
# Load proxy configuration
proxy_config = load_proxy_config(proxy)
if proxy and proxy_config is None:
return {"error": f"Error loading proxy configuration"}
# Create client
client = create_telegram_client(api_id, api_hash, session_file, proxy_config)
# Define all result markers
result_markers = ["🔬 **", "🔖 **", "📚 **"]
try:
# Check if session file exists
debug_print(f"Checking for session file: {session_file}")
if not os.path.exists(session_file):
error_print(f"Session file '{session_file}' not found!")
info_print("You need to create a session first by running this script interactively once.")
info_print("After that, the session will be saved and you can run without manual input.")
return {"error": "Session file not found. Run script interactively first to create session."}
debug_print("Using cached session...")
if proxy_config:
debug_print(f"Connecting through proxy: {proxy_config['type']}://{proxy_config['addr']}:{proxy_config['port']}")
debug_print("Starting Telegram client...")
await client.start()
debug_print("Client started successfully")
# Verify we're connected
debug_print("Checking user authorization...")
if not await client.is_user_authorized():
error_print("Session expired or not authorized")
return {"error": "Session expired. Please delete the session file and run interactively to re-authenticate."}
debug_print("User authorized successfully")
# Get the bot entity
debug_print(f"Getting bot entity for: {bot_username}")
bot_entity = await client.get_entity(bot_username)
debug_print(f"Bot entity retrieved: {bot_entity.id}")
# Create message handler
# Set up handler closure for getting and setting bot_reply
def create_setter():
bot_reply_value = [None]
async def new_handler(event):
debug_print(f"Received message from bot: ID={event.message.id}, Text={event.message.text[:100]}...")
buttons = extract_button_info(event.message.reply_markup)
bot_reply_value[0] = {
"message_id": event.message.id,
"date": event.message.date.timestamp(),
"text": event.message.text,
"buttons": buttons
}
debug_print(f"Bot reply captured: {len(buttons)} buttons found")
def get_reply():
return bot_reply_value[0]
def set_reply(value):
bot_reply_value[0] = value
return new_handler, get_reply, set_reply
handler, get_bot_reply, set_bot_reply = create_setter()
client.on(events.NewMessage(from_users=bot_entity))(handler)
# Determine if message is a DOI
is_doi = bool(re.match(r'^10\.\d+/.+', message.strip()))
# Use user-specified limit if provided, else default logic
if limit is not None:
reply_limit = int(limit)
else:
reply_limit = 1 if is_doi else 5
# Check if message is a command (starts with / and is not a DOI)
is_command = message.strip().startswith("/") and not is_doi
# Send message to the bot
debug_print(f"Sending message to bot: '{message}'")
result = await client.send_message(bot_username, message)
debug_print(f"Message sent successfully. Message ID: {result.id}")
# Wait for bot reply
bot_reply = await wait_for_reply(get_bot_reply, timeout=30)
bot_reply = await handle_search_message(get_bot_reply, set_bot_reply)
if bot_reply is None:
bot_reply = await fetch_recent_messages(client, bot_entity, result)
# If message is a command, just return the reply as is
if is_command:
if bot_reply:
response = {
"ok": True,
"sent_message": {
"message_id": result.id,
"date": result.date.timestamp(),
"text": result.text
},
"bot_reply": bot_reply
}
debug_print(f"Command response prepared. Bot reply: {bot_reply.get('text', 'No text')[:50]}")
return response
else:
return {"error": "No reply received from bot for command."}
if is_doi:
# Handle DOI: just return the first reply
if bot_reply:
response = {
"ok": True,
"sent_message": {
"message_id": result.id,
"date": result.date.timestamp(),
"text": result.text
},
"bot_reply": bot_reply
}
debug_print(f"DOI response prepared. Bot reply: {bot_reply.get('text', 'No text')[:50]}")
return response
else:
return {"error": "No reply received from bot for DOI."}
else:
# Handle search (not DOI)
bot_replies = []
if bot_reply:
bot_replies.append(bot_reply)
else:
return {"error": "No reply received from bot for search."}
# --- Extract total number of results from Nexus reply ---
total_results = None
if bot_reply and isinstance(bot_reply, dict):
text = bot_reply.get("text", "")
match = re.search(r"__([\d,]+)\s+results__", text)
if match:
total_results_str = match.group(1).replace(",", "")
try:
total_results = int(total_results_str)
info_print(f"Total results found in Nexus: {total_results:,}")
except Exception:
info_print(f"Total results found in Nexus: {match.group(1)}")
else:
debug_print("Could not extract total results from bot reply text.")
# Try to determine the number of current results from the text
n_results = sum(bot_reply.get("text", "").count(marker) for marker in result_markers)
debug_print(f"Detected {n_results} search results in bot reply text using markers {result_markers}")
# If the number of results already exceeds the limit, stop here
if n_results >= reply_limit:
# Split by all markers, keep only the first <limit> results, then join back
text = bot_reply.get("text", "")
marker_positions = []
for marker in result_markers:
idx = 0
while True:
idx = text.find(marker, idx)
if idx == -1:
break
marker_positions.append((idx, marker))
idx += len(marker)
marker_positions.sort()
if len(marker_positions) > reply_limit:
cut_idx = marker_positions[reply_limit][0]
concatenated_text = text[:cut_idx]
else:
concatenated_text = text
bot_reply_final = dict(bot_reply)
bot_reply_final["text"] = concatenated_text
response = {
"ok": True,
"sent_message": {
"message_id": result.id,
"date": result.date.timestamp(),
"text": result.text
},
"bot_reply": bot_reply_final
}
debug_print(f"Response prepared early due to enough results. Bot reply count: {len(bot_replies)}")
return response
# Try to fetch more results if limit not reached
seen_search_callbacks = set()
all_texts = [bot_reply["text"]] if bot_reply and "text" in bot_reply else []
def count_all_markers(texts):
return sum(sum(t.count(marker) for marker in result_markers) for t in texts)
current_count = count_all_markers(all_texts)
while current_count < reply_limit:
last_reply = bot_replies[-1] if bot_replies else None
if not last_reply or not last_reply.get("buttons"):
break
# Find all callback buttons whose text contains ">" and callback_data like "/search_<number>"
search_buttons = []
for btn in last_reply["buttons"]:
btn_text = btn.get("text", "")
cb_data = btn.get("callback_data") or btn.get("data")
if cb_data and ">" in btn_text:
if isinstance(cb_data, bytes):
cb_data_str = cb_data.decode(errors="ignore")
else:
cb_data_str = str(cb_data)
if re.match(r"^/search_\d+$", cb_data_str):
search_buttons.append((btn, cb_data_str))
found = False
for btn, cb_data_str in search_buttons:
if cb_data_str not in seen_search_callbacks:
seen_search_callbacks.add(cb_data_str)
cb_data = btn.get("callback_data") or btn.get("data")
info_print(f"Clicking search button (text contains '>') to fetch more results: {btn.get('text', '')}")
try:
await client.disconnect()
click_result = await click_callback_button(
api_id, api_hash, phone_number, bot_username,
last_reply["message_id"], cb_data, session_file, proxy
)
await client.start()
debug_print(f"Search button {btn.get('text', '')} clicked successfully. Result: {click_result}")
info_print("Fetching new results...")
new_reply = await fetch_recent_messages(client, bot_entity, result)
debug_print(f"New reply fetched: {new_reply.get('text', 'No text')[:50]}...")
if new_reply and new_reply.get("text"):
all_texts.append(new_reply["text"])
bot_replies.append(new_reply)
found = True
except Exception as e:
error_print(f"Error clicking search button: {str(e)}")
break # Only click one button per loop
if not found:
break
concatenated_text = "\n".join(all_texts) if all_texts else ""
current_count = sum(concatenated_text.count(marker) for marker in result_markers)
debug_print(f"Current total marker count: {current_count}, reply_limit: {reply_limit}")
if current_count >= reply_limit:
break
# Concatenate all texts for final bot_reply
concatenated_text = "\n".join(all_texts) if all_texts else ""
# If the result contains more results than the <limit>, only fetch the first <limit> number of results
if reply_limit > 0:
marker_positions = []
for marker in result_markers:
idx = 0
while True:
idx = concatenated_text.find(marker, idx)
if idx == -1:
break
marker_positions.append((idx, marker))
idx += len(marker)
marker_positions.sort()
if len(marker_positions) > reply_limit:
cut_idx = marker_positions[reply_limit][0]
concatenated_text = concatenated_text[:cut_idx]
debug_print(f"Trimmed search results to first {reply_limit} entries.")
# Remove "__<number> results__" from the text
concatenated_text = re.sub(r"__[\d,]+\s+results__\s*", "", concatenated_text)
# Remove advertising or footer lines starting with an emoji (not our result markers)
lines = concatenated_text.splitlines()
filtered_lines = []
result_counter = 1
for line in lines:
stripped = line.strip()
if any(stripped.startswith(marker[:-3]) for marker in result_markers):
filtered_lines.append(f"[{result_counter}] {line}")
result_counter += 1
elif re.match(r"^[^\w\s]", stripped):
continue
else:
filtered_lines.append(line)
concatenated_text = "\n".join(filtered_lines)
# Only prepend this line if the original message is not "/profile"
concatenated_text = f"The first {reply_limit} results among {total_results} results found:\n\n" + concatenated_text
bot_reply_final = dict(bot_replies[0])
bot_reply_final["text"] = concatenated_text
response = {
"ok": True,
"sent_message": {
"message_id": result.id,
"date": result.date.timestamp(),
"text": result.text
},
"bot_reply": bot_reply_final
}
debug_print(f"Response prepared successfully. Bot reply count: {len(bot_replies)}")
info_print(response)
return response
# Fallback: just return the first reply if nothing else
response = {
"ok": True,
"sent_message": {
"message_id": result.id,
"date": result.date.timestamp(),
"text": result.text
},
"bot_reply": bot_replies[0]
}
return response
finally:
debug_print("Disconnecting client...")
await client.disconnect()
debug_print("Client disconnected")
[docs]
async def create_session(api_id, api_hash, phone_number, session_file=SESSION_FILE):
"""Create a new session file interactively"""
debug_print(f"Creating new session with file: {session_file}")
client = TelegramClient(session_file, api_id, api_hash)
try:
debug_print("Starting client for session creation...")
await client.start(phone_number)
info_print(f"Session created successfully! File saved as '{session_file}'")
info_print("You can now run the script without manual input.")
debug_print("Session creation completed successfully")
except Exception as e:
error_print(f"Error creating session: {e}")
debug_print(f"Session creation failed: {type(e).__name__}: {str(e)}")
finally:
debug_print("Disconnecting client after session creation...")
await client.disconnect()
[docs]
def handle_single_search_result(bot_reply):
"""
Handle a single search result based on whether the first callback button contains "Request"
Args:
bot_reply: Dictionary containing bot reply with buttons
Returns:
Dictionary with action type and relevant information
"""
if not bot_reply or not bot_reply.get("buttons"):
debug_print("No bot reply or buttons found")
return {
"action": "no_buttons",
"message": "No buttons available in the response"
}
buttons = bot_reply.get("buttons", [])
if not buttons:
debug_print("Empty buttons list")
return {
"action": "no_buttons",
"message": "No buttons available in the response"
}
# Find the first callback button
first_callback_button = None
for button in buttons:
if button.get("type") == "callback":
first_callback_button = button
break
if not first_callback_button:
debug_print("No callback buttons found")
return {
"action": "no_callback_buttons",
"message": "No callback buttons available in the response"
}
button_text = first_callback_button.get("text", "").strip()
callback_data = first_callback_button.get("callback_data") or first_callback_button.get("data")
debug_print(f"First callback button text: '{button_text}'")
# Check if the first callback button contains "Request"
if "request" in button_text.lower():
info_print(f"Found 'Request' callback button: {button_text}")
return {
"action": "request_callback",
"button_text": button_text,
"callback_data": callback_data,
"message_id": bot_reply.get("message_id"),
"message": f"Ready to click request button: {button_text}"
}
else:
info_print(f"First callback button does not contain 'Request': {button_text}")
return {
"action": "other_callback",
"button_text": button_text,
"callback_data": callback_data,
"message_id": bot_reply.get("message_id"),
"message": f"Other callback button found: {button_text}"
}
[docs]
async def download_telegram_file(client, message, download_path=None):
"""
Download a file from a Telegram message
Args:
client: TelegramClient instance
message: Telegram message containing the file
download_path: Path where to save the file (optional)
Returns:
Dictionary with download result
"""
try:
# Check if message has media
if not message.media:
return {"success": False, "error": "Message contains no media"}
# Get file name
filename = getattr(message.media, 'filename', None)
if not filename:
# Try to get filename from document attributes
if hasattr(message.media, 'document') and message.media.document.attributes:
for attr in message.media.document.attributes:
if hasattr(attr, 'file_name'):
filename = attr.file_name
break
# If still no filename, generate one
if not filename:
file_ext = ""
if hasattr(message.media, 'document'):
mime_type = message.media.document.mime_type
if mime_type == 'application/pdf':
file_ext = ".pdf"
elif 'image' in mime_type:
file_ext = ".jpg"
elif 'video' in mime_type:
file_ext = ".mp4"
filename = f"telegram_file_{message.id}{file_ext}"
# Set download path
if not download_path:
# Create downloads directory in user's home
downloads_dir = os.path.join(os.path.expanduser("~"), "Downloads", "TelegramFiles")
os.makedirs(downloads_dir, exist_ok=True)
download_path = os.path.join(downloads_dir, filename)
elif os.path.isdir(download_path):
download_path = os.path.join(download_path, filename)
# Get file size
file_size = getattr(message.media.document, 'size', 0) if hasattr(message.media, 'document') else 0
info_print(f"Starting download: {filename}")
info_print(f"File size: {file_size / (1024*1024):.2f} MB" if file_size > 0 else "File size: Unknown")
info_print(f"Download path: {download_path}")
# Download with progress
last_progress = 0
def progress_callback(current, total):
nonlocal last_progress
if total > 0:
progress = int((current / total) * 100)
if progress >= last_progress + 5: # Update every 5%
info_print(f"Download progress: {progress}% ({current / (1024*1024):.2f}/{total / (1024*1024):.2f} MB)")
last_progress = progress
# Perform the download
start_time = datetime.now()
path = await client.download_media(
message,
file=download_path,
progress_callback=progress_callback
)
end_time = datetime.now()
download_time = (end_time - start_time).total_seconds()
if path and os.path.exists(path):
actual_size = os.path.getsize(path)
speed_mbps = (actual_size / (1024*1024)) / max(download_time, 1)
info_print(f"✓ Download completed successfully!")
info_print(f"File saved to: {path}")
info_print(f"Download time: {download_time:.2f} seconds")
info_print(f"Average speed: {speed_mbps:.2f} MB/s")
return {
"success": True,
"file_path": path,
"filename": filename,
"file_size": actual_size,
"download_time": download_time,
"speed_mbps": speed_mbps
}
else:
return {"success": False, "error": "File download failed - file not found after download"}
except Exception as e:
error_print(f"Error downloading file: {str(e)}")
debug_print(f"Download error details: {type(e).__name__}: {str(e)}")
return {"success": False, "error": f"Download failed: {str(e)}"}
[docs]
async def handle_file_download_from_bot_reply(bot_reply, proxy=None):
"""
Handle file download from bot reply if it contains a document
Args:
bot_reply: Dictionary containing bot reply information
proxy: Proxy configuration (same format as other functions)
Returns:
Dictionary with download result or None if no file to download
"""
if not bot_reply:
debug_print("No bot reply for file download")
return None
# Load proxy configuration
proxy_config = load_proxy_config(proxy)
if proxy and proxy_config is None:
return {"success": False, "error": "Error loading proxy configuration"}
# Create client
client = create_telegram_client(TG_API_ID, TG_API_HASH, SESSION_FILE, proxy_config)
try:
# Start client
await client.start()
if not await client.is_user_authorized():
return {"success": False, "error": "Session expired"}
# Get the bot entity
bot_entity = await client.get_entity(BOT_USERNAME)
# Get the message by ID
message_id = bot_reply.get("message_id")
if not message_id:
return {"success": False, "error": "No message ID in bot reply"}
# Fetch the message
message = await client.get_messages(bot_entity, ids=message_id)
if not message:
return {"success": False, "error": "Could not fetch message"}
# Check if message contains a file
if not message.media:
debug_print("Message contains no media for download")
return None
info_print("File detected in bot reply, starting download...")
# Download the file
download_result = await download_telegram_file(client, message)
return download_result
except Exception as e:
error_print(f"Error handling file download: {str(e)}")
debug_print(f"File download handling error: {type(e).__name__}: {str(e)}")
return {"success": False, "error": f"File download handling failed: {str(e)}"}
finally:
await client.disconnect()
# Get user input with timeout
[docs]
async def load_credentials_from_file(credentials_path, print_result=True):
"""Load API credentials from JSON file, validate, and prompt user if invalid or missing."""
global TG_API_ID, TG_API_HASH, PHONE, BOT_USERNAME
def prompt_for_credentials():
if print_result:
print("\nPlease enter your Telegram API credentials.")
tg_api_id = get_input_with_timeout("API ID: ", timeout=30, default="", keep_origin=True)
if not tg_api_id:
if print_result:
error_print("No API ID entered. Exiting.")
return None
tg_api_hash = get_input_with_timeout("API Hash: ", timeout=30, default="", keep_origin=True)
if not tg_api_hash:
if print_result:
error_print("No API Hash entered. Exiting.")
return None
phone = get_input_with_timeout("Phone number (with country code): ", timeout=30, default="", keep_origin=True)
if not phone:
if print_result:
error_print("No phone number entered. Exiting.")
return None
bot_username = get_input_with_timeout("Bot username (default: SciNexBot): ", timeout=30, default="SciNexBot", keep_origin=True)
if not bot_username:
bot_username = "SciNexBot"
return {
"tg_api_id": tg_api_id,
"tg_api_hash": tg_api_hash,
"phone": phone,
"bot_username": bot_username
}
async def validate_and_save(creds):
global TG_API_ID, TG_API_HASH, PHONE, BOT_USERNAME
TG_API_ID = creds.get("tg_api_id", TG_API_ID)
TG_API_HASH = creds.get("tg_api_hash", TG_API_HASH)
PHONE = creds.get("phone", PHONE)
BOT_USERNAME = creds.get("bot_username", BOT_USERNAME)
# Validate credentials
test_result = await test_credentials(TG_API_ID, TG_API_HASH, PHONE)
if not os.path.exists(DEFAULT_PROXY_FILE):
if print_result:
info_print(f"Proxy file not found: {DEFAULT_PROXY_FILE}")
info_print("Attempting to find a suitable free proxy...")
working_proxy = await test_and_select_working_proxy()
if working_proxy:
if print_result:
info_print("✓ Found and configured a working proxy")
else:
if print_result:
error_print("Could not find a working proxy for Telegram")
error_print("You can either:")
error_print("1. Try running again (will test different proxies)")
error_print("2. Run with --no-proxy to connect directly")
error_print("3. Provide a custom proxy configuration file")
return None
if not test_result.get("ok") and os.path.exists(DEFAULT_PROXY_FILE):
if print_result:
info_print("Credential test failed without proxy, retrying with proxy...")
test_result = await test_credentials(TG_API_ID, TG_API_HASH, PHONE, proxy=DEFAULT_PROXY_FILE)
if test_result.get("ok"):
if print_result:
info_print("Credentials validated successfully.")
# Save to default location if not already there or if different
save_needed = True
if os.path.exists(CREDENTIALS_FILE):
try:
with open(CREDENTIALS_FILE, 'r') as f:
existing = json.load(f)
if (
str(existing.get("tg_api_id", "")) == str(TG_API_ID)
and str(existing.get("tg_api_hash", "")) == str(TG_API_HASH)
and str(existing.get("phone", "")) == str(PHONE)
and str(existing.get("bot_username", "")) == str(BOT_USERNAME)
):
save_needed = False
except Exception:
save_needed = True
if save_needed:
try:
with open(CREDENTIALS_FILE, 'w') as f:
json.dump({
"tg_api_id": TG_API_ID,
"tg_api_hash": TG_API_HASH,
"phone": PHONE,
"bot_username": BOT_USERNAME
}, f, indent=2)
if print_result:
info_print(f"Credentials saved to: {CREDENTIALS_FILE}")
except Exception as e:
debug_print(f"Warning: Could not save credentials to default location: {e}")
return [TG_API_ID, TG_API_HASH, PHONE, BOT_USERNAME]
else:
if print_result:
error_print(f"Credential validation failed: {test_result.get('error', 'Unknown error')}")
return None
debug_print(f"Loading credentials from: {credentials_path}")
creds = None
if os.path.exists(credentials_path):
try:
with open(credentials_path, 'r') as f:
creds = json.load(f)
result = await validate_and_save(creds)
if result:
return result
else:
if print_result:
info_print("Credentials in file are invalid. Please re-enter.")
except Exception as e:
if print_result:
error_print(f"Error loading credentials file: {e}")
debug_print(f"Credentials loading error: {type(e).__name__}: {str(e)}")
creds = None
if credentials_path != CREDENTIALS_FILE and os.path.exists(CREDENTIALS_FILE):
try:
with open(CREDENTIALS_FILE, 'r') as f:
creds = json.load(f)
result = await validate_and_save(creds)
if result:
return result
else:
if print_result:
info_print("Credentials in default location are invalid. Please re-enter.")
except Exception as e:
if print_result:
error_print(f"Error loading credentials file: {e}")
debug_print(f"Credentials loading error: {type(e).__name__}: {str(e)}")
creds = None
for attempt in range(2):
creds = prompt_for_credentials()
if not creds:
if print_result:
error_print("No credentials provided. Exiting.")
return None
result = await validate_and_save(creds)
if result:
return result
else:
if print_result:
info_print("Credentials invalid. Please try again.")
if print_result:
error_print("Failed to provide valid credentials after multiple attempts or timeout.")
return None
[docs]
async def test_credentials(api_id, api_hash, phone_number, session_file=SESSION_FILE, proxy=None):
"""
Test if the provided Telegram API credentials are correct by attempting to connect and authorize.
Returns a dictionary with the result.
"""
result = {
"ok": False,
"error": None,
"user": None
}
try:
proxy_config = load_proxy_config(proxy) if proxy else None
client = create_telegram_client(api_id, api_hash, session_file, proxy_config)
await client.start(phone=phone_number if phone_number else None)
if await client.is_user_authorized():
me = await client.get_me()
result["ok"] = True
result["user"] = {
"id": me.id,
"first_name": me.first_name,
"last_name": me.last_name,
"username": me.username,
"phone": me.phone
}
else:
result["error"] = "Not authorized. Credentials may be invalid or session expired."
except Exception as e:
result["error"] = f"Credential test failed: {str(e)}"
finally:
try:
await client.disconnect()
except Exception:
pass
return result
[docs]
async def setup_proxy_configuration(proxy_arg):
"""Setup proxy configuration - load existing or find new working proxy"""
if proxy_arg is None:
debug_print("No proxy specified, connecting directly")
return None
info_print("Proxy option specified")
# If proxy is just the flag without value, use default
proxy_file = proxy_arg if proxy_arg else DEFAULT_PROXY_FILE
debug_print(f"Proxy file path: {proxy_file}")
# Check if proxy file exists
if os.path.exists(proxy_file):
info_print(f"Loading existing proxy configuration from: {proxy_file}")
proxy_config = load_proxy_config(proxy_file)
if proxy_config:
info_print("✓ Proxy configuration loaded successfully")
return proxy_file
else:
error_print(f"Failed to load proxy configuration from: {proxy_file}")
return False
else:
info_print(f"Proxy file not found: {proxy_file}")
info_print("Searching for a working proxy...")
# Try to find a working proxy
working_proxy = await test_and_select_working_proxy()
if working_proxy:
info_print("✓ Found and configured a working proxy")
return DEFAULT_PROXY_FILE
else:
error_print("Could not find a working proxy for Telegram")
error_print("You can either:")
error_print("1. Try running again (will test different proxies)")
error_print("2. Run with --no-proxy to connect directly")
error_print("3. Provide a custom proxy configuration file")
return False
[docs]
async def wait_and_download_file(click_result, proxy_to_use):
"""Wait for file upload to Telegram and download it"""
# Extract file size information from click_result (previously parsed from callback_data)
if verbose_mode:
print("Clicked button result:")
print(click_result)
# First check if file size was already extracted from callback_data
button_info = click_result.get("button_info", {})
file_size_mb = button_info.get("file_size_mb", 0)
if file_size_mb > 0:
size_unit = button_info.get("size_unit", "MB")
original_size = button_info.get("original_size", file_size_mb)
if verbose_mode:
info_print(f"Using file size from button: {original_size} {size_unit} ({file_size_mb:.2f} MB)")
else:
# Fallback: try to extract from bot response text
bot_text = click_result.get("bot_reply", {}).get("text", "")
# Handle MB and MiB separately since they are different units
mb_match = re.search(r'(\d+(?:\.\d+)?)\s*(?:mb|MB|megabytes?)', bot_text, re.IGNORECASE)
mib_match = re.search(r'(\d+(?:\.\d+)?)\s*(?:mib|MiB)', bot_text, re.IGNORECASE)
if mb_match:
file_size_mb = float(mb_match.group(1))
if verbose_mode:
info_print(f"Detected file size from bot text: {file_size_mb} MB")
elif mib_match:
# Convert MiB to MB: 1 MiB = 1.048576 MB
file_size_mib = float(mib_match.group(1))
file_size_mb = file_size_mib * 1.048576
if verbose_mode:
info_print(f"Detected file size from bot text: {file_size_mib} MiB ({file_size_mb:.2f} MB)")
else:
# Check for other size units
kb_match = re.search(r'(\d+(?:\.\d+)?)\s*(?:kb|KB|kilobytes?)', bot_text, re.IGNORECASE)
kib_match = re.search(r'(\d+(?:\.\d+)?)\s*(?:kib|KiB)', bot_text, re.IGNORECASE)
if kb_match:
file_size_mb = float(kb_match.group(1)) / 1000
if verbose_mode:
info_print(f"Detected file size from bot text: {file_size_mb:.2f} MB")
elif kib_match:
# Convert KiB to MB: 1 KiB = 0.001024 MB
file_size_kib = float(kib_match.group(1))
file_size_mb = file_size_kib * 0.001024
if verbose_mode:
info_print(f"Detected file size from bot text: {file_size_kib} KiB ({file_size_mb:.2f} MB)")
else:
# Default assumption for academic papers
file_size_mb = 5.0
if verbose_mode:
info_print("No file size detected, assuming 5 MB for academic paper")
# Calculate wait time based on file size
base_wait = 10
size_based_wait = int(file_size_mb * 5)
total_wait = max(base_wait, size_based_wait)
info_print(f"Waiting {total_wait} seconds for file preparation...")
# Wait with progress indication (only show progress in verbose mode)
for i in range(total_wait):
if verbose_mode and i % 5 == 0 and i > 0:
info_print(f"Still waiting... {i}/{total_wait} seconds")
await asyncio.sleep(1)
info_print("Checking for file...")
# Handle file download if the bot reply contains a file
download_result = await handle_file_download_from_bot_reply(
click_result.get("bot_reply"), proxy_to_use
)
if download_result and download_result.get("success"):
info_print("✓ File downloaded successfully!")
info_print(f"File saved to: {download_result['file_path']}")
if verbose_mode:
info_print(f"File size: {download_result['file_size'] / (1024*1024):.2f} MB")
info_print(f"Download speed: {download_result['speed_mbps']:.2f} MB/s")
elif download_result and not download_result.get("success"):
error_print(f"✗ File download failed: {download_result.get('error', 'Unknown error')}")
else:
debug_print("No file to download in bot reply")
[docs]
async def get_latest_messages_from_bot(api_id, api_hash, bot_username, session_file=SESSION_FILE, limit=10, proxy=None):
"""
Get the latest messages from a bot
Args:
api_id: Your Telegram API ID
api_hash: Your Telegram API hash
bot_username: Bot's username
session_file: Name of the session file
limit: Maximum number of messages to retrieve (default: 10)
proxy: Proxy configuration dict or file path
Returns:
Dictionary with success status and messages list
"""
debug_print(f"Getting latest {limit} messages from bot: {bot_username}")
# Load proxy configuration
proxy_config = load_proxy_config(proxy)
if proxy and proxy_config is None:
return {"error": "Error loading proxy configuration"}
# Create client
client = create_telegram_client(api_id, api_hash, session_file, proxy_config)
try:
# Check if session file exists
if not os.path.exists(session_file):
error_print(f"Session file not found: {session_file}")
return {"error": "Session file not found. Run script interactively first to create session."}
debug_print("Starting client to get latest messages...")
if proxy_config:
info_print(f"Connecting through proxy: {proxy_config['type']}://{proxy_config['addr']}:{proxy_config['port']}")
await client.start()
# Verify we're connected
if not await client.is_user_authorized():
error_print("Session expired or not authorized")
return {"error": "Session expired. Please delete the session file and run interactively to re-authenticate."}
# Get the bot entity
debug_print(f"Getting bot entity for: {bot_username}")
bot_entity = await client.get_entity(bot_username)
# Fetch messages
debug_print(f"Fetching latest {limit} messages from bot...")
messages = []
async for message in client.iter_messages(bot_entity, limit=limit):
# Extract button information
buttons = extract_button_info(message.reply_markup)
# Check if message has media
has_media = message.media is not None
media_type = None
if has_media:
if hasattr(message.media, 'document'):
media_type = "document"
elif hasattr(message.media, 'photo'):
media_type = "photo"
elif hasattr(message.media, 'video'):
media_type = "video"
else:
media_type = "other"
message_data = {
"message_id": message.id,
"date": message.date.timestamp(),
"date_formatted": message.date.strftime("%Y-%m-%d %H:%M:%S"),
"text": message.text,
"buttons": buttons,
"has_media": has_media,
"media_type": media_type,
"is_reply": message.reply_to is not None,
"views": getattr(message, 'views', None),
"forwards": getattr(message, 'forwards', None)
}
messages.append(message_data)
debug_print(f"Retrieved message {message.id}: {message.text[:50]}...")
info_print(f"Successfully retrieved {len(messages)} messages from {bot_username}")
return {
"ok": True,
"bot_username": bot_username,
"messages_count": len(messages),
"messages": messages
}
except Exception as e:
error_print(f"Error getting latest messages: {str(e)}")
debug_print(f"Exception details: {type(e).__name__}: {str(e)}")
return {"error": f"Error getting latest messages: {str(e)}"}
finally:
debug_print("Disconnecting client...")
await client.disconnect()
[docs]
async def get_user_profile(api_id, api_hash, phone_number, bot_username, session_file=SESSION_FILE, proxy=None):
"""
Get user profile information from Nexus bot by sending /profile command
Args:
api_id: Your Telegram API ID
api_hash: Your Telegram API hash
phone_number: Your phone number (not used, kept for compatibility)
bot_username: Bot's username
session_file: Name of the session file
proxy: Proxy configuration dict or file path
Returns:
Dictionary with user profile information or error
"""
info_print("Getting user profile information from Nexus bot...")
debug_print("Sending /profile command to bot")
# Use the existing send_message_to_bot function to send /profile
profile_result = await send_message_to_bot(
api_id, api_hash, phone_number, bot_username,
"/profile", session_file, proxy
)
if not profile_result.get("ok"):
error_print(f"Failed to get profile: {profile_result.get('error', 'Unknown error')}")
return profile_result
bot_reply = profile_result.get("bot_reply")
if not bot_reply:
error_print("No reply received from bot for /profile command")
return {"error": "No reply received from bot for /profile command"}
profile_text = bot_reply.get("text", "")
debug_print(f"Profile response text: {profile_text[:200]}...")
# Parse profile information from the bot response
profile_info = {
"raw_response": profile_text,
"user_level": None,
"level_emoji": None,
"level_name": None,
"n_points": None,
"uploaded_count": None,
"leaderboard_position": None,
"orcid_url": None
}
# Extract information using regex patterns for the specific Nexus format
# Extract user level with emoji and name
level_pattern = r"User level:\s*([^\s]+)\s+(.+?)\s+with\s+(\d+)\s+n-points"
level_match = re.search(level_pattern, profile_text)
if level_match:
profile_info["level_emoji"] = level_match.group(1).strip()
profile_info["level_name"] = level_match.group(2).strip()
profile_info["n_points"] = int(level_match.group(3))
profile_info["user_level"] = f"{profile_info['level_emoji']} {profile_info['level_name']}"
debug_print(f"Extracted user level: {profile_info['user_level']} with {profile_info['n_points']} n-points")
# Extract uploaded count
uploaded_pattern = r"uploaded\s+(\d+)\s+books and papers"
uploaded_match = re.search(uploaded_pattern, profile_text)
if uploaded_match:
profile_info["uploaded_count"] = int(uploaded_match.group(1))
debug_print(f"Extracted uploaded count: {profile_info['uploaded_count']}")
# Extract leaderboard position
leaderboard_pattern = r"takes\s+(\d+)(?:st|nd|rd|th)\s+leaderboard position"
leaderboard_match = re.search(leaderboard_pattern, profile_text)
if leaderboard_match:
profile_info["leaderboard_position"] = int(leaderboard_match.group(1))
debug_print(f"Extracted leaderboard position: {profile_info['leaderboard_position']}")
# Extract OrcID URL
orcid_pattern = r"OrcID:\s*Link your OrcID\s*\(([^)]+)\)"
orcid_match = re.search(orcid_pattern, profile_text)
if orcid_match:
profile_info["orcid_url"] = orcid_match.group(1).strip()
debug_print(f"Extracted OrcID URL: {profile_info['orcid_url'][:50]}...")
info_print("Successfully retrieved and parsed user profile information")
debug_print(f"Profile info extracted: {profile_info}")
result = {
"ok": True,
"profile": profile_info,
"bot_reply": bot_reply,
"sent_message": profile_result.get("sent_message")
}
return result
[docs]
async def fetch_and_display_recent_messages(api_id, api_hash, bot_username, session_file=SESSION_FILE,
limit=10, proxy=None, display=True):
"""
Fetch recent messages from a bot and optionally display them
Args:
api_id: Your Telegram API ID
api_hash: Your Telegram API hash
bot_username: Bot's username
session_file: Name of the session file
limit: Maximum number of messages to retrieve (default: 10, max: 100)
proxy: Proxy configuration dict or file path
display: Whether to display formatted results (default: True)
Returns:
Dictionary with success status and messages list
"""
# Validate and clamp limit
if limit < 1:
limit = 1
elif limit > 100: # Reasonable maximum to prevent excessive API calls
limit = 100
info_print(f"Limit clamped to maximum of 100 messages")
info_print(f"Fetching {limit} recent messages from @{bot_username}...")
# Use existing function to get messages
messages_result = await get_latest_messages_from_bot(
api_id, api_hash, bot_username, session_file, limit, proxy
)
# Display results if requested
if display:
format_messages_result(messages_result)
return messages_result
[docs]
async def fetch_nexus_aaron_messages(api_id, api_hash, phone_number, session_file=SESSION_FILE,
limit=10, proxy=None, display=True):
"""
Fetch recent messages from the @nexus_aaron bot specifically
Args:
api_id: Your Telegram API ID
api_hash: Your Telegram API hash
phone_number: Your phone number (not used, kept for compatibility)
session_file: Name of the session file
limit: Maximum number of messages to retrieve (default: 10, max: 100)
proxy: Proxy configuration dict or file path
display: Whether to display formatted results (default: True)
Returns:
Dictionary with success status and messages list from @nexus_aaron
"""
nexus_aaron_username = "nexus_aaron"
info_print(f"Fetching {limit} recent messages from @{nexus_aaron_username}...")
debug_print(f"Specialized function for Nexus Aaron bot messages")
# Use existing function to get messages from nexus_aaron
messages_result = await get_latest_messages_from_bot(
api_id, api_hash, nexus_aaron_username, session_file, limit, proxy
)
# Display results if requested with specialized formatting for nexus_aaron
if display and messages_result.get("ok"):
format_nexus_aaron_messages(messages_result)
elif display:
format_messages_result(messages_result)
return messages_result
[docs]
def get_publisher_name_from_doi(doi):
"""
Extract publisher name from DOI using Crossref API
Args:
doi: DOI string (e.g., "10.1038/nature12373")
Returns:
Publisher name string or None if not found
"""
if not doi or not isinstance(doi, str):
return None
# Extract publisher prefix from DOI (part between 10. and /)
doi_match = re.match(r'^10\.(\d+)/', doi.strip())
if not doi_match:
debug_print(f"Invalid DOI format for publisher extraction: {doi}")
return None
publisher_prefix = doi_match.group(1)
debug_print(f"Extracted publisher prefix from DOI {doi}: {publisher_prefix}")
try:
# Query Crossref API for publisher information
# Use the DOI to get work information which includes publisher
crossref_url = f"https://api.crossref.org/works/{doi}"
headers = {
'User-Agent': 'Mozilla/5.0 (compatible; TelegramBot/1.0; mailto:your-email@example.com)'
}
debug_print(f"Querying Crossref API for DOI: {doi}")
response = requests.get(crossref_url, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
# Extract publisher name from the response
work = data.get('message', {})
publisher = work.get('publisher')
if publisher:
debug_print(f"Found publisher name for DOI {doi}: {publisher}")
return publisher
else:
debug_print(f"No publisher information found in Crossref response for DOI {doi}")
# Fallback: try to get institution/organization info
institution = work.get('institution')
if institution and isinstance(institution, list) and len(institution) > 0:
inst_name = institution[0].get('name')
if inst_name:
debug_print(f"Found institution name as fallback for DOI {doi}: {inst_name}")
return inst_name
elif response.status_code == 404:
debug_print(f"DOI not found in Crossref database: {doi}")
return None
else:
debug_print(f"Crossref API error for DOI {doi}: HTTP {response.status_code}")
return None
except requests.RequestException as e:
debug_print(f"Error querying Crossref API for DOI {doi}: {str(e)}")
return None
except Exception as e:
debug_print(f"Unexpected error getting publisher name for DOI {doi}: {str(e)}")
return None
return None
[docs]
def parse_nexus_aaron_request(text):
"""
Parse a nexus_aaron request message to extract structured information
Args:
text: The raw message text from nexus_aaron
Returns:
Dictionary with parsed information
"""
request_info = {
'request_count': 'Unknown',
'pub_type': 'Unknown',
'doi': None,
'publisher_code': None,
'libstc_link': None,
'worldcat_link': None,
'raw_text': text
}
if not text:
return request_info
# Extract request count: #request (X)
request_match = re.search(r'#request \((\d+)\)', text)
if request_match:
request_info['request_count'] = request_match.group(1)
# Determine publication type by emoji
if '🔬' in text:
request_info['pub_type'] = 'Research Paper'
elif '📚' in text:
request_info['pub_type'] = 'Book'
elif '📖' in text:
request_info['pub_type'] = 'Book Chapter'
else:
request_info['pub_type'] = 'Unknown'
# Extract DOI
doi_match = re.search(r'(10\.\d+/[^\s\]]+)', text)
if doi_match:
request_info['doi'] = doi_match.group(1)
# Extract publisher code (e.g., #p_1177)
publisher_match = re.search(r'#p_(\d+)', text)
if publisher_match:
request_info['publisher_code'] = f"p_{publisher_match.group(1)}"
# Extract LibSTC link
libstc_match = re.search(r'\[🔬\]\((https://libstc\.cc/[^)]+)\)', text)
if not libstc_match:
libstc_match = re.search(r'\[📚\]\((https://libstc\.cc/[^)]+)\)', text)
if libstc_match:
request_info['libstc_link'] = libstc_match.group(1)
# Extract WorldCat link
worldcat_match = re.search(r'\[worldcat\]\((https://search\.worldcat\.org/[^)]+)\)', text)
if worldcat_match:
request_info['worldcat_link'] = worldcat_match.group(1)
return request_info
[docs]
def parse_nexus_aaron_upload(text):
"""
Parse a nexus_aaron upload/voting message to extract structured information
Args:
text: The raw message text from nexus_aaron upload
Returns:
Dictionary with parsed upload information
"""
upload_info = {
'title': 'Unknown',
'author': None,
'year': None,
'pages': None,
'pub_type': 'Unknown',
'doi': None,
'worldcat_link': None,
'isbn': None,
'libstc_link': None,
'raw_text': text
}
if not text:
return upload_info
# Determine publication type by emoji
if '🔬' in text:
upload_info['pub_type'] = 'Research Paper'
elif '📚' in text:
upload_info['pub_type'] = 'Book'
elif '📖' in text:
upload_info['pub_type'] = 'Book Chapter'
else:
upload_info['pub_type'] = 'Unknown'
# Extract title from **title** format
title_match = re.search(r'\*\*([^*]+)\*\*', text)
if title_match:
upload_info['title'] = title_match.group(1).strip()
# Extract year from (YYYY) or (YYYY-MM) format
year_match = re.search(r'\((\d{4})(?:-\d{2})?\)', text)
if year_match:
upload_info['year'] = year_match.group(1)
# Extract author name (appears after title and before year)
# Pattern: **Title** (year) \nAuthor pp. pages
author_match = re.search(r'\*\*[^*]+\*\*[^\\n]*\\n([^\\n]+?)(?:\s+pp\.\s+\d+)?', text)
if author_match:
upload_info['author'] = author_match.group(1).strip()
# Extract pages
pages_match = re.search(r'pp\.\s+(\d+)', text)
if pages_match:
upload_info['pages'] = pages_match.group(1)
# Extract DOI
doi_match = re.search(r'(10\.\d+/[^\s\]]+)', text)
if doi_match:
upload_info['doi'] = doi_match.group(1)
# Extract WorldCat link and ISBN
worldcat_match = re.search(r'\[isbn:(\d+)\]\((https://search\.worldcat\.org/[^)]+)\)', text)
if worldcat_match:
upload_info['isbn'] = worldcat_match.group(1)
upload_info['worldcat_link'] = worldcat_match.group(2)
# Extract LibSTC link
libstc_match = re.search(r'\[🔬\]\((https://libstc\.cc/[^)]+)\)', text)
if not libstc_match:
libstc_match = re.search(r'\[📚\]\((https://libstc\.cc/[^)]+)\)', text)
if libstc_match:
upload_info['libstc_link'] = libstc_match.group(1)
return upload_info
[docs]
async def upload_file_to_bot(api_id, api_hash, phone_number, bot_username, file_path, message="", session_file=SESSION_FILE, proxy=None):
"""
Upload a file to a Telegram bot with optional message
Args:
api_id: Your Telegram API ID
api_hash: Your Telegram API hash
phone_number: Your phone number (not used, kept for compatibility)
bot_username: Bot's username
file_path: Path to the file to upload
message: Optional message to send with the file (default: "")
session_file: Name of the session file
proxy: Proxy configuration dict or file path
Returns:
Dictionary with upload result and bot reply
"""
debug_print(f"Uploading file to bot: {file_path}")
# Validate file exists
if not os.path.exists(file_path):
error_print(f"File not found: {file_path}")
return {"error": f"File not found: {file_path}"}
# Get file info
file_size = os.path.getsize(file_path)
file_name = os.path.basename(file_path)
file_size_mb = file_size / (1024 * 1024)
info_print(f"Preparing to upload: {file_name}")
info_print(f"File size: {file_size_mb:.2f} MB")
# Load proxy configuration
proxy_config = load_proxy_config(proxy)
if proxy and proxy_config is None:
return {"error": "Error loading proxy configuration"}
# Create client
client = create_telegram_client(api_id, api_hash, session_file, proxy_config)
try:
# Check if session file exists
if not os.path.exists(session_file):
error_print(f"Session file not found: {session_file}")
return {"error": "Session file not found. Run script interactively first to create session."}
debug_print("Starting client for file upload...")
if proxy_config:
info_print(f"Connecting through proxy: {proxy_config['type']}://{proxy_config['addr']}:{proxy_config['port']}")
await client.start()
# Verify we're connected
if not await client.is_user_authorized():
error_print("Session expired or not authorized")
return {"error": "Session expired. Please delete the session file and run interactively to re-authenticate."}
debug_print("User authorized successfully")
# Get the bot entity
debug_print(f"Getting bot entity for: {bot_username}")
bot_entity = await client.get_entity(bot_username)
debug_print(f"Bot entity retrieved: {bot_entity.id}")
# Create message handler for bot responses
handler, get_bot_reply = create_message_handler(bot_entity)
client.on(events.NewMessage(from_users=bot_entity))(handler)
# Upload progress callback
last_progress = 0
def progress_callback(current, total):
nonlocal last_progress
if total > 0:
progress = int((current / total) * 100)
if progress >= last_progress + 10: # Update every 10%
info_print(f"Upload progress: {progress}% ({current / (1024*1024):.2f}/{total / (1024*1024):.2f} MB)")
last_progress = progress
# Send file with optional message
debug_print(f"Starting file upload: {file_path}")
start_time = datetime.now()
result = await client.send_file(
bot_username,
file_path,
caption=message if message else None,
progress_callback=progress_callback
)
end_time = datetime.now()
upload_time = (end_time - start_time).total_seconds()
upload_speed_mbps = file_size_mb / max(upload_time, 1)
info_print(f"✓ File uploaded successfully!")
info_print(f"Upload time: {upload_time:.2f} seconds")
info_print(f"Upload speed: {upload_speed_mbps:.2f} MB/s")
info_print(f"Message ID: {result.id}")
# Wait for bot reply
bot_reply = await wait_for_reply(get_bot_reply, timeout=30)
# If no immediate reply, fetch recent messages
if bot_reply is None:
bot_reply = await fetch_recent_messages(client, bot_entity, result)
response = {
"ok": True,
"uploaded_file": {
"file_path": file_path,
"file_name": file_name,
"file_size": file_size,
"file_size_mb": file_size_mb,
"upload_time": upload_time,
"upload_speed_mbps": upload_speed_mbps,
"message_id": result.id,
"date": result.date.timestamp(),
"caption": message if message else None
},
"bot_reply": bot_reply
}
debug_print("File upload operation completed successfully")
return response
except Exception as e:
error_print(f"Error uploading file: {str(e)}")
debug_print(f"File upload exception: {type(e).__name__}: {str(e)}")
return {"error": f"Error uploading file: {str(e)}"}
finally:
debug_print("Disconnecting client after file upload...")
await client.disconnect()
[docs]
async def upload_file_to_nexus_aaron(api_id, api_hash, phone_number, file_path, message="", session_file=SESSION_FILE, proxy=None):
"""
Upload a file to the @nexus_aaron bot specifically
Args:
api_id: Your Telegram API ID
api_hash: Your Telegram API hash
phone_number: Your phone number (not used, kept for compatibility)
file_path: Path to the file to upload
message: Optional message to send with the file (default: "")
session_file: Name of the session file
proxy: Proxy configuration dict or file path
Returns:
Dictionary with upload result and bot reply from @nexus_aaron
"""
nexus_aaron_username = "nexus_aaron"
info_print(f"Uploading file to @{nexus_aaron_username}...")
debug_print(f"File path: {file_path}")
# Use the existing upload_file_to_bot function with nexus_aaron as target
upload_result = await upload_file_to_bot(
api_id, api_hash, phone_number, nexus_aaron_username,
file_path, message, session_file, proxy
)
if upload_result.get("ok"):
info_print(f"✓ Successfully uploaded file to @{nexus_aaron_username}")
# Add nexus_aaron specific information to the result
upload_result["target_bot"] = nexus_aaron_username
upload_result["upload_type"] = "nexus_aaron_contribution"
# Parse bot reply for nexus_aaron specific content
bot_reply = upload_result.get("bot_reply")
if bot_reply and bot_reply.get("text"):
reply_text = bot_reply["text"]
# Check for common nexus_aaron responses
if "thank you" in reply_text.lower() or "received" in reply_text.lower():
upload_result["status"] = "received"
info_print("File was received by nexus_aaron")
elif "voting" in reply_text.lower():
upload_result["status"] = "pending_voting"
info_print("File is pending community voting")
elif "error" in reply_text.lower() or "problem" in reply_text.lower():
upload_result["status"] = "error"
info_print("nexus_aaron reported an issue with the file")
else:
upload_result["status"] = "uploaded"
debug_print(f"Upload to nexus_aaron completed with status: {upload_result.get('status', 'unknown')}")
else:
error_print(f"✗ Failed to upload file to @{nexus_aaron_username}: {upload_result.get('error', 'Unknown error')}")
return upload_result
[docs]
async def simple_upload_to_nexus_aaron(file_path, verbose=False):
"""
Upload a file to the @nexus_aaron bot with minimal input.
If the file is a PDF, try to extract the DOI using getpapers.
If DOI extraction fails, prompt the user to enter a DOI manually (with timeout).
Args:
file_path (str): Path to the file to upload.
verbose (bool): If True, enable verbose output.
Returns:
dict: Upload result.
"""
# Optionally enable verbose mode and logging
if verbose:
setup_logging(DEFAULT_LOG_FILE, verbose=True)
info_print("Verbose mode enabled for simple upload.")
# Load credentials from default location
if not os.path.exists(CREDENTIALS_FILE):
error_print("Credentials file not found. Please run the script interactively to set up credentials.")
return {"error": "Credentials file not found."}
creds = None
try:
with open(CREDENTIALS_FILE, "r") as f:
creds = json.load(f)
except Exception as e:
error_print(f"Failed to load credentials: {e}")
return {"error": f"Failed to load credentials: {e}"}
api_id = creds.get("tg_api_id")
api_hash = creds.get("tg_api_hash")
phone = creds.get("phone")
# Use default session file and proxy if available
session_file = SESSION_FILE
proxy = await decide_proxy_usage(api_id, api_hash, phone, SESSION_FILE, DEFAULT_PROXY_FILE)
# Print proxy decision for debugging/visibility
if proxy is None:
info_print("Proxy decision: No proxy needed (direct connection)")
elif proxy is False:
info_print("Proxy decision: No working proxy found (connection may fail)")
else:
info_print(f"Proxy decision: Using proxy configuration: {proxy}")
# If file is a PDF, try to extract DOI for caption
caption = ""
doi = None
if file_path.lower().endswith(".pdf"):
try:
info_print(f"Attempting to extract DOI from PDF: {file_path}")
doi = getpapers.extract_doi_from_pdf(file_path)
if doi:
caption = f"DOI: {doi}"
info_print(f"Extracted DOI from PDF: {doi}")
else:
info_print("Could not extract DOI from PDF.")
except Exception as e:
debug_print(f"Could not extract DOI from PDF: {e}")
info_print("Could not extract DOI from PDF.")
# If DOI extraction failed, prompt user for manual input
if not doi:
user_doi = get_input_with_timeout(
"Enter DOI for this PDF (or leave blank to cancel): ",
timeout=60,
default="",
keep_origin=True
)
user_doi = user_doi.strip()
if not user_doi:
error_print("No DOI provided. Upload cancelled.")
return {"error": "No DOI provided. Upload cancelled."}
else:
caption = f"DOI: {user_doi}"
info_print(f"Using manually entered DOI: {user_doi}")
# Call upload_file_to_nexus_aaron
result = await upload_file_to_nexus_aaron(
api_id, api_hash, phone, file_path, caption, session_file, proxy
)
if verbose:
format_nexus_aaron_upload_result(result)
return result
[docs]
async def list_and_reply_to_nexus_aaron_message(api_id, api_hash, phone_number, session_file=SESSION_FILE, limit=10, proxy=None):
"""
List recent research request messages from @nexus_aaron, allow user to select one, and upload a file as reply
Args:
api_id: Your Telegram API ID
api_hash: Your Telegram API hash
phone_number: Your phone number (not used, kept for compatibility)
session_file: Name of the session file
limit: Maximum number of messages to retrieve (default: 10, max: 50)
proxy: Proxy configuration dict or file path
Returns:
Dictionary with operation result
"""
nexus_aaron_username = "nexus_aaron"
# Validate and clamp limit
if limit < 1:
limit = 1
elif limit > 50:
limit = 50
info_print("Message limit adjusted to maximum of 50 for better interaction")
info_print(f"Fetching up to {limit} recent messages from @{nexus_aaron_username} to find research requests...")
# Get more messages than requested to filter for research requests only
# We'll fetch up to 3x the limit to ensure we get enough research requests
fetch_limit = min(limit * 3, 100) # Cap at 100 to avoid excessive API calls
# Use existing function to get messages
messages_result = await get_latest_messages_from_bot(
api_id, api_hash, nexus_aaron_username, session_file, fetch_limit, proxy
)
if not messages_result.get("ok"):
error_print(f"Failed to fetch messages: {messages_result.get('error', 'Unknown error')}")
return messages_result
all_messages = messages_result.get("messages", [])
if not all_messages:
info_print("No messages found")
return {"error": "No messages found in the bot"}
# Filter for research request messages only
research_requests = []
for msg in all_messages:
text = msg.get('text', '')
if text.startswith('#request'):
research_requests.append(msg)
if len(research_requests) >= limit:
break
if not research_requests:
info_print("No research request messages found")
return {"error": "No research request messages found in recent messages"}
# Display research request messages for user selection
print("\n" + "="*80)
print("RESEARCH REQUESTS FROM @nexus_aaron - SELECT ONE TO REPLY")
print("="*80)
print(f"Found {len(research_requests)} recent research request messages. Select one to reply to:\n")
for i, msg in enumerate(research_requests, 1):
# Parse request information
request_info = parse_nexus_aaron_request(msg.get('text', ''))
print(f"[{i}] Message ID: {msg['message_id']}")
print(f" 📅 Date: {msg['date_formatted']}")
print(f" ⭐ Request Point: {request_info['request_count']}")
print(f" 📄 Type: {request_info['pub_type']}")
if request_info['doi']:
print(f" 🔗 DOI: {request_info['doi']}")
if request_info['publisher_code']:
print(f" 📖 Publisher: {request_info['publisher_code']}")
if request_info['libstc_link']:
print(f" 🔬 LibSTC: {request_info['libstc_link']}")
if request_info['worldcat_link']:
print(f" 📚 WorldCat: {request_info['worldcat_link']}")
# Additional message information
if msg.get('has_media'):
media_type = msg.get('media_type', 'unknown')
print(f" 📎 Media: {media_type}")
if msg.get('buttons'):
print(f" 🔘 Interactive elements: {len(msg['buttons'])} button(s)")
# Additional stats
stats = []
if msg.get('views'):
stats.append(f"👁️ {msg['views']:,} views")
if msg.get('forwards'):
stats.append(f"🔄 {msg['forwards']:,} forwards")
if msg.get('is_reply'):
stats.append("↩️ Reply")
if stats:
print(f" 📊 Stats: {' | '.join(stats)}")
print(" " + "─" * 76)
print()
# Get user selection
while True:
try:
selection = get_input_with_timeout(
f"Select a research request to reply to (1-{len(research_requests)}) or 'q' to quit: ",
timeout=60,
default='q'
)
if selection.lower() == 'q':
info_print("Operation cancelled by user")
return {"ok": True, "cancelled": True, "message": "Operation cancelled by user"}
selected_index = int(selection) - 1
if 0 <= selected_index < len(research_requests):
selected_message = research_requests[selected_index]
break
else:
print(f"Invalid selection. Please choose a number between 1 and {len(research_requests)}")
except ValueError:
print("Invalid input. Please enter a number or 'q' to quit")
print(f"\n✓ Selected message {selected_message['message_id']} from {selected_message['date_formatted']}")
# Show selected message details with enhanced formatting
request_info = parse_nexus_aaron_request(selected_message.get('text', ''))
print(f"📋 Selected Research Request [{selected_index+1}]")
print(f"📄 Publication Type: {request_info['pub_type']}")
if request_info['doi']:
print(f"🔗 DOI: {request_info['doi']}")
# Get file path for upload
while True:
file_path = get_input_with_timeout(
"Enter the full path to the file you want to upload as reply (or 'q' to quit): ",
timeout=120,
default='q',
keep_origin=True
)
if file_path.lower() == 'q':
info_print("File upload cancelled by user")
return {"ok": True, "cancelled": True, "message": "File upload cancelled by user"}
# Expand user path and resolve relative paths
file_path = os.path.expanduser(file_path.strip().strip('"\''))
file_path = os.path.abspath(file_path)
if os.path.exists(file_path) and os.path.isfile(file_path):
break
else:
print(f"File not found: {file_path}")
print("Please enter a valid file path or 'q' to quit")
# Get file info
file_size = os.path.getsize(file_path)
file_size_mb = file_size / (1024 * 1024)
file_name = os.path.basename(file_path)
print(f"\n📁 File selected: {file_name}")
print(f"📏 Size: {file_size_mb:.2f} MB")
# Validate file size
if file_size_mb > 2000: # 2GB Telegram limit
error_print(f"File is too large ({file_size_mb:.2f} MB). Telegram limit is 2GB.")
return {"error": f"File too large: {file_size_mb:.2f} MB exceeds 2GB limit"}
if file_size_mb > 50: # Warn for large files
print(f"⚠️ Large file ({file_size_mb:.2f} MB) may take time to upload")
# Get optional caption for the file
caption = get_input_with_timeout(
"Enter an optional caption for the file (or press Enter for no caption): ",
timeout=60,
default='',
keep_origin=True
)
if caption.strip():
print(f"📝 Caption: {caption}")
# Confirm upload
confirm = get_input_with_timeout(
f"Confirm upload '{file_name}' as reply to request [{selected_index+1}] (message {selected_message['message_id']})? [y/N]: ",
timeout=30,
default='n'
)
if confirm.lower() not in ['y', 'yes']:
info_print("Upload cancelled by user")
return {"ok": True, "cancelled": True, "message": "Upload cancelled by user"}
# Now perform the actual reply upload using existing proxy and client setup logic
proxy_config = load_proxy_config(proxy)
if proxy and proxy_config is None:
return {"error": "Error loading proxy configuration"}
client = create_telegram_client(api_id, api_hash, session_file, proxy_config)
try:
# Check if session file exists
if not os.path.exists(session_file):
error_print(f"Session file not found: {session_file}")
return {"error": "Session file not found. Run script interactively first to create session."}
debug_print("Starting client for reply upload...")
if proxy_config:
info_print(f"Connecting through proxy: {proxy_config['type']}://{proxy_config['addr']}:{proxy_config['port']}")
await client.start()
# Verify we're connected
if not await client.is_user_authorized():
error_print("Session expired or not authorized")
return {"error": "Session expired. Please delete the session file and run interactively to re-authenticate."}
# Get the bot entity
debug_print(f"Getting bot entity for: {nexus_aaron_username}")
bot_entity = await client.get_entity(nexus_aaron_username)
# Get the specific message to reply to
target_message = await client.get_messages(bot_entity, ids=selected_message['message_id'])
if not target_message:
return {"error": f"Could not fetch target message {selected_message['message_id']}"}
# Create message handler for bot responses
handler, get_bot_reply = create_message_handler(bot_entity)
client.on(events.NewMessage(from_users=bot_entity))(handler)
# Upload progress callback
last_progress = 0
def progress_callback(current, total):
nonlocal last_progress
if total > 0:
progress = int((current / total) * 100)
if progress >= last_progress + 10: # Update every 10%
info_print(f"Upload progress: {progress}% ({current / (1024*1024):.2f}/{total / (1024*1024):.2f} MB)")
last_progress = progress
# Send file as reply
info_print(f"Uploading '{file_name}' as reply to request [{selected_index+1}] (message {selected_message['message_id']})...")
start_time = datetime.now()
result = await client.send_file(
bot_entity,
file_path,
caption=caption if caption.strip() else None,
reply_to=target_message, # Reply to the selected message
progress_callback=progress_callback
)
end_time = datetime.now()
upload_time = (end_time - start_time).total_seconds()
upload_speed_mbps = file_size_mb / max(upload_time, 1)
info_print(f"✓ File uploaded successfully as reply!")
info_print(f"Upload time: {upload_time:.2f} seconds")
info_print(f"Upload speed: {upload_speed_mbps:.2f} MB/s")
info_print(f"Reply message ID: {result.id}")
response = {
"ok": True,
"selected_message": {
"message_id": selected_message['message_id'],
"date": selected_message['date_formatted'],
"request_count": request_info['request_count'],
"text": selected_message['text'][:200] + "..." if len(selected_message.get('text', '')) > 200 else selected_message.get('text', '')
},
"uploaded_file": {
"file_path": file_path,
"file_name": file_name,
"file_size": file_size,
"file_size_mb": file_size_mb,
"upload_time": upload_time,
"upload_speed_mbps": upload_speed_mbps,
"reply_message_id": result.id,
"date": result.date.timestamp(),
"caption": caption if caption.strip() else None
}
}
debug_print("Message reply with file upload completed successfully")
return response
except Exception as e:
error_print(f"Error in reply upload operation: {str(e)}")
debug_print(f"Reply upload exception: {type(e).__name__}: {str(e)}")
return {"error": f"Error in reply upload operation: {str(e)}"}
finally:
debug_print("Disconnecting client after reply upload operation...")
await client.disconnect()
[docs]
async def check_doi_availability_on_nexus(api_id, api_hash, phone_number, bot_username, doi, session_file=SESSION_FILE, proxy=None, download=False):
"""
Check if a DOI is available on Nexus by sending it to the bot and analyzing the response
Args:
api_id: Your Telegram API ID
api_hash: Your Telegram API hash
phone_number: Your phone number (not used, kept for compatibility)
bot_username: Bot's username
doi: DOI number to check (e.g., "10.1038/nature12373")
session_file: Name of the session file
proxy: Proxy configuration dict or file path
download: If True, automatically download the paper if available (default: False)
Returns:
Dictionary with availability status and details, including download result if applicable
"""
info_print(f"Checking DOI availability on Nexus: {doi}")
debug_print(f"DOI to check: {doi}")
if download:
info_print("Auto-download enabled - will download paper if available")
# Validate DOI format
if not doi or not isinstance(doi, str):
return {"error": "Invalid DOI: DOI must be a non-empty string"}
# Clean and validate DOI format
doi = doi.strip()
if not re.match(r'^10\.\d+/.+', doi):
return {"error": f"Invalid DOI format: {doi}. DOI should start with '10.' followed by digits and a slash"}
# Send DOI to the bot
debug_print(f"Sending DOI query to {bot_username}: {doi}")
try:
send_result = await send_message_to_bot(
api_id, api_hash, phone_number, bot_username,
doi, session_file, proxy
)
if not send_result.get("ok"):
error_print(f"Failed to send DOI query: {send_result.get('error', 'Unknown error')}")
return {"error": f"Failed to send DOI query: {send_result.get('error', 'Unknown error')}"}
bot_reply = send_result.get("bot_reply")
if not bot_reply:
error_print("No reply received from bot for DOI query")
return {"error": "No reply received from bot for DOI query"}
reply_text = bot_reply.get("text", "").lower()
buttons = bot_reply.get("buttons", [])
debug_print(f"Bot reply text (first 200 chars): {reply_text[:200]}...")
debug_print(f"Number of buttons in reply: {len(buttons)}")
# Analyze the response to determine availability
availability_result = {
"doi": doi,
"available": False,
"status": "unknown",
"details": {},
"raw_response": bot_reply.get("text", ""),
"buttons": buttons,
"message_id": bot_reply.get("message_id"),
"download_requested": download
}
# Check for common "not found" or "no results" indicators
not_found_indicators = [
"no results found",
"not found",
"no matches",
"nothing found",
"0 results",
"no books or papers found",
"search returned no results"
]
if any(indicator in reply_text for indicator in not_found_indicators):
availability_result["status"] = "not_found"
availability_result["available"] = False
availability_result["details"]["reason"] = "DOI not found in Nexus database"
info_print(f"DOI {doi} is NOT available on Nexus (not found)")
debug_print("DOI marked as not found based on reply text indicators")
return availability_result
# Check for error messages
error_indicators = [
"error",
"invalid",
"malformed",
"cannot process",
"failed to search"
]
if any(indicator in reply_text for indicator in error_indicators):
availability_result["status"] = "error"
availability_result["available"] = False
availability_result["details"]["reason"] = "Error processing DOI query"
error_print(f"Error processing DOI {doi}")
debug_print("DOI query resulted in error based on reply text")
return availability_result
# If we have buttons, analyze them to determine availability
if buttons:
debug_print("Analyzing buttons to determine DOI availability...")
# Look for callback buttons (these typically indicate results)
callback_buttons = [btn for btn in buttons if btn.get("type") == "callback"]
if callback_buttons:
first_button = callback_buttons[0]
button_text = first_button.get("text", "").lower()
debug_print(f"First callback button text: '{button_text}'")
# Check if button indicates availability
if "request" in button_text:
# Paper is not available, needs to be requested
availability_result["status"] = "not_available_requestable"
availability_result["available"] = False
availability_result["details"]["reason"] = "Paper not available but can be requested"
availability_result["details"]["request_button"] = {
"text": first_button.get("text"),
"callback_data": first_button.get("callback_data") or first_button.get("data"),
"message_id": bot_reply.get("message_id")
}
info_print(f"DOI {doi} is NOT available on Nexus but can be requested")
debug_print("DOI can be requested based on button analysis")
elif any(word in button_text for word in ["download", "get", "pdf", "file"]):
# Paper is available for download
availability_result["status"] = "available"
availability_result["available"] = True
availability_result["details"]["reason"] = "Paper is available for download"
# Try to extract file size information
button_size_info = extract_file_size_from_button_text(first_button.get("text", ""))
callback_size_info = extract_file_size_from_callback_data(first_button.get("callback_data") or first_button.get("data"))
size_info = button_size_info or callback_size_info
if size_info:
availability_result["details"]["file_size_mb"] = size_info["size_mb"]
availability_result["details"]["file_size_unit"] = size_info["unit"]
availability_result["details"]["file_size_original"] = size_info["original_size"]
debug_print(f"Extracted file size: {size_info['original_size']} {size_info['unit']} ({size_info['size_mb']:.2f} MB)")
availability_result["details"]["download_button"] = {
"text": first_button.get("text"),
"callback_data": first_button.get("callback_data") or first_button.get("data"),
"message_id": bot_reply.get("message_id")
}
info_print(f"DOI {doi} is AVAILABLE on Nexus for download")
debug_print("DOI is available for download based on button analysis")
# Auto-download if requested
if download:
info_print(f"Auto-downloading paper for DOI: {doi}")
debug_print("Starting automatic download process...")
try:
# Click the download button using existing function
button_callback_data = first_button.get("callback_data") or first_button.get("data")
message_id = bot_reply.get("message_id")
debug_print(f"Clicking download button - Message ID: {message_id}, Callback: {button_callback_data}")
click_result = await click_callback_button(
api_id, api_hash, phone_number, bot_username,
message_id, button_callback_data, session_file, proxy
)
if click_result.get("ok"):
info_print("✓ Download button clicked successfully")
# Wait for file and download using existing function
debug_print("Waiting for file preparation and download...")
# Calculate wait time based on file size if available
file_size_mb = size_info.get("size_mb", 5.0) if size_info else 5.0
base_wait = 10
size_based_wait = int(file_size_mb * 5) # 5 seconds per MB
total_wait = max(base_wait, size_based_wait)
info_print(f"Waiting {total_wait} seconds for file preparation...")
await asyncio.sleep(total_wait)
# Handle file download from bot reply
download_result = await handle_file_download_from_bot_reply(
click_result.get("bot_reply"), proxy
)
if download_result and download_result.get("success"):
availability_result["download_result"] = {
"success": True,
"file_path": download_result["file_path"],
"file_name": download_result["filename"],
"file_size": download_result["file_size"],
"file_size_mb": download_result["file_size"] / (1024*1024),
"download_time": download_result["download_time"],
"speed_mbps": download_result["speed_mbps"]
}
info_print(f"✓ Auto-download completed successfully!")
info_print(f"File saved to: {download_result['file_path']}")
debug_print(f"Download stats - Size: {download_result['file_size'] / (1024*1024):.2f} MB, Speed: {download_result['speed_mbps']:.2f} MB/s")
elif download_result and not download_result.get("success"):
availability_result["download_result"] = {
"success": False,
"error": download_result.get("error", "Unknown download error")
}
error_print(f"✗ Auto-download failed: {download_result.get('error', 'Unknown error')}")
debug_print("File download from bot reply failed")
else:
# No file in bot reply, this might be normal for some responses
availability_result["download_result"] = {
"success": False,
"error": "No file received from bot after clicking download button"
}
info_print("⚠️ No file received after clicking download button (may be normal depending on bot response)")
debug_print("No file detected in bot reply after download button click")
else:
availability_result["download_result"] = {
"success": False,
"error": f"Failed to click download button: {click_result.get('error', 'Unknown error')}"
}
error_print(f"✗ Failed to click download button: {click_result.get('error', 'Unknown error')}")
debug_print("Download button click failed")
except Exception as download_error:
availability_result["download_result"] = {
"success": False,
"error": f"Auto-download error: {str(download_error)}"
}
error_print(f"✗ Auto-download error: {str(download_error)}")
debug_print(f"Auto-download exception: {type(download_error).__name__}: {str(download_error)}")
else:
# Unknown button type, but presence suggests some result
availability_result["status"] = "found_unknown"
availability_result["available"] = None # Uncertain
availability_result["details"]["reason"] = "DOI found but availability status unclear"
availability_result["details"]["first_button"] = {
"text": first_button.get("text"),
"callback_data": first_button.get("callback_data") or first_button.get("data"),
"message_id": bot_reply.get("message_id")
}
info_print(f"DOI {doi} found but availability status unclear")
debug_print("DOI found but button type unclear")
else:
# Has buttons but no callback buttons (might be URL buttons)
url_buttons = [btn for btn in buttons if btn.get("type") == "url"]
if url_buttons:
availability_result["status"] = "found_external_links"
availability_result["available"] = False
availability_result["details"]["reason"] = "DOI found with external links but not directly available"
availability_result["details"]["external_links"] = [
{"text": btn.get("text"), "url": btn.get("url")}
for btn in url_buttons
]
info_print(f"DOI {doi} found with external links but not available on Nexus")
debug_print("DOI found with external URL buttons")
else:
# Unknown button types
availability_result["status"] = "found_unknown_buttons"
availability_result["available"] = None
availability_result["details"]["reason"] = "DOI found with unknown button types"
info_print(f"DOI {doi} found but button types unclear")
debug_print("DOI found with unknown button types")
else:
# No buttons in response
if len(reply_text.strip()) > 10: # Substantial text response
availability_result["status"] = "found_text_only"
availability_result["available"] = None
availability_result["details"]["reason"] = "DOI found with text response but no interactive elements"
info_print(f"DOI {doi} found with text response but no buttons")
debug_print("DOI query returned text but no buttons")
else:
availability_result["status"] = "minimal_response"
availability_result["available"] = False
availability_result["details"]["reason"] = "Minimal response received"
info_print(f"DOI {doi} query returned minimal response")
debug_print("DOI query returned minimal response")
# Add search metadata
availability_result["search_metadata"] = {
"query_sent": doi,
"response_length": len(reply_text),
"button_count": len(buttons),
"callback_button_count": len([btn for btn in buttons if btn.get("type") == "callback"]),
"url_button_count": len([btn for btn in buttons if btn.get("type") == "url"]),
"timestamp": datetime.now().isoformat()
}
debug_print(f"DOI availability check completed. Status: {availability_result['status']}, Available: {availability_result['available']}")
if download and availability_result.get("download_result"):
download_success = availability_result["download_result"].get("success", False)
debug_print(f"Auto-download completed. Success: {download_success}")
return availability_result
except Exception as e:
error_print(f"Error checking DOI availability: {str(e)}")
debug_print(f"DOI availability check exception: {type(e).__name__}: {str(e)}")
return {"error": f"Error checking DOI availability: {str(e)}"}
[docs]
async def batch_check_doi_availability(api_id, api_hash, phone_number, bot_username, doi_list, session_file=SESSION_FILE, proxy=None, delay=2, download=False):
"""
Check availability of multiple DOIs on Nexus with rate limiting and optional auto-download
Args:
api_id: Your Telegram API ID
api_hash: Your Telegram API hash
phone_number: Your phone number (not used, kept for compatibility)
bot_username: Bot's username
doi_list: List of DOI strings to check
session_file: Name of the session file
proxy: Proxy configuration dict or file path
delay: Delay in seconds between requests to avoid rate limiting (default: 2)
download: If True, automatically download papers that are available (default: False)
Returns:
Dictionary with batch results including download information
"""
if not doi_list or not isinstance(doi_list, list):
return {"error": "DOI list must be a non-empty list"}
info_print(f"Starting batch DOI availability check for {len(doi_list)} DOIs")
if download:
info_print("Auto-download enabled - will download available papers")
debug_print(f"Delay between requests: {delay} seconds")
batch_results = {
"total_dois": len(doi_list),
"processed": 0,
"available": 0,
"not_available": 0,
"requestable": 0,
"not_found": 0,
"errors": 0,
"downloaded": 0,
"download_errors": 0,
"results": [],
"downloads": [],
"summary": {},
"download_enabled": download,
"started_at": datetime.now().isoformat()
}
try:
for i, doi in enumerate(doi_list, 1):
info_print(f"Checking DOI {i}/{len(doi_list)}: {doi}")
# Check individual DOI with download option
result = await check_doi_availability_on_nexus(
api_id, api_hash, phone_number, bot_username,
doi, session_file, proxy, download=download
)
# Add to batch results
batch_results["results"].append(result)
batch_results["processed"] += 1
# Update counters based on result
if "error" in result:
batch_results["errors"] += 1
debug_print(f"DOI {doi} resulted in error: {result['error']}")
else:
status = result.get("status", "unknown")
available = result.get("available")
if status == "available":
batch_results["available"] += 1
# Check if download was attempted and track results
if download:
download_result = result.get("download_result")
if download_result:
if download_result.get("success"):
batch_results["downloaded"] += 1
batch_results["downloads"].append({
"doi": doi,
"success": True,
"file_path": download_result.get("file_path"),
"file_name": download_result.get("file_name"),
"file_size_mb": download_result.get("file_size_mb"),
"download_time": download_result.get("download_time"),
"speed_mbps": download_result.get("speed_mbps")
})
info_print(f"✓ Downloaded paper for DOI {doi}")
else:
batch_results["download_errors"] += 1
batch_results["downloads"].append({
"doi": doi,
"success": False,
"error": download_result.get("error", "Unknown download error")
})
info_print(f"✗ Failed to download paper for DOI {doi}")
else:
# Available but no download attempted (shouldn't happen if download=True)
debug_print(f"DOI {doi} available but no download result found")
elif status == "not_available_requestable":
batch_results["requestable"] += 1
batch_results["not_available"] += 1
elif status == "not_found":
batch_results["not_found"] += 1
batch_results["not_available"] += 1
else:
batch_results["not_available"] += 1
debug_print(f"DOI {doi} status: {status}, available: {available}")
# Rate limiting delay (except for last request)
if i < len(doi_list):
debug_print(f"Waiting {delay} seconds before next request...")
await asyncio.sleep(delay)
# Generate summary
batch_results["completed_at"] = datetime.now().isoformat()
batch_results["summary"] = {
"available_count": batch_results["available"],
"requestable_count": batch_results["requestable"],
"not_found_count": batch_results["not_found"],
"error_count": batch_results["errors"],
"success_rate": round((batch_results["processed"] - batch_results["errors"]) / batch_results["total_dois"] * 100, 2),
"availability_rate": round(batch_results["available"] / batch_results["total_dois"] * 100, 2) if batch_results["total_dois"] > 0 else 0
}
# Add download summary if download was enabled
if download:
batch_results["summary"]["downloaded_count"] = batch_results["downloaded"]
batch_results["summary"]["download_errors_count"] = batch_results["download_errors"]
batch_results["summary"]["download_success_rate"] = round(batch_results["downloaded"] / batch_results["available"] * 100, 2) if batch_results["available"] > 0 else 0
# Calculate total download statistics
successful_downloads = [d for d in batch_results["downloads"] if d.get("success")]
if successful_downloads:
total_size_mb = sum(d.get("file_size_mb", 0) for d in successful_downloads)
total_time = sum(d.get("download_time", 0) for d in successful_downloads)
avg_speed = sum(d.get("speed_mbps", 0) for d in successful_downloads) / len(successful_downloads)
batch_results["summary"]["total_downloaded_mb"] = round(total_size_mb, 2)
batch_results["summary"]["total_download_time"] = round(total_time, 2)
batch_results["summary"]["average_download_speed_mbps"] = round(avg_speed, 2)
info_print(f"Batch DOI check completed: {batch_results['available']} available, {batch_results['requestable']} requestable, {batch_results['not_found']} not found, {batch_results['errors']} errors")
if download:
info_print(f"Download results: {batch_results['downloaded']} successful, {batch_results['download_errors']} failed")
return batch_results
except Exception as e:
error_print(f"Error in batch DOI availability check: {str(e)}")
debug_print(f"Batch check exception: {type(e).__name__}: {str(e)}")
batch_results["error"] = f"Batch processing error: {str(e)}"
batch_results["completed_at"] = datetime.now().isoformat()
return batch_results
[docs]
async def download_from_nexus_bot(doi, download_dir=None, bot_username=None):
"""
Download a paper from Nexus based on DOI
Args:
doi: DOI string to search and download (e.g., "10.1038/nature12373")
download_dir: Target directory to save the file (optional, uses default if None)
bot_username: Bot username to use (optional, uses global BOT_USERNAME if None)
Returns:
Dictionary with download result and file information
"""
if not doi or not isinstance(doi, str):
return {"success": False, "error": "Invalid DOI: DOI must be a non-empty string"}
doi = doi.strip()
if not re.match(r'^10\.\d+/.+', doi):
return {"success": False, "error": f"Invalid DOI format: {doi}. DOI should start with '10.' followed by digits and a slash"}
target_bot = bot_username or BOT_USERNAME
info_print(f"Downloading from Nexus - DOI: {doi}")
debug_print(f"Target bot: {target_bot}")
debug_print(f"Download directory: {download_dir or 'default'}")
try:
# Step 1: Decide proxy usage (try direct connection first)
proxy_to_use = None
try:
client = create_telegram_client(TG_API_ID, TG_API_HASH, SESSION_FILE, proxy=None)
await client.start(phone=PHONE if PHONE else None)
is_auth = await client.is_user_authorized()
await client.disconnect()
if not is_auth:
proxy_to_use = await decide_proxy_usage(TG_API_ID, TG_API_HASH, PHONE, SESSION_FILE, DEFAULT_PROXY_FILE)
except Exception:
proxy_to_use = await decide_proxy_usage(TG_API_ID, TG_API_HASH, PHONE, SESSION_FILE, DEFAULT_PROXY_FILE)
# Step 2: Check DOI availability with auto-download enabled
info_print("Step 1: Checking DOI availability on Nexus...")
availability_result = await check_doi_availability_on_nexus(
TG_API_ID, TG_API_HASH, PHONE, target_bot,
doi, SESSION_FILE, proxy_to_use, download=True
)
if "error" in availability_result:
error_print(f"DOI availability check failed: {availability_result['error']}")
return {
"success": False,
"error": f"DOI availability check failed: {availability_result['error']}",
"doi": doi
}
status = availability_result.get("status", "unknown")
available = availability_result.get("available", False)
debug_print(f"DOI status: {status}, Available: {available}")
if not available:
if status == "not_available_requestable":
info_print("Paper is not available on Nexus but can be requested")
return {
"success": False,
"error": "Paper not available on Nexus - can be requested from community",
"doi": doi,
"status": "requestable",
"request_info": availability_result.get("details", {}).get("request_button")
}
elif status == "not_found":
info_print("DOI not found in Nexus database")
return {
"success": False,
"error": "DOI not found in Nexus database",
"doi": doi,
"status": "not_found"
}
else:
info_print(f"Paper not available - status: {status}")
return {
"success": False,
"error": f"Paper not available - status: {status}",
"doi": doi,
"status": status
}
download_result = availability_result.get("download_result")
if not download_result:
error_print("Paper is available but no download was attempted")
return {
"success": False,
"error": "Paper is available but download was not attempted",
"doi": doi,
"status": "available_no_download"
}
if not download_result.get("success"):
error_print(f"Download failed: {download_result.get('error', 'Unknown download error')}")
return {
"success": False,
"error": f"Download failed: {download_result.get('error', 'Unknown download error')}",
"doi": doi,
"status": "download_failed"
}
downloaded_file_path = download_result.get("file_path")
file_name = download_result.get("file_name") or download_result.get("filename")
file_size = download_result.get("file_size", 0)
file_size_mb = file_size / (1024 * 1024) if file_size else 0
if not downloaded_file_path or not os.path.exists(downloaded_file_path):
error_print("Downloaded file not found on disk")
return {
"success": False,
"error": "Downloaded file not found on disk",
"doi": doi,
"status": "file_missing"
}
info_print(f"✓ File downloaded successfully: {file_name}")
info_print(f"File size: {file_size_mb:.2f} MB")
debug_print(f"Downloaded to: {downloaded_file_path}")
final_file_path = downloaded_file_path
if download_dir:
target_dir = os.path.expanduser(download_dir.strip())
target_dir = os.path.abspath(target_dir)
debug_print(f"Target directory specified: {target_dir}")
try:
os.makedirs(target_dir, exist_ok=True)
debug_print(f"Target directory created/verified: {target_dir}")
target_file_path = os.path.join(target_dir, file_name)
if os.path.exists(target_file_path):
base_name, ext = os.path.splitext(file_name)
counter = 1
while os.path.exists(target_file_path):
new_name = f"{base_name}_{counter}{ext}"
target_file_path = os.path.join(target_dir, new_name)
counter += 1
info_print(f"File name conflict resolved: {os.path.basename(target_file_path)}")
debug_print(f"Original file exists, using: {target_file_path}")
shutil.move(downloaded_file_path, target_file_path)
final_file_path = target_file_path
info_print(f"✓ File moved to: {target_file_path}")
debug_print("File move operation completed successfully")
except Exception as move_error:
error_print(f"Warning: Could not move file to specified directory: {str(move_error)}")
debug_print(f"File move exception: {type(move_error).__name__}: {str(move_error)}")
info_print(f"File remains at: {downloaded_file_path}")
result = {
"success": True,
"doi": doi,
"file_path": final_file_path,
"file_name": os.path.basename(final_file_path),
"file_size": file_size,
"file_size_mb": file_size_mb,
"download_time": download_result.get("download_time", 0),
"speed_mbps": download_result.get("speed_mbps", 0),
"original_download_path": downloaded_file_path,
"moved_to_target_dir": download_dir is not None and final_file_path != downloaded_file_path,
"target_directory": download_dir,
"status": "downloaded"
}
details = availability_result.get("details", {})
if details.get("file_size_mb"):
result["expected_file_size_mb"] = details["file_size_mb"]
result["file_size_unit"] = details.get("file_size_unit", "MB")
info_print(f"✓ Download completed successfully!")
info_print(f"Final location: {final_file_path}")
debug_print(f"Download result: {result}")
return result
except Exception as e:
error_print(f"Error in download_from_nexus: {str(e)}")
debug_print(f"Download function exception: {type(e).__name__}: {str(e)}")
return {
"success": False,
"error": f"Download operation failed: {str(e)}",
"doi": doi,
"status": "error"
}
[docs]
async def request_paper_by_doi(api_id, api_hash, phone_number, bot_username, doi, session_file=SESSION_FILE, proxy=None):
"""
Request a paper from Nexus by DOI.
This will send the DOI to the bot, detect if a request is needed, and click the request button if available.
Args:
api_id: Telegram API ID
api_hash: Telegram API hash
phone_number: Your phone number (not used, kept for compatibility)
bot_username: Bot's username
doi: DOI string to request (e.g., "10.1038/nature12373")
session_file: Session file name
proxy: Proxy configuration dict or file path
Returns:
dict: {
"ok": True if request sent, False or "error" otherwise,
"doi": <doi>,
"request_sent": True/False,
"details": ...,
}
"""
info_print(f"Requesting paper by DOI: {doi}")
# Step 1: Send DOI to the bot and get reply
send_result = await send_message_to_bot(
api_id, api_hash, phone_number, bot_username, doi, session_file, proxy
)
if not send_result.get("ok"):
return {"error": f"Failed to send DOI to bot: {send_result.get('error', 'Unknown error')}", "doi": doi}
bot_reply = send_result.get("bot_reply")
if not bot_reply or not bot_reply.get("buttons"):
return {"error": "No reply or no buttons found in bot response", "doi": doi}
# Step 2: Find the request button
request_button = None
for btn in bot_reply.get("buttons", []):
if btn.get("type") == "callback" and "request" in btn.get("text", "").lower():
request_button = btn
break
if not request_button:
return {"ok": False, "doi": doi, "request_sent": False, "details": "No request button found. Paper may already be available or not requestable."}
# Step 3: Click the request button
message_id = bot_reply.get("message_id")
callback_data = request_button.get("callback_data") or request_button.get("data")
click_result = await click_callback_button(
api_id, api_hash, phone_number, bot_username, message_id, callback_data, session_file, proxy
)
if click_result.get("ok"):
info_print("Paper request sent successfully.")
return {
"ok": True,
"doi": doi,
"request_sent": True,
"details": click_result
}
else:
return {
"ok": False,
"doi": doi,
"request_sent": False,
"details": click_result.get("error", "Unknown error")
}
[docs]
async def batch_request_papers_by_doi(api_id, api_hash, phone_number, bot_username, doi_list, session_file=SESSION_FILE, proxy=None, delay=2):
"""
Request multiple papers from Nexus by DOI.
For each DOI, sends the DOI to the bot, detects if a request is needed, and clicks the request button if available.
Args:
api_id: Telegram API ID
api_hash: Telegram API hash
phone_number: Your phone number (not used, kept for compatibility)
bot_username: Bot's username
doi_list: List of DOI strings to request
session_file: Session file name
proxy: Proxy configuration dict or file path
delay: Delay in seconds between requests (default: 2)
Returns:
dict: {
"total": int,
"requested": int,
"skipped": int,
"errors": int,
"results": list of per-DOI results
}
"""
if not doi_list or not isinstance(doi_list, list):
return {"error": "DOI list must be a non-empty list"}
info_print(f"Starting batch paper request for {len(doi_list)} DOIs")
results = []
requested = 0
skipped = 0
errors = 0
for i, doi in enumerate(doi_list, 1):
info_print(f"Requesting DOI {i}/{len(doi_list)}: {doi}")
try:
result = await request_paper_by_doi(
api_id, api_hash, phone_number, bot_username, doi, session_file, proxy
)
results.append(result)
if result.get("ok"):
requested += 1
info_print(f"✓ Requested paper for DOI: {doi}")
elif result.get("request_sent") is False:
skipped += 1
info_print(f"Skipped DOI (no request needed or possible): {doi}")
else:
errors += 1
error_print(f"Error requesting DOI {doi}: {result.get('error', result.get('details', 'Unknown error'))}")
except Exception as e:
errors += 1
error_print(f"Exception requesting DOI {doi}: {str(e)}")
results.append({"doi": doi, "error": str(e)})
if i < len(doi_list):
await asyncio.sleep(delay)
summary = {
"total": len(doi_list),
"requested": requested,
"skipped": skipped,
"errors": errors,
"results": results
}
info_print(f"Batch request completed: {requested} requested, {skipped} skipped, {errors} errors")
return summary
[docs]
async def request_papers_by_doi_list(doi_list):
"""
Request one or more papers by DOI using the Nexus bot.
Attempts direct connection first, falls back to proxy if needed.
Args:
doi_list (list): List of DOI strings.
Returns:
dict: Summary of request results.
"""
if not doi_list or not isinstance(doi_list, list):
return {"error": "Input must be a non-empty list of DOIs"}
# Load credentials from default location
if not os.path.exists(CREDENTIALS_FILE):
return {"error": "Credentials file not found. Please set up credentials first."}
try:
with open(CREDENTIALS_FILE, "r") as f:
creds = json.load(f)
api_id = creds.get("tg_api_id")
api_hash = creds.get("tg_api_hash")
phone = creds.get("phone")
bot_username = creds.get("bot_username", BOT_USERNAME)
except Exception as e:
return {"error": f"Failed to load credentials: {e}"}
# Try direct connection first
proxy_to_use = None
try:
client = create_telegram_client(api_id, api_hash, SESSION_FILE, proxy=None)
await client.start(phone=phone if phone else None)
is_auth = await client.is_user_authorized()
await client.disconnect()
if not is_auth:
proxy_to_use = await decide_proxy_usage(api_id, api_hash, phone, SESSION_FILE, DEFAULT_PROXY_FILE)
except Exception:
proxy_to_use = await decide_proxy_usage(api_id, api_hash, phone, SESSION_FILE, DEFAULT_PROXY_FILE)
# Use batch_request_papers_by_doi for all DOIs
result = await batch_request_papers_by_doi(
api_id, api_hash, phone, bot_username, doi_list, SESSION_FILE, proxy_to_use
)
return result
[docs]
def print_default_paths():
"""Print all default file and directory paths used by the script."""
print("\n" + "="*50)
print("DEFAULT FILE AND DIRECTORY PATHS")
print("="*50)
print(f"Session file: {SESSION_FILE}")
print(f"Credentials file: {CREDENTIALS_FILE}")
print(f"Proxy config file: {DEFAULT_PROXY_FILE}")
print(f"Proxy list file: {DEFAULT_PROXY_FILE.replace('.json', '_list.json')}")
print(f"Log file: {DEFAULT_LOG_FILE}")
print(f"Download directory: {DEFAULT_DOWNLOAD_DIR}")
print("="*50 + "\n")
[docs]
async def main():
global TG_API_ID, TG_API_HASH, PHONE, BOT_USERNAME
# Get the parent package name from the module's __name__
parent_package = __name__.split('.')[0] if '.' in __name__ else None
if parent_package is None:
program_name = 'nexus'
elif '_' in parent_package:
# If the parent package has an underscore, strip it
parent_package = parent_package[:parent_package.index('_')]
program_name = f"{parent_package} nexus"
# Parse command line arguments using argparse
parser = argparse.ArgumentParser(
prog=program_name,
description='Interact with Telegram bots, especially Nexus scientific paper bot',
epilog='''
Examples:
%(prog)s --search "artificial intelligence"
Search for papers on artificial intelligence
%(prog)s --check-doi 10.1038/nature12373
Check if a specific DOI is available on Nexus
%(prog)s --check-doi 10.1038/nature12373 --download
Check DOI availability and auto-download if available
%(prog)s --check-doi dois.txt --download
Check multiple DOIs from file and download available papers
%(prog)s --check-doi "10.1038/nature12373,10.1126/science.abc123"
Check multiple DOIs by comma-separated list
%(prog)s --check-doi "10.1038/nature12373 10.1126/science.abc123"
Check multiple DOIs by space-separated list
%(prog)s --user-info
Get your Nexus user profile information
%(prog)s --fetch-nexus-aaron 20
Fetch 20 recent messages from @nexus_aaron bot
%(prog)s --upload-to-nexus-aaron paper.pdf --upload-message "New research paper"
Upload a file to @nexus_aaron with optional message
%(prog)s --solve-requests 5
Help solve research requests from @nexus_aaron
%(prog)s --create-session
Create new Telegram session interactively
%(prog)s --test-connection --proxy proxy.json
Test connection with proxy configuration
%(prog)s --clear-proxy --clear-credentials
Clean up configuration files
%(prog)s --request-doi 10.1038/nature12373
Request a paper by DOI if not available
%(prog)s --request-doi "10.1038/nature12373,10.1126/science.abc123"
Request multiple papers by comma-separated DOIs
%(prog)s --request-doi "10.1038/nature12373 10.1126/science.abc123"
Request multiple papers by space-separated DOIs
%(prog)s --request-doi dois.txt
Request papers by DOIs listed in a file (one per line)
''',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('-S', '--create-session', action='store_true',
help='Create a new session file interactively')
parser.add_argument('-c', '--credentials', type=str,
help='Path to credentials JSON file containing API credentials. '
'Example file content: {"tg_api_id": "12345678", "tg_api_hash": "abcd1234efgh5678", '
'"phone": "+1234567890", "bot_username": "SciNexBot"}')
parser.add_argument('-s', '--search', type=str,
default="",
help='Search query to send to the bot')
parser.add_argument('-L', '--search-limit', type=int, default=None,
help='Limit the number of search results returned when using --search')
parser.add_argument('--bot', type=str,
help='Bot username to interact with (overrides default)')
parser.add_argument('-v', '--verbose', action='store_true',
help='Enable verbose output for debugging')
parser.add_argument('-l', '--log', type=str, nargs='?', const=DEFAULT_LOG_FILE,
help=f'Save output to log file (default: {DEFAULT_LOG_FILE})')
parser.add_argument('-p', '--proxy', type=str, nargs='?', const=DEFAULT_PROXY_FILE,
help=f'Path to proxy configuration JSON file (default: {DEFAULT_PROXY_FILE}). '
'Example file content: {"type": "http", "addr": "127.0.0.1", "port": 8080} '
'or {"type": "socks5", "addr": "127.0.0.1", "port": 1080, "username": "user", "password": "pass"}')
parser.add_argument('-P', '--no-proxy', action='store_true',
help='Disable proxy usage and connect directly')
parser.add_argument('-u', '--user-info', action='store_true',
help='Get and display user profile information from Nexus bot')
parser.add_argument('-X', '--clear-proxy', action='store_true',
help='Clear proxy configuration files (delete default proxy files)')
parser.add_argument('-C', '--clear-credentials', action='store_true',
help='Clear credentials configuration file (delete default credentials file)')
parser.add_argument('-f', '--fetch-nexus-aaron', type=int, nargs='?', const=10, metavar='LIMIT',
help='Fetch recent messages from @nexus_aaron bot (default: 10, max: 100)')
parser.add_argument('-t', '--test-connection', action='store_true',
help='Test connection to Telegram servers and proxy (if configured)')
parser.add_argument('-U', '--upload-to-nexus-aaron', type=str, metavar='FILE_PATH',
help='Upload a file to @nexus_aaron bot')
parser.add_argument('-m', '--upload-message', type=str, default="",
help='Optional message to send with the uploaded file (use with --upload-to-nexus-aaron)')
parser.add_argument('-R', '--solve-requests', type=int, nargs='?', const=10, metavar='LIMIT',
help='Reply to research requests from @nexus_aaron bot (default: 10, max: 50)')
parser.add_argument('-k', '--check-doi', type=str, metavar='DOI_OR_LIST_OR_FILE',
help='Check if a paper with the specified DOI(s) is available on Nexus. '
'Accepts a single DOI, a comma/space separated list, or a file path (one DOI per line).')
parser.add_argument('-b', '--batch-delay', type=float, default=2.0, metavar='SECONDS',
help='Delay between batch DOI checks to avoid rate limiting (default: 2.0 seconds)')
parser.add_argument('-d', '--download', action='store_true',
help='Automatically download papers if available (use with --check-doi)')
parser.add_argument('-r', '--request-doi', type=str, metavar='DOI_OR_LIST_OR_FILE',
help='Request a paper by DOI, a comma/space separated list of DOIs, or a file containing DOIs (one per line)')
parser.add_argument(
"--print-default",
action="store_true",
help="Print all default paths and configuration file locations used by the script"
)
args = parser.parse_args()
# Argument conflict checks
if args.proxy and args.no_proxy:
error_print("--proxy and --no-proxy cannot be specified at the same time.")
return
if args.proxy and args.clear_proxy:
error_print("--proxy and --clear-proxy cannot be specified at the same time.")
return
# Setup logging
setup_logging(args.log, args.verbose)
if args.verbose:
info_print("Verbose mode enabled")
if args.log:
info_print(f"Logging enabled to: {args.log}")
debug_print(f"Platform: {platform.system()}")
debug_print(f"Session file: {SESSION_FILE}")
debug_print(f"Default log file: {DEFAULT_LOG_FILE}")
# Handle --print-default before anything else
if args.print_default:
print_default_paths()
sys.exit(0)
# Handle clear-proxy and clear-credentials commands
if args.clear_proxy or args.clear_credentials:
# Only allow these options if no other actionable arguments are specified
actionable_args = [
args.create_session, args.search, args.user_info, args.check_doi, args.request_doi,
args.fetch_nexus_aaron, args.upload_to_nexus_aaron, args.solve_requests,
args.test_connection
]
if any(actionable_args):
error_print("--clear-proxy and --clear-credentials can only be used alone or together, not with other options.")
return
if args.clear_proxy:
info_print("Clearing proxy configuration files...")
proxy_files_to_clear = [
DEFAULT_PROXY_FILE,
DEFAULT_PROXY_FILE.replace('.json', '_list.json')
]
cleared_count = 0
for proxy_file in proxy_files_to_clear:
if os.path.exists(proxy_file):
try:
os.remove(proxy_file)
info_print(f"✓ Removed proxy file: {proxy_file}")
cleared_count += 1
except Exception as e:
error_print(f"✗ Failed to remove proxy file {proxy_file}: {e}")
else:
debug_print(f"Proxy file does not exist: {proxy_file}")
if cleared_count > 0:
info_print(f"Successfully cleared {cleared_count} proxy configuration files")
else:
info_print("No proxy configuration files found to clear")
if args.clear_credentials:
info_print("Clearing credentials configuration file...")
if os.path.exists(CREDENTIALS_FILE):
try:
os.remove(CREDENTIALS_FILE)
info_print(f"✓ Removed credentials file: {CREDENTIALS_FILE}")
info_print("Credentials cleared successfully")
except Exception as e:
error_print(f"✗ Failed to remove credentials file: {e}")
else:
info_print("No credentials file found to clear")
return
# Load credentials if specified, otherwise try default location
if args.credentials:
if not await load_credentials_from_file(args.credentials):
return
# Quit after loading credentials if no other actionable argument is specified
actionable_args = [
args.create_session, args.search, args.user_info, args.check_doi, args.request_doi,
args.fetch_nexus_aaron, args.upload_to_nexus_aaron, args.solve_requests,
args.test_connection, args.clear_proxy, args.clear_credentials
]
if not any(actionable_args):
info_print("Credentials loaded successfully.")
return
else:
# Try to load from default location
if os.path.exists(CREDENTIALS_FILE):
info_print(f"No credentials file for `nexus` module specified, trying default location: {CREDENTIALS_FILE}")
if not await load_credentials_from_file(CREDENTIALS_FILE):
debug_print("Failed to load credentials from default location")
else:
# Quit after loading credentials if no other actionable argument is specified
actionable_args = [
args.create_session, args.search, args.user_info, args.check_doi, args.request_doi,
args.fetch_nexus_aaron, args.upload_to_nexus_aaron, args.solve_requests,
args.test_connection, args.clear_proxy, args.clear_credentials
]
if not any(actionable_args):
info_print("Credentials loaded successfully.")
return
else:
debug_print(f"No credentials file found at default location: {CREDENTIALS_FILE}")
# Update bot username if specified
if args.bot:
BOT_USERNAME = args.bot
info_print(f"Using specified bot username: {BOT_USERNAME}")
debug_print(f"Bot username updated from command line argument")
else:
info_print(f"Using default bot username: {BOT_USERNAME}")
# Handle create-session command
if args.create_session:
info_print("Creating new session...")
await create_session(TG_API_ID, TG_API_HASH, PHONE, SESSION_FILE)
return
# Determine proxy usage
proxy_to_use = None
if args.no_proxy:
info_print("Proxy disabled by --no-proxy flag")
proxy_to_use = None
elif args.proxy:
# Use the proxy config file specified by --proxy
proxy_to_use = args.proxy
info_print(f"Using proxy configuration file specified by --proxy: {proxy_to_use}")
else:
# Decide whether to use proxy
proxy_to_use = await decide_proxy_usage(TG_API_ID, TG_API_HASH, PHONE, SESSION_FILE, DEFAULT_PROXY_FILE)
if proxy_to_use is False: # Explicitly check for False (error case)
return
# Handle test-connection command
if args.test_connection:
info_print("Testing connection to Telegram servers...")
await test_telegram_connection(TG_API_ID, TG_API_HASH, PHONE, SESSION_FILE, proxy_to_use)
return
# Handle request-doi command (updated to support single DOI, list, or file)
if args.request_doi:
input_value = args.request_doi.strip()
doi_list = []
# Check if input is a file path
if os.path.isfile(input_value):
info_print(f"Reading DOIs from file: {input_value}")
try:
doi_list = getpapers.extract_dois_from_file(input_value)
if not doi_list:
info_print(f"No valid DOIs found in file: {input_value}")
except Exception as e:
error_print(f"Failed to extract DOIs from file: {e}")
return
else:
# Use extract_dois_from_text for comma/space separated input
info_print(f"Extracting DOIs from input: {input_value}")
doi_list = getpapers.extract_dois_from_text(input_value)
if not doi_list:
info_print(f"No valid DOIs found in input: {input_value}")
if not doi_list:
error_print("No valid DOIs provided for --request-doi")
return
if len(doi_list) == 1:
info_print(f"Requesting paper by DOI: {doi_list[0]}")
request_result = await request_paper_by_doi(
TG_API_ID, TG_API_HASH, PHONE, BOT_USERNAME, doi_list[0], SESSION_FILE, proxy_to_use
)
if request_result.get("ok"):
info_print(f"✓ Paper request sent for DOI {doi_list[0]}")
print("Request details:")
print(request_result.get("details"))
else:
error_print(f"✗ Failed to request paper: {request_result.get('details', request_result.get('error', 'Unknown error'))}")
else:
info_print(f"Requesting papers for {len(doi_list)} DOIs...")
batch_result = await batch_request_papers_by_doi(
TG_API_ID, TG_API_HASH, PHONE, BOT_USERNAME, doi_list, SESSION_FILE, proxy_to_use
)
print("\nBatch request summary:")
print(f" Total: {batch_result.get('total', 0)}")
print(f" Requested: {batch_result.get('requested', 0)}")
print(f" Skipped: {batch_result.get('skipped', 0)}")
print(f" Errors: {batch_result.get('errors', 0)}")
for res in batch_result.get("results", []):
doi = res.get("doi", "Unknown")
if res.get("ok"):
print(f" ✓ Requested: {doi}")
elif res.get("request_sent") is False:
print(f" - Skipped (no request needed): {doi}")
else:
print(f" ✗ Error: {doi} - {res.get('error', res.get('details', 'Unknown error'))}")
return
# Handle check-doi command (now supports single DOI, list, or file)
if args.check_doi:
input_value = args.check_doi.strip()
doi_list = []
# Check if input is a file path
if os.path.isfile(input_value):
info_print(f"Reading DOIs from file: {input_value}")
try:
doi_list = getpapers.extract_dois_from_file(input_value)
if not doi_list:
info_print(f"No valid DOIs found in file: {input_value}. Either the DOI is invaild or it cannot be verified on Crossref.")
except Exception as e:
error_print(f"Failed to extract DOIs from file: {e}")
return
else:
# Use extract_dois_from_text for comma/space separated input
info_print(f"Extracting DOIs from input: {input_value}")
doi_list = getpapers.extract_dois_from_text(input_value)
if not doi_list:
info_print(f"No valid DOIs found in input: {input_value}")
if not doi_list:
error_print("No valid DOIs provided for --check-doi")
return
download_enabled = args.download
if download_enabled:
info_print("Auto-download enabled - will download paper(s) if available")
# If only one DOI, do single check
if len(doi_list) == 1:
info_print(f"Checking DOI availability: {doi_list[0]}")
debug_print(f"DOI to check: {doi_list[0]}")
debug_print(f"Download enabled: {download_enabled}")
availability_result = await check_doi_availability_on_nexus(
TG_API_ID, TG_API_HASH, PHONE, BOT_USERNAME, doi_list[0], SESSION_FILE, proxy_to_use, download=download_enabled
)
format_doi_availability_result(availability_result)
if "error" not in availability_result:
status = availability_result.get("status", "unknown")
if status == "available":
info_print("✓ DOI check completed - Paper is available on Nexus")
if download_enabled:
download_result = availability_result.get("download_result")
if download_result and download_result.get("success"):
info_print("✓ Paper downloaded successfully")
elif download_result and not download_result.get("success"):
error_print(f"✗ Download failed: {download_result.get('error', 'Unknown error')}")
elif download_enabled:
info_print("ℹ️ Download was requested but no download occurred")
elif status == "not_available_requestable":
info_print("✓ DOI check completed - Paper can be requested from Nexus")
elif status == "not_found":
info_print("✓ DOI check completed - Paper not found in Nexus database")
else:
info_print(f"✓ DOI check completed - Status: {status}")
else:
error_print(f"✗ DOI check failed: {availability_result.get('error', 'Unknown error')}")
return
else:
# Batch check
info_print(f"Batch checking {len(doi_list)} DOIs...")
batch_delay = args.batch_delay
if batch_delay < 0.5:
batch_delay = 0.5
info_print("Batch delay adjusted to minimum of 0.5 seconds")
elif batch_delay > 10:
batch_delay = 10
info_print("Batch delay adjusted to maximum of 10 seconds")
info_print(f"Using batch delay of {batch_delay} seconds between requests")
debug_print(f"Download enabled for batch: {download_enabled}")
batch_result = await batch_check_doi_availability(
TG_API_ID, TG_API_HASH, PHONE, BOT_USERNAME, doi_list,
SESSION_FILE, proxy_to_use, batch_delay, download=download_enabled
)
format_batch_doi_results(batch_result)
if "error" not in batch_result:
summary = batch_result.get("summary", {})
available_count = batch_result.get("available", 0)
total_dois = batch_result.get("total_dois", 0)
info_print(f"✓ Batch DOI check completed: {available_count}/{total_dois} papers available on Nexus")
info_print(f"Success rate: {summary.get('success_rate', 0):.1f}%")
if download_enabled:
downloaded_count = batch_result.get("downloaded", 0)
download_errors = batch_result.get("download_errors", 0)
info_print(f"Download results: {downloaded_count} successful, {download_errors} failed")
if downloaded_count > 0:
total_downloaded_mb = summary.get("total_downloaded_mb", 0)
info_print(f"Total downloaded: {total_downloaded_mb:.2f} MB")
else:
error_print(f"✗ Batch DOI check failed: {batch_result.get('error', 'Unknown error')}")
return
# Validate --download flag usage
if args.download:
info_print("⚠️ Warning: --download flag is only effective when used with --check-doi")
debug_print("Download flag specified but no compatible command found")
# Handle fetch-nexus-aaron command
if args.fetch_nexus_aaron is not None:
info_print("Fetching messages from @nexus_aaron...")
# Validate and clamp limit
limit = args.fetch_nexus_aaron
if limit < 1:
limit = 1
info_print("Message limit adjusted to minimum of 1")
elif limit > 100:
limit = 100
info_print("Message limit adjusted to maximum of 100")
debug_print(f"Fetching {limit} messages from @nexus_aaron")
messages_result = await fetch_nexus_aaron_messages(
TG_API_ID, TG_API_HASH, PHONE, SESSION_FILE, limit, proxy_to_use, display=True
)
if messages_result.get("ok"):
info_print(f"Successfully fetched {messages_result.get('messages_count', 0)} messages from @nexus_aaron")
else:
error_print(f"Failed to fetch messages from @nexus_aaron: {messages_result.get('error', 'Unknown error')}")
return
# Handle upload-to-nexus-aaron command
if args.upload_to_nexus_aaron:
info_print("Uploading file to @nexus_aaron...")
file_path = args.upload_to_nexus_aaron
upload_message = args.upload_message
debug_print(f"File to upload: {file_path}")
debug_print(f"Upload message: '{upload_message}'")
# Validate file exists
if not os.path.exists(file_path):
error_print(f"File not found: {file_path}")
return
# Get file info for validation
file_size = os.path.getsize(file_path)
file_size_mb = file_size / (1024 * 1024)
file_name = os.path.basename(file_path)
info_print(f"File: {file_name}")
info_print(f"Size: {file_size_mb:.2f} MB")
# Warn for large files (Telegram has limits)
if file_size_mb > 2000: # 2GB limit
error_print(f"File is too large ({file_size_mb:.2f} MB). Telegram limit is 2GB.")
return
elif file_size_mb > 50: # Warn for files over 50MB
info_print(f"Warning: Large file ({file_size_mb:.2f} MB) may take time to upload")
# If the file is a PDF and no upload_message is specified, use simple_upload_to_nexus_aaron
if file_path.lower().endswith(".pdf") and not upload_message.strip():
upload_result = await simple_upload_to_nexus_aaron(file_path, verbose=args.verbose)
else:
# Perform the upload
upload_result = await upload_file_to_nexus_aaron(
TG_API_ID, TG_API_HASH, PHONE, file_path, upload_message, SESSION_FILE, proxy_to_use
)
# Display results with specialized formatting
format_nexus_aaron_upload_result(upload_result)
if upload_result.get("ok"):
info_print("✓ File upload to @nexus_aaron completed successfully")
else:
error_print(f"✗ File upload to @nexus_aaron failed: {upload_result.get('error', 'Unknown error')}")
return
# Handle solve-requests command
if args.solve_requests is not None:
info_print("Solving research requests from @nexus_aaron...")
# Validate and clamp limit
limit = args.solve_requests
if limit < 1:
limit = 1
info_print("Request limit adjusted to minimum of 1")
elif limit > 50:
limit = 50
info_print("Request limit adjusted to maximum of 50")
debug_print(f"Processing up to {limit} research requests from @nexus_aaron")
# Use the existing function to handle request solving
solve_result = await list_and_reply_to_nexus_aaron_message(
TG_API_ID, TG_API_HASH, PHONE, SESSION_FILE, limit, proxy_to_use
)
# Display results with specialized formatting
format_list_and_reply_result(solve_result)
if solve_result.get("ok") and not solve_result.get("cancelled"):
info_print("✓ Request solving completed successfully")
elif solve_result.get("cancelled"):
info_print("Request solving cancelled by user")
else:
error_print(f"✗ Request solving failed: {solve_result.get('error', 'Unknown error')}")
return
# Handle user-info command
if args.user_info:
info_print("Getting user profile information...")
debug_print("Starting user profile retrieval process...")
profile_result = await get_user_profile(TG_API_ID, TG_API_HASH, PHONE, BOT_USERNAME, SESSION_FILE, proxy_to_use)
debug_print("User profile retrieval process completed")
format_profile_result(profile_result)
return
# Use the search query from arguments
message_to_send = args.search
info_print(f"Using search query: {message_to_send}")
debug_print(f"Search query length: {len(message_to_send)} characters")
debug_print("Starting message sending process...")
if proxy_to_use:
info_print(f"Connecting via proxy: {proxy_to_use}")
else:
info_print("Connecting directly (no proxy)")
# Pass search-limit to send_message_to_bot if specified
search_limit = args.search_limit if args.search_limit is not None else None
send_result = await send_message_to_bot(
TG_API_ID, TG_API_HASH, PHONE, BOT_USERNAME, message_to_send, SESSION_FILE, proxy_to_use, limit=search_limit
)
debug_print("Message sending process completed")
format_result(send_result)
# Handle button clicks after search results
if send_result.get("ok") and send_result.get("bot_reply"):
await process_callback_buttons(send_result["bot_reply"], proxy_to_use)
else:
debug_print("No valid bot reply to process for button clicks")
if __name__ == "__main__":
asyncio.run(main())