233 lines
8.4 KiB
Python
233 lines
8.4 KiB
Python
import socket
|
|
import threading
|
|
import json
|
|
import sys
|
|
import time
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
class Server:
|
|
def __init__(self, region, host, port, config_file):
|
|
self.region = region
|
|
self.host = host
|
|
self.port = int(port)
|
|
self.config_file = config_file
|
|
|
|
self.is_running = True
|
|
self.clients = []
|
|
self.lock = threading.Lock()
|
|
self.neighbors = [] # neighbors: list of (region, host, port)
|
|
self.region_peers = [] # servers in the same region (host, port)
|
|
self.nation = None
|
|
self.seen_messages = set()
|
|
|
|
self.load_config()
|
|
|
|
def load_config(self):
|
|
with open(self.config_file, 'r') as f:
|
|
all_servers = json.load(f)
|
|
|
|
# Find this region info
|
|
for region_info in all_servers:
|
|
if region_info["region"] == self.region:
|
|
self.nation = region_info.get("nation", None)
|
|
|
|
# Collect region peers (all servers in this region)
|
|
for server in region_info.get("servers", []):
|
|
self.region_peers.append((server["host"], server["port"]))
|
|
|
|
# Collect neighbors as (region, host, port)
|
|
for neighbor in region_info.get("neighbors", []):
|
|
h, p = neighbor.split(":")
|
|
# We need to find region of this neighbor host:port from config
|
|
neighbor_region = None
|
|
for r in all_servers:
|
|
for srv in r.get("servers", []):
|
|
if srv["host"] == h and int(p) == srv["port"]:
|
|
neighbor_region = r["region"]
|
|
break
|
|
if neighbor_region:
|
|
break
|
|
self.neighbors.append((neighbor_region, h, int(p)))
|
|
|
|
break
|
|
|
|
def start(self):
|
|
print(f"[{self.region}] Server started on port {self.port}")
|
|
threading.Thread(target=self.accept_clients, daemon=True).start()
|
|
threading.Thread(target=self.listen_for_servers, daemon=True).start()
|
|
|
|
def accept_clients(self):
|
|
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
server_socket.bind((self.host, self.port))
|
|
server_socket.listen()
|
|
|
|
while self.is_running:
|
|
try:
|
|
client_sock, _ = server_socket.accept()
|
|
with self.lock:
|
|
self.clients.append(client_sock)
|
|
threading.Thread(target=self.handle_client, args=(client_sock,), daemon=True).start()
|
|
except Exception as e:
|
|
print(f"[{self.region}] Accept client error: {e}")
|
|
|
|
def handle_client(self, client_sock):
|
|
buffer = ""
|
|
while True:
|
|
try:
|
|
data = client_sock.recv(4096)
|
|
if not data:
|
|
break
|
|
buffer += data.decode()
|
|
while '\n' in buffer:
|
|
line, buffer = buffer.split('\n', 1)
|
|
message = json.loads(line)
|
|
self.process_message(message, from_client=True, client_sock=client_sock)
|
|
except Exception:
|
|
break
|
|
with self.lock:
|
|
if client_sock in self.clients:
|
|
self.clients.remove(client_sock)
|
|
client_sock.close()
|
|
|
|
def listen_for_servers(self):
|
|
# Listen on port+1000 for server-server communication
|
|
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
server_socket.bind((self.host, self.port + 1000))
|
|
server_socket.listen()
|
|
while self.is_running:
|
|
try:
|
|
conn, _ = server_socket.accept()
|
|
threading.Thread(target=self.handle_server_connection, args=(conn,), daemon=True).start()
|
|
except Exception as e:
|
|
print(f"[{self.region}] ⚠️ Accept server error: {e}")
|
|
|
|
def handle_server_connection(self, conn):
|
|
buffer = ""
|
|
while True:
|
|
try:
|
|
data = conn.recv(4096)
|
|
if not data:
|
|
break
|
|
buffer += data.decode()
|
|
while '\n' in buffer:
|
|
line, buffer = buffer.split('\n', 1)
|
|
message = json.loads(line)
|
|
self.process_message(message, from_client=False)
|
|
except Exception:
|
|
break
|
|
conn.close()
|
|
|
|
def process_message(self, message, from_client=False, client_sock=None):
|
|
# Deduplicate messages
|
|
msg_id = message.get("id")
|
|
if msg_id in self.seen_messages:
|
|
return
|
|
self.seen_messages.add(msg_id)
|
|
|
|
level = message.get("level", "local").lower()
|
|
sender_region = message.get("region")
|
|
timestamp = message.get("timestamp")
|
|
sender = message.get("sender")
|
|
content = message.get("content")
|
|
|
|
print(f"[{self.region}] 📩 {level.upper()} message from {sender} @ {timestamp}: {content}")
|
|
|
|
# Send to local clients except sender (if from client)
|
|
self._send_to_local_clients(message, exclude_sock=client_sock if from_client else None)
|
|
|
|
# Propagate based on level
|
|
if level == "local":
|
|
# Forward to all region peers except self
|
|
for host, port in self.region_peers:
|
|
if host == self.host and port == self.port:
|
|
continue
|
|
self._send_to_server(host, port + 1000, message)
|
|
|
|
elif level == "national":
|
|
if self.nation is None or sender_region is None:
|
|
return
|
|
|
|
# Determine sender's nation from config
|
|
sender_nation = None
|
|
with open(self.config_file, 'r') as f:
|
|
all_servers = json.load(f)
|
|
for s in all_servers:
|
|
if s["region"] == sender_region:
|
|
sender_nation = s.get("nation", None)
|
|
break
|
|
if sender_nation != self.nation:
|
|
return
|
|
|
|
# Forward to region peers
|
|
for host, port in self.region_peers:
|
|
if host == self.host and port == self.port:
|
|
continue
|
|
self._send_to_server(host, port + 1000, message)
|
|
|
|
# Forward to neighbors with the same nation
|
|
for region, host, port in self.neighbors:
|
|
neighbor_nation = None
|
|
with open(self.config_file, 'r') as f:
|
|
all_servers = json.load(f)
|
|
for s in all_servers:
|
|
if s["region"] == region:
|
|
neighbor_nation = s.get("nation", None)
|
|
break
|
|
if neighbor_nation == self.nation:
|
|
self._send_to_server(host, port + 1000, message)
|
|
|
|
elif level == "global":
|
|
# Forward to all region peers except self
|
|
for host, port in self.region_peers:
|
|
if host == self.host and port == self.port:
|
|
continue
|
|
self._send_to_server(host, port + 1000, message)
|
|
|
|
# Forward to all neighbors
|
|
for region, host, port in self.neighbors:
|
|
self._send_to_server(host, port + 1000, message)
|
|
|
|
def _send_to_local_clients(self, message, exclude_sock=None):
|
|
with self.lock:
|
|
to_remove = []
|
|
for c in self.clients:
|
|
if c == exclude_sock:
|
|
continue
|
|
try:
|
|
c.sendall((json.dumps(message) + "\n").encode())
|
|
except Exception:
|
|
to_remove.append(c)
|
|
for c in to_remove:
|
|
self.clients.remove(c)
|
|
c.close()
|
|
|
|
def _send_to_server(self, host, port, message):
|
|
try:
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.settimeout(2)
|
|
s.connect((host, port))
|
|
s.sendall((json.dumps(message) + "\n").encode())
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) != 5:
|
|
print("Usage: python server.py <region> <host> <port> <config_file>")
|
|
sys.exit(1)
|
|
|
|
region = sys.argv[1]
|
|
host = sys.argv[2]
|
|
port = sys.argv[3]
|
|
config_file = sys.argv[4]
|
|
|
|
server = Server(region, host, port, config_file)
|
|
server.start()
|
|
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
print(f"\n[{region}] Shutting down server...")
|
|
server.is_running = False |