mirror of
https://git.photon.obnh.io/AXSY/whois.git
synced 2025-12-11 04:39:15 +00:00
Based on mwhois by Antonios A. Chariton Modifications for SCION AS support by Olaf Baumert, Axpo Systems AG
471 lines
14 KiB
Python
Executable File
471 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
|
|
import socket, os, re, json, sys
|
|
from time import strftime
|
|
from netaddr import IPAddress, IPNetwork
|
|
|
|
LISTEN_ADDRESS = "::" # Listen on all IPv6 addresses (includes IPv4 via IPv4-mapped IPv6 addresses)
|
|
LISTEN_PORT = 43
|
|
MAX_QUERY_SIZE = 128
|
|
LOGFILE = "/root/whois/mwhois.log"
|
|
|
|
# Print startup message
|
|
print(f"Starting mwhoisd on {LISTEN_ADDRESS}:{LISTEN_PORT}")
|
|
sys.stdout.flush()
|
|
|
|
n = "\r\n"
|
|
os.system("mkdir -p db/ipv4")
|
|
os.system("mkdir -p db/domains")
|
|
os.system("mkdir -p db/as")
|
|
os.system("mkdir -p db/isd")
|
|
|
|
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
|
|
# Set socket option to reuse address
|
|
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
# Enable dual stack (IPv4 and IPv6)
|
|
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
|
|
try:
|
|
s.bind((LISTEN_ADDRESS, LISTEN_PORT))
|
|
print(f"Successfully bound to {LISTEN_ADDRESS}:{LISTEN_PORT}")
|
|
sys.stdout.flush()
|
|
except Exception as e:
|
|
print(f"Could not bind to {LISTEN_ADDRESS}:{LISTEN_PORT}: {str(e)}")
|
|
sys.stdout.flush()
|
|
exit(2)
|
|
s.listen(5) # Allow up to 5 connections in the queue
|
|
|
|
# Sanitize the query received
|
|
def sanitizeQuery(qr):
|
|
qr = qr.lower()
|
|
qr = qr.replace("..", ".")
|
|
qr = qr.replace("/", "")
|
|
qr = qr.replace("\\", "")
|
|
qr = qr.replace(n, "")
|
|
|
|
# Remove "domain " prefix that some whois clients add
|
|
if qr.startswith("domain "):
|
|
qr = qr[7:]
|
|
|
|
return qr
|
|
|
|
# Check if the input is a valid IPv4 Address
|
|
def isIP(qr):
|
|
bytez = qr.split(".")
|
|
if(len(bytez) != 4):
|
|
return False
|
|
for byt in bytez:
|
|
if(not byt.isdigit()):
|
|
return False
|
|
ipaddr = [int(bytez[0]), int(bytez[1]), int(bytez[2]), int(bytez[3])]
|
|
for num in ipaddr:
|
|
if(num < 0 or num > 255):
|
|
return False
|
|
return True
|
|
|
|
# Check if the input is a valid domain name
|
|
def isDomain(qr):
|
|
if(len(qr.split(".")) == 1):
|
|
return False
|
|
zones = qr.split(".")
|
|
ac = re.compile(r"^[a-z0-9\.-]+\n")
|
|
for zone in zones:
|
|
if(zone == ""):
|
|
return False
|
|
if(zone[0] == "-"):
|
|
return False
|
|
if(zone[-1] == "-"):
|
|
return False
|
|
if(ac.match(zone + "\n")):
|
|
pass
|
|
else:
|
|
return False
|
|
return True
|
|
|
|
# Check if the input is a valid SCION AS identifier
|
|
def isAS(qr):
|
|
print(f"isAS checking: '{qr}'")
|
|
sys.stdout.flush()
|
|
|
|
# Check if format matches SCION AS identifier (e.g., "2:2:0")
|
|
if re.match(r'^\d+:\d+:\d+[a-f0-9]*$', qr):
|
|
print(f" Matched colon format")
|
|
sys.stdout.flush()
|
|
return qr, None # Return AS identifier and no ISD
|
|
|
|
# Also check if the query is just "as" followed by the identifier
|
|
if qr.startswith("as") and re.match(r'^\d+:\d+:\d+[a-f0-9]*$', qr[2:]):
|
|
print(f" Matched as-prefix with colon format")
|
|
sys.stdout.flush()
|
|
return qr[2:], None # Return the actual AS identifier and no ISD
|
|
|
|
# Check if format includes ISD number with colon format (e.g., "76-2:2:0")
|
|
isd_as_match = re.match(r'^(\d+)-(\d+:\d+:\d+[a-f0-9]*)$', qr)
|
|
if isd_as_match:
|
|
print(f" Matched ISD-AS colon format")
|
|
sys.stdout.flush()
|
|
isd_part = isd_as_match.group(1)
|
|
as_part = isd_as_match.group(2)
|
|
print(f" Extracted ISD: {isd_part}, AS part: {as_part}")
|
|
sys.stdout.flush()
|
|
return as_part, isd_part # Return both AS part and ISD part
|
|
|
|
# Check if format is ISD-AS without colons (e.g., "64-60284")
|
|
isd_as_simple = re.match(r'^(\d+)-(\d+)$', qr)
|
|
if isd_as_simple:
|
|
print(f" Matched ISD-AS decimal format")
|
|
sys.stdout.flush()
|
|
# Extract the ISD and AS number
|
|
isd_number = isd_as_simple.group(1)
|
|
as_number = isd_as_simple.group(2)
|
|
print(f" Extracted ISD: {isd_number}, AS number: {as_number}")
|
|
sys.stdout.flush()
|
|
return as_number, isd_number
|
|
|
|
# Check if format is just a decimal AS number (including 2-digit ones)
|
|
if re.match(r'^\d+$', qr):
|
|
print(f" Matched plain decimal format: {qr}")
|
|
sys.stdout.flush()
|
|
# Special handling for decimal AS numbers to ensure they're properly identified
|
|
# across all ISDs they might belong to
|
|
return qr, None
|
|
|
|
print(f" No match found")
|
|
sys.stdout.flush()
|
|
return False, None
|
|
|
|
# Check if the input is a valid ISD identifier
|
|
def isISD(qr):
|
|
# Check if format is just a number
|
|
if qr.isdigit():
|
|
return qr
|
|
# Check if format is "isd" followed by a number
|
|
if qr.startswith("isd") and qr[3:].isdigit():
|
|
return qr[3:] # Return just the ISD number
|
|
# Check if format is "ISD-" (e.g., "76-") to list all AS numbers in that ISD
|
|
if re.match(r'^\d+\-$', qr):
|
|
return qr[:-1] # Return just the ISD number without the dash
|
|
return False
|
|
|
|
# Check if an IP belongs to a CIDR IP block
|
|
def isIPinCIDR(ip, network):
|
|
return IPAddress(ip) in IPNetwork(network)
|
|
|
|
while True:
|
|
try:
|
|
con, adr = s.accept()
|
|
print(f"Connection from {adr[0]}:{adr[1]}")
|
|
sys.stdout.flush()
|
|
log = "[" + strftime("%d/%m/%Y %H:%M:%S") + "] " + adr[0] + " - "
|
|
try:
|
|
query = con.recv(MAX_QUERY_SIZE)
|
|
if not query:
|
|
con.close()
|
|
continue
|
|
# Decode bytes to string
|
|
query_str = query.decode('utf-8', errors='ignore')
|
|
log = log + query_str.replace("\r\n", "").replace("\n", "") + " - "
|
|
print(f"Raw query: '{query_str}'")
|
|
query = sanitizeQuery(query_str)
|
|
print(f"Sanitized query: '{query}'")
|
|
except Exception as e:
|
|
print(f"Error receiving data: {str(e)}")
|
|
sys.stdout.flush()
|
|
con.close()
|
|
continue
|
|
|
|
rsp = "# +-----------------------------------+" + n
|
|
rsp = rsp + "# | AXPO SYSTEMS AG |" + n
|
|
rsp = rsp + "# +-----------------------------------+" + n
|
|
rsp = rsp + "# | served by modded mwhois |" + n
|
|
rsp = rsp + "# +-----------------------------------+" + n
|
|
rsp = rsp + "# | whois.as60284.net |" + n
|
|
rsp = rsp + "# +-----------------------------------+" + n
|
|
rsp = rsp + n
|
|
|
|
if(isIP(query)):
|
|
# WHOIS IPv4
|
|
log = log + "IPv4"
|
|
ipdb = os.listdir("db/ipv4/")
|
|
found = False
|
|
for ipe in ipdb:
|
|
ipe = ipe.replace("-", "/")
|
|
if(isIPinCIDR(query, ipe)):
|
|
dd = open("db/ipv4/" + ipe.replace("/", "-"), "r")
|
|
rsp = rsp + dd.read()
|
|
dd.close()
|
|
found = True
|
|
log = log + n
|
|
break
|
|
if(found == False):
|
|
rsp = rsp + n
|
|
rsp = rsp + "# IP Address was not found in the whois database" + n
|
|
log = log + " (Not found)" + n
|
|
|
|
elif(isDomain(query)):
|
|
# WHOIS Domain
|
|
log = log + "Domain" + n
|
|
|
|
domaindb = os.listdir("db/domains/")
|
|
found = False
|
|
for domain in domaindb:
|
|
if(query == domain):
|
|
dd = open("db/domains/" + domain, "r")
|
|
rsp = rsp + dd.read()
|
|
dd.close()
|
|
found = True
|
|
log = log + n
|
|
break
|
|
if(found == False):
|
|
rsp = rsp + n
|
|
rsp = rsp + "# Domain name was not found in the whois database" + n
|
|
log = log + " (Not found)" + n
|
|
elif(isAS(query)[0]):
|
|
# WHOIS SCION AS
|
|
log = log + "SCION AS" + n
|
|
|
|
# If isAS returned the actual AS identifier and possibly an ISD
|
|
as_id, isd_id = isAS(query)
|
|
print(f"isAS returned: AS={as_id}, ISD={isd_id} for query: {query}")
|
|
sys.stdout.flush()
|
|
|
|
if isinstance(as_id, str):
|
|
query_as = as_id
|
|
else:
|
|
query_as = query
|
|
|
|
print(f"Looking for AS: {query_as}")
|
|
sys.stdout.flush()
|
|
|
|
# List all files in the AS directory for debugging
|
|
print(f"Files in db/as/: {os.listdir('db/as/')}")
|
|
sys.stdout.flush()
|
|
|
|
found = False
|
|
matching_files = []
|
|
|
|
# If ISD is specified, we only want to show that specific ISD-AS combination
|
|
if isd_id:
|
|
print(f"ISD specified: {isd_id}, looking for specific ISD-AS combination")
|
|
sys.stdout.flush()
|
|
|
|
# First try direct file lookup
|
|
if os.path.exists("db/as/" + query_as):
|
|
with open("db/as/" + query_as, "r") as f:
|
|
content = f.read()
|
|
for line in content.split("\n"):
|
|
if line.startswith("ISD:"):
|
|
file_isd = line.split(":", 1)[1].strip()
|
|
if file_isd == isd_id:
|
|
print(f"Found matching ISD-AS combination in file: db/as/{query_as}")
|
|
sys.stdout.flush()
|
|
rsp = rsp + content
|
|
found = True
|
|
break
|
|
|
|
# If not found by direct lookup, scan all files
|
|
if not found:
|
|
for as_file in os.listdir("db/as/"):
|
|
try:
|
|
with open(f"db/as/{as_file}", "r") as f:
|
|
content = f.read()
|
|
as_match = False
|
|
isd_match = False
|
|
|
|
# Check if AS matches
|
|
for line in content.split("\n"):
|
|
if line.startswith("AS Identifier:"):
|
|
as_identifier = line.split(":", 1)[1].strip()
|
|
if as_identifier == query_as:
|
|
as_match = True
|
|
elif line.startswith("ISD:"):
|
|
file_isd = line.split(":", 1)[1].strip()
|
|
if file_isd == isd_id:
|
|
isd_match = True
|
|
|
|
if as_match and isd_match:
|
|
print(f"Found matching ISD-AS combination in file: db/as/{as_file}")
|
|
sys.stdout.flush()
|
|
rsp = rsp + content
|
|
found = True
|
|
break
|
|
except Exception as e:
|
|
print(f"Error reading AS file {as_file}: {str(e)}")
|
|
sys.stdout.flush()
|
|
else:
|
|
# No ISD specified, show all instances of this AS across all ISDs
|
|
print(f"No ISD specified, looking for all instances of AS: {query_as}")
|
|
sys.stdout.flush()
|
|
|
|
# First try direct file lookup
|
|
if os.path.exists("db/as/" + query_as):
|
|
print(f"File found: db/as/{query_as}")
|
|
sys.stdout.flush()
|
|
matching_files.append(query_as)
|
|
found = True
|
|
|
|
# Also scan all files to find any other instances with the same AS
|
|
for as_file in os.listdir("db/as/"):
|
|
if as_file == query_as:
|
|
continue # Already added this one
|
|
|
|
try:
|
|
with open(f"db/as/{as_file}", "r") as f:
|
|
content = f.read()
|
|
|
|
# Look for AS Identifier match
|
|
for line in content.split("\n"):
|
|
if line.startswith("AS Identifier:"):
|
|
as_identifier = line.split(":", 1)[1].strip()
|
|
if as_identifier == query_as:
|
|
print(f"Found matching AS in file: db/as/{as_file}")
|
|
sys.stdout.flush()
|
|
matching_files.append(as_file)
|
|
found = True
|
|
break
|
|
|
|
# Look for AS decimal line if no match yet
|
|
if as_file not in matching_files:
|
|
for line in content.split("\n"):
|
|
if line.startswith("AS decimal:"):
|
|
decimal_value = line.split(":", 1)[1].strip()
|
|
# Handle both empty and non-empty decimal values
|
|
if decimal_value and decimal_value == query_as:
|
|
print(f"Found matching AS decimal in file: db/as/{as_file}")
|
|
sys.stdout.flush()
|
|
matching_files.append(as_file)
|
|
found = True
|
|
break
|
|
except Exception as e:
|
|
print(f"Error reading AS file {as_file}: {str(e)}")
|
|
sys.stdout.flush()
|
|
|
|
# Add all matching files to response
|
|
if matching_files:
|
|
rsp = rsp + "% Multiple entries found for this AS identifier:" + n + n
|
|
|
|
for as_file in matching_files:
|
|
try:
|
|
with open(f"db/as/{as_file}", "r") as f:
|
|
rsp = rsp + f.read() + n + "---" + n
|
|
except Exception as e:
|
|
print(f"Error reading AS file {as_file}: {str(e)}")
|
|
sys.stdout.flush()
|
|
|
|
# Create a symbolic link for future queries if we found exactly one match
|
|
if len(matching_files) == 1 and not os.path.exists(f"db/as/{query_as}"):
|
|
try:
|
|
os.symlink(matching_files[0], f"db/as/{query_as}")
|
|
print(f"Created symlink from {query_as} to {matching_files[0]}")
|
|
sys.stdout.flush()
|
|
except Exception as e:
|
|
print(f"Error creating symlink: {str(e)}")
|
|
sys.stdout.flush()
|
|
|
|
if not found:
|
|
rsp = rsp + n
|
|
rsp = rsp + "# SCION AS was not found in the whois database" + n
|
|
log = log + " (Not found)" + n
|
|
elif(isISD(query)):
|
|
# WHOIS SCION ISD
|
|
log = log + "SCION ISD" + n
|
|
|
|
# If isISD returned the actual ISD number (in case of "isd" prefix or "ISD-" format)
|
|
isd_id = isISD(query)
|
|
if isinstance(isd_id, str):
|
|
query = isd_id
|
|
|
|
found = False
|
|
|
|
# Check if this is a request to list all AS numbers in an ISD (format: "ISD-")
|
|
is_list_all_as = query.endswith("-")
|
|
if is_list_all_as:
|
|
query = query[:-1] # Remove the trailing dash
|
|
|
|
# First check if we have a direct ISD file
|
|
if os.path.exists("db/isd/" + query):
|
|
dd = open("db/isd/" + query, "r")
|
|
isd_info = dd.read()
|
|
dd.close()
|
|
found = True
|
|
log = log + n
|
|
|
|
# Add the ISD information to the response
|
|
rsp = rsp + isd_info + n + n
|
|
|
|
# Now find all AS numbers associated with this ISD
|
|
associated_as = []
|
|
|
|
for as_file in os.listdir("db/as/"):
|
|
try:
|
|
with open(f"db/as/{as_file}", "r") as f:
|
|
content = f.read()
|
|
|
|
# Look for ISD match
|
|
for line in content.split("\n"):
|
|
if line.startswith("ISD:"):
|
|
file_isd = line.split(":", 1)[1].strip()
|
|
if file_isd == query:
|
|
print(f"Found AS associated with ISD {query}: {as_file}")
|
|
sys.stdout.flush()
|
|
associated_as.append((as_file, content))
|
|
break
|
|
except Exception as e:
|
|
print(f"Error reading AS file {as_file}: {str(e)}")
|
|
sys.stdout.flush()
|
|
|
|
if associated_as:
|
|
found = True
|
|
rsp = rsp + f"% AS Numbers associated with ISD {query}:" + n + n
|
|
|
|
for as_file, content in associated_as:
|
|
# If this is a request to list all AS numbers, just show a summary
|
|
if is_list_all_as:
|
|
as_identifier = ""
|
|
organization = ""
|
|
description = ""
|
|
|
|
for line in content.split("\n"):
|
|
if line.startswith("AS Identifier:"):
|
|
as_identifier = line.split(":", 1)[1].strip()
|
|
elif line.startswith("Organization:"):
|
|
organization = line.split(":", 1)[1].strip()
|
|
elif line.startswith("Description:"):
|
|
description = line.split(":", 1)[1].strip()
|
|
|
|
rsp = rsp + f"AS: {as_identifier} - {organization} - {description}" + n
|
|
else:
|
|
# Show full details for each AS
|
|
rsp = rsp + content + n + "---" + n
|
|
|
|
if not found:
|
|
rsp = rsp + n
|
|
rsp = rsp + "# SCION ISD was not found in the whois database" + n
|
|
log = log + " (Not found)" + n
|
|
else:
|
|
# Unrecognized
|
|
log = log + "Unrecognized" + n
|
|
rsp = rsp + n
|
|
rsp = rsp + "# Error. Unknown query type. Query is not IPv4, Domain, SCION ISD or SCION AS" + n
|
|
try:
|
|
# Encode string to bytes before sending
|
|
con.send(rsp.encode('utf-8'))
|
|
except Exception as e:
|
|
print(f"Error sending response: {str(e)}")
|
|
sys.stdout.flush()
|
|
finally:
|
|
con.close()
|
|
|
|
if(LOGFILE!=""):
|
|
# Save to logs
|
|
try:
|
|
d = open(LOGFILE, "a+")
|
|
d.write(log)
|
|
d.close()
|
|
except Exception as e:
|
|
print(f"FAILED TO SAVE TO LOGFILE: {str(e)}")
|
|
sys.stdout.flush()
|
|
except Exception as e:
|
|
print(f"Error in main loop: {str(e)}")
|
|
sys.stdout.flush()
|