🔍 最小连接数算法核心思想
最小连接数(Least Connections)是一种动态负载均衡策略,将新请求分配给当前活跃连接数最少的后端服务器。相比轮询或随机算法,它能根据服务器实时负载动态调整,更适合处理耗时不均匀的请求(如长连接、大文件传输)。
💻 完整代码实现(线程安全版)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
| import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger;
public class OptimizedLeastConnectionsLoadBalancer { private final Map<String, ServerStats> serverStats = new ConcurrentHashMap<>(); private final ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(); private final String fallbackServer;
public OptimizedLeastConnectionsLoadBalancer(String fallbackServer) { this.fallbackServer = fallbackServer; }
public void addServer(String server) { serverStats.putIfAbsent(server, new ServerStats()); }
public void removeServer(String server) { serverStats.remove(server); }
public String getServerWithLeastConnections() { ServerStats selected = null; String serverUrl = null; int minLoad = Integer.MAX_VALUE;
for (Map.Entry<String, ServerStats> entry : serverStats.entrySet()) { ServerStats stats = entry.getValue(); if (!stats.isHealthy.get()) continue;
int load = stats.connections.get(); if (load < minLoad) { minLoad = load; selected = stats; serverUrl = entry.getKey(); } }
if (selected != null) { selected.connections.incrementAndGet(); return serverUrl; } return fallbackServer != null ? fallbackServer : null; }
public void releaseConnection(String request, String server) { System.out.println("Releasing: " + request + " connection to " + server); ServerStats stats = serverStats.get(server); if (stats != null) { stats.connections.decrementAndGet(); } }
public void startHealthChecks(long interval, TimeUnit unit) { healthCheckScheduler.scheduleAtFixedRate(() -> { for (Map.Entry<String, ServerStats> entry : serverStats.entrySet()) { String server = entry.getKey(); ServerStats stats = entry.getValue();
boolean isHealthy = performHealthCheck(server); stats.isHealthy.set(isHealthy);
if (!isHealthy) { System.out.println("[Health Check] Server " + server + " is unhealthy"); } } }, 0, interval, unit); }
public void stop() { healthCheckScheduler.shutdown(); try { healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }
private boolean performHealthCheck(String server) { return Math.random() < 0.9; }
private static class ServerStats { final AtomicInteger connections = new AtomicInteger(0); final AtomicBoolean isHealthy = new AtomicBoolean(true);
ServerStats() { } } }
|
🧪 使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| import java.util.concurrent.TimeUnit;
public class Demo { public static void main(String[] args) { OptimizedLeastConnectionsLoadBalancer balancer = new OptimizedLeastConnectionsLoadBalancer("http://fallback-server");
balancer.addServer("http://server1:8080"); balancer.addServer("http://server2:8080"); balancer.addServer("http://server3:8080");
balancer.startHealthChecks(5, TimeUnit.SECONDS);
for (int i = 0; i < 10; i++) { String server = balancer.getServerWithLeastConnections(); System.out.println("Request " + (i+1) + " → " + server); final String request = "Request " + (i+1);
new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } balancer.releaseConnection(request, server); }).start(); }
Runtime.getRuntime().addShutdownHook(new Thread(() -> { System.out.println("Shutting down health checks..."); balancer.stop(); })); } }
|
💎 总结
- 核心优势:最小连接数算法动态感知服务器负载,避免高负载服务器过载,适合处理异构服务器集群或耗时波动大的请求。
- 注意事项:
- 务必调用
releaseConnection()
释放连接,否则计数会持续增长导致负载评估失真。
- 生产环境建议结合 Nginx 或 Spring Cloud Ribbon 等框架实现更完善的负载均衡。
示例代码已涵盖基础场景,实际部署时需根据业务扩展(如权重、健康检查)。通过动态感知与原子计数,本实现确保了高并发下的线程安全与精准调度。