zookeeper的使用及Demo级RPC通信_zkx-ppc-demo-程序员宅基地

技术标签: zookeeper  

目录

Demo级RPC通信

服务端

客户端

zookeeper的使用

服务端发布服务

客户端调用服务

源码地址

client:https://gitee.com/pengzhang_master/rpc-zookeeper-client.git

server:https://gitee.com/pengzhang_master/rpc-zookeeper-server.git

Demo级RPC通信

这是基于netty的Demo级RPC通信小项目,为后面zookeeper的使用做铺垫。

服务端

创建并初始化spring容器

public class Bootstrap {
    public static void main(String[] args) {
        ApplicationContext context = new AnnotationConfigApplicationContext(SpringConfig.class);
        ((AnnotationConfigApplicationContext) context).start();
    }
}

通过配置类添加bean

@Configuration
@ComponentScan(basePackages = "com.server")
public class SpringConfig {

    @Bean(name="gpRpcServer")
    public GpRpcServer gpRpcServer(){
        return new GpRpcServer(8080);
    }
}

传入8080端口,供后续暴露端口使用。GpRpcServer类实现接口ApplicationContextAware和InitializingBean,重写方法setApplicationContext(ApplicationContext applicationContext)和afterPropertiesSet()

setApplicationContext(ApplicationContext applicationContext)获取spring容器的上下文,从而获取所有加了@RpcService注解的类的对应的bean,从而将接口和服务放入缓存

afterPropertiesSet()在初始化GpRpcServer的bean的时候通过netty暴露服务端口

public class GpRpcServer implements ApplicationContextAware,InitializingBean {

    private Map<String,Object> handlerMap=new HashMap();

    private int port;

    private IRegistryCenter registryCenter=new RegistryCenterWithZk();

    public GpRpcServer(int port) {
        this.port = port;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        //接收客户端的链接
        EventLoopGroup bossGroup=new NioEventLoopGroup();
        //处理已经被接收的链接
        EventLoopGroup workerGroup=new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap=new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).
                    childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().
                                    addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))).
                                    addLast(new ObjectEncoder()).
                                    addLast(new ProcessorHandler(handlerMap));
                        }
                    });
            serverBootstrap.bind(port).sync();
        }finally {
           /* workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();*/
        }
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //拿到所有加了@RpcService注解的类的对应的bean
        Map<String,Object> serviceBeanMap=applicationContext.getBeansWithAnnotation(RpcService.class);
        if(!serviceBeanMap.isEmpty()){
            for(Object servcieBean:serviceBeanMap.values()){
                //迭代bean,拿到注解
                RpcService rpcService=servcieBean.getClass().getAnnotation((RpcService.class));
                String serviceName=rpcService.value().getName();//拿到接口类定义
                String version=rpcService.version(); //拿到版本号
                if(!StringUtils.isEmpty(version)){
                    serviceName+="-"+version;
                }
                //放入缓存
                handlerMap.put(serviceName,servcieBean);
                //服务注册
                registryCenter.registry(serviceName,getAddress()+":"+port);
            }
        }
    }
    private static String getAddress(){
        InetAddress inetAddress=null;
        try {
            inetAddress=InetAddress.getLocalHost();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        return inetAddress.getHostAddress();// 获得本机的ip地址
    }
}

ProcessorHandler继承SimpleChannelInboundHandler<RpcRequest>,重写channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest),rpcRequest是netty接受的数据。

channelHandlerContext.writeAndFlush写出数据,ChannelHandlerContext代表了一个ChannelHandler和ChannelPipeline之间的关系,ChannelHandlerContext创建于ChannelHandler被载入到ChannelPipeline的时候,ChannelHandlerContext主要功能是管理在同一ChannelPipeline中各个ChannelHandler的交互。

invoke中通过接受的消息拿到客户端请求参数、类和方法名反射调用,获取调用结果。

public class ProcessorHandler extends SimpleChannelInboundHandler<RpcRequest> {

    private Map<String,Object> handlerMap;


    public ProcessorHandler(Map<String,Object> handlerMap) {
        this.handlerMap = handlerMap;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) throws Exception {
        Object result=invoke(rpcRequest);
        channelHandlerContext.writeAndFlush(result).addListener(ChannelFutureListener.CLOSE);

    }

    private Object invoke(RpcRequest request) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        //反射调用
        String serviceName=request.getClassName();
        String version=request.getVersion();
        //增加版本号的判断
        if(!StringUtils.isEmpty(version)){
            serviceName+="-"+version;
        }

        Object service=handlerMap.get(serviceName);
        if(service==null){
            throw new RuntimeException("service not found:"+serviceName);
        }
        Object[] args=request.getParameters(); //拿到客户端请求的参数
        Method method=null;
        Class clazz=Class.forName(request.getClassName()); //跟去请求的类进行加载
        method=clazz.getMethod(request.getMethodName(),request.getParamTypes()); //sayHello, saveUser找到这个类中的方法
        Object result=method.invoke(service,args);//HelloServiceImpl 进行反射调用
        return result;
    }
}

客户端

创建并初始化spring容器,在上下文中拿到代理对象并进行调用。

public class App {
    public static void main(String[] args) throws InterruptedException {
        ApplicationContext context = new
                AnnotationConfigApplicationContext(SpringConfig.class);
        RpcProxyClient rpcProxyClient = context.getBean(RpcProxyClient.class);

        IHelloService iHelloService = rpcProxyClient.clientProxy
                (IHelloService.class, "v2.0");
        for (int i = 0; i < 100; i++) {
            Thread.sleep(2000);
            System.out.println(iHelloService.sayHello(1.0));
        }
    }
}

通过配置类添加bean

@Configuration
public class SpringConfig {

    @Bean(name="rpcPRoxyClient")
    public RpcProxyClient proxyClient(){
        return new RpcProxyClient();
    }
}

RpcProxyClient中的clientProxy返回代理对象

public class RpcProxyClient {

    private IServiceDiscovery serviceDiscovery=new ServiceDiscoveryWithZk();

    public <T> T clientProxy(final Class<T> interfaceCls, String version){

        return (T)Proxy.newProxyInstance(interfaceCls.getClassLoader(),
                new Class<?>[]{interfaceCls},new RemoteInvocationHandler(serviceDiscovery,version));
    }
}

在RemoteInvocationHandler.invoke中包装请求数据rpcRequest,包括执行方法和参数。再根据服务名加版本号从zookeeper中获取服务地址serviceAddress。根据服务地址发送数据包,并接受返回数据。

public class RemoteInvocationHandler implements InvocationHandler {

    private IServiceDiscovery serviceDiscovery;
    private String version;

    public RemoteInvocationHandler(IServiceDiscovery serviceDiscovery,String version) {
        this.serviceDiscovery=serviceDiscovery;
        this.version=version;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //请求数据的包装
        RpcRequest rpcRequest=new RpcRequest();
        rpcRequest.setClassName(method.getDeclaringClass().getName());
        rpcRequest.setMethodName(method.getName());
        rpcRequest.setParamTypes(method.getParameterTypes());
        rpcRequest.setParameters(args);
        rpcRequest.setVersion(version);
        String serviceName=rpcRequest.getClassName();
        if(!StringUtils.isEmpty(version)){
            serviceName=serviceName+"-"+version;
        }
        String serviceAddress=serviceDiscovery.discovery(serviceName);
        //远程通信
        RpcNetTransport netTransport=new RpcNetTransport(serviceAddress);
        Object result=netTransport.send(rpcRequest);

        return result;
    }
}

RpcNetTransport.send中利用netty将数据包发送到对应的服务。

public class RpcNetTransport extends SimpleChannelInboundHandler<Object>  {

    private String serviceAddress;

    public RpcNetTransport(String serviceAddress) {
        this.serviceAddress = serviceAddress;
    }

    private final Object lock=new Object();
    private Object result;


    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
        this.result=o;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("异常:");
        ctx.close();
    }

    public Object send(RpcRequest request){
        NioEventLoopGroup eventLoogGroup=new NioEventLoopGroup();
        Bootstrap bootstrap=new Bootstrap();
        bootstrap.group(eventLoogGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().
                        addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))).
                        addLast(new ObjectEncoder()).
                        addLast(RpcNetTransport.this);
            }
        }).option(ChannelOption.TCP_NODELAY,true);
        try {
            String urls[]=serviceAddress.split(":");
            ChannelFuture future=bootstrap.connect(urls[0],Integer.parseInt(urls[1])).sync();
            future.channel().writeAndFlush(request).sync();

            if(request!=null){
                future.channel().closeFuture().sync();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            eventLoogGroup.shutdownGracefully();
        }
        return result;
    }
}

zookeeper的使用

服务端发布服务

在服务启动时,初始化zookeeper的连接。

public class RegistryCenterWithZk implements IRegistryCenter{

    CuratorFramework curatorFramework =null;

    {
        //初始化zookeeper的连接, 会话超时时间是5s,衰减重试
        curatorFramework = CuratorFrameworkFactory.builder().
                connectString(ZkConfig.CONNECTION_STR).sessionTimeoutMs(5000).
                retryPolicy(new ExponentialBackoffRetry(1000, 3)).
                namespace("registry")
                .build();
        curatorFramework.start();
    }
、、、、、

在初始化GpRpcServer的bean的时候,在setApplicationContext中获取spring上下文拿到所有加了RpcService注解的bean,并将接口定义和对应的bean放入缓存,然后将服务注册到zookeeper。

@Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //拿到所有加了@RpcService注解的类的对应的bean
        Map<String,Object> serviceBeanMap=applicationContext.getBeansWithAnnotation(RpcService.class);
        if(!serviceBeanMap.isEmpty()){
            for(Object servcieBean:serviceBeanMap.values()){
                //迭代bean,拿到注解
                RpcService rpcService=servcieBean.getClass().getAnnotation((RpcService.class));
                String serviceName=rpcService.value().getName();//拿到接口类定义
                String version=rpcService.version(); //拿到版本号
                if(!StringUtils.isEmpty(version)){
                    serviceName+="-"+version;
                }
                //放入缓存
                handlerMap.put(serviceName,servcieBean);
                //服务注册
                registryCenter.registry(serviceName,getAddress()+":"+port);
            }
        }
    }

在将服务注册到zookeeper时,将服务名和ip、端口号获取并传入。将服务名创建为持久化节点,将ip:port创建为临时节点。

@Override
    public void registry(String serviceName, String serviceAddress) {
        String servicePath="/"+serviceName;
        try {
            //判断节点是否存在
            if(curatorFramework.checkExists().forPath(servicePath)==null){
                //创建持久化节点
                curatorFramework.create().creatingParentsIfNeeded().
                        withMode(CreateMode.PERSISTENT).forPath(servicePath);
            }
            //serviceAddress: ip:port
            String addressPath=servicePath+"/"+serviceAddress;
            //创建临时节点
            curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(addressPath);
            System.out.println("服务注册成功");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

客户端调用服务

在spring上下文中获取RpcProxyClient的bean,调用rpcProxyClient.clientProxy(IHelloService.class, "v2.0")获取代理对象。并调用sayHello方法。

在调用sayHello时实际调用的是invoke,在invoke中通过传入的serviceDiscovery调用discovery(String serviceName)获取服务地址,并完成远程通信,实际上就是将类名、方法名、参数封装后发给服务端,在服务端通过以上数据进行反射调用。

@Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //请求数据的包装
        RpcRequest rpcRequest=new RpcRequest();
        rpcRequest.setClassName(method.getDeclaringClass().getName());
        rpcRequest.setMethodName(method.getName());
        rpcRequest.setParamTypes(method.getParameterTypes());
        rpcRequest.setParameters(args);
        rpcRequest.setVersion(version);
        String serviceName=rpcRequest.getClassName();
        if(!StringUtils.isEmpty(version)){
            serviceName=serviceName+"-"+version;
        }
        String serviceAddress=serviceDiscovery.discovery(serviceName);
        //远程通信
        RpcNetTransport netTransport=new RpcNetTransport(serviceAddress);
        Object result=netTransport.send(rpcRequest);

        return result;
    }
public Object send(RpcRequest request){
        NioEventLoopGroup eventLoogGroup=new NioEventLoopGroup();
        Bootstrap bootstrap=new Bootstrap();
        bootstrap.group(eventLoogGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().
                        addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))).
                        addLast(new ObjectEncoder()).
                        addLast(RpcNetTransport.this);
            }
        }).option(ChannelOption.TCP_NODELAY,true);
        try {
            String urls[]=serviceAddress.split(":");
            ChannelFuture future=bootstrap.connect(urls[0],Integer.parseInt(urls[1])).sync();
            future.channel().writeAndFlush(request).sync();

            if(request!=null){
                future.channel().closeFuture().sync();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            eventLoogGroup.shutdownGracefully();
        }
        return result;
    }

在ServiceDiscoveryWithZk中初始化zookeeper的连接。在discovery中完成服务地址的查找和根据已知的服务地址进行负载均衡,以及监听zookeeper上注册的服务节点,若感知到节点发生变化,则刷新缓存中的服务地址。

@Override
    public String discovery(String serviceName) {
        //完成了服务地址的查找(服务地址被删除)
        String path="/"+serviceName; //registry/com.server.HelloService
        if(serviceRepos.isEmpty()) {
            try {
                serviceRepos = curatorFramework.getChildren().forPath(path);
                registryWatch(path);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        //针对已有的地址做负载均衡
        LoadBalanceStrategy loadBalanceStrategy=new RandomLoadBalance();
        return loadBalanceStrategy.selectHost(serviceRepos);
    }
private void registryWatch(final String path) throws Exception {
        PathChildrenCache nodeCache=new PathChildrenCache(curatorFramework,path,true);
        PathChildrenCacheListener nodeCacheListener= (curatorFramework1, pathChildrenCacheEvent) -> {
            System.out.println("客户端收到节点变更的事件");
            serviceRepos=curatorFramework1.getChildren().forPath(path);// 再次更新本地的缓存地址
        };
        nodeCache.getListenable().addListener(nodeCacheListener);
        nodeCache.start();

    }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_34908838/article/details/114006414

智能推荐

中断异常的分析思路_sensor采集中断-程序员宅基地

文章浏览阅读517次。工作学习中经常遇到中断相关的issue,我一般会从 中断源 (ic)-- 驱动层 -- SDK 逐层分析。往往解决的比较快。举一个例子,下面给出分析过程。问题描述: 开机过程中指纹driver有大量异常中断触发log 问题分析: 首先,我们要确定的是,这些异常中断是怎么来的。有以下三种case。 1.软件误报2.cpu主动发出的信号3.指纹sensor发出的信号 针对软件误报,只需要 cat ..._sensor采集中断

斧子的HTML5输出是什么,CF斧有什么使用技巧 斧子使用经验分享【详解】-程序员宅基地

文章浏览阅读82次。很多人知道斧子很牛,但就是用不好,所以好多人练了一段时间就放弃了,其实斧子练习有一些问题是需要注意的,下面就结合一下我个人练习斧子的经验,和大家分享一下,希望能对大家的练习提供一些帮助。1.拿起斧子要有信心和决心因为斧子有无视头盔的特点,所以面对其他武器,不应该有任何畏惧的心理(当然不能傻傻的向前冲然后白白送死),就像阿卡选手一样,要有一击毙命的信心和决心,因为在刀战战场上,心态很重要。有一个好的...

nginx配合fastdfs使用-----python上传测试_fastdfs python 上传-程序员宅基地

文章浏览阅读176次。开始前需要先准备好fdfs的环境https://blog.csdn.net/weixin_44834666/article/details/105686059一、安装py3Fdfs包pip install py3Fdfs二、编辑py文件from fdfs_client.client import Fdfs_client, get_tracker_conf# 创建客户端 client...._fastdfs python 上传

plsql创建用户,创建表空间以及删除用户,删除表空间_plsql删除创建的用户-程序员宅基地

文章浏览阅读1.8k次,点赞2次,收藏5次。1.创建表空间create tablespace [表空间名:例如{newszgcp}]datafile [表空间所在地址:例如{'D:/StudySoft/OrServer/admin/orcl/newszgcp'}]size [表空间大小:100M ]autoextend on next [扩展:50M ] maxsize unlimited; 2.创建用户create..._plsql删除创建的用户

Java SE GUI编程_p.add(new textfield(10))含义-程序员宅基地

文章浏览阅读169次。GUI编程_p.add(new textfield(10))含义

Codeforces 1119B 贪心二分_codeforces - 1119b-程序员宅基地

文章浏览阅读191次。https://codeforces.com/contest/1119/problem/B不改变原数组的值,复制一份,要求1-k连续,则1-mid区间内判断子数组从大到小排序后,每两个相差不大,比较均匀 i+=2倒着的目的是前大后小 能装前一定能装后 尽量装更多的 累积高度和跳着累积高度和,如果<=h 可增加更多的瓶子,否则高度太多瓶子太多,减小瓶子数量const int maxn..._codeforces - 1119b

随便推点

用matlab画三相桥式,matlab三相桥式电路仿真.doc-程序员宅基地

文章浏览阅读572次。matlab三相桥式电路仿真.doc 五邑大学 电 力电子技术 课程 设计报告 题 目: 三相桥式整流电路的 MATLAB 仿真 院 系 信息工程学院 专 业 自动化 班 级 140705 学 号 3114001891 学生姓名 杨煜基 指导教师 张建民 电力电子技术课程设计报告 2 三相桥式整流电路的MATLAB 仿真 一、 题目的要求和意义 整流电路是电力电子电路中出现最早的一种,它的作用是将..._三相全桥电路怎么在matlad画出

vscode浏览器怎么打开php,vscode怎么选择浏览器-程序员宅基地

文章浏览阅读2.4k次。vscode怎么选择浏览器?vscode如何右键选择浏览器运行html文件?我们利用Vscode软件编写html的时候,一般都想右键选择html文件,然后直接选择浏览器运行,但是默认是没有的。相关文章教程推荐:vscode教程下面小编给大家分享一下如何设置。首先我们新建一个html文件,你可以用记事本编写一个,如下图所示接着将html文件导入到VsCode软件中,如下图所示我们直接在html文件中..._vscode运行php代码用特定浏览器

phpexcel导出大量数据合并单元格_【PHP】通过PHPExcel导出比较复杂的表格-程序员宅基地

文章浏览阅读591次。[PHP] 纯文本查看 复制代码/*** excel文件导出** [url=home.php?mod=space&uid=952169]@Param[/url] array $data 需要生成excel文件的数组* $data = [* [NULL, 2010, 2011, 2012],//列名* ..._phpexcel 导出复杂表格

马哥运维学习作业(九)-程序员宅基地

文章浏览阅读110次。1、详细描述一次加密通讯的过程,结合图示最佳。2、描述创建私有CA的过程,以及为客户端发来的证书请求进行办法证书。以下操作使用的2台服务器完成:服务器主机名IPCA服务器ca192.168.2.30httpd服务器httpd192.168.2.80在CA服务器上操作:创建所需要的文件[root@ca~]#cd/etc/pki/CA/#..._主进程负责生成n个子进程

iOS AFNetworking上传图片到服务器_afnetworking multipartfile-程序员宅基地

文章浏览阅读828次。AFHTTPRequestOperationManager *manager = [AFHTTPRequestOperationManagermanager]; manager.requestSerializer = [AFHTTPRequestSerializerserializer]; manager.responseSerializer = [AFHTTP_afnetworking multipartfile

一个C语言程序是由( d)构成,自考“C语言程序设计”模拟试题八-程序员宅基地

文章浏览阅读562次。自考“C语言程序设计”模拟试题八一、选择题1.一个C语言程序是由( )构成。A.语句 B.行号 C.数据 D.函数2.下面标识符中正确的是( )。A.a#bc B.123ABC C.sime D.Y·M·D3.在C语言中,存储一个整型、字符型、双精度实型变量所需的字节数..._6*4/7 +7%3

推荐文章

热门文章

相关标签