一、模拟优化分析

1、优化前代码分析

1.1 负载轮询类

public class RoundRobinLoadBalancer<T> {
    private List<T> serverList;

    private int currentIndex = 0;

    public RoundRobinLoadBalancer(List<T> serverList) {
        this.serverList = serverList;
    }
		
  	// 获取下一个Server时,会根据索引自动选择下一个。实现多台服务器的闭环调用
    public synchronized T getNextServer() {
        if (currentIndex >= serverList.size()) {
            currentIndex = 0;
        }
        T server = serverList.get(currentIndex);
        currentIndex++;
        System.out.printf("当前请求的服务器是:%s\n", server.toString());
        return server;
    }
}

在获取下一个Server时,只实现了轮询,但是并未考虑负载均衡。
优化思路:在获取Server时使用计数器进行负载均衡判断,返回负载小的Server

1.2 初始化Server

  • 读取服务器地址,初始化到负载轮询器中,这时候serverList中初始化了4个server,通过@Bean注入容器
    // host:172.16.14.236:7000,172.16.14.237:4000,172.16.14.238:1000
	@Value("${nlp.algorithm.outline.draft.host}")
    String mianbi;

		@Bean
    RoundRobinLoadBalancer mianbiBalancer() {
        List serverList = new ArrayList<>();
        String[] split = mianbi.split(",");
        for (int i = 0; i < split.length; i++) {
            WebClient webClient = webClient(split[i]);
            serverList.add(webClient);
        }
        return new RoundRobinLoadBalancer(serverList);
    }

1.3 调用服务

  • 使用接口调用该方法,传入上一步初始化的roundRobinLoadBalancer对象,调用getNextServer方法,轮询获取WebClient对象后,作为参数传递,调用另一个postJson方法实现响应式请求远程服务器
    @Qualifier("mianbiBalancer")
    @Autowired
    RoundRobinLoadBalancer mianbiBalancer;

	 // 第一步:通过控制层接口调用该方法,并传递参数
   public Flux<ServerSentEvent<String>> generationChat(String requestBody) {
				// 传递参数、url地址、RoundRobinLoadBalancer对象
        return WebClientService.postJson(requestBody, NLPConstants.CHAT_ADDR, mianbiBalancer);
    }

// 第二步
public static Flux<ServerSentEvent<String>> postJson(String bodyData,String url,RoundRobinLoadBalancer roundRobinLoadBalancer){
        // 轮询获取WebClient对象
       WebClient webClient =  (WebClient) roundRobinLoadBalancer.getNextServer();
        // 调用方法
        return postJson(bodyData, url, webClient);
    }


		// 第三步③
    public static Flux<ServerSentEvent<String>> postJson(String bodyData, String url, WebClient webClient) {
        // 省略响应式请求,调用远程服务器......
        return sseFlux.delayElements(Duration.ofMillis(1));

    }

2、模拟多台服务器调用

  • 本地测试没有多台算法服务器,这里模拟调用三台服务器调用
  • 修改上述第三步③的方法

    /**
     * 模拟器远程请求服务器,服务器处理数据
     */
    public static Flux<ServerSentEvent<String>> postJson(RemoteServers remoteServers) {
        try {
            String ip = remoteServers.getIp();
            System.out.println(ip + "服务器响应中");
            // 模拟器每台服务器的响应时间不同  
            // 172.16.14.236:7000(7000ms),172.16.14.237:4000(4000ms),172.16.14.238:1000(1000ms)
            int delay = Integer.parseInt(ip.split(":")[1]);
            Thread.sleep(delay);
            System.out.println(ip + "服务器响应完成");
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        return null;
    }

3、调整其他代码(优化前)

3.1 创建RemoteServers类

  • 在初始化Server时,List集合中存储的是WebClient对象,这里创建一个RemoteServers对象,ip作为成员,方便打印日志
@Data
public class RemoteServers implements Comparable {
    String ip;
  
    public RemoteServers(String ip, int initialValue) {
        this.ip = ip;
    }

}

3.2 创建LoadBalancer类

  • 这里创建一个新的负载轮询类,用于初始化RemoteServers
  • 对于原本的RoundRobinLoadBalancer对象,这里只修改的T为RemoteServers
public class LoadBalancer {
    private final List<RemoteServers> serverList;

    private int currentIndex = 0;
  
    public LoadBalancer(List<RemoteServers> serverList) {
        this.serverList = serverList;
    }
		
  	// 轮询获取下一个Server
    public synchronized RemoteServers getNextServer() {
        if (currentIndex >= serverList.size()) {
            currentIndex = 0;
        }
        RemoteServers server = serverList.get(currentIndex);
        currentIndex++;
        System.out.printf("当前请求的服务器是:%s\n", server.getIp());
        return server;
    }

}

3.3 初始化Server

  • 这里初始化同上面的初始化Server一样,不过这里为了模拟,初始化的不是Webclient对象而是RemoteServers对象,参数为每台服务器的ip地址和初始化计数器为0
    // host:172.16.14.236:7000,172.16.14.237:4000,172.16.14.238:1000
		@Value("${nlp.algorithm.outline.draft.host}")
    String mianbi;

    @Bean
    LoadBalancer testBalancer() {
        List<RemoteServers> serverList = new ArrayList<>();
        String[] split = mianbi.split(",");
        for (String ip : split) {
            serverList.add(new RemoteServers(ip, 0));
        }
        return new LoadBalancer(serverList);
    }

3.4 模拟调用

  • 这里进行模拟多个客户端请求
   @GetMapping("/testGenerationChat1")
    public String testGenerationChat(int count) {
        Map<String, Object> map = new HashMap<>();
        map.put("messages", "网络安全");
        map.put("stream", true);
        map.put("messages_type", 0);
        String jsonString = JSON.toJSONString(map);
      
     		// 模拟调用 count次
        // 构建线程池
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        List<Future<?>> futures = new ArrayList<>();
        long start = System.currentTimeMillis();
        for (int i = 0; i < count; i++) {
            Future<?> future = executor.submit(() -> {
                // 测试调用接口!!!!!!!!!!!!!!!!
                this.generationChat(jsonString);
                long end = System.currentTimeMillis();
                System.out.println("当前线程" + Thread.currentThread().getId() + " 耗时:" + (end - start));
            });
            futures.add(future);
        }
        // 等待所有任务完成
        for (Future<?> future : futures) {
            try {
                try {
                    future.get(); // 获取任务结果,确保每个查询都已完成
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } catch (ExecutionException e) {
                e.printStackTrace(); // 处理任务中的异常
            }
        }
        executor.shutdown(); // 关闭线程池
        try {
            executor.awaitTermination(20, TimeUnit.SECONDS); // 等待所有任务完成,或超时
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println(count +"次请求总耗时:"+(System.currentTimeMillis()-start));
        return "访问完成";
    }

3.5 调用结果

1-优化前循环测试.png

2-优化前循环测试.png

4、添加计数器(优化后)

4.1 修改RemoteServers类

  • 添加count作为计数器,并添加计数器的加减方法,重写CompareTo方法,使用count计数器作为排序依据
@Data
public class RemoteServers implements Comparable {
    String ip;
    AtomicInteger count;

    public RemoteServers(String ip, int initialValue) {
        this.ip = ip;
        this.count = new AtomicInteger(initialValue);
    }

    /**
     * 计数器减1
     */
    public void countDecrement() {
        this.count.decrementAndGet();
    }

    /**
     * 计数器加1
     */
    public void countIncrement() {
        this.count.incrementAndGet();
    }

    @Override
    public int compareTo(Object o) {
        RemoteServers other = (RemoteServers) o;
        return Integer.compare(this.count.get(), other.count.get());
    }

}

4.2 修改LoadBalancer类

  • 通过判断count计数器,返回负载值最低的服务器
public class LoadBalancer {
    private final List<RemoteServers> serverList;

    public LoadBalancer(List<RemoteServers> serverList) {
        this.serverList = serverList;
    }

    public synchronized RemoteServers getNextServer() {
        RemoteServers remoteServers = serverList.stream().sorted().findFirst().orElse(serverList.get(0));
        String ip = remoteServers.getIp();
        AtomicInteger count = remoteServers.getCount();
        // 这里每返回一个服务器,需要为其count++
        remoteServers.countIncrement();
        System.out.println("当前请求的服务器是:"+ip+"负载值:"+count.get());
        return remoteServers;
    }

}

4.3 初始化Server

  • 这里初始化Server和 ”三、调整其它代码“里的初始化一致

4.4 模拟调用

  • 和上述调用方式一样

4.5 调用结果

3-优化后测试.png

4-优化后测试.png

二、实际优化测试

1、均衡测试

1.1 数据测试

使用不同的参数,用于模拟两台服务器处理数据消耗时间不同

  • “消防安全” 作为参数时,处理时间为2.50s

1-实际优化测试.png

  • “网络安全”作为参数时,处理时间为7.1s

2-实际优化测试.png

1.2 编写测试

测试过程:count设置为30次,多线程调用,分别测试使用轮询调用和负载均衡调用的效率

    @GetMapping("/testGenerationChat1")
    public String testGenerationChat(int count) {
        Map<String, Object> map = new HashMap<>();
        map.put("stream", true);
        map.put("messages_type", 0);
        // 构建线程池
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        List<Future<?>> futures = new ArrayList<>();
        long start = System.currentTimeMillis();
        for (int i = 0; i < count; i++) {
            // 对于不同的算法服务器,进行不用的数据计算,模拟不同服务器的效率不同
            if (i % 2 == 0) {
                map.put("messages", "网络安全");
            } else {
                map.put("messages", "消防安全");
            }
            String jsonString = JSON.toJSONString(map);
            Future<?> future = executor.submit(() -> {
                // 测试调用接口!!!!!!!!!!!!!!!!
                Flux<ServerSentEvent<String>> serverSentEventFlux = this.generationChat(jsonString);
                // 将Flux转换为Mono,并阻塞等待完成
                Mono.fromFuture(serverSentEventFlux.then().toFuture()).block();
            });
            futures.add(future);
        }
        // 等待所有任务完成
        for (Future<?> future : futures) {
            try {
                try {
                    future.get(); // 获取任务结果,确保每个查询都已完成
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } catch (ExecutionException e) {
                e.printStackTrace(); // 处理任务中的异常
            }
        }
        // 最后统一关闭线程池
        try {
            // 关闭线程池
            executor.shutdown();
            executor.awaitTermination(20, TimeUnit.SECONDS); // 等待所有任务完成,或超时
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println(count + "次请求总耗时:" + (System.currentTimeMillis() - start));
        return "访问完成";
    }

1.3 普通轮询的测试结果

5次接口调用,接口内30个线程

  • 一轮测试

01-一轮测试.png

  • 二轮测试

01-二轮测试.png

  • 三轮测试

01-三轮测试.png

  • 四轮测试

01-四轮测试.png

  • 五轮测试

01-五轮测试.png

1.4 均衡测试结果

5次接口调用,接口内30个线程

  • 一轮测试

02-一轮测试.png

  • 二轮测试

02-二轮测试.png

  • 三轮测试

02-三轮测试.png

  • 四轮测试

02-四轮测试.png

  • 五轮测试

02-五轮测试.png

2、限流测试

  • 通过对RemoteServers的count计数器判断,进行限流
    public static Flux<ServerSentEvent<String>> postJson(String bodyData, String url, LoadBalancer loadBalancer, int currentLimit) {
        // 发起POST请求并获取响应流
        RemoteServers remoteServers = loadBalancer.getNextServer();
        // 限流
        if (remoteServers.getCount().get() > currentLimit) {
            // 限流,计数器减1
            remoteServers.countDecrement();
            return Flux.error(new RuntimeException("Error: Server count limit exceeded"));
        } else {
            // 设置响应的媒体类型为text/event-stream
            return postJson(bodyData, url, remoteServers)
                    .doFinally(signalType -> remoteServers.countDecrement());
        }
    }
  • 实现限流

限流.png