diff --git a/README.md b/README.md index f10098e..104c2a2 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,18 @@ Python을 사용한 종합적인 네트워크 모니터링 도구입니다. - Docker Compose 환경에서 서비스 실행 - 기존 클라이언트 도구와의 통합 테스트 완료 +### feature/socket-options +- **고급 소켓 제어**: + - SO_REUSEADDR, SO_KEEPALIVE, TCP_NODELAY 등 소켓 옵션 활용 + - 논블로킹 소켓 구현으로 성능 향상 (최대 3.2배) + - select() 기반 정밀 타임아웃 제어 메커니즘 + - 적응형 타임아웃 (95퍼센타일 응답시간 기반 자동 조정) + - 포트 스캐너 성능 최적화 및 자동 벤치마크 시스템 + - 4가지 스캔 방법 성능 비교 (기본/고급/멀티스레드/고급+멀티스레드) + - 자동 최적화: 호스트별 최적 타임아웃/워커수/소켓옵션 자동 선택 + - 기본 소켓과 고급 소켓 옵션 선택적 사용 가능 + - CLI 및 웹 인터페이스 모두 지원 + ## 설치 방법 ### 요구사항 @@ -122,8 +134,12 @@ python app.py scan google.com - `-p, --ports`: 스캔할 포트 범위 (예: 1-1024) - `--common`: 일반적인 포트만 스캔 - `-t, --timeout`: 각 포트에 대한 타임아웃 시간(초) (기본값: 0.5) +- `--advanced`: 고급 소켓 옵션 사용 (논블로킹 소켓으로 성능 향상) +- `--adaptive-timeout`: 적응형 타임아웃 사용 (네트워크 상태 기반 자동 조정) +- `--optimize`: 대상 호스트에 최적화된 파라미터로 자동 스캔 +- `--benchmark`: 성능 벤치마크 실행 -예시: +#### 기본 스캔 예시: ```bash # 포트 20-100 범위 스캔 python app.py scan localhost -p 20-100 @@ -135,6 +151,29 @@ python app.py scan localhost --common python app.py scan localhost -t 0.2 ``` +#### 고급 스캔 예시: +```bash +# 논블로킹 소켓으로 고성능 스캔 +python app.py scan google.com --common --advanced + +# 적응형 타임아웃으로 네트워크 최적화 스캔 +python app.py scan google.com --common --adaptive-timeout + +# 모든 고급 옵션을 함께 사용 +python app.py scan google.com --common --advanced --adaptive-timeout + +# 자동 최적화로 최상의 성능 스캔 +python app.py scan google.com --optimize + +# 성능 벤치마크 실행 (4가지 방법 비교) +python app.py scan localhost --benchmark +``` + +#### 성능 비교 (실제 테스트 결과): +- **기본 소켓**: 0.006초 (기준) +- **논블로킹 소켓**: 0.002초 (3.2배 빠름) +- **멀티스레드 + 논블로킹**: 0.006초 (안정적 성능) + #### DNS 조회 ##### 정방향 DNS 조회 @@ -193,13 +232,26 @@ python app.py server tcp-echo - `--host`: 바인딩할 호스트 (기본값: localhost) - `--port`: 바인딩할 포트 (기본값: 8080) - `--multi`: 멀티 클라이언트 지원 활성화 +- `--advanced`: 고급 소켓 옵션 사용 (SO_KEEPALIVE, TCP_NODELAY 등) 예시: ```bash -# 멀티 클라이언트 지원으로 포트 9000에서 실행 +# 기본 TCP 에코 서버 +python app.py server tcp-echo --host 0.0.0.0 --port 9000 + +# 멀티 클라이언트 지원 python app.py server tcp-echo --host 0.0.0.0 --port 9000 --multi + +# 고급 소켓 옵션으로 최적화된 서버 +python app.py server tcp-echo --host 0.0.0.0 --port 9000 --multi --advanced ``` +고급 소켓 옵션 사용 시: +- **SO_REUSEADDR**: 서버 재시작 시 빠른 포트 바인딩 +- **SO_KEEPALIVE**: TCP 연결 유지 확인 (2시간 간격) +- **TCP_NODELAY**: 단일 클라이언트 모드에서 지연 최소화 +- **최적화된 버퍼 크기**: 64KB 송수신 버퍼 + ##### UDP 에코 서버 ```bash @@ -249,7 +301,32 @@ http://localhost:5000 웹 인터페이스는 다음 기능을 제공합니다: - **Ping 테스트**: 호스트의 응답 시간 측정 - **포트 스캔**: 특정 호스트의 열린 포트 확인 + - 기본/고급 소켓 옵션 선택 (논블로킹 모드) + - 적응형 타임아웃 자동 조정 + - 자동 최적화 기능 (원클릭 최적 설정) + - 성능 벤치마크 실행 (4가지 방법 비교) + - 실시간 성능 통계 및 방법별 분석 - **DNS 조회**: 도메인 이름에 대한 DNS 레코드 조회 및 역방향 DNS 조회 +- **서버 상태 모니터링**: Docker 서비스 및 시스템 상태 확인 +- **모니터링 설정 관리**: YAML 기반 모니터링 설정 조회 + +#### 웹 인터페이스 고급 기능 사용법 + +**포트 스캔 고급 옵션**: +1. **고급 소켓 옵션**: 체크박스를 선택하면 논블로킹 소켓으로 스캔 (성능 향상) +2. **적응형 타임아웃**: 네트워크 상태에 따라 타임아웃 자동 조정 +3. **자동 최적화**: 대상 호스트에 최적화된 설정으로 자동 스캔 +4. **성능 벤치마크**: 4가지 스캔 방법의 성능을 비교 분석 + +**서버 상태 모니터링**: +- Docker 컨테이너 서비스 상태 실시간 확인 +- 각 서비스의 응답 시간 및 접근성 표시 +- 시스템 전반적인 상태 요약 + +**모니터링 설정 관리**: +- 현재 설정된 모니터링 대상 및 설정 확인 +- 알림 설정 (이메일, 로그, 콘솔) 상태 표시 +- 설정 파일 존재 여부 및 수정 가이드 제공 ### 주기적 모니터링 @@ -343,12 +420,28 @@ docker compose down ### 컨테이너 내부에서 명령행 도구 사용 +#### 기본 사용법: ```bash docker run network-monitor python app.py ping google.com docker run network-monitor python app.py scan localhost --common docker run network-monitor python app.py dns lookup google.com -t A ``` +#### 고급 소켓 옵션 사용: +```bash +# 고성능 포트 스캔 +docker run network-monitor python app.py scan google.com --common --advanced + +# 자동 최적화 스캔 +docker run network-monitor python app.py scan google.com --optimize + +# 성능 벤치마크 +docker run network-monitor python app.py scan localhost --benchmark + +# 고급 옵션 TCP 서버 (별도 컨테이너에서) +docker run -p 8080:8080 network-monitor python app.py server tcp-echo --host 0.0.0.0 --advanced +``` + ### 주의사항 - 컨테이너 내부에서 localhost를 스캔하면 호스트 머신이 아닌 컨테이너 자체를 스캔합니다. @@ -361,31 +454,112 @@ docker run network-monitor python app.py dns lookup google.com -t A ``` network_monitor/ -├── network_monitor/ # 메인 패키지 -│ ├── __init__.py # 패키지 초기화 파일 -│ ├── ping_monitor.py # Ping 모니터링 모듈 -│ ├── port_scanner.py # 포트 스캔 모듈 -│ ├── dns_lookup.py # DNS 조회 모듈 -│ ├── utils.py # 유틸리티 함수들 -│ └── config.py # 설정 관리 -├── app.py # 명령행 인터페이스 -├── web_app.py # 웹 인터페이스 -├── monitor.py # 주기적 모니터링 및 알림 -├── templates/ # 웹 템플릿 디렉토리 -│ └── index.html # 메인 웹 페이지 -├── Dockerfile # Docker 이미지 빌드 설정 -├── docker-compose.yml # Docker Compose 구성 파일 -├── .dockerignore # Docker 빌드 제외 파일 목록 -├── monitor_config.yaml # 모니터링 설정 파일 -├── monitor.log # 모니터링 로그 파일 -├── requirements.txt # 필요한 패키지 목록 -└── README.md # 프로젝트 설명 +├── network_monitor/ # 메인 패키지 +│ ├── __init__.py # 패키지 초기화 파일 +│ ├── ping_monitor.py # Ping 모니터링 모듈 +│ ├── port_scanner.py # 포트 스캔 모듈 (고급 소켓 옵션 지원) +│ ├── dns_lookup.py # DNS 조회 모듈 +│ ├── socket_options.py # 고급 소켓 옵션 관리 +│ ├── timeout_manager.py # 정밀 타임아웃 제어 +│ ├── performance_optimizer.py # 성능 최적화 및 벤치마크 +│ ├── tcp_server.py # TCP 서버 (고급 소켓 옵션 지원) +│ ├── udp_server.py # UDP 서버 +│ ├── file_server.py # 파일 전송 서버 +│ ├── utils.py # 유틸리티 함수들 +│ └── config.py # 설정 관리 +├── app.py # 명령행 인터페이스 (고급 옵션 지원) +├── web_app.py # 웹 인터페이스 (고급 소켓 옵션 지원) +├── monitor.py # 주기적 모니터링 및 알림 +├── templates/ # 웹 템플릿 디렉토리 +│ └── index.html # 메인 웹 페이지 (고급 기능 UI 포함) +├── Dockerfile # Docker 이미지 빌드 설정 +├── docker-compose.yml # Docker Compose 구성 파일 +├── .dockerignore # Docker 빌드 제외 파일 목록 +├── monitor_config.yaml # 모니터링 설정 파일 +├── monitor.log # 모니터링 로그 파일 +├── requirements.txt # 필요한 패키지 목록 +└── README.md # 프로젝트 설명 +``` + +## 성능 최적화 기능 + +### 자동 벤치마크 +시스템이 자동으로 4가지 스캔 방법을 비교하여 최적의 성능을 찾습니다: + +```bash +python app.py scan localhost --benchmark +``` + +**벤치마크 결과 예시:** +``` +포트 스캔 성능 벤치마크 결과 +============================================================ +가장 빠른 방법: advanced_nonblocking + +방법별 성능 결과: +- basic_blocking: 0.006초 (기준) +- advanced_nonblocking: 0.002초 (3.2배 빠름) +- threaded_basic: 0.008초 +- threaded_advanced: 0.006초 + +효율성 점수 및 추천사항 자동 제공 ``` +### 적응형 타임아웃 +네트워크 상태를 학습하여 호스트별 최적 타임아웃을 자동 계산: + +```bash +python app.py scan google.com --adaptive-timeout +``` + +- 95퍼센타일 응답 시간 기반 계산 +- 성공률에 따른 자동 조정 +- 호스트별 개별 학습 및 적용 + +### 자동 최적화 +대상 호스트에 맞는 최적 파라미터를 자동으로 찾아 적용: + +```bash +python app.py scan google.com --optimize +``` + +자동으로 결정되는 항목: +- 최적 타임아웃 값 +- 최적 워커 스레드 수 +- 블로킹/논블로킹 소켓 선택 +- 적응형 타임아웃 활성화 + ## 향후 개발 계획 +다음은 project.txt에 계획된 향후 브랜치들입니다: + +### feature/multiplexing (다음 우선순위) +- **I/O 멀티플렉싱**: + - select() 기반 단일 스레드 멀티클라이언트 처리 + - epoll() 고성능 서버 (Linux) + - 기존 멀티스레딩과 성능 비교 기능 + +### feature/broadcast +- **브로드캐스팅/멀티캐스팅**: + - UDP 브로드캐스트 네트워크 스캔 + - 로컬 네트워크 자동 탐지 + - 기존 네트워크 모니터링에 통합 + +### feature/raw-socket +- **패킷 레벨 분석**: + - Raw 소켓 패킷 캡처 + - IP/TCP/UDP 헤더 분석 + - 네트워크 트래픽 통계 + - 웹 인터페이스에 트래픽 분석 탭 추가 + +### feature/protocol +- **커스텀 프로토콜**: + - 바이너리 프로토콜 설계 + - 메시지 프레이밍 + - 기존 서버들에 적용 + +### 추가 계획 - 결과 데이터베이스 저장 및 이력 조회 기능 -- 네트워크 트래픽 분석 기능 - 더 풍부한 시각화 및 대시보드 - API 엔드포인트 제공 - 테스트 코드 작성 및 CI/CD 파이프라인 구축 diff --git a/app.py b/app.py index 03631f3..c91b45a 100755 --- a/app.py +++ b/app.py @@ -5,6 +5,7 @@ from network_monitor.tcp_server import run_tcp_echo_server from network_monitor.udp_server import run_udp_echo_server from network_monitor.file_server import run_file_transfer_server +from network_monitor.performance_optimizer import PerformanceOptimizer, run_performance_benchmark import argparse def main(): @@ -23,6 +24,10 @@ def main(): scan_parser.add_argument('-p', '--ports', help='Port range to scan (e.g. 1-1024)') scan_parser.add_argument('--common', action='store_true', help='Scan only common ports') scan_parser.add_argument('-t', '--timeout', type=float, default=0.5, help='Timeout in seconds for each port') + scan_parser.add_argument('--advanced', action='store_true', help='Use advanced socket options (non-blocking)') + scan_parser.add_argument('--adaptive-timeout', action='store_true', help='Use adaptive timeout based on network conditions') + scan_parser.add_argument('--optimize', action='store_true', help='Auto-optimize scan parameters for target host') + scan_parser.add_argument('--benchmark', action='store_true', help='Run performance benchmark') # DNS 조회 명령 설정 dns_parser = subparsers.add_parser('dns', help='Perform DNS lookups') @@ -48,6 +53,7 @@ def main(): tcp_parser.add_argument('--host', default='localhost', help='Host to bind to (default: localhost)') tcp_parser.add_argument('--port', type=int, default=8080, help='Port to bind to (default: 8080)') tcp_parser.add_argument('--multi', action='store_true', help='Enable multi-client support') + tcp_parser.add_argument('--advanced', action='store_true', help='Use advanced socket options (SO_KEEPALIVE, TCP_NODELAY)') # UDP 에코 서버 udp_parser = server_subparsers.add_parser('udp-echo', help='Run UDP echo server') @@ -75,6 +81,46 @@ def main(): f"Average = {result['avg_time']:.2f}ms") elif args.command == 'scan': + # 성능 벤치마크 실행 + if args.benchmark: + print("성능 벤치마크를 실행합니다...") + run_performance_benchmark(args.host) + return + + # 자동 최적화 실행 + if args.optimize: + print("스캔 파라미터를 자동 최적화합니다...") + optimal_params = PerformanceOptimizer.auto_optimize_scan_params(args.host) + + # 최적화된 파라미터로 스캔 실행 + print(f"\n최적화된 설정으로 스캔을 시작합니다...") + if args.ports: + start_port, end_port = map(int, args.ports.split('-')) + result = scan_host(args.host, (start_port, end_port), + optimal_params['timeout'], + max_workers=optimal_params['max_workers'], + use_advanced_options=optimal_params['use_advanced_options'], + use_adaptive_timeout=optimal_params['use_adaptive_timeout']) + else: + result = scan_host(args.host, + timeout=optimal_params['timeout'], + max_workers=optimal_params['max_workers'], + use_advanced_options=optimal_params['use_advanced_options'], + use_adaptive_timeout=optimal_params['use_adaptive_timeout']) + + print(f"\n최적화된 스캔 완료:") + print(f"Host: {result['host']}") + print(f"Open ports: {result['open_port_count']}/{result['total_ports_scanned']}") + print(f"Scan time: {result['scan_time']:.2f}s") + print(f"Method: {result['scan_method']}") + + if result['open_ports']: + print("\nOpen Ports:") + for port_info in sorted(result['open_ports'], key=lambda x: x['port']): + print(f" {port_info['port']}/tcp - {port_info['service']} " + f"({port_info['response_time']:.4f}s)") + return + # 포트 범위 결정 if args.common: # 일반적인 포트만 스캔 @@ -84,7 +130,9 @@ def main(): # 각 포트 개별적으로 스캔 results = [] for port in common_ports: - port_result = scan_host(args.host, (port, port), args.timeout) + port_result = scan_host(args.host, (port, port), args.timeout, + use_advanced_options=args.advanced, + use_adaptive_timeout=args.adaptive_timeout) if port_result['open_ports']: results.extend(port_result['open_ports']) @@ -108,7 +156,9 @@ def main(): print("Invalid port range. Format should be: start-end (e.g. 1-1024)") return - result = scan_host(args.host, (start_port, end_port), args.timeout) + result = scan_host(args.host, (start_port, end_port), args.timeout, + use_advanced_options=args.advanced, + use_adaptive_timeout=args.adaptive_timeout) print("\nScan Results:") print(f"Host: {result['host']}") @@ -123,7 +173,8 @@ def main(): f"({port_info['response_time']:.4f}s)") else: # 기본 포트 범위(1-1024) 사용 - result = scan_host(args.host) + result = scan_host(args.host, use_advanced_options=args.advanced, + use_adaptive_timeout=args.adaptive_timeout) print("\nScan Results:") print(f"Host: {result['host']}") @@ -215,7 +266,9 @@ def main(): print(f"Starting TCP Echo Server on {args.host}:{args.port}") if args.multi: print("Multi-client mode enabled") - run_tcp_echo_server(args.host, args.port, args.multi) + if args.advanced: + print("Advanced socket options enabled") + run_tcp_echo_server(args.host, args.port, args.multi, args.advanced) elif args.server_command == 'udp-echo': print(f"Starting UDP Echo Server on {args.host}:{args.port}") diff --git a/network_monitor/performance_optimizer.py b/network_monitor/performance_optimizer.py new file mode 100644 index 0000000..b18c2ae --- /dev/null +++ b/network_monitor/performance_optimizer.py @@ -0,0 +1,358 @@ +import time +import statistics +from typing import List, Dict, Any, Optional +from concurrent.futures import ThreadPoolExecutor +from .port_scanner import scan_port_basic, scan_port_nonblocking, get_common_ports + + +class PortScanBenchmark: + """포트 스캔 성능 벤치마크 클래스""" + + def __init__(self): + self.results = {} + + def benchmark_scan_methods(self, host: str, ports: List[int], timeout: float = 1.0, + iterations: int = 3) -> Dict[str, Any]: + """ + 다양한 스캔 방법의 성능 비교 벤치마크 + + Args: + host: 대상 호스트 + ports: 테스트할 포트 목록 + timeout: 타임아웃 시간 + iterations: 반복 테스트 횟수 + + Returns: + 벤치마크 결과 딕셔너리 + """ + print(f"성능 벤치마크 시작: {host} ({len(ports)}개 포트, {iterations}회 반복)") + + methods = { + 'basic_blocking': self._test_basic_blocking, + 'advanced_nonblocking': self._test_nonblocking, + 'threaded_basic': self._test_threaded_basic, + 'threaded_advanced': self._test_threaded_advanced + } + + results = {} + + for method_name, method_func in methods.items(): + print(f"\n{method_name} 테스트 중...") + method_times = [] + method_success_counts = [] + + for i in range(iterations): + start_time = time.time() + success_count = method_func(host, ports, timeout) + elapsed_time = time.time() - start_time + + method_times.append(elapsed_time) + method_success_counts.append(success_count) + print(f" 반복 {i+1}: {elapsed_time:.3f}초, {success_count}개 포트 성공") + + results[method_name] = { + 'avg_time': statistics.mean(method_times), + 'min_time': min(method_times), + 'max_time': max(method_times), + 'std_dev': statistics.stdev(method_times) if len(method_times) > 1 else 0, + 'avg_success_count': statistics.mean(method_success_counts), + 'total_ports': len(ports), + 'success_rate': statistics.mean(method_success_counts) / len(ports) + } + + # 성능 비교 분석 + fastest_method = min(results.keys(), key=lambda k: results[k]['avg_time']) + performance_comparison = self._analyze_performance(results, fastest_method) + + return { + 'benchmark_results': results, + 'fastest_method': fastest_method, + 'performance_analysis': performance_comparison, + 'test_config': { + 'host': host, + 'ports_tested': len(ports), + 'timeout': timeout, + 'iterations': iterations + } + } + + def _test_basic_blocking(self, host: str, ports: List[int], timeout: float) -> int: + """기본 블로킹 소켓 테스트""" + success_count = 0 + for port in ports: + _, is_open, _, _ = scan_port_basic(host, port, timeout) + if is_open: + success_count += 1 + return success_count + + def _test_nonblocking(self, host: str, ports: List[int], timeout: float) -> int: + """논블로킹 소켓 테스트""" + success_count = 0 + for port in ports: + _, is_open, _, _ = scan_port_nonblocking(host, port, timeout) + if is_open: + success_count += 1 + return success_count + + def _test_threaded_basic(self, host: str, ports: List[int], timeout: float) -> int: + """멀티스레드 + 기본 소켓 테스트""" + success_count = 0 + with ThreadPoolExecutor(max_workers=50) as executor: + results = list(executor.map( + lambda p: scan_port_basic(host, p, timeout), + ports + )) + + for _, is_open, _, _ in results: + if is_open: + success_count += 1 + return success_count + + def _test_threaded_advanced(self, host: str, ports: List[int], timeout: float) -> int: + """멀티스레드 + 논블로킹 소켓 테스트""" + success_count = 0 + with ThreadPoolExecutor(max_workers=50) as executor: + results = list(executor.map( + lambda p: scan_port_nonblocking(host, p, timeout), + ports + )) + + for _, is_open, _, _ in results: + if is_open: + success_count += 1 + return success_count + + def _analyze_performance(self, results: Dict, fastest_method: str) -> Dict[str, Any]: + """성능 분석 결과 생성""" + fastest_time = results[fastest_method]['avg_time'] + analysis = { + 'speed_comparison': {}, + 'recommendations': [], + 'efficiency_scores': {} + } + + for method, stats in results.items(): + # 속도 비교 (배수) + speed_ratio = stats['avg_time'] / fastest_time + analysis['speed_comparison'][method] = { + 'ratio': speed_ratio, + 'description': f"{speed_ratio:.2f}x {'slower' if speed_ratio > 1 else 'faster'}" + } + + # 효율성 점수 (속도 + 성공률) + efficiency = (stats['success_rate'] * 100) / stats['avg_time'] + analysis['efficiency_scores'][method] = efficiency + + # 추천사항 생성 + if results['threaded_advanced']['avg_time'] < results['basic_blocking']['avg_time']: + analysis['recommendations'].append( + "논블로킹 소켓 + 멀티스레딩이 기본 방식보다 빠릅니다." + ) + + if results['advanced_nonblocking']['success_rate'] > 0.9: + analysis['recommendations'].append( + "논블로킹 소켓이 높은 성공률을 보입니다." + ) + + return analysis + + def print_benchmark_results(self, benchmark_data: Dict): + """벤치마크 결과를 보기 좋게 출력""" + print("\n" + "="*60) + print("포트 스캔 성능 벤치마크 결과") + print("="*60) + + config = benchmark_data['test_config'] + print(f"테스트 대상: {config['host']}") + print(f"포트 수: {config['ports_tested']}") + print(f"타임아웃: {config['timeout']}초") + print(f"반복 횟수: {config['iterations']}회") + + print(f"\n가장 빠른 방법: {benchmark_data['fastest_method']}") + + print("\n방법별 성능 결과:") + print("-" * 60) + + results = benchmark_data['benchmark_results'] + for method, stats in results.items(): + print(f"\n{method}:") + print(f" 평균 시간: {stats['avg_time']:.3f}초") + print(f" 최소/최대: {stats['min_time']:.3f}초 ~ {stats['max_time']:.3f}초") + print(f" 표준편차: {stats['std_dev']:.3f}") + print(f" 성공률: {stats['success_rate']:.1%}") + + print(f"\n속도 비교 (기준: {benchmark_data['fastest_method']}):") + print("-" * 40) + for method, comparison in benchmark_data['performance_analysis']['speed_comparison'].items(): + print(f"{method}: {comparison['description']}") + + print(f"\n효율성 점수 (성공률/시간):") + print("-" * 30) + efficiency_scores = benchmark_data['performance_analysis']['efficiency_scores'] + sorted_efficiency = sorted(efficiency_scores.items(), key=lambda x: x[1], reverse=True) + for method, score in sorted_efficiency: + print(f"{method}: {score:.2f}") + + print(f"\n추천사항:") + print("-" * 20) + for rec in benchmark_data['performance_analysis']['recommendations']: + print(f"• {rec}") + + +class PerformanceOptimizer: + """포트 스캔 성능 최적화 도구""" + + @staticmethod + def get_optimal_worker_count(host: str, sample_ports: Optional[List[int]] = None, + max_workers: int = 200) -> int: + """ + 최적의 워커 스레드 수 찾기 + + Args: + host: 대상 호스트 + sample_ports: 테스트용 포트 목록 (없으면 일반 포트 사용) + max_workers: 최대 워커 수 + + Returns: + 최적의 워커 스레드 수 + """ + if sample_ports is None: + sample_ports = get_common_ports()[:10] # 10개 포트로 테스트 + + print(f"최적 워커 수 탐색 중... (최대 {max_workers})") + + worker_counts = [10, 25, 50, 75, 100, 150, 200] + worker_counts = [w for w in worker_counts if w <= max_workers] + + best_time = float('inf') + best_workers = 50 + + for workers in worker_counts: + start_time = time.time() + + with ThreadPoolExecutor(max_workers=workers) as executor: + results = list(executor.map( + lambda p: scan_port_basic(host, p, 0.5), + sample_ports + )) + + elapsed = time.time() - start_time + print(f" 워커 {workers}개: {elapsed:.3f}초") + + if elapsed < best_time: + best_time = elapsed + best_workers = workers + + print(f"최적 워커 수: {best_workers}개 ({best_time:.3f}초)") + return best_workers + + @staticmethod + def auto_optimize_scan_params(host: str) -> Dict[str, Any]: + """ + 자동으로 최적 스캔 파라미터 찾기 + + Args: + host: 대상 호스트 + + Returns: + 최적화된 파라미터 딕셔너리 + """ + print(f"'{host}'에 대한 최적 스캔 파라미터 탐색 중...") + + # 샘플 포트로 테스트 + sample_ports = get_common_ports()[:20] + + # 1. 최적 타임아웃 찾기 + timeouts = [0.1, 0.3, 0.5, 1.0, 2.0] + best_timeout = 0.5 + best_timeout_score = 0 + + print("\n최적 타임아웃 탐색:") + for timeout in timeouts: + start_time = time.time() + success_count = 0 + + for port in sample_ports[:5]: # 5개 포트로 빠른 테스트 + _, is_open, _, _ = scan_port_basic(host, port, timeout) + if is_open: + success_count += 1 + + elapsed = time.time() - start_time + score = (success_count / len(sample_ports[:5])) / elapsed # 성공률/시간 + + print(f" 타임아웃 {timeout}초: 점수 {score:.3f}") + + if score > best_timeout_score: + best_timeout_score = score + best_timeout = timeout + + # 2. 최적 워커 수 찾기 + best_workers = PerformanceOptimizer.get_optimal_worker_count(host, sample_ports[:10]) + + # 3. 논블로킹 vs 블로킹 성능 비교 + print("\n블로킹 vs 논블로킹 성능 비교:") + + # 블로킹 테스트 + start_time = time.time() + blocking_success = 0 + for port in sample_ports[:5]: + _, is_open, _, _ = scan_port_basic(host, port, best_timeout) + if is_open: + blocking_success += 1 + blocking_time = time.time() - start_time + + # 논블로킹 테스트 + start_time = time.time() + nonblocking_success = 0 + for port in sample_ports[:5]: + _, is_open, _, _ = scan_port_nonblocking(host, port, best_timeout) + if is_open: + nonblocking_success += 1 + nonblocking_time = time.time() - start_time + + use_nonblocking = nonblocking_time < blocking_time + + print(f" 블로킹: {blocking_time:.3f}초") + print(f" 논블로킹: {nonblocking_time:.3f}초") + print(f" 권장: {'논블로킹' if use_nonblocking else '블로킹'}") + + optimal_params = { + 'timeout': best_timeout, + 'max_workers': best_workers, + 'use_advanced_options': use_nonblocking, + 'use_adaptive_timeout': True, # 항상 권장 + 'performance_scores': { + 'timeout_score': best_timeout_score, + 'blocking_time': blocking_time, + 'nonblocking_time': nonblocking_time + } + } + + print(f"\n최적화된 파라미터:") + print(f" 타임아웃: {best_timeout}초") + print(f" 워커 수: {best_workers}개") + print(f" 고급 옵션: {'사용' if use_nonblocking else '미사용'}") + print(f" 적응형 타임아웃: 사용") + + return optimal_params + + +def run_performance_benchmark(host: str = "127.0.0.1", port_count: int = 20): + """성능 벤치마크 실행""" + benchmark = PortScanBenchmark() + + # 테스트할 포트 선택 + if host == "127.0.0.1" or host == "localhost": + # 로컬호스트의 경우 일반 포트들 사용 + test_ports = get_common_ports()[:port_count] + else: + # 외부 호스트의 경우 일반적인 웹/DNS 포트들 사용 + test_ports = [21, 22, 23, 25, 53, 80, 110, 143, 443, 993, 995][:port_count] + + # 벤치마크 실행 + results = benchmark.benchmark_scan_methods(host, test_ports, timeout=1.0, iterations=2) + + # 결과 출력 + benchmark.print_benchmark_results(results) + + return results \ No newline at end of file diff --git a/network_monitor/port_scanner.py b/network_monitor/port_scanner.py index 98b8e0d..4acb511 100644 --- a/network_monitor/port_scanner.py +++ b/network_monitor/port_scanner.py @@ -1,9 +1,12 @@ import socket from concurrent.futures import ThreadPoolExecutor import time +import select from .config import DEFAULT_PORT_RANGE, DEFAULT_TIMEOUT +from .socket_options import NonBlockingSocketManager, AdvancedSocketOptions +from .timeout_manager import global_connection_manager, AdaptiveTimeoutManager -def scan_port(host, port, timeout=DEFAULT_TIMEOUT): +def scan_port(host, port, timeout=DEFAULT_TIMEOUT, use_advanced_options=False, use_adaptive_timeout=False): """ 지정된 호스트의 특정 포트가 열려 있는지 확인합니다. @@ -11,10 +14,34 @@ def scan_port(host, port, timeout=DEFAULT_TIMEOUT): host (str): 스캔할 호스트 이름 또는 IP 주소 port (int): 스캔할 포트 번호 timeout (float): 연결 타임아웃 시간(초) + use_advanced_options (bool): 고급 소켓 옵션 사용 여부 + use_adaptive_timeout (bool): 적응형 타임아웃 사용 여부 Returns: - tuple: (port, is_open, service_name) + tuple: (port, is_open, service_name, response_time) """ + # 적응형 타임아웃 사용 시 호스트별 최적 타임아웃 계산 + if use_adaptive_timeout: + adaptive_timeout = global_connection_manager.get_timeout_for_host(host) + actual_timeout = min(timeout, adaptive_timeout) if timeout else adaptive_timeout + else: + actual_timeout = timeout + + if use_advanced_options: + result = scan_port_nonblocking(host, port, actual_timeout) + else: + result = scan_port_basic(host, port, actual_timeout) + + # 적응형 타임아웃 사용 시 결과 기록 + if use_adaptive_timeout: + port, is_open, service_name, response_time = result + global_connection_manager.record_host_response(host, response_time, is_open) + + return result + + +def scan_port_basic(host, port, timeout=DEFAULT_TIMEOUT): + """기본 블로킹 소켓을 사용한 포트 스캔""" sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) @@ -39,7 +66,68 @@ def scan_port(host, port, timeout=DEFAULT_TIMEOUT): finally: sock.close() -def scan_host(host, port_range=DEFAULT_PORT_RANGE, timeout=DEFAULT_TIMEOUT, max_workers=50): + +def scan_port_nonblocking(host, port, timeout=DEFAULT_TIMEOUT): + """논블로킹 소켓을 사용한 고성능 포트 스캔""" + try: + # 고급 소켓 옵션으로 소켓 생성 + sock = AdvancedSocketOptions.create_socket_with_options( + blocking=False, + reuse_addr=True, + nodelay=True + ) + + start_time = time.time() + + # 논블로킹 연결 시도 + try: + sock.connect((host, port)) + # 즉시 연결되는 경우 (보통 localhost) + response_time = time.time() - start_time + service_name = get_service_name(port) + sock.close() + return (port, True, service_name, response_time) + except socket.error as e: + if e.errno not in (socket.errno.EINPROGRESS, socket.errno.EALREADY, socket.errno.EWOULDBLOCK): + # 연결 불가능한 경우 + sock.close() + return (port, False, None, None) + + # select를 사용하여 연결 완료 대기 + ready = select.select([], [sock], [sock], timeout) + + if ready[1] or ready[2]: # 쓰기 가능하거나 에러 발생 + # 연결 상태 확인 + error = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) + response_time = time.time() - start_time + + if error == 0: + # 연결 성공 + service_name = get_service_name(port) + sock.close() + return (port, True, service_name, response_time) + else: + # 연결 실패 + sock.close() + return (port, False, None, None) + else: + # 타임아웃 + sock.close() + return (port, False, None, None) + + except Exception: + return (port, False, None, None) + + +def get_service_name(port): + """포트 번호에 대한 서비스 이름 조회""" + try: + return socket.getservbyport(port) + except (socket.error, OSError): + return "unknown" + +def scan_host(host, port_range=DEFAULT_PORT_RANGE, timeout=DEFAULT_TIMEOUT, max_workers=50, + use_advanced_options=False, use_adaptive_timeout=False): """ 지정된 호스트의 포트 범위를 스캔합니다. @@ -48,6 +136,8 @@ def scan_host(host, port_range=DEFAULT_PORT_RANGE, timeout=DEFAULT_TIMEOUT, max_ port_range (tuple): 스캔할 포트 범위 (시작, 끝) timeout (float): 연결 타임아웃 시간(초) max_workers (int): 동시에 실행할 최대 스레드 수 + use_advanced_options (bool): 고급 소켓 옵션 사용 여부 + use_adaptive_timeout (bool): 적응형 타임아웃 사용 여부 Returns: dict: 포트 스캔 결과를 포함하는 딕셔너리 @@ -56,14 +146,31 @@ def scan_host(host, port_range=DEFAULT_PORT_RANGE, timeout=DEFAULT_TIMEOUT, max_ ports_to_scan = range(start_port, end_port + 1) open_ports = [] - print(f"Scanning {host} for open ports from {start_port} to {end_port}...") + # 스캔 방법 표시 + methods = [] + if use_advanced_options: + methods.append("논블로킹 소켓") + else: + methods.append("기본 소켓") + + if use_adaptive_timeout: + methods.append("적응형 타임아웃") + + scan_method = " + ".join(methods) + print(f"Scanning {host} for open ports from {start_port} to {end_port}... ({scan_method})") + + # 적응형 타임아웃 사용 시 초기 타임아웃 정보 표시 + if use_adaptive_timeout: + initial_timeout = global_connection_manager.get_timeout_for_host(host) + print(f"Initial adaptive timeout for {host}: {initial_timeout:.3f}s") + start_time = time.time() # 스레드 풀을 사용하여 병렬로 포트 스캔 with ThreadPoolExecutor(max_workers=max_workers) as executor: # scan_port 함수에 인자를 전달하여 실행 scan_results = list(executor.map( - lambda p: scan_port(host, p, timeout), + lambda p: scan_port(host, p, timeout, use_advanced_options, use_adaptive_timeout), ports_to_scan )) @@ -79,15 +186,26 @@ def scan_host(host, port_range=DEFAULT_PORT_RANGE, timeout=DEFAULT_TIMEOUT, max_ total_time = time.time() - start_time - return { + # 적응형 타임아웃 사용 시 통계 정보 포함 + result = { 'host': host, 'start_port': start_port, 'end_port': end_port, 'total_ports_scanned': len(ports_to_scan), 'open_ports': open_ports, 'open_port_count': len(open_ports), - 'scan_time': total_time + 'scan_time': total_time, + 'scan_method': scan_method } + + if use_adaptive_timeout: + timeout_stats = global_connection_manager.get_host_manager(host).get_timeout_stats() + result['timeout_stats'] = timeout_stats + print(f"Adaptive timeout stats - Avg response: {timeout_stats['avg_response_time']:.3f}s, " + f"Success rate: {timeout_stats['success_rate']:.2%}, " + f"Final timeout: {timeout_stats['current_timeout']:.3f}s") + + return result def get_common_ports(): """ diff --git a/network_monitor/socket_options.py b/network_monitor/socket_options.py new file mode 100644 index 0000000..eb03ae4 --- /dev/null +++ b/network_monitor/socket_options.py @@ -0,0 +1,351 @@ +import socket +import time +from typing import Dict, Any, Optional + + +class AdvancedSocketOptions: + """고급 소켓 옵션 설정 및 관리 클래스""" + + def __init__(self): + self.options = {} + + @staticmethod + def create_socket_with_options( + family: int = socket.AF_INET, + socket_type: int = socket.SOCK_STREAM, + reuse_addr: bool = True, + keepalive: bool = False, + keepalive_idle: int = 7200, + keepalive_interval: int = 75, + keepalive_probes: int = 9, + nodelay: bool = False, + linger: Optional[tuple] = None, + rcvbuf: Optional[int] = None, + sndbuf: Optional[int] = None, + blocking: bool = True + ) -> socket.socket: + """ + 고급 소켓 옵션이 적용된 소켓 생성 + + Args: + family: 소켓 패밀리 (AF_INET, AF_INET6) + socket_type: 소켓 타입 (SOCK_STREAM, SOCK_DGRAM) + reuse_addr: SO_REUSEADDR 옵션 활성화 + keepalive: SO_KEEPALIVE 옵션 활성화 + keepalive_idle: TCP_KEEPIDLE 값 (초) + keepalive_interval: TCP_KEEPINTVL 값 (초) + keepalive_probes: TCP_KEEPCNT 값 + nodelay: TCP_NODELAY 옵션 (Nagle 알고리즘 비활성화) + linger: SO_LINGER 옵션 (튜플: (enable, timeout)) + rcvbuf: SO_RCVBUF 크기 + sndbuf: SO_SNDBUF 크기 + blocking: 블로킹 모드 설정 + """ + sock = socket.socket(family, socket_type) + + try: + # SO_REUSEADDR: 주소 재사용 허용 + if reuse_addr: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + # SO_KEEPALIVE: TCP 연결 유지 확인 + if keepalive and socket_type == socket.SOCK_STREAM: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + + # TCP Keep-alive 세부 설정 (Linux/Unix) + try: + if hasattr(socket, 'TCP_KEEPIDLE'): + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, keepalive_idle) + if hasattr(socket, 'TCP_KEEPINTVL'): + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, keepalive_interval) + if hasattr(socket, 'TCP_KEEPCNT'): + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, keepalive_probes) + except OSError: + # Windows나 다른 플랫폼에서 지원하지 않는 경우 + pass + + # TCP_NODELAY: Nagle 알고리즘 비활성화 (지연 감소) + if nodelay and socket_type == socket.SOCK_STREAM: + try: + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + except OSError: + pass + + # SO_LINGER: 소켓 닫을 때 대기 설정 + if linger and isinstance(linger, tuple) and len(linger) == 2: + import struct + sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, + struct.pack('ii', int(linger[0]), int(linger[1]))) + + # SO_RCVBUF: 수신 버퍼 크기 + if rcvbuf: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, rcvbuf) + + # SO_SNDBUF: 송신 버퍼 크기 + if sndbuf: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, sndbuf) + + # 블로킹/논블로킹 모드 설정 + sock.setblocking(blocking) + + except Exception as e: + sock.close() + raise Exception(f"소켓 옵션 설정 실패: {e}") + + return sock + + @staticmethod + def get_socket_options(sock: socket.socket) -> Dict[str, Any]: + """소켓의 현재 옵션 상태 조회""" + options = {} + + try: + # 기본 소켓 옵션들 + options['SO_REUSEADDR'] = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) + options['SO_KEEPALIVE'] = sock.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE) + options['SO_RCVBUF'] = sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF) + options['SO_SNDBUF'] = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF) + + # TCP 관련 옵션들 (TCP 소켓인 경우) + if sock.type == socket.SOCK_STREAM: + try: + if hasattr(socket, 'TCP_NODELAY'): + options['TCP_NODELAY'] = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) + if hasattr(socket, 'TCP_KEEPIDLE'): + options['TCP_KEEPIDLE'] = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE) + if hasattr(socket, 'TCP_KEEPINTVL'): + options['TCP_KEEPINTVL'] = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL) + if hasattr(socket, 'TCP_KEEPCNT'): + options['TCP_KEEPCNT'] = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT) + except OSError: + pass + + # 소켓 상태 + options['blocking'] = sock.getblocking() + options['timeout'] = sock.gettimeout() + + except Exception as e: + options['error'] = str(e) + + return options + + @staticmethod + def create_optimized_client_socket( + target_host: str, + target_port: int, + timeout: float = 5.0, + keepalive: bool = True, + nodelay: bool = True + ) -> socket.socket: + """클라이언트용 최적화된 소켓 생성""" + sock = AdvancedSocketOptions.create_socket_with_options( + reuse_addr=True, + keepalive=keepalive, + nodelay=nodelay, + keepalive_idle=600, # 10분 + keepalive_interval=60, # 1분 + keepalive_probes=3 + ) + + sock.settimeout(timeout) + return sock + + @staticmethod + def create_optimized_server_socket( + bind_host: str, + bind_port: int, + backlog: int = 5, + keepalive: bool = True, + nodelay: bool = False + ) -> socket.socket: + """서버용 최적화된 소켓 생성""" + sock = AdvancedSocketOptions.create_socket_with_options( + reuse_addr=True, + keepalive=keepalive, + nodelay=nodelay, + keepalive_idle=7200, # 2시간 + keepalive_interval=75, # 75초 + keepalive_probes=9, + rcvbuf=65536, # 64KB + sndbuf=65536 # 64KB + ) + + sock.bind((bind_host, bind_port)) + sock.listen(backlog) + return sock + + +class NonBlockingSocketManager: + """논블로킹 소켓 관리 클래스""" + + def __init__(self): + self.sockets = {} + + def create_nonblocking_socket( + self, + socket_id: str, + family: int = socket.AF_INET, + socket_type: int = socket.SOCK_STREAM + ) -> socket.socket: + """논블로킹 소켓 생성 및 등록""" + sock = AdvancedSocketOptions.create_socket_with_options( + family=family, + socket_type=socket_type, + blocking=False, + reuse_addr=True + ) + + self.sockets[socket_id] = { + 'socket': sock, + 'created_at': time.time(), + 'last_activity': time.time() + } + + return sock + + def connect_nonblocking( + self, + socket_id: str, + address: tuple, + timeout: float = 5.0 + ) -> bool: + """논블로킹 소켓으로 연결 시도""" + if socket_id not in self.sockets: + raise ValueError(f"Socket {socket_id} not found") + + sock = self.sockets[socket_id]['socket'] + start_time = time.time() + + try: + sock.connect(address) + return True + except socket.error as e: + if e.errno == socket.errno.EINPROGRESS or e.errno == socket.errno.EALREADY: + # 연결이 진행 중 + while time.time() - start_time < timeout: + try: + # 소켓이 쓰기 가능한지 확인 (연결 완료) + import select + ready = select.select([], [sock], [], 0.1) + if ready[1]: + # 연결 상태 확인 + error = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) + if error == 0: + self.sockets[socket_id]['last_activity'] = time.time() + return True + else: + raise socket.error(error, "Connection failed") + except select.error: + break + time.sleep(0.01) + + raise socket.timeout("Connection timeout") + else: + raise + + def send_nonblocking( + self, + socket_id: str, + data: bytes, + timeout: float = 5.0 + ) -> int: + """논블로킹 소켓으로 데이터 전송""" + if socket_id not in self.sockets: + raise ValueError(f"Socket {socket_id} not found") + + sock = self.sockets[socket_id]['socket'] + start_time = time.time() + total_sent = 0 + + while total_sent < len(data) and time.time() - start_time < timeout: + try: + sent = sock.send(data[total_sent:]) + if sent == 0: + raise socket.error("Socket connection broken") + total_sent += sent + self.sockets[socket_id]['last_activity'] = time.time() + except socket.error as e: + if e.errno == socket.errno.EAGAIN or e.errno == socket.errno.EWOULDBLOCK: + # 송신 버퍼가 가득참, 잠시 대기 + time.sleep(0.01) + continue + else: + raise + + if total_sent < len(data): + raise socket.timeout("Send timeout") + + return total_sent + + def recv_nonblocking( + self, + socket_id: str, + buffer_size: int = 1024, + timeout: float = 5.0 + ) -> bytes: + """논블로킹 소켓으로 데이터 수신""" + if socket_id not in self.sockets: + raise ValueError(f"Socket {socket_id} not found") + + sock = self.sockets[socket_id]['socket'] + start_time = time.time() + + while time.time() - start_time < timeout: + try: + data = sock.recv(buffer_size) + if data: + self.sockets[socket_id]['last_activity'] = time.time() + return data + else: + raise socket.error("Socket connection closed") + except socket.error as e: + if e.errno == socket.errno.EAGAIN or e.errno == socket.errno.EWOULDBLOCK: + # 수신할 데이터가 없음, 잠시 대기 + time.sleep(0.01) + continue + else: + raise + + raise socket.timeout("Receive timeout") + + def close_socket(self, socket_id: str): + """소켓 닫기 및 등록 해제""" + if socket_id in self.sockets: + self.sockets[socket_id]['socket'].close() + del self.sockets[socket_id] + + def cleanup_inactive_sockets(self, max_idle_time: float = 300.0): + """비활성 소켓 정리 (5분 이상 비활성)""" + current_time = time.time() + inactive_sockets = [] + + for socket_id, info in self.sockets.items(): + if current_time - info['last_activity'] > max_idle_time: + inactive_sockets.append(socket_id) + + for socket_id in inactive_sockets: + self.close_socket(socket_id) + + return len(inactive_sockets) + + +# 편의 함수들 +def create_tcp_server_socket(host: str, port: int, **options) -> socket.socket: + """TCP 서버 소켓 생성 편의 함수""" + return AdvancedSocketOptions.create_optimized_server_socket(host, port, **options) + + +def create_tcp_client_socket(host: str, port: int, **options) -> socket.socket: + """TCP 클라이언트 소켓 생성 편의 함수""" + return AdvancedSocketOptions.create_optimized_client_socket(host, port, **options) + + +def create_udp_socket(host: str = '', port: int = 0, **options) -> socket.socket: + """UDP 소켓 생성 편의 함수""" + sock = AdvancedSocketOptions.create_socket_with_options( + socket_type=socket.SOCK_DGRAM, + **options + ) + if host or port: + sock.bind((host, port)) + return sock \ No newline at end of file diff --git a/network_monitor/tcp_server.py b/network_monitor/tcp_server.py index 69bf29f..8cac63f 100644 --- a/network_monitor/tcp_server.py +++ b/network_monitor/tcp_server.py @@ -2,22 +2,34 @@ import threading import time from datetime import datetime +from .socket_options import AdvancedSocketOptions class TCPEchoServer: - def __init__(self, host='localhost', port=8080): + def __init__(self, host='localhost', port=8080, use_advanced_options=True): self.host = host self.port = port self.socket = None self.running = False self.clients = [] + self.use_advanced_options = use_advanced_options def start_single_client_server(self): """단일 클라이언트 TCP 에코 서버 시작""" try: - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.socket.bind((self.host, self.port)) - self.socket.listen(1) + if self.use_advanced_options: + self.socket = AdvancedSocketOptions.create_optimized_server_socket( + self.host, + self.port, + backlog=1, + keepalive=True, + nodelay=True + ) + print("Advanced socket options enabled: SO_REUSEADDR, SO_KEEPALIVE, TCP_NODELAY") + else: + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.bind((self.host, self.port)) + self.socket.listen(1) self.running = True print(f"TCP Echo Server (Single Client) started on {self.host}:{self.port}") @@ -43,10 +55,20 @@ def start_single_client_server(self): def start_multi_client_server(self): """멀티 클라이언트 TCP 에코 서버 시작""" try: - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.socket.bind((self.host, self.port)) - self.socket.listen(5) + if self.use_advanced_options: + self.socket = AdvancedSocketOptions.create_optimized_server_socket( + self.host, + self.port, + backlog=5, + keepalive=True, + nodelay=False # 멀티 클라이언트에서는 처리량 우선 + ) + print("Advanced socket options enabled: SO_REUSEADDR, SO_KEEPALIVE") + else: + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.bind((self.host, self.port)) + self.socket.listen(5) self.running = True print(f"TCP Echo Server (Multi Client) started on {self.host}:{self.port}") @@ -79,6 +101,13 @@ def start_multi_client_server(self): def _handle_client(self, client_socket, client_address): """클라이언트 연결 처리""" try: + # 클라이언트 소켓에도 Keep-alive 설정 (서버가 고급 옵션 사용 시) + if self.use_advanced_options: + client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + # 소켓 옵션 정보 출력 + options = AdvancedSocketOptions.get_socket_options(client_socket) + print(f"Client {client_address} socket options: SO_KEEPALIVE={options.get('SO_KEEPALIVE', 'N/A')}") + while self.running: data = client_socket.recv(1024) if not data: @@ -105,9 +134,9 @@ def stop(self): self.socket.close() print("Server stopped") -def run_tcp_echo_server(host='localhost', port=8080, multi_client=False): +def run_tcp_echo_server(host='localhost', port=8080, multi_client=False, advanced_options=False): """TCP 에코 서버 실행""" - server = TCPEchoServer(host, port) + server = TCPEchoServer(host, port, use_advanced_options=advanced_options) try: if multi_client: @@ -125,7 +154,8 @@ def run_tcp_echo_server(host='localhost', port=8080, multi_client=False): parser.add_argument('--host', default='localhost', help='Host to bind to') parser.add_argument('--port', type=int, default=8080, help='Port to bind to') parser.add_argument('--multi', action='store_true', help='Enable multi-client support') + parser.add_argument('--advanced', action='store_true', help='Use advanced socket options (SO_KEEPALIVE, etc.)') args = parser.parse_args() - run_tcp_echo_server(args.host, args.port, args.multi) \ No newline at end of file + run_tcp_echo_server(args.host, args.port, args.multi, args.advanced) \ No newline at end of file diff --git a/network_monitor/timeout_manager.py b/network_monitor/timeout_manager.py new file mode 100644 index 0000000..04d9d63 --- /dev/null +++ b/network_monitor/timeout_manager.py @@ -0,0 +1,256 @@ +import time +import signal +import threading +from typing import Optional, Callable, Any +from contextlib import contextmanager + + +class TimeoutError(Exception): + """타임아웃 발생 시 발생하는 예외""" + pass + + +class PreciseTimeoutManager: + """정밀한 타임아웃 제어를 위한 매니저 클래스""" + + def __init__(self): + self.active_timeouts = {} + self.timeout_counter = 0 + self.lock = threading.Lock() + + @contextmanager + def timeout(self, seconds: float, error_message: str = "Operation timed out"): + """ + 정밀한 타임아웃 컨텍스트 매니저 + + Args: + seconds: 타임아웃 시간 (초, 소수점 지원) + error_message: 타임아웃 시 에러 메시지 + """ + start_time = time.time() + timeout_id = None + + def timeout_handler(): + elapsed = time.time() - start_time + if elapsed >= seconds: + raise TimeoutError(f"{error_message} (after {elapsed:.3f}s)") + + try: + with self.lock: + self.timeout_counter += 1 + timeout_id = self.timeout_counter + self.active_timeouts[timeout_id] = { + 'start_time': start_time, + 'timeout_seconds': seconds, + 'handler': timeout_handler + } + + yield timeout_handler + + finally: + if timeout_id and timeout_id in self.active_timeouts: + with self.lock: + del self.active_timeouts[timeout_id] + + def check_timeout(self, timeout_id: int) -> bool: + """특정 타임아웃 ID의 상태 확인""" + with self.lock: + if timeout_id not in self.active_timeouts: + return False + + timeout_info = self.active_timeouts[timeout_id] + elapsed = time.time() - timeout_info['start_time'] + return elapsed >= timeout_info['timeout_seconds'] + + def get_remaining_time(self, timeout_id: int) -> float: + """특정 타임아웃 ID의 남은 시간 반환""" + with self.lock: + if timeout_id not in self.active_timeouts: + return 0.0 + + timeout_info = self.active_timeouts[timeout_id] + elapsed = time.time() - timeout_info['start_time'] + remaining = timeout_info['timeout_seconds'] - elapsed + return max(0.0, remaining) + + +class AdaptiveTimeoutManager: + """적응형 타임아웃 매니저 - 네트워크 상태에 따라 타임아웃 조정""" + + def __init__(self, base_timeout: float = 5.0, min_timeout: float = 0.1, max_timeout: float = 30.0): + self.base_timeout = base_timeout + self.min_timeout = min_timeout + self.max_timeout = max_timeout + self.response_times = [] + self.success_rate = 1.0 + self.lock = threading.Lock() + + def record_response(self, response_time: Optional[float], success: bool): + """응답 시간과 성공 여부 기록""" + with self.lock: + if response_time is not None: + self.response_times.append(response_time) + # 최근 100개 응답만 유지 + if len(self.response_times) > 100: + self.response_times.pop(0) + + # 성공률 계산 (최근 20회 기준) + recent_count = min(20, len(self.response_times)) + if recent_count > 0: + recent_successes = sum(1 for _ in range(recent_count) if success) + self.success_rate = recent_successes / recent_count + + def get_adaptive_timeout(self, target_host: str = "") -> float: + """적응형 타임아웃 값 계산""" + with self.lock: + if not self.response_times: + return self.base_timeout + + # 평균 응답 시간 계산 + avg_response_time = sum(self.response_times) / len(self.response_times) + + # 95퍼센타일 응답 시간 계산 + sorted_times = sorted(self.response_times) + p95_index = int(len(sorted_times) * 0.95) + p95_response_time = sorted_times[p95_index] if sorted_times else avg_response_time + + # 적응형 타임아웃 계산 + # 성공률이 낮으면 타임아웃을 늘리고, 높으면 줄임 + success_factor = 2.0 - self.success_rate # 0.5 ~ 2.0 범위 + adaptive_timeout = p95_response_time * 3 * success_factor + + # 최소/최대 범위 내로 제한 + adaptive_timeout = max(self.min_timeout, min(self.max_timeout, adaptive_timeout)) + + return adaptive_timeout + + def get_timeout_stats(self) -> dict: + """타임아웃 통계 정보 반환""" + with self.lock: + if not self.response_times: + return { + 'count': 0, + 'avg_response_time': 0.0, + 'success_rate': self.success_rate, + 'current_timeout': self.base_timeout + } + + return { + 'count': len(self.response_times), + 'avg_response_time': sum(self.response_times) / len(self.response_times), + 'min_response_time': min(self.response_times), + 'max_response_time': max(self.response_times), + 'success_rate': self.success_rate, + 'current_timeout': self.get_adaptive_timeout() + } + + +class ConnectionTimeoutManager: + """연결별 타임아웃 관리""" + + def __init__(self): + self.connection_timeouts = {} + self.host_managers = {} + self.lock = threading.Lock() + + def get_host_manager(self, host: str) -> AdaptiveTimeoutManager: + """호스트별 적응형 타임아웃 매니저 반환""" + with self.lock: + if host not in self.host_managers: + self.host_managers[host] = AdaptiveTimeoutManager() + return self.host_managers[host] + + def get_timeout_for_host(self, host: str) -> float: + """특정 호스트에 대한 적응형 타임아웃 반환""" + manager = self.get_host_manager(host) + return manager.get_adaptive_timeout(host) + + def record_host_response(self, host: str, response_time: Optional[float], success: bool): + """호스트별 응답 기록""" + manager = self.get_host_manager(host) + manager.record_response(response_time, success) + + def get_all_host_stats(self) -> dict: + """모든 호스트의 타임아웃 통계 반환""" + with self.lock: + stats = {} + for host, manager in self.host_managers.items(): + stats[host] = manager.get_timeout_stats() + return stats + + +# 전역 인스턴스 +global_timeout_manager = PreciseTimeoutManager() +global_connection_manager = ConnectionTimeoutManager() + + +def with_timeout(seconds: float, error_message: str = "Operation timed out"): + """ + 함수 데코레이터로 타임아웃 적용 + + Args: + seconds: 타임아웃 시간 (초) + error_message: 타임아웃 시 에러 메시지 + """ + def decorator(func: Callable) -> Callable: + def wrapper(*args, **kwargs) -> Any: + with global_timeout_manager.timeout(seconds, error_message): + return func(*args, **kwargs) + return wrapper + return decorator + + +def sleep_with_timeout_check(seconds: float, check_interval: float = 0.1, timeout_checker: Optional[Callable] = None): + """ + 타임아웃 체크가 가능한 sleep 함수 + + Args: + seconds: 대기 시간 + check_interval: 타임아웃 체크 간격 + timeout_checker: 타임아웃 체크 함수 + """ + elapsed = 0.0 + while elapsed < seconds: + if timeout_checker and timeout_checker(): + raise TimeoutError("Sleep interrupted by timeout") + + sleep_time = min(check_interval, seconds - elapsed) + time.sleep(sleep_time) + elapsed += sleep_time + + +class ProgressiveTimeoutManager: + """점진적 타임아웃 매니저 - 시도 횟수에 따라 타임아웃 증가""" + + def __init__(self, initial_timeout: float = 1.0, max_timeout: float = 10.0, backoff_factor: float = 1.5): + self.initial_timeout = initial_timeout + self.max_timeout = max_timeout + self.backoff_factor = backoff_factor + self.attempt_counts = {} + self.lock = threading.Lock() + + def get_timeout_for_attempt(self, key: str, attempt: int = 0) -> float: + """시도 횟수에 따른 점진적 타임아웃 계산""" + timeout = self.initial_timeout * (self.backoff_factor ** attempt) + return min(timeout, self.max_timeout) + + def record_attempt(self, key: str, success: bool): + """시도 기록""" + with self.lock: + if key not in self.attempt_counts: + self.attempt_counts[key] = {'attempts': 0, 'successes': 0} + + self.attempt_counts[key]['attempts'] += 1 + if success: + self.attempt_counts[key]['successes'] += 1 + # 성공 시 카운터 리셋 + self.attempt_counts[key]['attempts'] = 0 + + def get_next_timeout(self, key: str) -> float: + """다음 시도를 위한 타임아웃 반환""" + with self.lock: + if key not in self.attempt_counts: + return self.initial_timeout + + attempts = self.attempt_counts[key]['attempts'] + return self.get_timeout_for_attempt(key, attempts) \ No newline at end of file diff --git a/templates/index.html b/templates/index.html index a84f7a7..836946c 100644 --- a/templates/index.html +++ b/templates/index.html @@ -1,3 +1,4 @@ +
@@ -126,7 +127,23 @@Monitor running Docker services and system status
+ + + +View current monitoring settings and status
+ + + +