技术标签: zookeeper
目录
源码地址
client:https://gitee.com/pengzhang_master/rpc-zookeeper-client.git
server:https://gitee.com/pengzhang_master/rpc-zookeeper-server.git
这是基于netty的Demo级RPC通信小项目,为后面zookeeper的使用做铺垫。
创建并初始化spring容器
public class Bootstrap {
public static void main(String[] args) {
ApplicationContext context = new AnnotationConfigApplicationContext(SpringConfig.class);
((AnnotationConfigApplicationContext) context).start();
}
}
通过配置类添加bean
@Configuration
@ComponentScan(basePackages = "com.server")
public class SpringConfig {
@Bean(name="gpRpcServer")
public GpRpcServer gpRpcServer(){
return new GpRpcServer(8080);
}
}
传入8080端口,供后续暴露端口使用。GpRpcServer类实现接口ApplicationContextAware和InitializingBean,重写方法setApplicationContext(ApplicationContext applicationContext)和afterPropertiesSet()
setApplicationContext(ApplicationContext applicationContext)获取spring容器的上下文,从而获取所有加了@RpcService注解的类的对应的bean,从而将接口和服务放入缓存
afterPropertiesSet()在初始化GpRpcServer的bean的时候通过netty暴露服务端口
public class GpRpcServer implements ApplicationContextAware,InitializingBean {
private Map<String,Object> handlerMap=new HashMap();
private int port;
private IRegistryCenter registryCenter=new RegistryCenterWithZk();
public GpRpcServer(int port) {
this.port = port;
}
@Override
public void afterPropertiesSet() throws Exception {
//接收客户端的链接
EventLoopGroup bossGroup=new NioEventLoopGroup();
//处理已经被接收的链接
EventLoopGroup workerGroup=new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap=new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).
childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().
addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))).
addLast(new ObjectEncoder()).
addLast(new ProcessorHandler(handlerMap));
}
});
serverBootstrap.bind(port).sync();
}finally {
/* workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();*/
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//拿到所有加了@RpcService注解的类的对应的bean
Map<String,Object> serviceBeanMap=applicationContext.getBeansWithAnnotation(RpcService.class);
if(!serviceBeanMap.isEmpty()){
for(Object servcieBean:serviceBeanMap.values()){
//迭代bean,拿到注解
RpcService rpcService=servcieBean.getClass().getAnnotation((RpcService.class));
String serviceName=rpcService.value().getName();//拿到接口类定义
String version=rpcService.version(); //拿到版本号
if(!StringUtils.isEmpty(version)){
serviceName+="-"+version;
}
//放入缓存
handlerMap.put(serviceName,servcieBean);
//服务注册
registryCenter.registry(serviceName,getAddress()+":"+port);
}
}
}
private static String getAddress(){
InetAddress inetAddress=null;
try {
inetAddress=InetAddress.getLocalHost();
} catch (UnknownHostException e) {
e.printStackTrace();
}
return inetAddress.getHostAddress();// 获得本机的ip地址
}
}
ProcessorHandler继承SimpleChannelInboundHandler<RpcRequest>,重写channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest),rpcRequest是netty接受的数据。
channelHandlerContext.writeAndFlush写出数据,ChannelHandlerContext代表了一个ChannelHandler和ChannelPipeline之间的关系,ChannelHandlerContext创建于ChannelHandler被载入到ChannelPipeline的时候,ChannelHandlerContext主要功能是管理在同一ChannelPipeline中各个ChannelHandler的交互。
invoke中通过接受的消息拿到客户端请求参数、类和方法名反射调用,获取调用结果。
public class ProcessorHandler extends SimpleChannelInboundHandler<RpcRequest> {
private Map<String,Object> handlerMap;
public ProcessorHandler(Map<String,Object> handlerMap) {
this.handlerMap = handlerMap;
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) throws Exception {
Object result=invoke(rpcRequest);
channelHandlerContext.writeAndFlush(result).addListener(ChannelFutureListener.CLOSE);
}
private Object invoke(RpcRequest request) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
//反射调用
String serviceName=request.getClassName();
String version=request.getVersion();
//增加版本号的判断
if(!StringUtils.isEmpty(version)){
serviceName+="-"+version;
}
Object service=handlerMap.get(serviceName);
if(service==null){
throw new RuntimeException("service not found:"+serviceName);
}
Object[] args=request.getParameters(); //拿到客户端请求的参数
Method method=null;
Class clazz=Class.forName(request.getClassName()); //跟去请求的类进行加载
method=clazz.getMethod(request.getMethodName(),request.getParamTypes()); //sayHello, saveUser找到这个类中的方法
Object result=method.invoke(service,args);//HelloServiceImpl 进行反射调用
return result;
}
}
创建并初始化spring容器,在上下文中拿到代理对象并进行调用。
public class App {
public static void main(String[] args) throws InterruptedException {
ApplicationContext context = new
AnnotationConfigApplicationContext(SpringConfig.class);
RpcProxyClient rpcProxyClient = context.getBean(RpcProxyClient.class);
IHelloService iHelloService = rpcProxyClient.clientProxy
(IHelloService.class, "v2.0");
for (int i = 0; i < 100; i++) {
Thread.sleep(2000);
System.out.println(iHelloService.sayHello(1.0));
}
}
}
通过配置类添加bean
@Configuration
public class SpringConfig {
@Bean(name="rpcPRoxyClient")
public RpcProxyClient proxyClient(){
return new RpcProxyClient();
}
}
RpcProxyClient中的clientProxy返回代理对象
public class RpcProxyClient {
private IServiceDiscovery serviceDiscovery=new ServiceDiscoveryWithZk();
public <T> T clientProxy(final Class<T> interfaceCls, String version){
return (T)Proxy.newProxyInstance(interfaceCls.getClassLoader(),
new Class<?>[]{interfaceCls},new RemoteInvocationHandler(serviceDiscovery,version));
}
}
在RemoteInvocationHandler.invoke中包装请求数据rpcRequest,包括执行方法和参数。再根据服务名加版本号从zookeeper中获取服务地址serviceAddress。根据服务地址发送数据包,并接受返回数据。
public class RemoteInvocationHandler implements InvocationHandler {
private IServiceDiscovery serviceDiscovery;
private String version;
public RemoteInvocationHandler(IServiceDiscovery serviceDiscovery,String version) {
this.serviceDiscovery=serviceDiscovery;
this.version=version;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//请求数据的包装
RpcRequest rpcRequest=new RpcRequest();
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParamTypes(method.getParameterTypes());
rpcRequest.setParameters(args);
rpcRequest.setVersion(version);
String serviceName=rpcRequest.getClassName();
if(!StringUtils.isEmpty(version)){
serviceName=serviceName+"-"+version;
}
String serviceAddress=serviceDiscovery.discovery(serviceName);
//远程通信
RpcNetTransport netTransport=new RpcNetTransport(serviceAddress);
Object result=netTransport.send(rpcRequest);
return result;
}
}
RpcNetTransport.send中利用netty将数据包发送到对应的服务。
public class RpcNetTransport extends SimpleChannelInboundHandler<Object> {
private String serviceAddress;
public RpcNetTransport(String serviceAddress) {
this.serviceAddress = serviceAddress;
}
private final Object lock=new Object();
private Object result;
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
this.result=o;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常:");
ctx.close();
}
public Object send(RpcRequest request){
NioEventLoopGroup eventLoogGroup=new NioEventLoopGroup();
Bootstrap bootstrap=new Bootstrap();
bootstrap.group(eventLoogGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().
addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))).
addLast(new ObjectEncoder()).
addLast(RpcNetTransport.this);
}
}).option(ChannelOption.TCP_NODELAY,true);
try {
String urls[]=serviceAddress.split(":");
ChannelFuture future=bootstrap.connect(urls[0],Integer.parseInt(urls[1])).sync();
future.channel().writeAndFlush(request).sync();
if(request!=null){
future.channel().closeFuture().sync();
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
eventLoogGroup.shutdownGracefully();
}
return result;
}
}
在服务启动时,初始化zookeeper的连接。
public class RegistryCenterWithZk implements IRegistryCenter{
CuratorFramework curatorFramework =null;
{
//初始化zookeeper的连接, 会话超时时间是5s,衰减重试
curatorFramework = CuratorFrameworkFactory.builder().
connectString(ZkConfig.CONNECTION_STR).sessionTimeoutMs(5000).
retryPolicy(new ExponentialBackoffRetry(1000, 3)).
namespace("registry")
.build();
curatorFramework.start();
}
、、、、、
在初始化GpRpcServer的bean的时候,在setApplicationContext中获取spring上下文拿到所有加了RpcService注解的bean,并将接口定义和对应的bean放入缓存,然后将服务注册到zookeeper。
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//拿到所有加了@RpcService注解的类的对应的bean
Map<String,Object> serviceBeanMap=applicationContext.getBeansWithAnnotation(RpcService.class);
if(!serviceBeanMap.isEmpty()){
for(Object servcieBean:serviceBeanMap.values()){
//迭代bean,拿到注解
RpcService rpcService=servcieBean.getClass().getAnnotation((RpcService.class));
String serviceName=rpcService.value().getName();//拿到接口类定义
String version=rpcService.version(); //拿到版本号
if(!StringUtils.isEmpty(version)){
serviceName+="-"+version;
}
//放入缓存
handlerMap.put(serviceName,servcieBean);
//服务注册
registryCenter.registry(serviceName,getAddress()+":"+port);
}
}
}
在将服务注册到zookeeper时,将服务名和ip、端口号获取并传入。将服务名创建为持久化节点,将ip:port创建为临时节点。
@Override
public void registry(String serviceName, String serviceAddress) {
String servicePath="/"+serviceName;
try {
//判断节点是否存在
if(curatorFramework.checkExists().forPath(servicePath)==null){
//创建持久化节点
curatorFramework.create().creatingParentsIfNeeded().
withMode(CreateMode.PERSISTENT).forPath(servicePath);
}
//serviceAddress: ip:port
String addressPath=servicePath+"/"+serviceAddress;
//创建临时节点
curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(addressPath);
System.out.println("服务注册成功");
} catch (Exception e) {
e.printStackTrace();
}
}
在spring上下文中获取RpcProxyClient的bean,调用rpcProxyClient.clientProxy(IHelloService.class, "v2.0")获取代理对象。并调用sayHello方法。
在调用sayHello时实际调用的是invoke,在invoke中通过传入的serviceDiscovery调用discovery(String serviceName)获取服务地址,并完成远程通信,实际上就是将类名、方法名、参数封装后发给服务端,在服务端通过以上数据进行反射调用。
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//请求数据的包装
RpcRequest rpcRequest=new RpcRequest();
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParamTypes(method.getParameterTypes());
rpcRequest.setParameters(args);
rpcRequest.setVersion(version);
String serviceName=rpcRequest.getClassName();
if(!StringUtils.isEmpty(version)){
serviceName=serviceName+"-"+version;
}
String serviceAddress=serviceDiscovery.discovery(serviceName);
//远程通信
RpcNetTransport netTransport=new RpcNetTransport(serviceAddress);
Object result=netTransport.send(rpcRequest);
return result;
}
public Object send(RpcRequest request){
NioEventLoopGroup eventLoogGroup=new NioEventLoopGroup();
Bootstrap bootstrap=new Bootstrap();
bootstrap.group(eventLoogGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().
addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))).
addLast(new ObjectEncoder()).
addLast(RpcNetTransport.this);
}
}).option(ChannelOption.TCP_NODELAY,true);
try {
String urls[]=serviceAddress.split(":");
ChannelFuture future=bootstrap.connect(urls[0],Integer.parseInt(urls[1])).sync();
future.channel().writeAndFlush(request).sync();
if(request!=null){
future.channel().closeFuture().sync();
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
eventLoogGroup.shutdownGracefully();
}
return result;
}
在ServiceDiscoveryWithZk中初始化zookeeper的连接。在discovery中完成服务地址的查找和根据已知的服务地址进行负载均衡,以及监听zookeeper上注册的服务节点,若感知到节点发生变化,则刷新缓存中的服务地址。
@Override
public String discovery(String serviceName) {
//完成了服务地址的查找(服务地址被删除)
String path="/"+serviceName; //registry/com.server.HelloService
if(serviceRepos.isEmpty()) {
try {
serviceRepos = curatorFramework.getChildren().forPath(path);
registryWatch(path);
} catch (Exception e) {
e.printStackTrace();
}
}
//针对已有的地址做负载均衡
LoadBalanceStrategy loadBalanceStrategy=new RandomLoadBalance();
return loadBalanceStrategy.selectHost(serviceRepos);
}
private void registryWatch(final String path) throws Exception {
PathChildrenCache nodeCache=new PathChildrenCache(curatorFramework,path,true);
PathChildrenCacheListener nodeCacheListener= (curatorFramework1, pathChildrenCacheEvent) -> {
System.out.println("客户端收到节点变更的事件");
serviceRepos=curatorFramework1.getChildren().forPath(path);// 再次更新本地的缓存地址
};
nodeCache.getListenable().addListener(nodeCacheListener);
nodeCache.start();
}
文章浏览阅读517次。工作学习中经常遇到中断相关的issue,我一般会从 中断源 (ic)-- 驱动层 -- SDK 逐层分析。往往解决的比较快。举一个例子,下面给出分析过程。问题描述: 开机过程中指纹driver有大量异常中断触发log 问题分析: 首先,我们要确定的是,这些异常中断是怎么来的。有以下三种case。 1.软件误报2.cpu主动发出的信号3.指纹sensor发出的信号 针对软件误报,只需要 cat ..._sensor采集中断
文章浏览阅读82次。很多人知道斧子很牛,但就是用不好,所以好多人练了一段时间就放弃了,其实斧子练习有一些问题是需要注意的,下面就结合一下我个人练习斧子的经验,和大家分享一下,希望能对大家的练习提供一些帮助。1.拿起斧子要有信心和决心因为斧子有无视头盔的特点,所以面对其他武器,不应该有任何畏惧的心理(当然不能傻傻的向前冲然后白白送死),就像阿卡选手一样,要有一击毙命的信心和决心,因为在刀战战场上,心态很重要。有一个好的...
文章浏览阅读176次。开始前需要先准备好fdfs的环境https://blog.csdn.net/weixin_44834666/article/details/105686059一、安装py3Fdfs包pip install py3Fdfs二、编辑py文件from fdfs_client.client import Fdfs_client, get_tracker_conf# 创建客户端 client...._fastdfs python 上传
文章浏览阅读1.8k次,点赞2次,收藏5次。1.创建表空间create tablespace [表空间名:例如{newszgcp}]datafile [表空间所在地址:例如{'D:/StudySoft/OrServer/admin/orcl/newszgcp'}]size [表空间大小:100M ]autoextend on next [扩展:50M ] maxsize unlimited; 2.创建用户create..._plsql删除创建的用户
文章浏览阅读169次。GUI编程_p.add(new textfield(10))含义
文章浏览阅读191次。https://codeforces.com/contest/1119/problem/B不改变原数组的值,复制一份,要求1-k连续,则1-mid区间内判断子数组从大到小排序后,每两个相差不大,比较均匀 i+=2倒着的目的是前大后小 能装前一定能装后 尽量装更多的 累积高度和跳着累积高度和,如果<=h 可增加更多的瓶子,否则高度太多瓶子太多,减小瓶子数量const int maxn..._codeforces - 1119b
文章浏览阅读572次。matlab三相桥式电路仿真.doc 五邑大学 电 力电子技术 课程 设计报告 题 目: 三相桥式整流电路的 MATLAB 仿真 院 系 信息工程学院 专 业 自动化 班 级 140705 学 号 3114001891 学生姓名 杨煜基 指导教师 张建民 电力电子技术课程设计报告 2 三相桥式整流电路的MATLAB 仿真 一、 题目的要求和意义 整流电路是电力电子电路中出现最早的一种,它的作用是将..._三相全桥电路怎么在matlad画出
文章浏览阅读2.4k次。vscode怎么选择浏览器?vscode如何右键选择浏览器运行html文件?我们利用Vscode软件编写html的时候,一般都想右键选择html文件,然后直接选择浏览器运行,但是默认是没有的。相关文章教程推荐:vscode教程下面小编给大家分享一下如何设置。首先我们新建一个html文件,你可以用记事本编写一个,如下图所示接着将html文件导入到VsCode软件中,如下图所示我们直接在html文件中..._vscode运行php代码用特定浏览器
文章浏览阅读591次。[PHP] 纯文本查看 复制代码/*** excel文件导出** [url=home.php?mod=space&uid=952169]@Param[/url] array $data 需要生成excel文件的数组* $data = [* [NULL, 2010, 2011, 2012],//列名* ..._phpexcel 导出复杂表格
文章浏览阅读110次。1、详细描述一次加密通讯的过程,结合图示最佳。2、描述创建私有CA的过程,以及为客户端发来的证书请求进行办法证书。以下操作使用的2台服务器完成:服务器主机名IPCA服务器ca192.168.2.30httpd服务器httpd192.168.2.80在CA服务器上操作:创建所需要的文件[root@ca~]#cd/etc/pki/CA/#..._主进程负责生成n个子进程
文章浏览阅读828次。AFHTTPRequestOperationManager *manager = [AFHTTPRequestOperationManagermanager]; manager.requestSerializer = [AFHTTPRequestSerializerserializer]; manager.responseSerializer = [AFHTTP_afnetworking multipartfile
文章浏览阅读562次。自考“C语言程序设计”模拟试题八一、选择题1.一个C语言程序是由( )构成。A.语句 B.行号 C.数据 D.函数2.下面标识符中正确的是( )。A.a#bc B.123ABC C.sime D.Y·M·D3.在C语言中,存储一个整型、字符型、双精度实型变量所需的字节数..._6*4/7 +7%3