年薪60W分水岭:基于Netty手写实现Dubbo框架 | 字数总计: 4.3k | 阅读时长: 21分钟 | 阅读量:
阅读这篇文章之前,建议先阅读和这篇文章关联的内容。
1. 详细剖析分布式微服务架构下网络通信的底层实现原理(图解)
2. (年薪60W的技巧)工作了5年,你真的理解Netty以及为什么要用吗?(深度干货)
3. 深度解析Netty中的核心组件(图解+实例)
4. BAT面试必问细节:关于Netty中的ByteBuf详解
5. 通过大量实战案例分解Netty中是如何解决拆包黏包问题的?
6. 基于Netty实现自定义消息通信协议(协议设计及解析应用实战)
7. 全网最详细最齐全的序列化技术及深度解析与应用实战
8. 手把手教你基于Netty实现一个基础的RPC框架(通俗易懂)
在本篇文章中,我们继续围绕Netty手写实现RPC基础篇 进行优化,主要引入几个点
集成spring,实现注解驱动配置
集成zookeeper,实现服务注册
增加负载均衡实现
源代码,加「跟着Mic学架构」微信号,回复『rpc』获取。
增加注解驱动 主要涉及到的修改模块
netty-rpc-protocol
netty-rpc-provider
netty-rpc-protocol 当前模块主要修改的类如下。
图7-1
下面针对netty-rpc-protocol模块的修改如下
增加注解驱动 这个注解的作用是用来指定某些服务为远程服务
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Component public @interface GpRemoteService {}
SpringRpcProviderBean 这个类主要用来在启动NettyServer,以及保存bean的映射关系
@Slf4j public class SpringRpcProviderBean implements InitializingBean , BeanPostProcessor { private final int serverPort; private final String serverAddress; public SpringRpcProviderBean (int serverPort) throws UnknownHostException { this .serverPort = serverPort; InetAddress address=InetAddress.getLocalHost(); this .serverAddress=address.getHostAddress(); } @Override public void afterPropertiesSet () throws Exception { log.info("begin deploy Netty Server to host {},on port {}" ,this .serverAddress,this .serverPort); new Thread(()->{ try { new NettyServer(this .serverAddress,this .serverPort).startNettyServer(); } catch (Exception e) { log.error("start Netty Server Occur Exception," ,e); e.printStackTrace(); } }).start(); } @Override public Object postProcessAfterInitialization (Object bean, String beanName) throws BeansException { if (bean.getClass().isAnnotationPresent(GpRemoteService.class)){ Method[] methods=bean.getClass().getDeclaredMethods(); for (Method method: methods){ String key=bean.getClass().getInterfaces()[0 ].getName()+"." +method.getName(); BeanMethod beanMethod=new BeanMethod(); beanMethod.setBean(bean); beanMethod.setMethod(method); Mediator.beanMethodMap.put(key,beanMethod); } } return bean; } }
主要管理bean以及调用
BeanMethod @Data public class BeanMethod { private Object bean; private Method method; }
负责持有发布bean的管理,以及bean的反射调用
public class Mediator { public static Map<String,BeanMethod> beanMethodMap=new ConcurrentHashMap<>(); private volatile static Mediator instance=null ; private Mediator () { } public static Mediator getInstance () { if (instance==null ){ synchronized (Mediator.class){ if (instance==null ){ instance=new Mediator(); } } } return instance; } public Object processor (RpcRequest rpcRequest) { String key=rpcRequest.getClassName()+"." +rpcRequest.getMethodName(); BeanMethod beanMethod=beanMethodMap.get(key); if (beanMethod==null ){ return null ; } Object bean=beanMethod.getBean(); Method method=beanMethod.getMethod(); try { return method.invoke(bean,rpcRequest.getParams()); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } return null ; } }
RpcServerProperties 定义配置属性
@Data @ConfigurationProperties(prefix = "gp.rpc") public class RpcServerProperties { private int servicePort; }
RpcProviderAutoConfiguration 定义自动配置类
@Configuration @EnableConfigurationProperties(RpcServerProperties.class) public class RpcProviderAutoConfiguration { @Bean public SpringRpcProviderBean rpcProviderBean (RpcServerProperties rpcServerProperties) throws UnknownHostException { return new SpringRpcProviderBean(rpcServerProperties.getServicePort()); } }
修改RpcServerHandler 修改调用方式,直接使用Mediator的调用即可。
public class RpcServerHandler extends SimpleChannelInboundHandler <RpcProtocol <RpcRequest >> { @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcProtocol<RpcRequest> msg) throws Exception { RpcProtocol resProtocol=new RpcProtocol<>(); Header header=msg.getHeader(); header.setReqType(ReqType.RESPONSE.code()); Object result=Mediator.getInstance().processor(msg.getContent()); resProtocol.setHeader(header); RpcResponse response=new RpcResponse(); response.setData(result); response.setMsg("success" ); resProtocol.setContent(response); ctx.writeAndFlush(resProtocol); } }
netty-rpc-provider 这个模块中主要修改两个部分
application.properties
NettyRpcProviderMain
NettyRpcProviderMain @ComponentScan(basePackages = {"com.example.spring.annotation","com.example.spring.service","com.example.service"}) @SpringBootApplication public class NettyRpcProviderMain { public static void main (String[] args) throws Exception { SpringApplication.run(NettyRpcProviderMain.class, args); } }
application.properties 增加一个配置属性。
UserServiceImpl 把当前服务发布出去。
@GpRemoteService @Slf4j public class UserServiceImpl implements IUserService { @Override public String saveUser (String name) { log.info("begin saveUser:" +name); return "Save User Success!" ; } }
修改客户端的注解驱动 客户端同样也需要通过注解的方式来引用服务,这样就能够彻底的屏蔽掉远程通信的细节内容,代码结构如图7-2所示
图7-2
增加客户端注解 在netty-rpc-protocol模块的annotation目录下创建下面这个注解。
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.FIELD) @Autowired public @interface GpRemoteReference {}
SpringRpcReferenceBean 定义工厂Bean,用来构建远程通信的代理
public class SpringRpcReferenceBean implements FactoryBean <Object > { private Class<?> interfaceClass; private Object object; private String serviceAddress; private int servicePort; @Override public Object getObject () throws Exception { return object; } public void init () { this .object= Proxy.newProxyInstance(this .interfaceClass.getClassLoader(), new Class<?>[]{this .interfaceClass}, new RpcInvokerProxy(this .serviceAddress,this .servicePort)); } @Override public Class<?> getObjectType() { return this .interfaceClass; } public void setInterfaceClass (Class<?> interfaceClass) { this .interfaceClass = interfaceClass; } public void setServiceAddress (String serviceAddress) { this .serviceAddress = serviceAddress; } public void setServicePort (int servicePort) { this .servicePort = servicePort; } }
SpringRpcReferencePostProcessor 用来实现远程Bean的动态代理注入:
BeanClassLoaderAware: 获取Bean的类装载器
BeanFactoryPostProcessor:在spring容器加载了bean的定义文件之后,在bean实例化之前执行
ApplicationContextAware: 获取上下文对象ApplicationContenxt
@Slf4j public class SpringRpcReferencePostProcessor implements ApplicationContextAware , BeanClassLoaderAware , BeanFactoryPostProcessor { private ApplicationContext context; private ClassLoader classLoader; private RpcClientProperties clientProperties; public SpringRpcReferencePostProcessor (RpcClientProperties clientProperties) { this .clientProperties = clientProperties; } private final Map<String, BeanDefinition> rpcRefBeanDefinitions=new ConcurrentHashMap<>(); @Override public void setBeanClassLoader (ClassLoader classLoader) { this .classLoader=classLoader; } @Override public void setApplicationContext (ApplicationContext applicationContext) throws BeansException { this .context=applicationContext; } @Override public void postProcessBeanFactory (ConfigurableListableBeanFactory beanFactory) throws BeansException { for (String beanDefinitionname:beanFactory.getBeanDefinitionNames()){ BeanDefinition beanDefinition=beanFactory.getBeanDefinition(beanDefinitionname); String beanClassName=beanDefinition.getBeanClassName(); if (beanClassName!=null ){ Class<?> clazz=ClassUtils.resolveClassName(beanClassName,this .classLoader); ReflectionUtils.doWithFields(clazz,this ::parseRpcReference); } } BeanDefinitionRegistry registry=(BeanDefinitionRegistry)beanFactory; this .rpcRefBeanDefinitions.forEach((beanName,beanDefinition)->{ if (context.containsBean(beanName)){ log.warn("SpringContext already register bean {}" ,beanName); return ; } registry.registerBeanDefinition(beanName,beanDefinition); log.info("registered RpcReferenceBean {} success." ,beanName); }); } private void parseRpcReference (Field field) { GpRemoteReference gpRemoteReference=AnnotationUtils.getAnnotation(field,GpRemoteReference.class); if (gpRemoteReference!=null ) { BeanDefinitionBuilder builder=BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class); builder.setInitMethodName(RpcConstant.INIT_METHOD_NAME); builder.addPropertyValue("interfaceClass" ,field.getType()); builder.addPropertyValue("serviceAddress" ,clientProperties.getServiceAddress()); builder.addPropertyValue("servicePort" ,clientProperties.getServicePort()); BeanDefinition beanDefinition=builder.getBeanDefinition(); rpcRefBeanDefinitions.put(field.getName(),beanDefinition); } } }
需要在RpcConstant常量中增加一个INIT_METHOD_NAME属性
public class RpcConstant { public final static int HEAD_TOTAL_LEN=16 ; public final static short MAGIC=0xca ; public static final String INIT_METHOD_NAME = "init" ; }
RpcClientProperties @Data public class RpcClientProperties { private String serviceAddress="192.168.1.102" ; private int servicePort=20880 ; }
RpcRefernceAutoConfiguration @Configuration public class RpcRefernceAutoConfiguration implements EnvironmentAware { @Bean public SpringRpcReferencePostProcessor postProcessor () { String address=environment.getProperty("gp.serviceAddress" ); int port=Integer.parseInt(environment.getProperty("gp.servicePort" )); RpcClientProperties rc=new RpcClientProperties(); rc.setServiceAddress(address); rc.setServicePort(port); return new SpringRpcReferencePostProcessor(rc); } private Environment environment; @Override public void setEnvironment (Environment environment) { this .environment=environment; } }
netty-rpc-consumer 修改netty-rpc-consumer模块
把该模块变成一个spring boot项目
增加web依赖
添加测试类
图7-3 netty-rpc-consumer模块
引入jar包依赖 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency >
HelloController @RestController public class HelloController { @GpRemoteReference private IUserService userService; @GetMapping("/test") public String test () { return userService.saveUser("Mic" ); } }
NettyConsumerMain @ComponentScan(basePackages = {"com.example.spring.annotation","com.example.controller","com.example.spring.reference"}) @SpringBootApplication public class NettyConsumerMain { public static void main (String[] args) { SpringApplication.run(NettyConsumerMain.class, args); } }
application.properties gp.serviceAddress =192.168.1.102 servicePort.servicePort =20880
访问测试
启动Netty-Rpc-Server
启动Netty-Rpc-Consumer
如果启动过程没有任何问题,则可以访问HelloController来测试远程服务的访问。
引入注册中心 创建一个netty-rpc-registry模块,代码结构如图7-4所示。
引入相关依赖
<dependency > <groupId > org.apache.curator</groupId > <artifactId > curator-framework</artifactId > <version > 4.2.0</version > </dependency > <dependency > <groupId > org.apache.curator</groupId > <artifactId > curator-recipes</artifactId > <version > 4.2.0</version > </dependency > <dependency > <groupId > org.apache.curator</groupId > <artifactId > curator-x-discovery</artifactId > <version > 4.2.0</version > </dependency >
IRegistryService public interface IRegistryService { void register (ServiceInfo serviceInfo) throws Exception ; void unRegister (ServiceInfo serviceInfo) throws Exception ; ServiceInfo discovery (String serviceName) throws Exception ; }
ServiceInfo @Data public class ServiceInfo { private String serviceName; private String serviceAddress; private int servicePort; }
ZookeeperRegistryService @Slf4j public class ZookeeperRegistryService implements IRegistryService { private static final String REGISTRY_PATH="/registry" ; private final ServiceDiscovery<ServiceInfo> serviceDiscovery; private ILoadBalance<ServiceInstance<ServiceInfo>> loadBalance; public ZookeeperRegistryService (String registryAddress) throws Exception { CuratorFramework client= CuratorFrameworkFactory .newClient(registryAddress,new ExponentialBackoffRetry(1000 ,3 )); JsonInstanceSerializer<ServiceInfo> serializer=new JsonInstanceSerializer<>(ServiceInfo.class); this .serviceDiscovery= ServiceDiscoveryBuilder.builder(ServiceInfo.class) .client(client) .serializer(serializer) .basePath(REGISTRY_PATH) .build(); this .serviceDiscovery.start(); loadBalance=new RandomLoadBalance(); } @Override public void register (ServiceInfo serviceInfo) throws Exception { log.info("开始注册服务,{}" ,serviceInfo); ServiceInstance<ServiceInfo> serviceInstance=ServiceInstance .<ServiceInfo>builder().name(serviceInfo.getServiceName()) .address(serviceInfo.getServiceAddress()) .port(serviceInfo.getServicePort()) .payload(serviceInfo) .build(); serviceDiscovery.registerService(serviceInstance); } @Override public void unRegister (ServiceInfo serviceInfo) throws Exception { ServiceInstance<ServiceInfo> serviceInstance=ServiceInstance.<ServiceInfo>builder() .name(serviceInfo.getServiceName()) .address(serviceInfo.getServiceAddress()) .port(serviceInfo.getServicePort()) .payload(serviceInfo) .build(); serviceDiscovery.unregisterService(serviceInstance); } @Override public ServiceInfo discovery (String serviceName) throws Exception { Collection<ServiceInstance<ServiceInfo>> serviceInstances= serviceDiscovery .queryForInstances(serviceName); ServiceInstance<ServiceInfo> serviceInstance=loadBalance.select((List<ServiceInstance<ServiceInfo>>)serviceInstances); if (serviceInstance!=null ){ return serviceInstance.getPayload(); } return null ; } }
引入负载均衡算法 由于服务端发现服务时可能有多个,所以需要用到负载均衡算法来实现
ILoadBalance public interface ILoadBalance <T > { T select (List<T> servers) ; }
AbstractLoadBalance public abstract class AbstractLoadBanalce implements ILoadBalance <ServiceInstance <ServiceInfo >> { @Override public ServiceInstance<ServiceInfo> select (List<ServiceInstance<ServiceInfo>> servers) { if (servers==null ||servers.size()==0 ){ return null ; } if (servers.size()==1 ){ return servers.get(0 ); } return doSelect(servers); } protected abstract ServiceInstance<ServiceInfo> doSelect (List<ServiceInstance<ServiceInfo>> servers) ; }
RandomLoadBalance public class RandomLoadBalance extends AbstractLoadBanalce { @Override protected ServiceInstance<ServiceInfo> doSelect (List<ServiceInstance<ServiceInfo>> servers) { int length=servers.size(); Random random=new Random(); return servers.get(random.nextInt(length)); } }
RegistryType public enum RegistryType { ZOOKEEPER((byte )0 ), EUREKA((byte )1 ); private byte code; RegistryType(byte code) { this .code=code; } public byte code () { return this .code; } public static RegistryType findByCode (byte code) { for (RegistryType rt : RegistryType.values()) { if (rt.code() == code) { return rt; } } return null ; } }
RegistryFactory public class RegistryFactory { public static IRegistryService createRegistryService (String address,RegistryType registryType) { IRegistryService registryService=null ; try { switch (registryType) { case ZOOKEEPER: registryService = new ZookeeperRegistryService(address); break ; case EUREKA: break ; default : registryService = new ZookeeperRegistryService(address); break ; } }catch (Exception e){ e.printStackTrace(); } return registryService; } }
修改服务端增加服务注册 修改netty-rpc-protocol模块,加入注册中心的支持
SpringRpcProviderBean 按照下面case标注部分,表示要修改的内容
@Slf4j public class SpringRpcProviderBean implements InitializingBean , BeanPostProcessor { private final int serverPort; private final String serverAddress; private final IRegistryService registryService; public SpringRpcProviderBean (int serverPort,IRegistryService registryService) throws UnknownHostException { this .serverPort = serverPort; InetAddress address=InetAddress.getLocalHost(); this .serverAddress=address.getHostAddress(); this .registryService=registryService; } @Override public void afterPropertiesSet () throws Exception { log.info("begin deploy Netty Server to host {},on port {}" ,this .serverAddress,this .serverPort); new Thread(()->{ try { new NettyServer(this .serverAddress,this .serverPort).startNettyServer(); } catch (Exception e) { log.error("start Netty Server Occur Exception," ,e); e.printStackTrace(); } }).start(); } @Override public Object postProcessAfterInitialization (Object bean, String beanName) throws BeansException { if (bean.getClass().isAnnotationPresent(GpRemoteService.class)){ Method[] methods=bean.getClass().getDeclaredMethods(); for (Method method: methods){ String serviceName=bean.getClass().getInterfaces()[0 ].getName(); String key=serviceName+"." +method.getName(); BeanMethod beanMethod=new BeanMethod(); beanMethod.setBean(bean); beanMethod.setMethod(method); Mediator.beanMethodMap.put(key,beanMethod); try { ServiceInfo serviceInfo = new ServiceInfo(); serviceInfo.setServiceAddress(this .serverAddress); serviceInfo.setServicePort(this .serverPort); serviceInfo.setServiceName(serviceName); registryService.register(serviceInfo); }catch (Exception e){ log.error("register service {} faild" ,serviceName,e); } } } return bean; } }
RpcServerProperties 修改RpcServerProperties,增加注册中心的配置
@Data @ConfigurationProperties(prefix = "gp.rpc") public class RpcServerProperties { private int servicePort; private byte registerType; private String registryAddress; }
RpcProviderAutoConfiguration 增加注册中心的注入。
@Configuration @EnableConfigurationProperties(RpcServerProperties.class) public class RpcProviderAutoConfiguration { @Bean public SpringRpcProviderBean rpcProviderBean (RpcServerProperties rpcServerProperties) throws UnknownHostException { IRegistryService registryService=RegistryFactory.createRegistryService(rpcServerProperties.getRegistryAddress(), RegistryType.findByCode(rpcServerProperties.getRegisterType())); return new SpringRpcProviderBean(rpcServerProperties.getServicePort(),registryService); } }
application.properties 修改netty-rpc-provider中的application.properties。
gp.rpc.servicePort =20880 gp.rpc.registerType =0 gp.rpc.registryAddress =192.168.221.128:2181
修改客户端,增加服务发现 客户端需要修改的地方较多,下面这些修改的代码,都是netty-rpc-protocol模块中的类。
RpcClientProperties 增加注册中心类型和注册中心地址的选项
@Data public class RpcClientProperties { private String serviceAddress="192.168.1.102" ; private int servicePort=20880 ; private byte registryType; private String registryAddress; }
修改NettyClient 原本是静态地址,现在修改成了从注册中心获取地址
@Slf4j public class NettyClient { private final Bootstrap bootstrap; private final EventLoopGroup eventLoopGroup=new NioEventLoopGroup(); public NettyClient () { log.info("begin init NettyClient" ); bootstrap=new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new RpcClientInitializer()); } public void sendRequest (RpcProtocol<RpcRequest> protocol, IRegistryService registryService) throws Exception { ServiceInfo serviceInfo=registryService.discovery(protocol.getContent().getClassName()); ChannelFuture future=bootstrap.connect(serviceInfo.getServiceAddress(),serviceInfo.getServicePort()).sync(); future.addListener(listener->{ if (future.isSuccess()){ log.info("connect rpc server {} success." ,serviceInfo.getServiceAddress()); }else { log.error("connect rpc server {} failed ." ,serviceInfo.getServiceAddress()); future.cause().printStackTrace(); eventLoopGroup.shutdownGracefully(); } }); log.info("begin transfer data" ); future.channel().writeAndFlush(protocol); } }
修改RpcInvokerProxy 将静态ip和地址,修改成IRegistryService
@Slf4j public class RpcInvokerProxy implements InvocationHandler { IRegistryService registryService; public RpcInvokerProxy (IRegistryService registryService) { this .registryService=registryService; } @Override public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { log.info("begin invoke target server" ); RpcProtocol<RpcRequest> protocol=new RpcProtocol<>(); long requestId= RequestHolder.REQUEST_ID.incrementAndGet(); Header header=new Header(RpcConstant.MAGIC, SerialType.JSON_SERIAL.code(), ReqType.REQUEST.code(),requestId,0 ); protocol.setHeader(header); RpcRequest request=new RpcRequest(); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParams(args); protocol.setContent(request); NettyClient nettyClient=new NettyClient(); RpcFuture<RpcResponse> future=new RpcFuture<>(new DefaultPromise<>(new DefaultEventLoop())); RequestHolder.REQUEST_MAP.put(requestId,future); nettyClient.sendRequest(protocol,this .registryService); return future.getPromise().get().getData(); } }
SpringRpcReferenceBean 修改引用bean,增加注册中心配置
public class SpringRpcReferenceBean implements FactoryBean <Object > { private Class<?> interfaceClass; private Object object; private byte registryType; private String registryAddress; @Override public Object getObject () throws Exception { return object; } public void init () { IRegistryService registryService= RegistryFactory.createRegistryService(this .registryAddress, RegistryType.findByCode(this .registryType)); this .object= Proxy.newProxyInstance(this .interfaceClass.getClassLoader(), new Class<?>[]{this .interfaceClass}, new RpcInvokerProxy(registryService)); } @Override public Class<?> getObjectType() { return this .interfaceClass; } public void setInterfaceClass (Class<?> interfaceClass) { this .interfaceClass = interfaceClass; } public void setRegistryType (byte registryType) { this .registryType = registryType; } public void setRegistryAddress (String registryAddress) { this .registryAddress = registryAddress; } }
SpringRpcReferencePostProcessor @Slf4j public class SpringRpcReferencePostProcessor implements ApplicationContextAware , BeanClassLoaderAware , BeanFactoryPostProcessor { private ApplicationContext context; private ClassLoader classLoader; private RpcClientProperties clientProperties; public SpringRpcReferencePostProcessor (RpcClientProperties clientProperties) { this .clientProperties = clientProperties; } private final Map<String, BeanDefinition> rpcRefBeanDefinitions=new ConcurrentHashMap<>(); @Override public void setBeanClassLoader (ClassLoader classLoader) { this .classLoader=classLoader; } @Override public void setApplicationContext (ApplicationContext applicationContext) throws BeansException { this .context=applicationContext; } @Override public void postProcessBeanFactory (ConfigurableListableBeanFactory beanFactory) throws BeansException { for (String beanDefinitionname:beanFactory.getBeanDefinitionNames()){ BeanDefinition beanDefinition=beanFactory.getBeanDefinition(beanDefinitionname); String beanClassName=beanDefinition.getBeanClassName(); if (beanClassName!=null ){ Class<?> clazz=ClassUtils.resolveClassName(beanClassName,this .classLoader); ReflectionUtils.doWithFields(clazz,this ::parseRpcReference); } } BeanDefinitionRegistry registry=(BeanDefinitionRegistry)beanFactory; this .rpcRefBeanDefinitions.forEach((beanName,beanDefinition)->{ if (context.containsBean(beanName)){ log.warn("SpringContext already register bean {}" ,beanName); return ; } registry.registerBeanDefinition(beanName,beanDefinition); log.info("registered RpcReferenceBean {} success." ,beanName); }); } private void parseRpcReference (Field field) { GpRemoteReference gpRemoteReference=AnnotationUtils.getAnnotation(field,GpRemoteReference.class); if (gpRemoteReference!=null ) { BeanDefinitionBuilder builder=BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class); builder.setInitMethodName(RpcConstant.INIT_METHOD_NAME); builder.addPropertyValue("interfaceClass" ,field.getType()); builder.addPropertyValue("registryType" ,clientProperties.getRegistryType()); builder.addPropertyValue("registryAddress" ,clientProperties.getRegistryAddress()); BeanDefinition beanDefinition=builder.getBeanDefinition(); rpcRefBeanDefinitions.put(field.getName(),beanDefinition); } } }
RpcRefernceAutoConfiguration @Configuration public class RpcRefernceAutoConfiguration implements EnvironmentAware { @Bean public SpringRpcReferencePostProcessor postProcessor () { String address=environment.getProperty("gp.serviceAddress" ); int port=Integer.parseInt(environment.getProperty("gp.servicePort" )); RpcClientProperties rc=new RpcClientProperties(); rc.setServiceAddress(address); rc.setServicePort(port); rc.setRegistryType(Byte.parseByte(environment.getProperty("gp.registryType" ))); rc.setRegistryAddress(environment.getProperty("gp.registryAddress" )); return new SpringRpcReferencePostProcessor(rc); } private Environment environment; @Override public void setEnvironment (Environment environment) { this .environment=environment; } }
application.properties 修改netty-rpc-consumer模块中的配置
gp.serviceAddress =192.168.1.102 gp.servicePort =20880 gp.registryType =0 gp.registryAddress =192.168.221.128:2181
负载均衡的测试 增加一个服务端的启动类,并且修改端口。然后客户端不需要重启的情况下刷新浏览器,即可看到负载均衡的效果。
图7-5