🔍 最小连接数算法核心思想

最小连接数(Least Connections)是一种动态负载均衡策略,将新请求分配给当前活跃连接数最少的后端服务器。相比轮询或随机算法,它能根据服务器实时负载动态调整,更适合处理耗时不均匀的请求(如长连接、大文件传输)。


💻 完整代码实现(线程安全版)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class OptimizedLeastConnectionsLoadBalancer {
private final Map<String, ServerStats> serverStats = new ConcurrentHashMap<>();
private final ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor();
private final String fallbackServer;

public OptimizedLeastConnectionsLoadBalancer(String fallbackServer) {
this.fallbackServer = fallbackServer;
}

public void addServer(String server) {
serverStats.putIfAbsent(server, new ServerStats());
}

public void removeServer(String server) {
serverStats.remove(server);
}

public String getServerWithLeastConnections() {
ServerStats selected = null;
String serverUrl = null;
int minLoad = Integer.MAX_VALUE;

for (Map.Entry<String, ServerStats> entry : serverStats.entrySet()) {
ServerStats stats = entry.getValue();
if (!stats.isHealthy.get()) continue;

int load = stats.connections.get();
if (load < minLoad) {
minLoad = load;
selected = stats;
serverUrl = entry.getKey();
}
}

if (selected != null) {
selected.connections.incrementAndGet();
return serverUrl;
}
return fallbackServer != null ? fallbackServer : null;
}


public void releaseConnection(String request, String server) {
System.out.println("Releasing: " + request + " connection to " + server);
ServerStats stats = serverStats.get(server);
if (stats != null) {
stats.connections.decrementAndGet();
}
}

public void startHealthChecks(long interval, TimeUnit unit) {
healthCheckScheduler.scheduleAtFixedRate(() -> {
for (Map.Entry<String, ServerStats> entry : serverStats.entrySet()) {
String server = entry.getKey();
ServerStats stats = entry.getValue();

// 模拟健康检查
boolean isHealthy = performHealthCheck(server);
stats.isHealthy.set(isHealthy);

if (!isHealthy) {
System.out.println("[Health Check] Server " + server + " is unhealthy");
}
}
}, 0, interval, unit);
}

public void stop() {
healthCheckScheduler.shutdown();
try {
healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

private boolean performHealthCheck(String server) {
// 实际应用中应实现真正的健康检查
// 这里模拟90%成功率的健康检查
return Math.random() < 0.9;
}

private static class ServerStats {
final AtomicInteger connections = new AtomicInteger(0);
final AtomicBoolean isHealthy = new AtomicBoolean(true);

ServerStats() {
}
}
}

🧪 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.util.concurrent.TimeUnit;

public class Demo {
public static void main(String[] args) {
OptimizedLeastConnectionsLoadBalancer balancer =
new OptimizedLeastConnectionsLoadBalancer("http://fallback-server");

balancer.addServer("http://server1:8080");
balancer.addServer("http://server2:8080");
balancer.addServer("http://server3:8080");

// 启动健康检查(每5秒一次)
balancer.startHealthChecks(5, TimeUnit.SECONDS);

// 模拟10个请求
for (int i = 0; i < 10; i++) {
String server = balancer.getServerWithLeastConnections();
System.out.println("Request " + (i+1) + " → " + server);
final String request = "Request " + (i+1);

// 模拟请求处理完成后释放连接
new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
balancer.releaseConnection(request, server);
}).start();
}

// 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutting down health checks...");
balancer.stop();
}));
}
}

💎 总结

  • 核心优势:最小连接数算法动态感知服务器负载,避免高负载服务器过载,适合处理异构服务器集群或耗时波动大的请求。
  • 注意事项:
    • 务必调用 releaseConnection() 释放连接,否则计数会持续增长导致负载评估失真。
    • 生产环境建议结合 NginxSpring Cloud Ribbon 等框架实现更完善的负载均衡。

示例代码已涵盖基础场景,实际部署时需根据业务扩展(如权重、健康检查)。通过动态感知与原子计数,本实现确保了高并发下的线程安全与精准调度。