#!/usr/bin/env python3 import socket, os, re, json, sys from time import strftime from netaddr import IPAddress, IPNetwork LISTEN_ADDRESS = "::" # Listen on all IPv6 addresses (includes IPv4 via IPv4-mapped IPv6 addresses) LISTEN_PORT = 43 MAX_QUERY_SIZE = 128 LOGFILE = "/root/whois/mwhois.log" # Print startup message print(f"Starting mwhoisd on {LISTEN_ADDRESS}:{LISTEN_PORT}") sys.stdout.flush() n = "\r\n" os.system("mkdir -p db/ipv4") os.system("mkdir -p db/domains") os.system("mkdir -p db/as") os.system("mkdir -p db/isd") s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) # Set socket option to reuse address s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Enable dual stack (IPv4 and IPv6) s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) try: s.bind((LISTEN_ADDRESS, LISTEN_PORT)) print(f"Successfully bound to {LISTEN_ADDRESS}:{LISTEN_PORT}") sys.stdout.flush() except Exception as e: print(f"Could not bind to {LISTEN_ADDRESS}:{LISTEN_PORT}: {str(e)}") sys.stdout.flush() exit(2) s.listen(5) # Allow up to 5 connections in the queue # Sanitize the query received def sanitizeQuery(qr): qr = qr.lower() qr = qr.replace("..", ".") qr = qr.replace("/", "") qr = qr.replace("\\", "") qr = qr.replace(n, "") # Remove "domain " prefix that some whois clients add if qr.startswith("domain "): qr = qr[7:] return qr # Check if the input is a valid IPv4 Address def isIP(qr): bytez = qr.split(".") if(len(bytez) != 4): return False for byt in bytez: if(not byt.isdigit()): return False ipaddr = [int(bytez[0]), int(bytez[1]), int(bytez[2]), int(bytez[3])] for num in ipaddr: if(num < 0 or num > 255): return False return True # Check if the input is a valid domain name def isDomain(qr): if(len(qr.split(".")) == 1): return False zones = qr.split(".") ac = re.compile(r"^[a-z0-9\.-]+\n") for zone in zones: if(zone == ""): return False if(zone[0] == "-"): return False if(zone[-1] == "-"): return False if(ac.match(zone + "\n")): pass else: return False return True # Check if the input is a valid SCION AS identifier def isAS(qr): print(f"isAS checking: '{qr}'") sys.stdout.flush() # Check if format matches SCION AS identifier (e.g., "2:2:0") if re.match(r'^\d+:\d+:\d+[a-f0-9]*$', qr): print(f" Matched colon format") sys.stdout.flush() return qr, None # Return AS identifier and no ISD # Also check if the query is just "as" followed by the identifier if qr.startswith("as") and re.match(r'^\d+:\d+:\d+[a-f0-9]*$', qr[2:]): print(f" Matched as-prefix with colon format") sys.stdout.flush() return qr[2:], None # Return the actual AS identifier and no ISD # Check if format includes ISD number with colon format (e.g., "76-2:2:0") isd_as_match = re.match(r'^(\d+)-(\d+:\d+:\d+[a-f0-9]*)$', qr) if isd_as_match: print(f" Matched ISD-AS colon format") sys.stdout.flush() isd_part = isd_as_match.group(1) as_part = isd_as_match.group(2) print(f" Extracted ISD: {isd_part}, AS part: {as_part}") sys.stdout.flush() return as_part, isd_part # Return both AS part and ISD part # Check if format is ISD-AS without colons (e.g., "64-60284") isd_as_simple = re.match(r'^(\d+)-(\d+)$', qr) if isd_as_simple: print(f" Matched ISD-AS decimal format") sys.stdout.flush() # Extract the ISD and AS number isd_number = isd_as_simple.group(1) as_number = isd_as_simple.group(2) print(f" Extracted ISD: {isd_number}, AS number: {as_number}") sys.stdout.flush() return as_number, isd_number # Check if format is just a decimal AS number (including 2-digit ones) if re.match(r'^\d+$', qr): print(f" Matched plain decimal format: {qr}") sys.stdout.flush() # Special handling for decimal AS numbers to ensure they're properly identified # across all ISDs they might belong to return qr, None print(f" No match found") sys.stdout.flush() return False, None # Check if the input is a valid ISD identifier def isISD(qr): # Check if format is just a number if qr.isdigit(): return qr # Check if format is "isd" followed by a number if qr.startswith("isd") and qr[3:].isdigit(): return qr[3:] # Return just the ISD number # Check if format is "ISD-" (e.g., "76-") to list all AS numbers in that ISD if re.match(r'^\d+\-$', qr): return qr[:-1] # Return just the ISD number without the dash return False # Check if an IP belongs to a CIDR IP block def isIPinCIDR(ip, network): return IPAddress(ip) in IPNetwork(network) while True: try: con, adr = s.accept() print(f"Connection from {adr[0]}:{adr[1]}") sys.stdout.flush() log = "[" + strftime("%d/%m/%Y %H:%M:%S") + "] " + adr[0] + " - " try: query = con.recv(MAX_QUERY_SIZE) if not query: con.close() continue # Decode bytes to string query_str = query.decode('utf-8', errors='ignore') log = log + query_str.replace("\r\n", "").replace("\n", "") + " - " print(f"Raw query: '{query_str}'") query = sanitizeQuery(query_str) print(f"Sanitized query: '{query}'") except Exception as e: print(f"Error receiving data: {str(e)}") sys.stdout.flush() con.close() continue rsp = "# +-----------------------------------+" + n rsp = rsp + "# | AXPO SYSTEMS AG |" + n rsp = rsp + "# +-----------------------------------+" + n rsp = rsp + "# | served by modded mwhois |" + n rsp = rsp + "# +-----------------------------------+" + n rsp = rsp + "# | whois.as60284.net |" + n rsp = rsp + "# +-----------------------------------+" + n rsp = rsp + n if(isIP(query)): # WHOIS IPv4 log = log + "IPv4" ipdb = os.listdir("db/ipv4/") found = False for ipe in ipdb: ipe = ipe.replace("-", "/") if(isIPinCIDR(query, ipe)): dd = open("db/ipv4/" + ipe.replace("/", "-"), "r") rsp = rsp + dd.read() dd.close() found = True log = log + n break if(found == False): rsp = rsp + n rsp = rsp + "# IP Address was not found in the whois database" + n log = log + " (Not found)" + n elif(isDomain(query)): # WHOIS Domain log = log + "Domain" + n domaindb = os.listdir("db/domains/") found = False for domain in domaindb: if(query == domain): dd = open("db/domains/" + domain, "r") rsp = rsp + dd.read() dd.close() found = True log = log + n break if(found == False): rsp = rsp + n rsp = rsp + "# Domain name was not found in the whois database" + n log = log + " (Not found)" + n elif(isAS(query)[0]): # WHOIS SCION AS log = log + "SCION AS" + n # If isAS returned the actual AS identifier and possibly an ISD as_id, isd_id = isAS(query) print(f"isAS returned: AS={as_id}, ISD={isd_id} for query: {query}") sys.stdout.flush() if isinstance(as_id, str): query_as = as_id else: query_as = query print(f"Looking for AS: {query_as}") sys.stdout.flush() # List all files in the AS directory for debugging print(f"Files in db/as/: {os.listdir('db/as/')}") sys.stdout.flush() found = False matching_files = [] # If ISD is specified, we only want to show that specific ISD-AS combination if isd_id: print(f"ISD specified: {isd_id}, looking for specific ISD-AS combination") sys.stdout.flush() # First try direct file lookup if os.path.exists("db/as/" + query_as): with open("db/as/" + query_as, "r") as f: content = f.read() for line in content.split("\n"): if line.startswith("ISD:"): file_isd = line.split(":", 1)[1].strip() if file_isd == isd_id: print(f"Found matching ISD-AS combination in file: db/as/{query_as}") sys.stdout.flush() rsp = rsp + content found = True break # If not found by direct lookup, scan all files if not found: for as_file in os.listdir("db/as/"): try: with open(f"db/as/{as_file}", "r") as f: content = f.read() as_match = False isd_match = False # Check if AS matches for line in content.split("\n"): if line.startswith("AS Identifier:"): as_identifier = line.split(":", 1)[1].strip() if as_identifier == query_as: as_match = True elif line.startswith("ISD:"): file_isd = line.split(":", 1)[1].strip() if file_isd == isd_id: isd_match = True if as_match and isd_match: print(f"Found matching ISD-AS combination in file: db/as/{as_file}") sys.stdout.flush() rsp = rsp + content found = True break except Exception as e: print(f"Error reading AS file {as_file}: {str(e)}") sys.stdout.flush() else: # No ISD specified, show all instances of this AS across all ISDs print(f"No ISD specified, looking for all instances of AS: {query_as}") sys.stdout.flush() # First try direct file lookup if os.path.exists("db/as/" + query_as): print(f"File found: db/as/{query_as}") sys.stdout.flush() matching_files.append(query_as) found = True # Also scan all files to find any other instances with the same AS for as_file in os.listdir("db/as/"): if as_file == query_as: continue # Already added this one try: with open(f"db/as/{as_file}", "r") as f: content = f.read() # Look for AS Identifier match for line in content.split("\n"): if line.startswith("AS Identifier:"): as_identifier = line.split(":", 1)[1].strip() if as_identifier == query_as: print(f"Found matching AS in file: db/as/{as_file}") sys.stdout.flush() matching_files.append(as_file) found = True break # Look for AS decimal line if no match yet if as_file not in matching_files: for line in content.split("\n"): if line.startswith("AS decimal:"): decimal_value = line.split(":", 1)[1].strip() # Handle both empty and non-empty decimal values if decimal_value and decimal_value == query_as: print(f"Found matching AS decimal in file: db/as/{as_file}") sys.stdout.flush() matching_files.append(as_file) found = True break except Exception as e: print(f"Error reading AS file {as_file}: {str(e)}") sys.stdout.flush() # Add all matching files to response if matching_files: rsp = rsp + "% Multiple entries found for this AS identifier:" + n + n for as_file in matching_files: try: with open(f"db/as/{as_file}", "r") as f: rsp = rsp + f.read() + n + "---" + n except Exception as e: print(f"Error reading AS file {as_file}: {str(e)}") sys.stdout.flush() # Create a symbolic link for future queries if we found exactly one match if len(matching_files) == 1 and not os.path.exists(f"db/as/{query_as}"): try: os.symlink(matching_files[0], f"db/as/{query_as}") print(f"Created symlink from {query_as} to {matching_files[0]}") sys.stdout.flush() except Exception as e: print(f"Error creating symlink: {str(e)}") sys.stdout.flush() if not found: rsp = rsp + n rsp = rsp + "# SCION AS was not found in the whois database" + n log = log + " (Not found)" + n elif(isISD(query)): # WHOIS SCION ISD log = log + "SCION ISD" + n # If isISD returned the actual ISD number (in case of "isd" prefix or "ISD-" format) isd_id = isISD(query) if isinstance(isd_id, str): query = isd_id found = False # Check if this is a request to list all AS numbers in an ISD (format: "ISD-") is_list_all_as = query.endswith("-") if is_list_all_as: query = query[:-1] # Remove the trailing dash # First check if we have a direct ISD file if os.path.exists("db/isd/" + query): dd = open("db/isd/" + query, "r") isd_info = dd.read() dd.close() found = True log = log + n # Add the ISD information to the response rsp = rsp + isd_info + n + n # Now find all AS numbers associated with this ISD associated_as = [] for as_file in os.listdir("db/as/"): try: with open(f"db/as/{as_file}", "r") as f: content = f.read() # Look for ISD match for line in content.split("\n"): if line.startswith("ISD:"): file_isd = line.split(":", 1)[1].strip() if file_isd == query: print(f"Found AS associated with ISD {query}: {as_file}") sys.stdout.flush() associated_as.append((as_file, content)) break except Exception as e: print(f"Error reading AS file {as_file}: {str(e)}") sys.stdout.flush() if associated_as: found = True rsp = rsp + f"% AS Numbers associated with ISD {query}:" + n + n for as_file, content in associated_as: # If this is a request to list all AS numbers, just show a summary if is_list_all_as: as_identifier = "" organization = "" description = "" for line in content.split("\n"): if line.startswith("AS Identifier:"): as_identifier = line.split(":", 1)[1].strip() elif line.startswith("Organization:"): organization = line.split(":", 1)[1].strip() elif line.startswith("Description:"): description = line.split(":", 1)[1].strip() rsp = rsp + f"AS: {as_identifier} - {organization} - {description}" + n else: # Show full details for each AS rsp = rsp + content + n + "---" + n if not found: rsp = rsp + n rsp = rsp + "# SCION ISD was not found in the whois database" + n log = log + " (Not found)" + n else: # Unrecognized log = log + "Unrecognized" + n rsp = rsp + n rsp = rsp + "# Error. Unknown query type. Query is not IPv4, Domain, SCION ISD or SCION AS" + n try: # Encode string to bytes before sending con.send(rsp.encode('utf-8')) except Exception as e: print(f"Error sending response: {str(e)}") sys.stdout.flush() finally: con.close() if(LOGFILE!=""): # Save to logs try: d = open(LOGFILE, "a+") d.write(log) d.close() except Exception as e: print(f"FAILED TO SAVE TO LOGFILE: {str(e)}") sys.stdout.flush() except Exception as e: print(f"Error in main loop: {str(e)}") sys.stdout.flush()