简单的负载均衡优化
AI-摘要
Tianli GPT
AI初始化中...
介绍自己
生成本文简介
推荐相关文章
前往主页
前往tianli博客
一、模拟优化分析
1、优化前代码分析
1.1 负载轮询类
public class RoundRobinLoadBalancer<T> {
private List<T> serverList;
private int currentIndex = 0;
public RoundRobinLoadBalancer(List<T> serverList) {
this.serverList = serverList;
}
// 获取下一个Server时,会根据索引自动选择下一个。实现多台服务器的闭环调用
public synchronized T getNextServer() {
if (currentIndex >= serverList.size()) {
currentIndex = 0;
}
T server = serverList.get(currentIndex);
currentIndex++;
System.out.printf("当前请求的服务器是:%s\n", server.toString());
return server;
}
}
在获取下一个Server时,只实现了轮询,但是并未考虑负载均衡。
优化思路:在获取Server时使用计数器进行负载均衡判断,返回负载小的Server
1.2 初始化Server
- 读取服务器地址,初始化到负载轮询器中,这时候serverList中初始化了4个server,通过@Bean注入容器
// host:172.16.14.236:7000,172.16.14.237:4000,172.16.14.238:1000
@Value("${nlp.algorithm.outline.draft.host}")
String mianbi;
@Bean
RoundRobinLoadBalancer mianbiBalancer() {
List serverList = new ArrayList<>();
String[] split = mianbi.split(",");
for (int i = 0; i < split.length; i++) {
WebClient webClient = webClient(split[i]);
serverList.add(webClient);
}
return new RoundRobinLoadBalancer(serverList);
}
1.3 调用服务
- 使用接口调用该方法,传入上一步初始化的roundRobinLoadBalancer对象,调用getNextServer方法,轮询获取WebClient对象后,作为参数传递,调用另一个postJson方法实现响应式请求远程服务器
@Qualifier("mianbiBalancer")
@Autowired
RoundRobinLoadBalancer mianbiBalancer;
// 第一步:通过控制层接口调用该方法,并传递参数
public Flux<ServerSentEvent<String>> generationChat(String requestBody) {
// 传递参数、url地址、RoundRobinLoadBalancer对象
return WebClientService.postJson(requestBody, NLPConstants.CHAT_ADDR, mianbiBalancer);
}
// 第二步
public static Flux<ServerSentEvent<String>> postJson(String bodyData,String url,RoundRobinLoadBalancer roundRobinLoadBalancer){
// 轮询获取WebClient对象
WebClient webClient = (WebClient) roundRobinLoadBalancer.getNextServer();
// 调用方法
return postJson(bodyData, url, webClient);
}
// 第三步③
public static Flux<ServerSentEvent<String>> postJson(String bodyData, String url, WebClient webClient) {
// 省略响应式请求,调用远程服务器......
return sseFlux.delayElements(Duration.ofMillis(1));
}
2、模拟多台服务器调用
- 本地测试没有多台算法服务器,这里模拟调用三台服务器调用
- 修改上述第三步③的方法
/**
* 模拟器远程请求服务器,服务器处理数据
*/
public static Flux<ServerSentEvent<String>> postJson(RemoteServers remoteServers) {
try {
String ip = remoteServers.getIp();
System.out.println(ip + "服务器响应中");
// 模拟器每台服务器的响应时间不同
// 172.16.14.236:7000(7000ms),172.16.14.237:4000(4000ms),172.16.14.238:1000(1000ms)
int delay = Integer.parseInt(ip.split(":")[1]);
Thread.sleep(delay);
System.out.println(ip + "服务器响应完成");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return null;
}
3、调整其他代码(优化前)
3.1 创建RemoteServers类
- 在初始化Server时,List集合中存储的是WebClient对象,这里创建一个RemoteServers对象,ip作为成员,方便打印日志
@Data
public class RemoteServers implements Comparable {
String ip;
public RemoteServers(String ip, int initialValue) {
this.ip = ip;
}
}
3.2 创建LoadBalancer类
- 这里创建一个新的负载轮询类,用于初始化RemoteServers
- 对于原本的RoundRobinLoadBalancer对象,这里只修改的T为RemoteServers
public class LoadBalancer {
private final List<RemoteServers> serverList;
private int currentIndex = 0;
public LoadBalancer(List<RemoteServers> serverList) {
this.serverList = serverList;
}
// 轮询获取下一个Server
public synchronized RemoteServers getNextServer() {
if (currentIndex >= serverList.size()) {
currentIndex = 0;
}
RemoteServers server = serverList.get(currentIndex);
currentIndex++;
System.out.printf("当前请求的服务器是:%s\n", server.getIp());
return server;
}
}
3.3 初始化Server
- 这里初始化同上面的初始化Server一样,不过这里为了模拟,初始化的不是Webclient对象而是RemoteServers对象,参数为每台服务器的ip地址和初始化计数器为0
// host:172.16.14.236:7000,172.16.14.237:4000,172.16.14.238:1000
@Value("${nlp.algorithm.outline.draft.host}")
String mianbi;
@Bean
LoadBalancer testBalancer() {
List<RemoteServers> serverList = new ArrayList<>();
String[] split = mianbi.split(",");
for (String ip : split) {
serverList.add(new RemoteServers(ip, 0));
}
return new LoadBalancer(serverList);
}
3.4 模拟调用
- 这里进行模拟多个客户端请求
@GetMapping("/testGenerationChat1")
public String testGenerationChat(int count) {
Map<String, Object> map = new HashMap<>();
map.put("messages", "网络安全");
map.put("stream", true);
map.put("messages_type", 0);
String jsonString = JSON.toJSONString(map);
// 模拟调用 count次
// 构建线程池
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
List<Future<?>> futures = new ArrayList<>();
long start = System.currentTimeMillis();
for (int i = 0; i < count; i++) {
Future<?> future = executor.submit(() -> {
// 测试调用接口!!!!!!!!!!!!!!!!
this.generationChat(jsonString);
long end = System.currentTimeMillis();
System.out.println("当前线程" + Thread.currentThread().getId() + " 耗时:" + (end - start));
});
futures.add(future);
}
// 等待所有任务完成
for (Future<?> future : futures) {
try {
try {
future.get(); // 获取任务结果,确保每个查询都已完成
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} catch (ExecutionException e) {
e.printStackTrace(); // 处理任务中的异常
}
}
executor.shutdown(); // 关闭线程池
try {
executor.awaitTermination(20, TimeUnit.SECONDS); // 等待所有任务完成,或超时
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(count +"次请求总耗时:"+(System.currentTimeMillis()-start));
return "访问完成";
}
3.5 调用结果


4、添加计数器(优化后)
4.1 修改RemoteServers类
- 添加count作为计数器,并添加计数器的加减方法,重写CompareTo方法,使用count计数器作为排序依据
@Data
public class RemoteServers implements Comparable {
String ip;
AtomicInteger count;
public RemoteServers(String ip, int initialValue) {
this.ip = ip;
this.count = new AtomicInteger(initialValue);
}
/**
* 计数器减1
*/
public void countDecrement() {
this.count.decrementAndGet();
}
/**
* 计数器加1
*/
public void countIncrement() {
this.count.incrementAndGet();
}
@Override
public int compareTo(Object o) {
RemoteServers other = (RemoteServers) o;
return Integer.compare(this.count.get(), other.count.get());
}
}
4.2 修改LoadBalancer类
- 通过判断count计数器,返回负载值最低的服务器
public class LoadBalancer {
private final List<RemoteServers> serverList;
public LoadBalancer(List<RemoteServers> serverList) {
this.serverList = serverList;
}
public synchronized RemoteServers getNextServer() {
RemoteServers remoteServers = serverList.stream().sorted().findFirst().orElse(serverList.get(0));
String ip = remoteServers.getIp();
AtomicInteger count = remoteServers.getCount();
// 这里每返回一个服务器,需要为其count++
remoteServers.countIncrement();
System.out.println("当前请求的服务器是:"+ip+"负载值:"+count.get());
return remoteServers;
}
}
4.3 初始化Server
- 这里初始化Server和 ”三、调整其它代码“里的初始化一致
4.4 模拟调用
- 和上述调用方式一样
4.5 调用结果


二、实际优化测试
1、均衡测试
1.1 数据测试
使用不同的参数,用于模拟两台服务器处理数据消耗时间不同
- “消防安全” 作为参数时,处理时间为2.50s

- “网络安全”作为参数时,处理时间为7.1s

1.2 编写测试
测试过程:count设置为30次,多线程调用,分别测试使用轮询调用和负载均衡调用的效率
@GetMapping("/testGenerationChat1")
public String testGenerationChat(int count) {
Map<String, Object> map = new HashMap<>();
map.put("stream", true);
map.put("messages_type", 0);
// 构建线程池
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
List<Future<?>> futures = new ArrayList<>();
long start = System.currentTimeMillis();
for (int i = 0; i < count; i++) {
// 对于不同的算法服务器,进行不用的数据计算,模拟不同服务器的效率不同
if (i % 2 == 0) {
map.put("messages", "网络安全");
} else {
map.put("messages", "消防安全");
}
String jsonString = JSON.toJSONString(map);
Future<?> future = executor.submit(() -> {
// 测试调用接口!!!!!!!!!!!!!!!!
Flux<ServerSentEvent<String>> serverSentEventFlux = this.generationChat(jsonString);
// 将Flux转换为Mono,并阻塞等待完成
Mono.fromFuture(serverSentEventFlux.then().toFuture()).block();
});
futures.add(future);
}
// 等待所有任务完成
for (Future<?> future : futures) {
try {
try {
future.get(); // 获取任务结果,确保每个查询都已完成
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} catch (ExecutionException e) {
e.printStackTrace(); // 处理任务中的异常
}
}
// 最后统一关闭线程池
try {
// 关闭线程池
executor.shutdown();
executor.awaitTermination(20, TimeUnit.SECONDS); // 等待所有任务完成,或超时
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(count + "次请求总耗时:" + (System.currentTimeMillis() - start));
return "访问完成";
}
1.3 普通轮询的测试结果
5次接口调用,接口内30个线程
- 一轮测试

- 二轮测试

- 三轮测试

- 四轮测试

- 五轮测试

1.4 均衡测试结果
5次接口调用,接口内30个线程
- 一轮测试

- 二轮测试

- 三轮测试

- 四轮测试

- 五轮测试

2、限流测试
- 通过对RemoteServers的count计数器判断,进行限流
public static Flux<ServerSentEvent<String>> postJson(String bodyData, String url, LoadBalancer loadBalancer, int currentLimit) {
// 发起POST请求并获取响应流
RemoteServers remoteServers = loadBalancer.getNextServer();
// 限流
if (remoteServers.getCount().get() > currentLimit) {
// 限流,计数器减1
remoteServers.countDecrement();
return Flux.error(new RuntimeException("Error: Server count limit exceeded"));
} else {
// 设置响应的媒体类型为text/event-stream
return postJson(bodyData, url, remoteServers)
.doFinally(signalType -> remoteServers.countDecrement());
}
}
- 实现限流

本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 leaflei
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果