import socket import threading import json import sys import time from datetime import datetime, timezone class Server: def __init__(self, region, host, port, config_file): self.region = region self.host = host self.port = int(port) self.config_file = config_file self.is_running = True self.clients = [] self.lock = threading.Lock() self.neighbors = [] # neighbors: list of (region, host, port) self.region_peers = [] # servers in the same region (host, port) self.nation = None self.seen_messages = set() self.load_config() def load_config(self): with open(self.config_file, 'r') as f: all_servers = json.load(f) # Find this region info for region_info in all_servers: if region_info["region"] == self.region: self.nation = region_info.get("nation", None) # Collect region peers (all servers in this region) for server in region_info.get("servers", []): self.region_peers.append((server["host"], server["port"])) # Collect neighbors as (region, host, port) for neighbor in region_info.get("neighbors", []): h, p = neighbor.split(":") # We need to find region of this neighbor host:port from config neighbor_region = None for r in all_servers: for srv in r.get("servers", []): if srv["host"] == h and int(p) == srv["port"]: neighbor_region = r["region"] break if neighbor_region: break self.neighbors.append((neighbor_region, h, int(p))) break def start(self): print(f"[{self.region}] Server started on port {self.port}") threading.Thread(target=self.accept_clients, daemon=True).start() threading.Thread(target=self.listen_for_servers, daemon=True).start() def accept_clients(self): server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind((self.host, self.port)) server_socket.listen() while self.is_running: try: client_sock, _ = server_socket.accept() with self.lock: self.clients.append(client_sock) threading.Thread(target=self.handle_client, args=(client_sock,), daemon=True).start() except Exception as e: print(f"[{self.region}] Accept client error: {e}") def handle_client(self, client_sock): buffer = "" while True: try: data = client_sock.recv(4096) if not data: break buffer += data.decode() while '\n' in buffer: line, buffer = buffer.split('\n', 1) message = json.loads(line) self.process_message(message, from_client=True, client_sock=client_sock) except Exception: break with self.lock: if client_sock in self.clients: self.clients.remove(client_sock) client_sock.close() def listen_for_servers(self): # Listen on port+1000 for server-server communication server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind((self.host, self.port + 1000)) server_socket.listen() while self.is_running: try: conn, _ = server_socket.accept() threading.Thread(target=self.handle_server_connection, args=(conn,), daemon=True).start() except Exception as e: print(f"[{self.region}] ⚠️ Accept server error: {e}") def handle_server_connection(self, conn): buffer = "" while True: try: data = conn.recv(4096) if not data: break buffer += data.decode() while '\n' in buffer: line, buffer = buffer.split('\n', 1) message = json.loads(line) self.process_message(message, from_client=False) except Exception: break conn.close() def process_message(self, message, from_client=False, client_sock=None): # Deduplicate messages msg_id = message.get("id") if msg_id in self.seen_messages: return self.seen_messages.add(msg_id) level = message.get("level", "local").lower() sender_region = message.get("region") timestamp = message.get("timestamp") sender = message.get("sender") content = message.get("content") print(f"[{self.region}] 📩 {level.upper()} message from {sender} @ {timestamp}: {content}") # Send to local clients except sender (if from client) self._send_to_local_clients(message, exclude_sock=client_sock if from_client else None) # Propagate based on level if level == "local": # Forward to all region peers except self for host, port in self.region_peers: if host == self.host and port == self.port: continue self._send_to_server(host, port + 1000, message) elif level == "national": if self.nation is None or sender_region is None: return # Determine sender's nation from config sender_nation = None with open(self.config_file, 'r') as f: all_servers = json.load(f) for s in all_servers: if s["region"] == sender_region: sender_nation = s.get("nation", None) break if sender_nation != self.nation: return # Forward to region peers for host, port in self.region_peers: if host == self.host and port == self.port: continue self._send_to_server(host, port + 1000, message) # Forward to neighbors with the same nation for region, host, port in self.neighbors: neighbor_nation = None with open(self.config_file, 'r') as f: all_servers = json.load(f) for s in all_servers: if s["region"] == region: neighbor_nation = s.get("nation", None) break if neighbor_nation == self.nation: self._send_to_server(host, port + 1000, message) elif level == "global": # Forward to all region peers except self for host, port in self.region_peers: if host == self.host and port == self.port: continue self._send_to_server(host, port + 1000, message) # Forward to all neighbors for region, host, port in self.neighbors: self._send_to_server(host, port + 1000, message) def _send_to_local_clients(self, message, exclude_sock=None): with self.lock: to_remove = [] for c in self.clients: if c == exclude_sock: continue try: c.sendall((json.dumps(message) + "\n").encode()) except Exception: to_remove.append(c) for c in to_remove: self.clients.remove(c) c.close() def _send_to_server(self, host, port, message): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(2) s.connect((host, port)) s.sendall((json.dumps(message) + "\n").encode()) except Exception: pass if __name__ == "__main__": if len(sys.argv) != 5: print("Usage: python server.py ") sys.exit(1) region = sys.argv[1] host = sys.argv[2] port = sys.argv[3] config_file = sys.argv[4] server = Server(region, host, port, config_file) server.start() try: while True: time.sleep(1) except KeyboardInterrupt: print(f"\n[{region}] Shutting down server...") server.is_running = False