Files
whois/mwhoisd
Olaf Baumert 34c631a06d Initial commit: mwhois with SCION AS support and decimal AS conversion
Based on mwhois by Antonios A. Chariton
Modifications for SCION AS support by Olaf Baumert, Axpo Systems AG
2025-06-03 11:01:02 +00:00

471 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
import socket, os, re, json, sys
from time import strftime
from netaddr import IPAddress, IPNetwork
LISTEN_ADDRESS = "::" # Listen on all IPv6 addresses (includes IPv4 via IPv4-mapped IPv6 addresses)
LISTEN_PORT = 43
MAX_QUERY_SIZE = 128
LOGFILE = "/root/whois/mwhois.log"
# Print startup message
print(f"Starting mwhoisd on {LISTEN_ADDRESS}:{LISTEN_PORT}")
sys.stdout.flush()
n = "\r\n"
os.system("mkdir -p db/ipv4")
os.system("mkdir -p db/domains")
os.system("mkdir -p db/as")
os.system("mkdir -p db/isd")
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
# Set socket option to reuse address
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Enable dual stack (IPv4 and IPv6)
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
try:
s.bind((LISTEN_ADDRESS, LISTEN_PORT))
print(f"Successfully bound to {LISTEN_ADDRESS}:{LISTEN_PORT}")
sys.stdout.flush()
except Exception as e:
print(f"Could not bind to {LISTEN_ADDRESS}:{LISTEN_PORT}: {str(e)}")
sys.stdout.flush()
exit(2)
s.listen(5) # Allow up to 5 connections in the queue
# Sanitize the query received
def sanitizeQuery(qr):
qr = qr.lower()
qr = qr.replace("..", ".")
qr = qr.replace("/", "")
qr = qr.replace("\\", "")
qr = qr.replace(n, "")
# Remove "domain " prefix that some whois clients add
if qr.startswith("domain "):
qr = qr[7:]
return qr
# Check if the input is a valid IPv4 Address
def isIP(qr):
bytez = qr.split(".")
if(len(bytez) != 4):
return False
for byt in bytez:
if(not byt.isdigit()):
return False
ipaddr = [int(bytez[0]), int(bytez[1]), int(bytez[2]), int(bytez[3])]
for num in ipaddr:
if(num < 0 or num > 255):
return False
return True
# Check if the input is a valid domain name
def isDomain(qr):
if(len(qr.split(".")) == 1):
return False
zones = qr.split(".")
ac = re.compile(r"^[a-z0-9\.-]+\n")
for zone in zones:
if(zone == ""):
return False
if(zone[0] == "-"):
return False
if(zone[-1] == "-"):
return False
if(ac.match(zone + "\n")):
pass
else:
return False
return True
# Check if the input is a valid SCION AS identifier
def isAS(qr):
print(f"isAS checking: '{qr}'")
sys.stdout.flush()
# Check if format matches SCION AS identifier (e.g., "2:2:0")
if re.match(r'^\d+:\d+:\d+[a-f0-9]*$', qr):
print(f" Matched colon format")
sys.stdout.flush()
return qr, None # Return AS identifier and no ISD
# Also check if the query is just "as" followed by the identifier
if qr.startswith("as") and re.match(r'^\d+:\d+:\d+[a-f0-9]*$', qr[2:]):
print(f" Matched as-prefix with colon format")
sys.stdout.flush()
return qr[2:], None # Return the actual AS identifier and no ISD
# Check if format includes ISD number with colon format (e.g., "76-2:2:0")
isd_as_match = re.match(r'^(\d+)-(\d+:\d+:\d+[a-f0-9]*)$', qr)
if isd_as_match:
print(f" Matched ISD-AS colon format")
sys.stdout.flush()
isd_part = isd_as_match.group(1)
as_part = isd_as_match.group(2)
print(f" Extracted ISD: {isd_part}, AS part: {as_part}")
sys.stdout.flush()
return as_part, isd_part # Return both AS part and ISD part
# Check if format is ISD-AS without colons (e.g., "64-60284")
isd_as_simple = re.match(r'^(\d+)-(\d+)$', qr)
if isd_as_simple:
print(f" Matched ISD-AS decimal format")
sys.stdout.flush()
# Extract the ISD and AS number
isd_number = isd_as_simple.group(1)
as_number = isd_as_simple.group(2)
print(f" Extracted ISD: {isd_number}, AS number: {as_number}")
sys.stdout.flush()
return as_number, isd_number
# Check if format is just a decimal AS number (including 2-digit ones)
if re.match(r'^\d+$', qr):
print(f" Matched plain decimal format: {qr}")
sys.stdout.flush()
# Special handling for decimal AS numbers to ensure they're properly identified
# across all ISDs they might belong to
return qr, None
print(f" No match found")
sys.stdout.flush()
return False, None
# Check if the input is a valid ISD identifier
def isISD(qr):
# Check if format is just a number
if qr.isdigit():
return qr
# Check if format is "isd" followed by a number
if qr.startswith("isd") and qr[3:].isdigit():
return qr[3:] # Return just the ISD number
# Check if format is "ISD-" (e.g., "76-") to list all AS numbers in that ISD
if re.match(r'^\d+\-$', qr):
return qr[:-1] # Return just the ISD number without the dash
return False
# Check if an IP belongs to a CIDR IP block
def isIPinCIDR(ip, network):
return IPAddress(ip) in IPNetwork(network)
while True:
try:
con, adr = s.accept()
print(f"Connection from {adr[0]}:{adr[1]}")
sys.stdout.flush()
log = "[" + strftime("%d/%m/%Y %H:%M:%S") + "] " + adr[0] + " - "
try:
query = con.recv(MAX_QUERY_SIZE)
if not query:
con.close()
continue
# Decode bytes to string
query_str = query.decode('utf-8', errors='ignore')
log = log + query_str.replace("\r\n", "").replace("\n", "") + " - "
print(f"Raw query: '{query_str}'")
query = sanitizeQuery(query_str)
print(f"Sanitized query: '{query}'")
except Exception as e:
print(f"Error receiving data: {str(e)}")
sys.stdout.flush()
con.close()
continue
rsp = "# +-----------------------------------+" + n
rsp = rsp + "# | AXPO SYSTEMS AG |" + n
rsp = rsp + "# +-----------------------------------+" + n
rsp = rsp + "# | served by modded mwhois |" + n
rsp = rsp + "# +-----------------------------------+" + n
rsp = rsp + "# | whois.as60284.net |" + n
rsp = rsp + "# +-----------------------------------+" + n
rsp = rsp + n
if(isIP(query)):
# WHOIS IPv4
log = log + "IPv4"
ipdb = os.listdir("db/ipv4/")
found = False
for ipe in ipdb:
ipe = ipe.replace("-", "/")
if(isIPinCIDR(query, ipe)):
dd = open("db/ipv4/" + ipe.replace("/", "-"), "r")
rsp = rsp + dd.read()
dd.close()
found = True
log = log + n
break
if(found == False):
rsp = rsp + n
rsp = rsp + "# IP Address was not found in the whois database" + n
log = log + " (Not found)" + n
elif(isDomain(query)):
# WHOIS Domain
log = log + "Domain" + n
domaindb = os.listdir("db/domains/")
found = False
for domain in domaindb:
if(query == domain):
dd = open("db/domains/" + domain, "r")
rsp = rsp + dd.read()
dd.close()
found = True
log = log + n
break
if(found == False):
rsp = rsp + n
rsp = rsp + "# Domain name was not found in the whois database" + n
log = log + " (Not found)" + n
elif(isAS(query)[0]):
# WHOIS SCION AS
log = log + "SCION AS" + n
# If isAS returned the actual AS identifier and possibly an ISD
as_id, isd_id = isAS(query)
print(f"isAS returned: AS={as_id}, ISD={isd_id} for query: {query}")
sys.stdout.flush()
if isinstance(as_id, str):
query_as = as_id
else:
query_as = query
print(f"Looking for AS: {query_as}")
sys.stdout.flush()
# List all files in the AS directory for debugging
print(f"Files in db/as/: {os.listdir('db/as/')}")
sys.stdout.flush()
found = False
matching_files = []
# If ISD is specified, we only want to show that specific ISD-AS combination
if isd_id:
print(f"ISD specified: {isd_id}, looking for specific ISD-AS combination")
sys.stdout.flush()
# First try direct file lookup
if os.path.exists("db/as/" + query_as):
with open("db/as/" + query_as, "r") as f:
content = f.read()
for line in content.split("\n"):
if line.startswith("ISD:"):
file_isd = line.split(":", 1)[1].strip()
if file_isd == isd_id:
print(f"Found matching ISD-AS combination in file: db/as/{query_as}")
sys.stdout.flush()
rsp = rsp + content
found = True
break
# If not found by direct lookup, scan all files
if not found:
for as_file in os.listdir("db/as/"):
try:
with open(f"db/as/{as_file}", "r") as f:
content = f.read()
as_match = False
isd_match = False
# Check if AS matches
for line in content.split("\n"):
if line.startswith("AS Identifier:"):
as_identifier = line.split(":", 1)[1].strip()
if as_identifier == query_as:
as_match = True
elif line.startswith("ISD:"):
file_isd = line.split(":", 1)[1].strip()
if file_isd == isd_id:
isd_match = True
if as_match and isd_match:
print(f"Found matching ISD-AS combination in file: db/as/{as_file}")
sys.stdout.flush()
rsp = rsp + content
found = True
break
except Exception as e:
print(f"Error reading AS file {as_file}: {str(e)}")
sys.stdout.flush()
else:
# No ISD specified, show all instances of this AS across all ISDs
print(f"No ISD specified, looking for all instances of AS: {query_as}")
sys.stdout.flush()
# First try direct file lookup
if os.path.exists("db/as/" + query_as):
print(f"File found: db/as/{query_as}")
sys.stdout.flush()
matching_files.append(query_as)
found = True
# Also scan all files to find any other instances with the same AS
for as_file in os.listdir("db/as/"):
if as_file == query_as:
continue # Already added this one
try:
with open(f"db/as/{as_file}", "r") as f:
content = f.read()
# Look for AS Identifier match
for line in content.split("\n"):
if line.startswith("AS Identifier:"):
as_identifier = line.split(":", 1)[1].strip()
if as_identifier == query_as:
print(f"Found matching AS in file: db/as/{as_file}")
sys.stdout.flush()
matching_files.append(as_file)
found = True
break
# Look for AS decimal line if no match yet
if as_file not in matching_files:
for line in content.split("\n"):
if line.startswith("AS decimal:"):
decimal_value = line.split(":", 1)[1].strip()
# Handle both empty and non-empty decimal values
if decimal_value and decimal_value == query_as:
print(f"Found matching AS decimal in file: db/as/{as_file}")
sys.stdout.flush()
matching_files.append(as_file)
found = True
break
except Exception as e:
print(f"Error reading AS file {as_file}: {str(e)}")
sys.stdout.flush()
# Add all matching files to response
if matching_files:
rsp = rsp + "% Multiple entries found for this AS identifier:" + n + n
for as_file in matching_files:
try:
with open(f"db/as/{as_file}", "r") as f:
rsp = rsp + f.read() + n + "---" + n
except Exception as e:
print(f"Error reading AS file {as_file}: {str(e)}")
sys.stdout.flush()
# Create a symbolic link for future queries if we found exactly one match
if len(matching_files) == 1 and not os.path.exists(f"db/as/{query_as}"):
try:
os.symlink(matching_files[0], f"db/as/{query_as}")
print(f"Created symlink from {query_as} to {matching_files[0]}")
sys.stdout.flush()
except Exception as e:
print(f"Error creating symlink: {str(e)}")
sys.stdout.flush()
if not found:
rsp = rsp + n
rsp = rsp + "# SCION AS was not found in the whois database" + n
log = log + " (Not found)" + n
elif(isISD(query)):
# WHOIS SCION ISD
log = log + "SCION ISD" + n
# If isISD returned the actual ISD number (in case of "isd" prefix or "ISD-" format)
isd_id = isISD(query)
if isinstance(isd_id, str):
query = isd_id
found = False
# Check if this is a request to list all AS numbers in an ISD (format: "ISD-")
is_list_all_as = query.endswith("-")
if is_list_all_as:
query = query[:-1] # Remove the trailing dash
# First check if we have a direct ISD file
if os.path.exists("db/isd/" + query):
dd = open("db/isd/" + query, "r")
isd_info = dd.read()
dd.close()
found = True
log = log + n
# Add the ISD information to the response
rsp = rsp + isd_info + n + n
# Now find all AS numbers associated with this ISD
associated_as = []
for as_file in os.listdir("db/as/"):
try:
with open(f"db/as/{as_file}", "r") as f:
content = f.read()
# Look for ISD match
for line in content.split("\n"):
if line.startswith("ISD:"):
file_isd = line.split(":", 1)[1].strip()
if file_isd == query:
print(f"Found AS associated with ISD {query}: {as_file}")
sys.stdout.flush()
associated_as.append((as_file, content))
break
except Exception as e:
print(f"Error reading AS file {as_file}: {str(e)}")
sys.stdout.flush()
if associated_as:
found = True
rsp = rsp + f"% AS Numbers associated with ISD {query}:" + n + n
for as_file, content in associated_as:
# If this is a request to list all AS numbers, just show a summary
if is_list_all_as:
as_identifier = ""
organization = ""
description = ""
for line in content.split("\n"):
if line.startswith("AS Identifier:"):
as_identifier = line.split(":", 1)[1].strip()
elif line.startswith("Organization:"):
organization = line.split(":", 1)[1].strip()
elif line.startswith("Description:"):
description = line.split(":", 1)[1].strip()
rsp = rsp + f"AS: {as_identifier} - {organization} - {description}" + n
else:
# Show full details for each AS
rsp = rsp + content + n + "---" + n
if not found:
rsp = rsp + n
rsp = rsp + "# SCION ISD was not found in the whois database" + n
log = log + " (Not found)" + n
else:
# Unrecognized
log = log + "Unrecognized" + n
rsp = rsp + n
rsp = rsp + "# Error. Unknown query type. Query is not IPv4, Domain, SCION ISD or SCION AS" + n
try:
# Encode string to bytes before sending
con.send(rsp.encode('utf-8'))
except Exception as e:
print(f"Error sending response: {str(e)}")
sys.stdout.flush()
finally:
con.close()
if(LOGFILE!=""):
# Save to logs
try:
d = open(LOGFILE, "a+")
d.write(log)
d.close()
except Exception as e:
print(f"FAILED TO SAVE TO LOGFILE: {str(e)}")
sys.stdout.flush()
except Exception as e:
print(f"Error in main loop: {str(e)}")
sys.stdout.flush()