Initial commit: mwhois with SCION AS support and decimal AS conversion

Based on mwhois by Antonios A. Chariton
Modifications for SCION AS support by Olaf Baumert, Axpo Systems AG
This commit is contained in:
Olaf Baumert
2025-06-03 11:01:02 +00:00
commit 34c631a06d
340 changed files with 212460 additions and 0 deletions

470
mwhoisd Executable file
View File

@@ -0,0 +1,470 @@
#!/usr/bin/env python3
import socket, os, re, json, sys
from time import strftime
from netaddr import IPAddress, IPNetwork
LISTEN_ADDRESS = "::" # Listen on all IPv6 addresses (includes IPv4 via IPv4-mapped IPv6 addresses)
LISTEN_PORT = 43
MAX_QUERY_SIZE = 128
LOGFILE = "/root/whois/mwhois.log"
# Print startup message
print(f"Starting mwhoisd on {LISTEN_ADDRESS}:{LISTEN_PORT}")
sys.stdout.flush()
n = "\r\n"
os.system("mkdir -p db/ipv4")
os.system("mkdir -p db/domains")
os.system("mkdir -p db/as")
os.system("mkdir -p db/isd")
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
# Set socket option to reuse address
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Enable dual stack (IPv4 and IPv6)
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
try:
s.bind((LISTEN_ADDRESS, LISTEN_PORT))
print(f"Successfully bound to {LISTEN_ADDRESS}:{LISTEN_PORT}")
sys.stdout.flush()
except Exception as e:
print(f"Could not bind to {LISTEN_ADDRESS}:{LISTEN_PORT}: {str(e)}")
sys.stdout.flush()
exit(2)
s.listen(5) # Allow up to 5 connections in the queue
# Sanitize the query received
def sanitizeQuery(qr):
qr = qr.lower()
qr = qr.replace("..", ".")
qr = qr.replace("/", "")
qr = qr.replace("\\", "")
qr = qr.replace(n, "")
# Remove "domain " prefix that some whois clients add
if qr.startswith("domain "):
qr = qr[7:]
return qr
# Check if the input is a valid IPv4 Address
def isIP(qr):
bytez = qr.split(".")
if(len(bytez) != 4):
return False
for byt in bytez:
if(not byt.isdigit()):
return False
ipaddr = [int(bytez[0]), int(bytez[1]), int(bytez[2]), int(bytez[3])]
for num in ipaddr:
if(num < 0 or num > 255):
return False
return True
# Check if the input is a valid domain name
def isDomain(qr):
if(len(qr.split(".")) == 1):
return False
zones = qr.split(".")
ac = re.compile(r"^[a-z0-9\.-]+\n")
for zone in zones:
if(zone == ""):
return False
if(zone[0] == "-"):
return False
if(zone[-1] == "-"):
return False
if(ac.match(zone + "\n")):
pass
else:
return False
return True
# Check if the input is a valid SCION AS identifier
def isAS(qr):
print(f"isAS checking: '{qr}'")
sys.stdout.flush()
# Check if format matches SCION AS identifier (e.g., "2:2:0")
if re.match(r'^\d+:\d+:\d+[a-f0-9]*$', qr):
print(f" Matched colon format")
sys.stdout.flush()
return qr, None # Return AS identifier and no ISD
# Also check if the query is just "as" followed by the identifier
if qr.startswith("as") and re.match(r'^\d+:\d+:\d+[a-f0-9]*$', qr[2:]):
print(f" Matched as-prefix with colon format")
sys.stdout.flush()
return qr[2:], None # Return the actual AS identifier and no ISD
# Check if format includes ISD number with colon format (e.g., "76-2:2:0")
isd_as_match = re.match(r'^(\d+)-(\d+:\d+:\d+[a-f0-9]*)$', qr)
if isd_as_match:
print(f" Matched ISD-AS colon format")
sys.stdout.flush()
isd_part = isd_as_match.group(1)
as_part = isd_as_match.group(2)
print(f" Extracted ISD: {isd_part}, AS part: {as_part}")
sys.stdout.flush()
return as_part, isd_part # Return both AS part and ISD part
# Check if format is ISD-AS without colons (e.g., "64-60284")
isd_as_simple = re.match(r'^(\d+)-(\d+)$', qr)
if isd_as_simple:
print(f" Matched ISD-AS decimal format")
sys.stdout.flush()
# Extract the ISD and AS number
isd_number = isd_as_simple.group(1)
as_number = isd_as_simple.group(2)
print(f" Extracted ISD: {isd_number}, AS number: {as_number}")
sys.stdout.flush()
return as_number, isd_number
# Check if format is just a decimal AS number (including 2-digit ones)
if re.match(r'^\d+$', qr):
print(f" Matched plain decimal format: {qr}")
sys.stdout.flush()
# Special handling for decimal AS numbers to ensure they're properly identified
# across all ISDs they might belong to
return qr, None
print(f" No match found")
sys.stdout.flush()
return False, None
# Check if the input is a valid ISD identifier
def isISD(qr):
# Check if format is just a number
if qr.isdigit():
return qr
# Check if format is "isd" followed by a number
if qr.startswith("isd") and qr[3:].isdigit():
return qr[3:] # Return just the ISD number
# Check if format is "ISD-" (e.g., "76-") to list all AS numbers in that ISD
if re.match(r'^\d+\-$', qr):
return qr[:-1] # Return just the ISD number without the dash
return False
# Check if an IP belongs to a CIDR IP block
def isIPinCIDR(ip, network):
return IPAddress(ip) in IPNetwork(network)
while True:
try:
con, adr = s.accept()
print(f"Connection from {adr[0]}:{adr[1]}")
sys.stdout.flush()
log = "[" + strftime("%d/%m/%Y %H:%M:%S") + "] " + adr[0] + " - "
try:
query = con.recv(MAX_QUERY_SIZE)
if not query:
con.close()
continue
# Decode bytes to string
query_str = query.decode('utf-8', errors='ignore')
log = log + query_str.replace("\r\n", "").replace("\n", "") + " - "
print(f"Raw query: '{query_str}'")
query = sanitizeQuery(query_str)
print(f"Sanitized query: '{query}'")
except Exception as e:
print(f"Error receiving data: {str(e)}")
sys.stdout.flush()
con.close()
continue
rsp = "# +-----------------------------------+" + n
rsp = rsp + "# | AXPO SYSTEMS AG |" + n
rsp = rsp + "# +-----------------------------------+" + n
rsp = rsp + "# | served by modded mwhois |" + n
rsp = rsp + "# +-----------------------------------+" + n
rsp = rsp + "# | whois.as60284.net |" + n
rsp = rsp + "# +-----------------------------------+" + n
rsp = rsp + n
if(isIP(query)):
# WHOIS IPv4
log = log + "IPv4"
ipdb = os.listdir("db/ipv4/")
found = False
for ipe in ipdb:
ipe = ipe.replace("-", "/")
if(isIPinCIDR(query, ipe)):
dd = open("db/ipv4/" + ipe.replace("/", "-"), "r")
rsp = rsp + dd.read()
dd.close()
found = True
log = log + n
break
if(found == False):
rsp = rsp + n
rsp = rsp + "# IP Address was not found in the whois database" + n
log = log + " (Not found)" + n
elif(isDomain(query)):
# WHOIS Domain
log = log + "Domain" + n
domaindb = os.listdir("db/domains/")
found = False
for domain in domaindb:
if(query == domain):
dd = open("db/domains/" + domain, "r")
rsp = rsp + dd.read()
dd.close()
found = True
log = log + n
break
if(found == False):
rsp = rsp + n
rsp = rsp + "# Domain name was not found in the whois database" + n
log = log + " (Not found)" + n
elif(isAS(query)[0]):
# WHOIS SCION AS
log = log + "SCION AS" + n
# If isAS returned the actual AS identifier and possibly an ISD
as_id, isd_id = isAS(query)
print(f"isAS returned: AS={as_id}, ISD={isd_id} for query: {query}")
sys.stdout.flush()
if isinstance(as_id, str):
query_as = as_id
else:
query_as = query
print(f"Looking for AS: {query_as}")
sys.stdout.flush()
# List all files in the AS directory for debugging
print(f"Files in db/as/: {os.listdir('db/as/')}")
sys.stdout.flush()
found = False
matching_files = []
# If ISD is specified, we only want to show that specific ISD-AS combination
if isd_id:
print(f"ISD specified: {isd_id}, looking for specific ISD-AS combination")
sys.stdout.flush()
# First try direct file lookup
if os.path.exists("db/as/" + query_as):
with open("db/as/" + query_as, "r") as f:
content = f.read()
for line in content.split("\n"):
if line.startswith("ISD:"):
file_isd = line.split(":", 1)[1].strip()
if file_isd == isd_id:
print(f"Found matching ISD-AS combination in file: db/as/{query_as}")
sys.stdout.flush()
rsp = rsp + content
found = True
break
# If not found by direct lookup, scan all files
if not found:
for as_file in os.listdir("db/as/"):
try:
with open(f"db/as/{as_file}", "r") as f:
content = f.read()
as_match = False
isd_match = False
# Check if AS matches
for line in content.split("\n"):
if line.startswith("AS Identifier:"):
as_identifier = line.split(":", 1)[1].strip()
if as_identifier == query_as:
as_match = True
elif line.startswith("ISD:"):
file_isd = line.split(":", 1)[1].strip()
if file_isd == isd_id:
isd_match = True
if as_match and isd_match:
print(f"Found matching ISD-AS combination in file: db/as/{as_file}")
sys.stdout.flush()
rsp = rsp + content
found = True
break
except Exception as e:
print(f"Error reading AS file {as_file}: {str(e)}")
sys.stdout.flush()
else:
# No ISD specified, show all instances of this AS across all ISDs
print(f"No ISD specified, looking for all instances of AS: {query_as}")
sys.stdout.flush()
# First try direct file lookup
if os.path.exists("db/as/" + query_as):
print(f"File found: db/as/{query_as}")
sys.stdout.flush()
matching_files.append(query_as)
found = True
# Also scan all files to find any other instances with the same AS
for as_file in os.listdir("db/as/"):
if as_file == query_as:
continue # Already added this one
try:
with open(f"db/as/{as_file}", "r") as f:
content = f.read()
# Look for AS Identifier match
for line in content.split("\n"):
if line.startswith("AS Identifier:"):
as_identifier = line.split(":", 1)[1].strip()
if as_identifier == query_as:
print(f"Found matching AS in file: db/as/{as_file}")
sys.stdout.flush()
matching_files.append(as_file)
found = True
break
# Look for AS decimal line if no match yet
if as_file not in matching_files:
for line in content.split("\n"):
if line.startswith("AS decimal:"):
decimal_value = line.split(":", 1)[1].strip()
# Handle both empty and non-empty decimal values
if decimal_value and decimal_value == query_as:
print(f"Found matching AS decimal in file: db/as/{as_file}")
sys.stdout.flush()
matching_files.append(as_file)
found = True
break
except Exception as e:
print(f"Error reading AS file {as_file}: {str(e)}")
sys.stdout.flush()
# Add all matching files to response
if matching_files:
rsp = rsp + "% Multiple entries found for this AS identifier:" + n + n
for as_file in matching_files:
try:
with open(f"db/as/{as_file}", "r") as f:
rsp = rsp + f.read() + n + "---" + n
except Exception as e:
print(f"Error reading AS file {as_file}: {str(e)}")
sys.stdout.flush()
# Create a symbolic link for future queries if we found exactly one match
if len(matching_files) == 1 and not os.path.exists(f"db/as/{query_as}"):
try:
os.symlink(matching_files[0], f"db/as/{query_as}")
print(f"Created symlink from {query_as} to {matching_files[0]}")
sys.stdout.flush()
except Exception as e:
print(f"Error creating symlink: {str(e)}")
sys.stdout.flush()
if not found:
rsp = rsp + n
rsp = rsp + "# SCION AS was not found in the whois database" + n
log = log + " (Not found)" + n
elif(isISD(query)):
# WHOIS SCION ISD
log = log + "SCION ISD" + n
# If isISD returned the actual ISD number (in case of "isd" prefix or "ISD-" format)
isd_id = isISD(query)
if isinstance(isd_id, str):
query = isd_id
found = False
# Check if this is a request to list all AS numbers in an ISD (format: "ISD-")
is_list_all_as = query.endswith("-")
if is_list_all_as:
query = query[:-1] # Remove the trailing dash
# First check if we have a direct ISD file
if os.path.exists("db/isd/" + query):
dd = open("db/isd/" + query, "r")
isd_info = dd.read()
dd.close()
found = True
log = log + n
# Add the ISD information to the response
rsp = rsp + isd_info + n + n
# Now find all AS numbers associated with this ISD
associated_as = []
for as_file in os.listdir("db/as/"):
try:
with open(f"db/as/{as_file}", "r") as f:
content = f.read()
# Look for ISD match
for line in content.split("\n"):
if line.startswith("ISD:"):
file_isd = line.split(":", 1)[1].strip()
if file_isd == query:
print(f"Found AS associated with ISD {query}: {as_file}")
sys.stdout.flush()
associated_as.append((as_file, content))
break
except Exception as e:
print(f"Error reading AS file {as_file}: {str(e)}")
sys.stdout.flush()
if associated_as:
found = True
rsp = rsp + f"% AS Numbers associated with ISD {query}:" + n + n
for as_file, content in associated_as:
# If this is a request to list all AS numbers, just show a summary
if is_list_all_as:
as_identifier = ""
organization = ""
description = ""
for line in content.split("\n"):
if line.startswith("AS Identifier:"):
as_identifier = line.split(":", 1)[1].strip()
elif line.startswith("Organization:"):
organization = line.split(":", 1)[1].strip()
elif line.startswith("Description:"):
description = line.split(":", 1)[1].strip()
rsp = rsp + f"AS: {as_identifier} - {organization} - {description}" + n
else:
# Show full details for each AS
rsp = rsp + content + n + "---" + n
if not found:
rsp = rsp + n
rsp = rsp + "# SCION ISD was not found in the whois database" + n
log = log + " (Not found)" + n
else:
# Unrecognized
log = log + "Unrecognized" + n
rsp = rsp + n
rsp = rsp + "# Error. Unknown query type. Query is not IPv4, Domain, SCION ISD or SCION AS" + n
try:
# Encode string to bytes before sending
con.send(rsp.encode('utf-8'))
except Exception as e:
print(f"Error sending response: {str(e)}")
sys.stdout.flush()
finally:
con.close()
if(LOGFILE!=""):
# Save to logs
try:
d = open(LOGFILE, "a+")
d.write(log)
d.close()
except Exception as e:
print(f"FAILED TO SAVE TO LOGFILE: {str(e)}")
sys.stdout.flush()
except Exception as e:
print(f"Error in main loop: {str(e)}")
sys.stdout.flush()