A modern szoftverfejlesztés világában minden programozó találkozik olyan helyzetekkel, amikor a kód egyszerűen "megáll" és nem tudja folytatni a munkáját. Ez a jelenség nemcsak frusztráló, hanem komoly teljesítményproblémákat is okozhat az alkalmazásokban. A többszálú programozás és az összetett rendszerek térnyerésével ez a probléma egyre gyakoribbá válik.
A holtpont egy olyan állapot, amikor két vagy több folyamat kölcsönösen várakozik egymásra, miközben mindegyik olyan erőforrást tart magánál, amelyre a másiknak szüksége van. Ez a definíció egyszerűnek tűnik, de a valóságban sokféle formában megjelenhet, és számos különböző megközelítést igényel a megoldáshoz. A probléma megértése kulcsfontosságú minden fejlesztő számára, aki stabil és megbízható alkalmazásokat szeretne készíteni.
Az alábbiakban részletesen megvizsgáljuk ezt a komplex témát, bemutatjuk a leggyakoribb forgatókönyveket, és praktikus megoldásokat kínálunk. Megtudhatod, hogyan ismerheted fel a jeleket, milyen megelőzési stratégiákat alkalmazhatsz, és mit tehetsz, ha már bekövetkezett a probléma.
A holtpont alapjai és mechanizmusa
A programozásban fellépő holtpont megértéséhez először azt kell tisztáznunk, hogy milyen feltételek szükségesek a kialakulásához. Négy alapvető feltétel létezik, amelyek mindegyikének egyidejűleg jelen kell lennie:
- Kölcsönös kizárás: Az erőforrások nem oszthatók meg, egyszerre csak egy folyamat használhatja őket
- Tartás és várakozás: A folyamatok megtartják a már megszerzett erőforrásokat, miközben újabbakra várnak
- Nem megszakítható erőforrás-használat: Az erőforrásokat nem lehet erőszakkal elvenni a folyamatoktól
- Körkörös várakozás: A folyamatok olyan láncot alkotnak, ahol mindegyik a következőre vár
Hogyan alakul ki a holtpont?
A klasszikus példa két bankszámlás tranzakció egyidejű végrehajtása. Az első tranzakció zárolja az A számlát, majd megpróbálja zárolni a B számlát. Ugyanebben az időben a második tranzakció zárolja a B számlát, és megpróbálja zárolni az A számlát. Mindkét folyamat várakozik a másikra, és egyikük sem tud továbblépni.
# Problémás kód példa
import threading
lock_a = threading.Lock()
lock_b = threading.Lock()
def transaction_1():
with lock_a:
print("Transaction 1: A számla zárolva")
time.sleep(1) # Szimulált munka
with lock_b:
print("Transaction 1: B számla zárolva")
def transaction_2():
with lock_b:
print("Transaction 2: B számla zárolva")
time.sleep(1) # Szimulált munka
with lock_a:
print("Transaction 2: A számla zárolva")
Ez a kód szinte garantáltan holtpontot fog okozni, ha mindkét függvényt párhuzamosan futtatjuk.
"A holtpont nem egy hiba, hanem egy tervezési probléma. A legjobb megoldás a megelőzés, nem a kezelés."
Holtpont típusai és megjelenési formái
Resource Deadlock (Erőforrás-holtpont)
Ez a leggyakoribb típus, amikor folyamatok fizikai vagy logikai erőforrásokért versenyeznek. Ilyenek lehetnek:
- Fájlok és adatbázis-zárak
- Memóriaterületek
- Hálózati kapcsolatok
- Hardveres eszközök
Communication Deadlock (Kommunikációs holtpont)
Ez akkor következik be, amikor folyamatok üzenetváltásra várnak egymástól. Tipikus esetei:
- Szinkron üzenetküldés során
- Pipeline-ok esetében
- Request-response mintáknál
Livelock és Starvation
Bár nem klasszikus holtpontok, de hasonló problémákat okoznak:
- Livelock: A folyamatok aktívak maradnak, de nem haladnak előre
- Starvation: Egy folyamat soha nem jut hozzá a szükséges erőforrásokhoz
| Holtpont típusa | Jellemzők | Tipikus környezet |
|---|---|---|
| Resource Deadlock | Fizikai erőforrások zárolása | Adatbázisok, fájlrendszerek |
| Communication Deadlock | Üzenetváltás blokkolása | Mikroszolgáltatások, RPC |
| Livelock | Aktív, de eredménytelen folyamatok | Konfliktus-feloldó algoritmusok |
| Starvation | Egyenlőtlen erőforrás-elosztás | Prioritásos ütemezők |
Felismerés és diagnosztika
Korai figyelmeztető jelek
A holtpont kialakulását gyakran megelőzik bizonyos jelek:
- Növekvő válaszidők: Az alkalmazás egyre lassabban reagál
- CPU-használat csökkenése: A processzorok várakoznak, nem dolgoznak
- Memóriahasználat stagnálása: Az alkalmazás nem halad előre
- Log üzenetek hiánya: A folyamatok nem írnak újabb bejegyzéseket
Monitoring és eszközök
Modern alkalmazásokban számos eszköz segíthet a holtpontok felismerésében:
import threading
import time
from collections import defaultdict
class DeadlockDetector:
def __init__(self):
self.waiting_graph = defaultdict(set)
self.lock = threading.Lock()
def add_wait_relationship(self, waiter, holder):
with self.lock:
self.waiting_graph[waiter].add(holder)
def detect_cycle(self):
visited = set()
rec_stack = set()
def dfs(node):
visited.add(node)
rec_stack.add(node)
for neighbor in self.waiting_graph[node]:
if neighbor not in visited:
if dfs(neighbor):
return True
elif neighbor in rec_stack:
return True
rec_stack.remove(node)
return False
for node in self.waiting_graph:
if node not in visited:
if dfs(node):
return True
return False
Profiling és telemetria
A modern alkalmazásokban elengedhetetlen a megfelelő telemetria beépítése:
- Lock contention metrikák: Mennyi időt töltenek a szálak várakozással
- Queue depth monitoring: A várakozó kérések száma
- Thread state tracking: Milyen állapotban vannak a szálak
"A legjobb holtpont-detektáló algoritmus az, amely soha nem fut le, mert a holtpont nem alakul ki."
Megelőzési stratégiák
Lock Ordering (Zár-rendezés)
Az egyik leghatékonyabb megelőzési módszer a zárak konzisztens sorrendben való megszerzése:
import threading
from contextlib import contextmanager
class OrderedLock:
def __init__(self, lock_id):
self.lock = threading.Lock()
self.lock_id = lock_id
def __lt__(self, other):
return self.lock_id < other.lock_id
@contextmanager
def acquire_locks(*locks):
"""Zárak megszerzése konzisztens sorrendben"""
sorted_locks = sorted(locks)
acquired_locks = []
try:
for lock in sorted_locks:
lock.lock.acquire()
acquired_locks.append(lock)
yield
finally:
for lock in reversed(acquired_locks):
lock.lock.release()
# Használat
lock_a = OrderedLock(1)
lock_b = OrderedLock(2)
def safe_transaction():
with acquire_locks(lock_a, lock_b):
# Biztonságos kód végrehajtása
print("Tranzakció végrehajtva")
Timeout mechanizmusok
A várakozási idő korlátozása megakadályozza a végtelen várakozást:
import threading
import time
def safe_acquire_with_timeout(lock, timeout=5.0):
"""Zár megszerzése időkorláttal"""
if lock.acquire(timeout=timeout):
try:
yield
finally:
lock.release()
else:
raise TimeoutError("Nem sikerült megszerezni a zárat időben")
# Használat
def transaction_with_timeout():
try:
with safe_acquire_with_timeout(lock_a):
with safe_acquire_with_timeout(lock_b, timeout=2.0):
# Biztonságos kód
pass
except TimeoutError:
print("Timeout miatt megszakítva - lehetséges holtpont")
Resource Allocation Graph algoritmus
Ez egy proaktív megközelítés, amely megelőzi a holtpont kialakulását:
- Wait-Die: Az idősebb folyamatok várhatnak, a fiatalabbak meghalnak
- Wound-Wait: Az idősebb folyamatok megsebesítik a fiatalabbakat
- Banker's Algorithm: Biztonságos állapotok fenntartása
| Stratégia | Előnyök | Hátrányok | Alkalmazási terület |
|---|---|---|---|
| Lock Ordering | Egyszerű, hatékony | Globális sorrend szükséges | Kis-közepes rendszerek |
| Timeout | Rugalmas, gyors felépülés | Hamis pozitívok | Valós idejű rendszerek |
| Resource Graph | Proaktív, elméleti garancia | Komplex implementáció | Kritikus rendszerek |
Feloldási technikák
Deadlock Breaking
Ha már bekövetkezett a holtpont, több stratégia alkalmazható:
Victim Selection (Áldozat kiválasztása)
class DeadlockResolver:
def __init__(self):
self.process_priorities = {}
self.resource_costs = {}
def select_victim(self, deadlocked_processes):
"""Legkevesebb költséggel járó folyamat kiválasztása"""
min_cost = float('inf')
victim = None
for process in deadlocked_processes:
cost = self.calculate_rollback_cost(process)
if cost < min_cost:
min_cost = cost
victim = process
return victim
def calculate_rollback_cost(self, process):
"""Visszaállítás költségének kiszámítása"""
return (
self.process_priorities.get(process, 1) *
self.resource_costs.get(process, 1)
)
Process Termination
A legdrasztikusabb, de néha szükséges megoldás:
import signal
import threading
class ProcessManager:
def __init__(self):
self.active_processes = {}
self.lock = threading.Lock()
def terminate_process(self, process_id, reason="Deadlock detected"):
"""Folyamat biztonságos leállítása"""
with self.lock:
if process_id in self.active_processes:
process = self.active_processes[process_id]
# Cleanup erőforrások
self.cleanup_resources(process)
# Folyamat leállítása
process.terminate()
# Log bejegyzés
self.log_termination(process_id, reason)
del self.active_processes[process_id]
def cleanup_resources(self, process):
"""Folyamat által használt erőforrások felszabadítása"""
for resource in process.held_resources:
resource.release()
Resource Preemption
Erőforrások erőszakos elvétele a folyamatoktól:
class ResourceManager:
def __init__(self):
self.resources = {}
self.allocations = defaultdict(list)
def preempt_resource(self, resource_id, from_process, to_process):
"""Erőforrás áthelyezése folyamatok között"""
if resource_id in self.allocations[from_process]:
# Állapot mentése
state = self.save_process_state(from_process)
# Erőforrás elvétele
self.allocations[from_process].remove(resource_id)
# Új tulajdonos beállítása
self.allocations[to_process].append(resource_id)
# Rollback információ tárolása
self.store_rollback_info(from_process, state)
return True
return False
"A holtpont feloldása mindig kompromisszumokkal jár. A cél a minimális károkozás mellett a rendszer működőképességének helyreállítása."
Gyakorlati implementációs minták
Producer-Consumer minta holtpontmentes megvalósítása
import threading
import queue
import time
class SafeProducerConsumer:
def __init__(self, buffer_size=10):
self.buffer = queue.Queue(maxsize=buffer_size)
self.running = threading.Event()
self.running.set()
def producer(self, producer_id):
"""Biztonságos producer implementáció"""
while self.running.is_set():
try:
item = f"Item-{producer_id}-{time.time()}"
# Timeout használata a holtpont elkerülésére
self.buffer.put(item, timeout=1.0)
print(f"Producer {producer_id}: {item} hozzáadva")
time.sleep(0.1)
except queue.Full:
print(f"Producer {producer_id}: Buffer tele, várakozás...")
except Exception as e:
print(f"Producer {producer_id}: Hiba - {e}")
break
def consumer(self, consumer_id):
"""Biztonságos consumer implementáció"""
while self.running.is_set():
try:
item = self.buffer.get(timeout=1.0)
print(f"Consumer {consumer_id}: {item} feldolgozva")
self.buffer.task_done()
time.sleep(0.2)
except queue.Empty:
print(f"Consumer {consumer_id}: Buffer üres, várakozás...")
except Exception as e:
print(f"Consumer {consumer_id}: Hiba - {e}")
break
def shutdown(self):
"""Biztonságos leállítás"""
self.running.clear()
# Buffer kiürítése
while not self.buffer.empty():
try:
self.buffer.get_nowait()
self.buffer.task_done()
except queue.Empty:
break
Database Connection Pool
Adatbázis-kapcsolatok kezelése holtpontmentes módon:
import threading
import time
from contextlib import contextmanager
class SafeConnectionPool:
def __init__(self, max_connections=10):
self.max_connections = max_connections
self.available_connections = queue.Queue(maxsize=max_connections)
self.active_connections = set()
self.lock = threading.RLock()
# Kapcsolatok inicializálása
for i in range(max_connections):
conn = self.create_connection(f"conn_{i}")
self.available_connections.put(conn)
@contextmanager
def get_connection(self, timeout=5.0):
"""Kapcsolat megszerzése timeout-tal"""
connection = None
try:
connection = self.available_connections.get(timeout=timeout)
with self.lock:
self.active_connections.add(connection)
yield connection
except queue.Empty:
raise TimeoutError("Nem sikerült kapcsolatot szerezni")
finally:
if connection:
with self.lock:
self.active_connections.discard(connection)
self.available_connections.put(connection)
def create_connection(self, conn_id):
"""Mock kapcsolat létrehozása"""
return {"id": conn_id, "created_at": time.time()}
def health_check(self):
"""Pool állapotának ellenőrzése"""
with self.lock:
return {
"available": self.available_connections.qsize(),
"active": len(self.active_connections),
"total": self.max_connections
}
Async/Await mintázatok
Modern aszinkron programozásban:
import asyncio
import aiohttp
import logging
class AsyncResourceManager:
def __init__(self):
self.semaphores = {}
self.locks = {}
async def acquire_resource(self, resource_id, max_concurrent=5, timeout=10.0):
"""Aszinkron erőforrás-megszerzés"""
if resource_id not in self.semaphores:
self.semaphores[resource_id] = asyncio.Semaphore(max_concurrent)
semaphore = self.semaphores[resource_id]
try:
await asyncio.wait_for(semaphore.acquire(), timeout=timeout)
return True
except asyncio.TimeoutError:
logging.warning(f"Timeout acquiring resource {resource_id}")
return False
async def release_resource(self, resource_id):
"""Erőforrás felszabadítása"""
if resource_id in self.semaphores:
self.semaphores[resource_id].release()
async def batch_operation(self, operations, max_concurrent=10):
"""Párhuzamos műveletek végrehajtása korlátozással"""
semaphore = asyncio.Semaphore(max_concurrent)
async def limited_operation(operation):
async with semaphore:
return await operation()
tasks = [limited_operation(op) for op in operations]
return await asyncio.gather(*tasks, return_exceptions=True)
"Az aszinkron programozás új lehetőségeket teremt a holtpontok elkerülésére, de új kihívásokat is hoz magával."
Tesztelési stratégiák
Chaos Engineering
Szándékos hibák bevezetése a rendszer ellenálló-képességének tesztelésére:
import random
import threading
import time
class ChaosDeadlockTester:
def __init__(self, system):
self.system = system
self.chaos_enabled = False
self.chaos_thread = None
def start_chaos(self):
"""Káosz tesztelés indítása"""
self.chaos_enabled = True
self.chaos_thread = threading.Thread(target=self._chaos_loop)
self.chaos_thread.start()
def stop_chaos(self):
"""Káosz tesztelés leállítása"""
self.chaos_enabled = False
if self.chaos_thread:
self.chaos_thread.join()
def _chaos_loop(self):
"""Káosz műveletek végrehajtása"""
while self.chaos_enabled:
chaos_type = random.choice([
'delay_operation',
'hold_resource_longer',
'random_failure',
'resource_contention'
])
getattr(self, chaos_type)()
time.sleep(random.uniform(1, 5))
def delay_operation(self):
"""Műveletek késleltetése"""
delay = random.uniform(0.1, 2.0)
time.sleep(delay)
def hold_resource_longer(self):
"""Erőforrások hosszabb ideig tartása"""
# Szimulált hosszabb erőforrás-használat
pass
def random_failure(self):
"""Véletlenszerű hibák"""
if random.random() < 0.1: # 10% esély
raise Exception("Chaos induced failure")
def resource_contention(self):
"""Erőforrás-verseny fokozása"""
# Több szál indítása párhuzamos erőforrás-hozzáférésre
pass
Load Testing
Terheléses tesztelés holtpontok felderítésére:
import concurrent.futures
import time
import statistics
class DeadlockLoadTester:
def __init__(self, target_system):
self.target_system = target_system
self.results = []
def run_load_test(self, num_threads=50, duration=60):
"""Terheléses teszt futtatása"""
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = []
# Munkaterhelés indítása
while time.time() - start_time < duration:
future = executor.submit(self._worker_task)
futures.append(future)
time.sleep(0.01) # Kis késleltetés a túlterhelés elkerülésére
# Eredmények gyűjtése
for future in concurrent.futures.as_completed(futures):
try:
result = future.result(timeout=10)
self.results.append(result)
except concurrent.futures.TimeoutError:
self.results.append({"status": "timeout", "error": "Task timeout"})
except Exception as e:
self.results.append({"status": "error", "error": str(e)})
def _worker_task(self):
"""Munkaszál feladat"""
start_time = time.time()
try:
# Szimulált munkaterhelés
self.target_system.perform_operation()
end_time = time.time()
return {
"status": "success",
"duration": end_time - start_time,
"thread_id": threading.current_thread().ident
}
except Exception as e:
return {
"status": "error",
"error": str(e),
"thread_id": threading.current_thread().ident
}
def analyze_results(self):
"""Eredmények elemzése"""
successful = [r for r in self.results if r["status"] == "success"]
errors = [r for r in self.results if r["status"] == "error"]
timeouts = [r for r in self.results if r["status"] == "timeout"]
if successful:
durations = [r["duration"] for r in successful]
avg_duration = statistics.mean(durations)
median_duration = statistics.median(durations)
else:
avg_duration = median_duration = 0
return {
"total_operations": len(self.results),
"successful": len(successful),
"errors": len(errors),
"timeouts": len(timeouts),
"success_rate": len(successful) / len(self.results) if self.results else 0,
"avg_duration": avg_duration,
"median_duration": median_duration,
"error_details": errors[:10] # Első 10 hiba részletei
}
"A tesztelés nem garantálja a holtpontok hiányát, de jelentősen csökkenti a kockázatot."
Monitoring és riasztási rendszerek
Valós idejű monitoring
import threading
import time
import json
from collections import defaultdict, deque
from datetime import datetime
class DeadlockMonitor:
def __init__(self, alert_threshold=5.0):
self.alert_threshold = alert_threshold
self.metrics = defaultdict(deque)
self.alerts = []
self.monitoring = False
self.lock = threading.Lock()
def start_monitoring(self):
"""Monitoring indítása"""
self.monitoring = True
monitor_thread = threading.Thread(target=self._monitor_loop)
monitor_thread.daemon = True
monitor_thread.start()
def stop_monitoring(self):
"""Monitoring leállítása"""
self.monitoring = False
def record_wait_time(self, resource_id, wait_time):
"""Várakozási idő rögzítése"""
with self.lock:
self.metrics[f"wait_time_{resource_id}"].append({
"timestamp": datetime.now(),
"value": wait_time
})
# Régi adatok törlése (csak utolsó 100 mérés)
if len(self.metrics[f"wait_time_{resource_id}"]) > 100:
self.metrics[f"wait_time_{resource_id}"].popleft()
def record_lock_contention(self, lock_id, contention_level):
"""Lock verseny szintjének rögzítése"""
with self.lock:
self.metrics[f"contention_{lock_id}"].append({
"timestamp": datetime.now(),
"value": contention_level
})
def _monitor_loop(self):
"""Monitoring hurok"""
while self.monitoring:
self._check_for_anomalies()
time.sleep(1.0) # Ellenőrzés másodpercenként
def _check_for_anomalies(self):
"""Rendellenességek keresése"""
with self.lock:
for metric_name, values in self.metrics.items():
if not values:
continue
recent_values = [v["value"] for v in list(values)[-10:]]
if recent_values:
avg_value = sum(recent_values) / len(recent_values)
if avg_value > self.alert_threshold:
alert = {
"timestamp": datetime.now(),
"metric": metric_name,
"value": avg_value,
"threshold": self.alert_threshold,
"severity": "HIGH" if avg_value > self.alert_threshold * 2 else "MEDIUM"
}
self.alerts.append(alert)
self._send_alert(alert)
def _send_alert(self, alert):
"""Riasztás küldése"""
print(f"ALERT: {alert['severity']} - {alert['metric']} = {alert['value']:.2f}")
# Itt lehet email, webhook, stb. küldés
def get_metrics_report(self):
"""Metrikák jelentés generálása"""
with self.lock:
report = {
"timestamp": datetime.now().isoformat(),
"metrics": {},
"recent_alerts": self.alerts[-10:] if self.alerts else []
}
for metric_name, values in self.metrics.items():
if values:
recent_values = [v["value"] for v in list(values)[-10:]]
report["metrics"][metric_name] = {
"current": recent_values[-1] if recent_values else 0,
"average": sum(recent_values) / len(recent_values),
"max": max(recent_values),
"min": min(recent_values),
"count": len(values)
}
return report
Dashboard és vizualizáció
import json
from http.server import HTTPServer, BaseHTTPRequestHandler
class MonitoringDashboard:
def __init__(self, monitor, port=8080):
self.monitor = monitor
self.port = port
def start_server(self):
"""Dashboard szerver indítása"""
server = HTTPServer(('localhost', self.port), self._make_handler())
print(f"Dashboard elérhető: http://localhost:{self.port}")
server.serve_forever()
def _make_handler(self):
monitor = self.monitor
class DashboardHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/':
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
html = self._generate_dashboard_html()
self.wfile.write(html.encode())
elif self.path == '/api/metrics':
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
metrics = monitor.get_metrics_report()
self.wfile.write(json.dumps(metrics, default=str).encode())
def _generate_dashboard_html(self):
return """
<!DOCTYPE html>
<html>
<head>
<title>Deadlock Monitor Dashboard</title>
<script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
</head>
<body>
<h1>Deadlock Monitoring Dashboard</h1>
<div id="metrics-chart" style="width:100%;height:400px;"></div>
<div id="alerts-table"></div>
<script>
function updateDashboard() {
fetch('/api/metrics')
.then(response => response.json())
.then(data => {
updateChart(data);
updateAlerts(data);
});
}
function updateChart(data) {
// Plotly chart frissítése
const traces = [];
for (const [name, metric] of Object.entries(data.metrics)) {
traces.push({
x: [new Date()],
y: [metric.current],
name: name,
type: 'scatter'
});
}
Plotly.newPlot('metrics-chart', traces);
}
function updateAlerts(data) {
const alertsDiv = document.getElementById('alerts-table');
let html = '<h2>Recent Alerts</h2><table border="1">';
html += '<tr><th>Time</th><th>Metric</th><th>Value</th><th>Severity</th></tr>';
data.recent_alerts.forEach(alert => {
html += `<tr>
<td>${alert.timestamp}</td>
<td>${alert.metric}</td>
<td>${alert.value.toFixed(2)}</td>
<td>${alert.severity}</td>
</tr>`;
});
html += '</table>';
alertsDiv.innerHTML = html;
}
// Automatikus frissítés 5 másodpercenként
setInterval(updateDashboard, 5000);
updateDashboard(); // Kezdeti betöltés
</script>
</body>
</html>
"""
return DashboardHandler
"A megfelelő monitoring nemcsak a problémák gyors felismerését teszi lehetővé, hanem megelőző információkat is szolgáltat."
Speciális esetek és megoldások
Distributed Deadlocks
Elosztott rendszerekben a holtpontok még komplexebbek:
import time
import uuid
from enum import Enum
class TransactionState(Enum):
ACTIVE = "active"
WAITING = "waiting"
COMMITTED = "committed"
ABORTED = "aborted"
class DistributedDeadlockDetector:
def __init__(self, node_id):
self.node_id = node_id
self.local_transactions = {}
self.wait_for_graph = {}
self.global_graph = {}
def start_transaction(self, resources_needed):
"""Elosztott tranzakció indítása"""
transaction_id = str(uuid.uuid4())
transaction = {
"id": transaction_id,
"node": self.node_id,
"state": TransactionState.ACTIVE,
"resources_needed": resources_needed,
"resources_held": [],
"started_at": time.time()
}
self.local_transactions[transaction_id] = transaction
return transaction_id
def request_resource(self, transaction_id, resource_id, owner_node=None):
"""Erőforrás kérése (lehet távoli is)"""
if owner_node and owner_node != self.node_id:
# Távoli erőforrás kérése
return self._request_remote_resource(transaction_id, resource_id, owner_node)
else:
# Helyi erőforrás kezelése
return self._request_local_resource(transaction_id, resource_id)
def _request_remote_resource(self, transaction_id, resource_id, owner_node):
"""Távoli erőforrás kérése"""
# Szimulált hálózati kommunikáció
message = {
"type": "resource_request",
"transaction_id": transaction_id,
"resource_id": resource_id,
"requesting_node": self.node_id
}
# Itt történne a tényleges hálózati küldés
response = self._simulate_network_call(owner_node, message)
if response["status"] == "waiting":
# Wait-for kapcsolat létrehozása
self.wait_for_graph[transaction_id] = {
"resource": resource_id,
"holder": response["holder_transaction"],
"holder_node": owner_node
}
self.local_transactions[transaction_id]["state"] = TransactionState.WAITING
return response["status"] == "granted"
def _simulate_network_call(self, target_node, message):
"""Hálózati hívás szimulációja"""
# Valós implementációban itt történne HTTP/gRPC/stb. hívás
return {
"status": "waiting",
"holder_transaction": "remote_tx_123",
"estimated_wait": 5.0
}
def detect_global_deadlock(self):
"""Globális holtpont detektálása"""
# Helyi wait-for gráf küldése más csomópontoknak
local_graph = self._build_local_wait_graph()
# Globális gráf összegyűjtése (szimulált)
global_graph = self._collect_global_graph(local_graph)
# Ciklus keresése a globális gráfban
return self._detect_cycle_in_global_graph(global_graph)
def _build_local_wait_graph(self):
"""Helyi wait-for gráf építése"""
graph = {}
for tx_id, wait_info in self.wait_for_graph.items():
graph[tx_id] = wait_info
return graph
def _collect_global_graph(self, local_graph):
"""Globális gráf összegyűjtése (szimulált)"""
# Valós implementációban itt történne a többi csomópont lekérdezése
return local_graph
def _detect_cycle_in_global_graph(self, graph):
"""Ciklus keresése a globális gráfban"""
visited = set()
rec_stack = set()
def dfs(node):
if node in rec_stack:
return True
if node in visited:
return False
visited.add(node)
rec_stack.add(node)
if node in graph:
holder = graph[node].get("holder_transaction")
if holder and dfs(holder):
return True
rec_stack.remove(node)
return False
for node in graph:
if node not in visited:
if dfs(node):
return True
return False
Lock-Free algoritmusok
Zárak nélküli megoldások:
import threading
import time
from concurrent.futures import ThreadPoolExecutor
class LockFreeCounter:
"""Lock-free számláló Compare-And-Swap használatával"""
def __init__(self, initial_value=0):
self._value = initial_value
self._lock = threading.Lock() # Csak a CAS szimulációjához
def increment(self):
"""Atomikus növelés"""
while True:
current = self._value
new_value = current + 1
if self._compare_and_swap(current, new_value):
return new_value
# Kis várakozás a retry előtt
time.sleep(0.0001)
def decrement(self):
"""Atomikus csökkentés"""
while True:
current = self._value
new_value = current - 1
if self._compare_and_swap(current, new_value):
return new_value
time.sleep(0.0001)
def _compare_and_swap(self, expected, new_value):
"""Compare-And-Swap szimuláció"""
with self._lock: # Valós CAS-ban ez atomikus lenne
if self._value == expected:
self._value = new_value
return True
return False
@property
def value(self):
return self._value
class LockFreeStack:
"""Lock-free verem implementáció"""
class Node:
def __init__(self, data, next_node=None):
self.data = data
self.next = next_node
def __init__(self):
self.head = None
self._lock = threading.Lock() # CAS szimulációhoz
def push(self, data):
"""Elem hozzáadása a verem tetejére"""
new_node = self.Node(data)
while True:
current_head = self.head
new_node.next = current_head
if self._compare_and_swap_pointer(current_head, new_node):
break
time.sleep(0.0001)
def pop(self):
"""Elem eltávolítása a verem tetejéről"""
while True:
current_head = self.head
if current_head is None:
return None
next_node = current_head.next
if self._compare_and_swap_pointer(current_head, next_node):
return current_head.data
time.sleep(0.0001)
def _compare_and_swap_pointer(self, expected, new_value):
"""Pointer CAS szimuláció"""
with self._lock:
if self.head is expected:
self.head = new_value
return True
return False
def is_empty(self):
return self.head is None
Wait-Free adatstruktúrák
import threading
from typing import Optional, Any
class WaitFreeQueue:
"""Wait-free sor implementáció"""
def __init__(self, capacity=1000):
self.capacity = capacity
self.buffer = [None] * capacity
self.head = 0
self.tail = 0
self.size = 0
# Thread-local változók
self.local_data = threading.local()
def enqueue(self, item: Any) -> bool:
"""Elem hozzáadása a sorhoz"""
# Thread-local pozíció inicializálása
if not hasattr(self.local_data, 'local_tail'):
self.local_data.local_tail = self.tail
local_tail = self.local_data.local_tail
# Hely foglalása
if (local_tail + 1) % self.capacity == self.head:
return False # Sor tele
# Elem elhelyezése
self.buffer[local_tail] = item
# Tail frissítése
self.local_data.local_tail = (local_tail + 1) % self.capacity
self.tail = self.local_data.local_tail
return True
def dequeue(self) -> Optional[Any]:
"""Elem eltávolítása a sorból"""
if not hasattr(self.local_data, 'local_head'):
self.local_data.local_head = self.head
local_head = self.local_data.local_head
# Üresség ellenőrzése
if local_head == self.tail:
return None
# Elem kiolvasása
item = self.buffer[local_head]
self.buffer[local_head] = None # Cleanup
# Head frissítése
self.local_data.local_head = (local_head + 1) % self.capacity
self.head = self.local_data.local_head
return item
def is_empty(self) -> bool:
return self.head == self.tail
def current_size(self) -> int:
if self.tail >= self.head:
return self.tail - self.head
else:
return self.capacity - self.head + self.tail
"A lock-free és wait-free algoritmusok teljesen kiküszöbölik a holtpontok lehetőségét, de implementálásuk jelentősen összetettebb."
Gyakran ismételt kérdések a holtpontokról
Mi a különbség a holtpont és a livelock között?
A holtpont esetén a folyamatok teljesen megállnak és várakoznak. Livelock esetén a folyamatok aktívak maradnak, de nem tudnak előrehaladni, mert folyamatosan reagálnak egymás akcióira.
Hogyan lehet felismerni egy holtpontot futás közben?
A leggyakoribb jelek: váratlanul megnövekedett válaszidők, csökkenő CPU-használat aktív folyamatok mellett, várakozó szálak nagy száma, és az alkalmazás "lefagyása".
Milyen szerepe van a timeout-oknak a holtpont-kezelésben?
A timeout-ok megakadályozzák a végtelen várakozást. Ha egy folyamat nem tudja megszerezni az erőforrást a megadott időn belül, feladja a kísérletet és más stratégiát választ.
Lehet-e teljesen elkerülni a holtpontokat?
Igen, lock-free és wait-free algoritmusokkal, vagy megfelelő tervezési mintákkal (például lock ordering) jelentősen csökkenthető vagy teljesen kiküszöbölhető a holtpontok kockázata.
Mikor érdemes process termination-t alkalmazni?
Csak végső esetben, amikor más módszerek nem működnek. Ilyenkor a legkisebb kárt okozó folyamatot kell kiválasztani, figyelembe véve a prioritást és a rollback költségeit.
Hogyan működnek a distributed deadlock detection algoritmusok?
Elosztott rendszerekben minden csomópont elküldi a helyi wait-for gráfját egy központi koordinátornak, vagy peer-to-peer módon megosztják egymással. A globális gráfban keresnek ciklusokat.
