1. 源码分析---SOFARPC可扩展的机制SPI

1. 源码分析---SOFARPC可扩展的机制SPI

这几天离职在家,正好没事可以疯狂的输出一下,本来想写DUBBO的源码解析的,但是发现写DUBBO源码的太多了,所以找一个写的不那么多的框架,所以就选中SOFARPC这个框架了。

SOFARPC是蚂蚁金服开源的一个RPC框架,相比DUBBO它没有这么多历史的包袱,代码更加简洁,设计思路更加清晰,更加容易去理解其中的代码。

那么为什么要去重写原生的SPI呢?官方给出了如下解释:

  1. 按需加载
  2. 可以有别名
  3. 可以有优先级进行排序和覆盖
  4. 可以控制是否单例
  5. 可以在某些场景下使用编码
  6. 可以指定扩展配置位置
  7. 可以排斥其他扩展点

整个流程如下:

我们以ConsumerBootstrap为例:

先要有一个抽象类:

@Extensible(singleton = false)public abstract class ConsumerBootstrap<T> {    ....}

指定扩展实现类:

@Extension("sofa")public class DefaultConsumerBootstrap<T> extends ConsumerBootstrap<T> {    ...}

扩展描述文件META-INF/services/sofa-rpc/com.alipay.sofa.rpc.bootstrap.ConsumerBootstrap

sofa=com.alipay.sofa.rpc.bootstrap.DefaultConsumerBootstrap

当这些准备完成后,直接调用即可。

ConsumerBootstrap sofa =  ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class).getExtension("sofa");

接下来我们看看ExtensionLoaderFactory的源码

    /**     * All extension loader {Class : ExtensionLoader}     * 这个map里面装的是所有ExtensionLoader     */    private static final ConcurrentMap<Class, ExtensionLoader> LOADER_MAP = new ConcurrentHashMap<Class, ExtensionLoader>();    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> clazz, ExtensionLoaderListener<T> listener) {        ExtensionLoader<T> loader = LOADER_MAP.get(clazz);        if (loader == null) {            //get不到则加上锁            synchronized (ExtensionLoaderFactory.class) {                //防止其他线程操作再get一次                loader = LOADER_MAP.get(clazz);                if (loader == null) {                    loader = new ExtensionLoader<T>(clazz, listener);                    LOADER_MAP.put(clazz, loader);                }            }        }        return loader;    }

然后我们看一下ExtensionLoader这个类的构造器

    protected ExtensionLoader(Class<T> interfaceClass, boolean autoLoad, ExtensionLoaderListener<T> listener) {        //如果正在执行关闭,则将属性置空后直接返回        if (RpcRunningState.isShuttingDown()) {            this.interfaceClass = null;            this.interfaceName = null;            this.listener = null;            this.factory = null;            this.extensible = null;            this.all = null;            return;        }        // 接口为空,既不是接口,也不是抽象类        if (interfaceClass == null ||                !(interfaceClass.isInterface() || Modifier.isAbstract(interfaceClass.getModifiers()))) {            throw new IllegalArgumentException("Extensible class must be interface or abstract class!");        }        //当前加载的接口类名        this.interfaceClass = interfaceClass;        //接口名字        this.interfaceName = ClassTypeUtils.getTypeStr(interfaceClass);        this.listener = listener;        //接口上必须要有Extensible注解        Extensible extensible = interfaceClass.getAnnotation(Extensible.class);        if (extensible == null) {            throw new IllegalArgumentException(                    "Error when load extensible interface " + interfaceName + ", must add annotation @Extensible.");        } else {            this.extensible = extensible;        }        // 如果是单例,那么factory不为空        this.factory = extensible.singleton() ? new ConcurrentHashMap<String, T>() : null;        //这个属性里面是这个接口的所有实现类        this.all = new ConcurrentHashMap<String, ExtensionClass<T>>();        if (autoLoad) {            //获取到扩展点加载的路径            List<String> paths = RpcConfigs.getListValue(RpcOptions.EXTENSION_LOAD_PATH);            for (String path : paths) {                //根据路径加载文件                loadFromFile(path);            }        }    }

拿到所有的扩展点加载的路径后进入到loadFromFile中进行文件的加载

    protected synchronized void loadFromFile(String path) {        if (LOGGER.isDebugEnabled()) {            LOGGER.debug("Loading extension of extensible {} from path: {}", interfaceName, path);        }        // 默认如果不指定文件名字,就是接口名        String file = StringUtils.isBlank(extensible.file()) ? interfaceName : extensible.file().trim();        String fullFileName = path + file;        try {            ClassLoader classLoader = ClassLoaderUtils.getClassLoader(getClass());            loadFromClassLoader(classLoader, fullFileName);        } catch (Throwable t) {            if (LOGGER.isErrorEnabled()) {                LOGGER.error("Failed to load extension of extensible " + interfaceName + " from path:" + fullFileName,                    t);            }        }    }            protected void loadFromClassLoader(ClassLoader classLoader, String fullFileName) throws Throwable {        Enumeration<URL> urls = classLoader != null ? classLoader.getResources(fullFileName)            : ClassLoader.getSystemResources(fullFileName);        // 可能存在多个文件。        if (urls != null) {            while (urls.hasMoreElements()) {                // 读取一个文件                URL url = urls.nextElement();                if (LOGGER.isDebugEnabled()) {                    LOGGER.debug("Loading extension of extensible {} from classloader: {} and file: {}",                        interfaceName, classLoader, url);                }                BufferedReader reader = null;                try {                    reader = new BufferedReader(new InputStreamReader(url.openStream(), "UTF-8"));                    String line;                    while ((line = reader.readLine()) != null) {                        readLine(url, line);                    }                } catch (Throwable t) {                    if (LOGGER.isWarnEnabled()) {                        LOGGER.warn("Failed to load extension of extensible " + interfaceName                            + " from classloader: " + classLoader + " and file:" + url, t);                    }                } finally {                    if (reader != null) {                        reader.close();                    }                }            }        }    }

接下来进入到readLine,这个方法主要是读取prop文件里面的每一行记录,并加载该实现类的类文件校验完后将文件添加到all属性中

    protected void readLine(URL url, String line) {        //读取文件里面的一行记录,并将这行记录用=号分割        String[] aliasAndClassName = parseAliasAndClassName(line);        if (aliasAndClassName == null || aliasAndClassName.length != 2) {            return;        }        //别名        String alias = aliasAndClassName[0];        //包名        String className = aliasAndClassName[1];        // 读取配置的实现类        Class tmp;        try {            tmp = ClassUtils.forName(className, false);        } catch (Throwable e) {            if (LOGGER.isWarnEnabled()) {                LOGGER.warn("Extension {} of extensible {} is disabled, cause by: {}",                    className, interfaceName, ExceptionUtils.toShortString(e, 2));            }            if (LOGGER.isDebugEnabled()) {                LOGGER.debug("Extension " + className + " of extensible " + interfaceName + " is disabled.", e);            }            return;        }        if (!interfaceClass.isAssignableFrom(tmp)) {            throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName +                " from file:" + url + ", " + className + " is not subtype of interface.");        }        Class<? extends T> implClass = (Class<? extends T>) tmp;        // 检查是否有可扩展标识        Extension extension = implClass.getAnnotation(Extension.class);        if (extension == null) {            throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName +                " from file:" + url + ", " + className + " must add annotation @Extension.");        } else {            String aliasInCode = extension.value();            if (StringUtils.isBlank(aliasInCode)) {                // 扩展实现类未配置@Extension 标签                throw new IllegalArgumentException("Error when load extension of extensible " + interfaceClass +                    " from file:" + url + ", " + className + "'s alias of @Extension is blank");            }            if (alias == null) {                // spi文件里没配置,用代码里的                alias = aliasInCode;            } else {                // spi文件里配置的和代码里的不一致                if (!aliasInCode.equals(alias)) {                    throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName +                        " from file:" + url + ", aliases of " + className + " are " +                        "not equal between " + aliasInCode + "(code) and " + alias + "(file).");                }            }            // 接口需要编号,实现类没设置            if (extensible.coded() && extension.code() < 0) {                throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName +                    " from file:" + url + ", code of @Extension must >=0 at " + className + ".");            }        }        // 不可以是default和*        if (StringUtils.DEFAULT.equals(alias) || StringUtils.ALL.equals(alias)) {            throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName +                " from file:" + url + ", alias of @Extension must not \"default\" and \"*\" at " + className + ".");        }        // 检查是否有存在同名的        ExtensionClass old = all.get(alias);        ExtensionClass<T> extensionClass = null;        if (old != null) {            // 如果当前扩展可以覆盖其它同名扩展            if (extension.override()) {                // 如果优先级还没有旧的高,则忽略                if (extension.order() < old.getOrder()) {                    if (LOGGER.isDebugEnabled()) {                        LOGGER.debug("Extension of extensible {} with alias {} override from {} to {} failure, " +                            "cause by: order of old extension is higher",                            interfaceName, alias, old.getClazz(), implClass);                    }                } else {                    if (LOGGER.isInfoEnabled()) {                        LOGGER.info("Extension of extensible {} with alias {}: {} has been override to {}",                            interfaceName, alias, old.getClazz(), implClass);                    }                    // 如果当前扩展可以覆盖其它同名扩展                    extensionClass = buildClass(extension, implClass, alias);                }            }            // 如果旧扩展是可覆盖的            else {                if (old.isOverride() && old.getOrder() >= extension.order()) {                    // 如果已加载覆盖扩展,再加载到原始扩展                    if (LOGGER.isInfoEnabled()) {                        LOGGER.info("Extension of extensible {} with alias {}: {} has been loaded, ignore origin {}",                            interfaceName, alias, old.getClazz(), implClass);                    }                } else {                    // 如果不能被覆盖,抛出已存在异常                    throw new IllegalStateException(                        "Error when load extension of extensible " + interfaceClass + " from file:" + url +                            ", Duplicate class with same alias: " + alias + ", " + old.getClazz() + " and " + implClass);                }            }        } else {            extensionClass = buildClass(extension, implClass, alias);        }        if (extensionClass != null) {            // 检查是否有互斥的扩展点            for (Map.Entry<String, ExtensionClass<T>> entry : all.entrySet()) {                ExtensionClass existed = entry.getValue();                if (extensionClass.getOrder() >= existed.getOrder()) {                    // 新的优先级 >= 老的优先级,检查新的扩展是否排除老的扩展                    String[] rejection = extensionClass.getRejection();                    if (CommonUtils.isNotEmpty(rejection)) {                        for (String rej : rejection) {                            existed = all.get(rej);                            if (existed == null || extensionClass.getOrder() < existed.getOrder()) {                                continue;                            }                            ExtensionClass removed = all.remove(rej);                            if (removed != null) {                                if (LOGGER.isInfoEnabled()) {                                    LOGGER.info(                                        "Extension of extensible {} with alias {}: {} has been reject by new {}",                                        interfaceName, removed.getAlias(), removed.getClazz(), implClass);                                }                            }                        }                    }                } else {                    String[] rejection = existed.getRejection();                    if (CommonUtils.isNotEmpty(rejection)) {                        for (String rej : rejection) {                            if (rej.equals(extensionClass.getAlias())) {                                // 被其它扩展排掉                                if (LOGGER.isInfoEnabled()) {                                    LOGGER.info(                                        "Extension of extensible {} with alias {}: {} has been reject by old {}",                                        interfaceName, alias, implClass, existed.getClazz());                                    return;                                }                            }                        }                    }                }            }            loadSuccess(alias, extensionClass);        }    }

加载完文件后我们再回到

ConsumerBootstrap sofa =  ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class).getExtension("sofa");

进入到getExtension方法中

    public ExtensionClass<T> getExtensionClass(String alias) {        return all == null ? null : all.get(alias);    }    public T getExtension(String alias) {        //从all属性中拿到加载的class        ExtensionClass<T> extensionClass = getExtensionClass(alias);        if (extensionClass == null) {            throw new SofaRpcRuntimeException("Not found extension of " + interfaceName + " named: \"" + alias + "\"!");        } else {            //在加载class的时候,校验了是否是单例,如果是单例,那么factory不为null            if (extensible.singleton() && factory != null) {                T t = factory.get(alias);                if (t == null) {                    synchronized (this) {                        t = factory.get(alias);                        if (t == null) {                            //实例化                            t = extensionClass.getExtInstance();                            //放入到factory,单例的class下次直接拿就好了,不需要重新创建                            factory.put(alias, t);                        }                    }                }                return t;            } else {                //实例化                return extensionClass.getExtInstance();            }        }    }

我们进入到ExtensionClass看看getExtInstance方法

    /**     * 服务端实例对象(只在是单例的时候保留)     * 用volatile修饰,保证了可见性     */    private volatile transient T       instance;        /**     * 得到服务端实例对象,如果是单例则返回单例对象,如果不是则返回新创建的实例对象     *     * @param argTypes 构造函数参数类型     * @param args     构造函数参数值     * @return 扩展点对象实例 ext instance     */    public T getExtInstance(Class[] argTypes, Object[] args) {        if (clazz != null) {            try {                if (singleton) { // 如果是单例                    if (instance == null) {                        synchronized (this) {                            if (instance == null) {                                //通过反射创建实例                                instance = ClassUtils.newInstanceWithArgs(clazz, argTypes, args);                            }                        }                    }                    return instance; // 保留单例                } else {                    //通过反射创建实例                    return ClassUtils.newInstanceWithArgs(clazz, argTypes, args);                }            } catch (Exception e) {                throw new SofaRpcRuntimeException("create " + clazz.getCanonicalName() + " instance error", e);            }        }        throw new SofaRpcRuntimeException("Class of ExtensionClass is null");    }

看完了SOFARPC的扩展类实现后感觉代码写的非常的整洁,逻辑非常的清晰,里面有很多可以学习的地方,比如线程安全用到了双重检查锁和volatile保证可见性。

原文链接:www.cnblogs.com/luozhiyun/p…

免责声明:本网信息来自于互联网,目的在于传递更多信息,并不代表本网赞同其观点。其原创性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容、文字的真实性、完整性、及时性本站不作任何保证或承诺,并请自行核实相关内容。本站不承担此类作品侵权行为的直接责任及连带责任。如若本网有任何内容侵犯您的权益,请及时联系我们,本站将会在24小时内处理完毕。
相关文章
返回顶部