本文共 9536 字,大约阅读时间需要 31 分钟。
淘宝的Tedis组件究竟是个啥呢?可能有一些朋友没有听过这个名字,有一些朋友会经常使用,那么今天我就来和大家深入分析一下,它的使用和原理。
一、Tedis简介
Tedis是另一个的java客户端,Tedis的目标是打造一个可在生产环境直接使用的高可用Redis解决方案。
特性如下:
二、Tedis能做啥
1、Tedis的原理
Tedis是对开源的Redis客户端组件Jedis进行的封装,在Jedis的基础上封装了一套更易于使用的byte api和object api接口,在部署上使用的是master-master结构,实现了多写与随机读的机制。既:每个写请求都会发到所有服务器上面,每个读请求随机读取一个服务器,当在某个服务器读取失败后,将该台服务器加到重试队列中,直到服务器恢复正常客户端请求才会重新访问到该服务器。2、典型的应用场景
二、Tedis的简单使用
可以先在Maven中依赖下面的jar包,当然还有更高版本的。
com.taobao.common tedis-group 1.1.0
最简单的使用方式就是这样,只需要声明一个TedisGroup类并且初始化,然后就可以用ValueCommands进行set,get等操作了。唯一有点别扭的可能就是Tedis自己封装了一些命令类,需要我们自己去定义然后使用。
Group tedisGroup = new TedisGroup(appName, version);tedisGroup.init();ValueCommands valueCommands = new DefaultValueCommands(tedisGroup.getTedis());// 写入一条数据valueCommands.set(1, "test", "test value object");// 读取一条数据valueCommands.get(1, "test");
三、Tedis的源码分析
1、Tedis的包结构
2、tedis-group源码解析
我们看源码往往是通过使用的类开始的,也就是上面提到的TedisGroup类开始,注:TedisGroup类是一个非常核心的类,有一个很重要的方法就是init和一个很重要的内部类RedisGroupInvocationHandler,在TedisGroup中依赖了二个重要的类,ConfigManager和RedisCommands。
ConfigManager解析
RedisCommands解析
@Override public Listsort(byte[] key, SortParams params) { try { if (params == null) { client.sort(key); } else { client.sort(key, params); } return client.getBinaryMultiBulkReply(); } catch (Exception ex) { throw new TedisException(ex); } } @Override public Boolean ping() { client.ping(); return PONG.equals(client.getStatusCodeReply()); } @Override public Long del(byte[]... keys) { client.del(keys); return client.getIntegerReply(); } @Override public Boolean exists(byte[] key) { client.exists(key); return client.getIntegerReply() == 1; } @Override public Set keys(byte[] pattern) { client.keys(pattern); return new HashSet (client.getBinaryMultiBulkReply()); } @Override public Boolean persist(byte[] key) { client.persist(key); return client.getIntegerReply() == 1; } @Override public Boolean move(byte[] key, int dbIndex) { client.move(key, dbIndex); return client.getIntegerReply() == 1; } @Override public byte[] randomKey() { client.randomKey(); return client.getBinaryBulkReply(); } @Override public byte[] get(byte[] key) { client.get(key); return client.getBinaryBulkReply(); } @Override public Boolean set(byte[] key, byte[] value) { client.set(key, value); return OK.equals(client.getStatusCodeReply()); }
这个类使用的是动态代理的模式,实现的是InvocationHandler接口,核心方法依然是
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
另外还有一个关键的注解就是Process,在RedisCommands的实现类Tedis中有使用,比如: @Process(Policy.WRITE) public Long zAdd(@ShardKey byte[] key, Tuple... value) { client.zadd(key, value); return client.getIntegerReply(); }
标识了这个注解的方法在代理中会进行判断是写还是读操作,如果是写操作则会读取所有Redis配置以循环的方式逐一插入数据,如果其中一个Redis报错则记录日志抛出异常,如果是读操作则采用RandomRouter的方式随机从Redis列表中选取一个进行读取操作。具体部分源码实现如下:
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { long time = System.currentTimeMillis(); String name = method.getName(); Router rr = cm.getRouter(); Process annotation = method.getAnnotation(Process.class); Throwable exception = null; if (annotation.value() == Policy.READ) { while (rr.getRouteData().props.size() > 0) { Single s = rr.route(); try { Object result = method.invoke(s.getTedis(), args); statLog(name, true, time); return result; } catch (Throwable t) { statLog(name, false, time); exception = t; logger.warn("read exception:" + s.getProperties(), t); boolean connectionError = false; try { if (t instanceof InvocationTargetException) {// 解包异常 InvocationTargetException ite = (InvocationTargetException) t; UndeclaredThrowableException ute = (UndeclaredThrowableException) ite.getTargetException(); if (ute.getUndeclaredThrowable() instanceof TimeoutException) { connectionError = true; rr.onError(s); } else { ExecutionException ee = (ExecutionException) ute.getUndeclaredThrowable(); InvocationTargetException ite_1 = (InvocationTargetException) ee.getCause(); TedisException te = (TedisException) ite_1.getTargetException(); if (te.getCause() instanceof TedisConnectionException) { connectionError = true; rr.onError(s); } } } } catch (Throwable tt) { logger.warn("解包异常:", tt); // 可能会抛出转换异常,符合预期,如果碰到转换异常,直接在connection error // 过程中从新抛出 } if (!connectionError) { throw t; } } } throw new Exception("read RouteData is empty," + rr, exception); } else if (annotation.value() == Policy.WRITE) { Single[] ss = rr.getRouteData().group; if (ss == null || ss.length == 0) { throw new Exception("write RouteData is empty," + rr, exception); } Object result = null; int e = 0; for (Single s : ss) { try { result = method.invoke(s.getTedis(), args); } catch (Throwable t) { e++; statLog(name, false, time); logger.warn("write exception:" + s.getProperties(), t); exception = t; try { // 解包异常 InvocationTargetException ite = (InvocationTargetException) t; UndeclaredThrowableException ute = (UndeclaredThrowableException) ite.getTargetException(); if (ute.getUndeclaredThrowable() instanceof TimeoutException) { rr.onError(s); } else { ExecutionException ee = (ExecutionException) ute.getUndeclaredThrowable(); InvocationTargetException ite_1 = (InvocationTargetException) ee.getCause(); TedisException te = (TedisException) ite_1.getTargetException(); if (te.getCause() instanceof TedisConnectionException) { rr.onError(s); } } } catch (Throwable tt) { logger.warn("解包异常:", tt); } } } if (e >= 2) {// 全部都抛异常了,告知调用端 throw exception; } statLog(name, true, time); return result; } else if ("toString".equals(name)) { return ""; } else if ("hashCode".equals(name)) { Single s = rr.route(); if (s != null) { return s.hashCode(); } else { return 0; } } else if ("equals".equals(name)) { Single s = rr.route(); if (args.length == 1) { return s.equals(args[0]); } } statLog(name, false, time); throw new Exception("method don't match:" + name); } }
对于整体架构和其他源码的分析,我们将在第二篇进行深入分析,请大家关注。
转载地址:http://gmeoa.baihongyu.com/