• RxJava + Retrofit源码解析


    RxJava + Retrofit怎么请求网络,具体的用法这里就不讲了,本文只讲一些重点源码。

    版本如下:

    复制代码
    okhttp                         : "com.squareup.okhttp3:okhttp:3.10.0",
    okhttp3_integration            : "com.github.bumptech.glide:okhttp3-integration:1.4.0@aar",
    retrofit                       : "com.squareup.retrofit2:retrofit:2.4.0",
    converter_gson                 : "com.squareup.retrofit2:converter-gson:2.3.0",
    converter_scalars              : "com.squareup.retrofit2:converter-scalars:2.3.0",
    converter_protobuf             : "com.squareup.retrofit2:converter-protobuf:2.3.0",
    adapter_rxjava2                : "com.squareup.retrofit2:adapter-rxjava2:2.2.0",
    logging_interceptor            : "com.squareup.okhttp3:logging-interceptor:3.10.0",
    rxjava                         : "io.reactivex.rxjava2:rxjava:2.1.12",
    rxandroid                      : "io.reactivex.rxjava2:rxandroid:2.0.2",
    复制代码

    一、首先关于Retrofit的初始化:

    复制代码
    private void initRetrofit() {
        ExtensionRegistry extensionRegistry = ExtensionRegistry.newInstance();
        retrofit = new Retrofit.Builder()
                .baseUrl(baseUrl) //设置地址
                .client(client.build()) //设置自定义的OkHttpClient
                .addConverterFactory(ProtoConverterFactory.createWithRegistry(extensionRegistry))
                .addConverterFactory(StringConverterFactory.create())
                .addConverterFactory(GsonConverterFactory.create(buildGson()))
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .build();
        service = retrofit.create(ApiService.class);
    }
    复制代码
    .addConverterFactory(ProtoConverterFactory.createWithRegistry(extensionRegistry))
    .addConverterFactory(StringConverterFactory.create())
    .addConverterFactory(GsonConverterFactory.create(buildGson()))
    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())

    添加了数据转换器与请求适配器。

    Retrofit的初始化采用了Builder模式。

    Retrofit.Builder()这一步,获取了一个平台,肯定就是Android()了,后面有地方会用到。

    复制代码
    Builder(Platform platform) {
      this.platform = platform;
    }
    public Builder() {
      this(Platform.get());
    }
    class Platform {
      private static final Platform PLATFORM = findPlatform();
      static Platform get() {
        return PLATFORM;
      }
      private static Platform findPlatform() {
        try {
          Class.forName("android.os.Build");
          if (Build.VERSION.SDK_INT != 0) {
            return new Android();
          }
        } catch (ClassNotFoundException ignored) {
        }
        try {
          Class.forName("java.util.Optional");
          return new Java8();
        } catch (ClassNotFoundException ignored) {
        }
        return new Platform();
      }
    }
    复制代码

    在看最后的build();方法:

    复制代码
    public Retrofit build() {
      if (baseUrl == null) {
        throw new IllegalStateException("Base URL required.");
      }
      okhttp3.Call.Factory callFactory = this.callFactory;
      if (callFactory == null) {
        callFactory = new OkHttpClient();
      }
      Executor callbackExecutor = this.callbackExecutor;
      if (callbackExecutor == null) {
        callbackExecutor = platform.defaultCallbackExecutor();
      }
      // Make a defensive copy of the adapters and add the default Call adapter.
      List<CallAdapter.Factory> callAdapterFactories = new ArrayList<>(this.callAdapterFactories);
      callAdapterFactories.add(platform.defaultCallAdapterFactory(callbackExecutor));
      // Make a defensive copy of the converters.
      List<Converter.Factory> converterFactories =
          new ArrayList<>(1 + this.converterFactories.size());
      // Add the built-in converter factory first. This prevents overriding its behavior but also
      // ensures correct behavior when using converters that consume all types.
      converterFactories.add(new BuiltInConverters());
      converterFactories.addAll(this.converterFactories);
      return new Retrofit(callFactory, baseUrl, unmodifiableList(converterFactories),
          unmodifiableList(callAdapterFactories), callbackExecutor, validateEagerly);
    }
    复制代码

    1、如果没有传入我们自定义的OkHttpClient,那么便会使用默认的。

    2、如果没有设置自定义的回调执行器,那么便会是用默认的platform.defaultCallbackExecutor();点进入可以发现回调是默认在主线程中的:

    复制代码
    static class Android extends Platform {
      @Override public Executor defaultCallbackExecutor() {
        return new MainThreadExecutor();
      }
      @Override CallAdapter.Factory defaultCallAdapterFactory(@Nullable Executor callbackExecutor) {
        if (callbackExecutor == null) throw new AssertionError();
        return new ExecutorCallAdapterFactory(callbackExecutor);
      }
      static class MainThreadExecutor implements Executor {
        private final Handler handler = new Handler(Looper.getMainLooper());
        @Override public void execute(Runnable r) {
          handler.post(r);
        }
      }
    }
    复制代码

    3、把我们设置的请求适配器添加进入,然后再添加一个默认的请求适配器。

    4、添加进入一个默认的数据转换器,然后再被我们设置的数据转换器添加进去。

     

    二、初始化好Retrofit后,再来看这一句:

    service = retrofit.create(ApiService.class);

    ApiService是一个接口,里面方法如下:

    @GET
    Observable<ResponseBody> doGet(@Url String url, @HeaderMap Map<String, String> headers, @QueryMap Map<String, String> map);

    这个create方法可以说是核心,它运用的是动态代理。

    复制代码
    @SuppressWarnings("unchecked") // Single-interface proxy creation guarded by parameter safety.
    public <T> T create(final Class<T> service) {
      Utils.validateServiceInterface(service);
      if (validateEagerly) {
        eagerlyValidateMethods(service);
      }
      return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
          new InvocationHandler() {
            private final Platform platform = Platform.get();
            @Override public Object invoke(Object proxy, Method method, @Nullable Object[] args)
                throws Throwable {
              // If the method is a method from Object then defer to normal invocation.
              if (method.getDeclaringClass() == Object.class) {
                return method.invoke(this, args);
              }
              if (platform.isDefaultMethod(method)) {
                return platform.invokeDefaultMethod(method, service, proxy, args);
              }
              ServiceMethod<Object, Object> serviceMethod =
                  (ServiceMethod<Object, Object>) loadServiceMethod(method);
              OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);
              return serviceMethod.adapt(okHttpCall);
            }
          });
    }
    复制代码

    1、首先检测这是否是一个接口,只有接口才能对它进行动态代理。

    2、是否需要对接口里面的方法进行初始化预加载,是的话便进行,这个与下面的有点重复,直接讲下面的。

    3、return后面的语句便是动态代理的地方,它会代理接口的所有方法,也就是说,当我们调用ApiService的方法的时候,会被拦截,然后走到inoke这个方法做我们自己的操作。

    关于动态代理,后面会单独讲。

    4、接下来边看invoke方法:

    (1)、首先判断该方法是否为Object这个类的方法,如果是,不拦截它,让他走原来的方法。

    (2)、platform为Android,platform.isDefaultMethod(method)返回false,不用管它。

    (3)、ServiceMethod<Object, Object> serviceMethod =(ServiceMethod<Object, Object>) loadServiceMethod(method);拿到接口的方法,对接口的方法进行解析,比如获取注解,参数之类,构造自己的serviceMethod 

    (4)、初始化OkHttpCall

    (5)、调用serviceMethod.adapt(okHttpCall)进行请求(因为采用的是RxJava,所以这里并不会立即请求,只有被订阅的时候才会,等会会讲)

     

    三、loadServiceMethod(method)方法:

    构造自己的serviceMethod 也采用了Builder模式。

    进入这个方法后,重点的一句:

    result = new ServiceMethod.Builder<>(this, method).build();

    先看:

    复制代码
    Builder(Retrofit retrofit, Method method) {
      this.retrofit = retrofit;
      this.method = method;
      this.methodAnnotations = method.getAnnotations();
      this.parameterTypes = method.getGenericParameterTypes();
      this.parameterAnnotationsArray = method.getParameterAnnotations();
    }
    复制代码

    注:我们这里以前面定义的方法来讲解:

    @GET
    Observable<ResponseBody> doGet(@Url String url, @HeaderMap Map<String, String> headers, @QueryMap Map<String, String> map);

    1、持有retrofit与原始的method对象。

    2、获取方法上的注解,获取到的为:

     

    3、获取参数类型,获取到的为:

     

     

     4、获取参数上面的的注解,获取到的为:

     

     

    再看build()方法:

    复制代码
    public ServiceMethod build() {
      callAdapter = createCallAdapter();
      responseType = callAdapter.responseType();
      if (responseType == Response.class || responseType == okhttp3.Response.class) {
        throw methodError("'"
            + Utils.getRawType(responseType).getName()
            + "' is not a valid response body type. Did you mean ResponseBody?");
      }
      responseConverter = createResponseConverter();
      for (Annotation annotation : methodAnnotations) {
        parseMethodAnnotation(annotation);
      }
      if (httpMethod == null) {
        throw methodError("HTTP method annotation is required (e.g., @GET, @POST, etc.).");
      }
      if (!hasBody) {
        if (isMultipart) {
          throw methodError(
              "Multipart can only be specified on HTTP methods with request body (e.g., @POST).");
        }
        if (isFormEncoded) {
          throw methodError("FormUrlEncoded can only be specified on HTTP methods with "
              + "request body (e.g., @POST).");
        }
      }
      int parameterCount = parameterAnnotationsArray.length;
      parameterHandlers = new ParameterHandler<?>[parameterCount];
      for (int p = 0; p < parameterCount; p++) {
        Type parameterType = parameterTypes[p];
        if (Utils.hasUnresolvableType(parameterType)) {
          throw parameterError(p, "Parameter type must not include a type variable or wildcard: %s",
              parameterType);
        }
        Annotation[] parameterAnnotations = parameterAnnotationsArray[p];
        if (parameterAnnotations == null) {
          throw parameterError(p, "No Retrofit annotation found.");
        }
        parameterHandlers[p] = parseParameter(p, parameterType, parameterAnnotations);
      }
      if (relativeUrl == null && !gotUrl) {
        throw methodError("Missing either @%s URL or @Url parameter.", httpMethod);
      }
      if (!isFormEncoded && !isMultipart && !hasBody && gotBody) {
        throw methodError("Non-body HTTP method cannot contain @Body.");
      }
      if (isFormEncoded && !gotField) {
        throw methodError("Form-encoded method must contain at least one @Field.");
      }
      if (isMultipart && !gotPart) {
        throw methodError("Multipart method must contain at least one @Part.");
      }
      return new ServiceMethod<>(this);
    }
    复制代码

    1、首先获取请求适配器。

    2、创建请求结果的转换器。

    3、对方法上的注解进行解析。

    4、构造ParameterHandler数组。

    5、对一些异常的判断。

     

    四、我们接下来对每一步进行讲解。

    1、首先获取请求适配器:

    复制代码
    private CallAdapter<T, R> createCallAdapter() {
      Type returnType = method.getGenericReturnType();
      if (Utils.hasUnresolvableType(returnType)) {
        throw methodError(
            "Method return type must not include a type variable or wildcard: %s", returnType);
      }
      if (returnType == void.class) {
        throw methodError("Service methods cannot return void.");
      }
      Annotation[] annotations = method.getAnnotations();
      try {
        //noinspection unchecked
        return (CallAdapter<T, R>) retrofit.callAdapter(returnType, annotations);
      } catch (RuntimeException e) { // Wide exception range because factories are user code.
        throw methodError(e, "Unable to create call adapter for %s", returnType);
      }
    }
    复制代码

    (1)、获取方法的返回类型,返回类型不能是void

    (2)、获取方法上的注解。

    (3)、调用retrofit.callAdapter(returnType, annotations)方法获取请求的适配器。(我们之前设置的请求适配器都在retrofit对象中)

    里面关键的一步为:

    复制代码
    int start = callAdapterFactories.indexOf(skipPast) + 1;
    for (int i = start, count = callAdapterFactories.size(); i < count; i++) {
      CallAdapter<?, ?> adapter = callAdapterFactories.get(i).get(returnType, annotations, this);
      if (adapter != null) {
        return adapter;
      }
    }
    复制代码

    skipPast为null,所以start为0;
    遍历我们之前设置给它的请求适配器,根据返回类型与方法上的注解去找,找到了便返回。(我们这里获取到的callAdapter为RxJava2CallAdapter

     

    2、创建请求结果的转换器:

    responseConverter = createResponseConverter()

    这个与获取请求的适配器的过程是类似的,因此这里就略过了。

     

    3、解析方法上的注解:parseMethodAnnotation(annotation),我们用的是GET,所以下面会调用:

    parseHttpMethodAndPath("GET", ((GET) annotation).value(), false);

    我们这里value是空的,所以它只走了下面这些就返回了。

    复制代码
    if (this.httpMethod != null) {
            throw methodError("Only one HTTP method is allowed. Found: %s and %s.",
                this.httpMethod, httpMethod);
          }
          this.httpMethod = httpMethod;
          this.hasBody = hasBody;
    
          if (value.isEmpty()) {
            return;
          }
    复制代码

     

    4、构造ParameterHandler数组

    复制代码
    int parameterCount = parameterAnnotationsArray.length;
    parameterHandlers = new ParameterHandler<?>[parameterCount];
    for (int p = 0; p < parameterCount; p++) {
      Type parameterType = parameterTypes[p];
      if (Utils.hasUnresolvableType(parameterType)) {
        throw parameterError(p, "Parameter type must not include a type variable or wildcard: %s",
            parameterType);
      }
      Annotation[] parameterAnnotations = parameterAnnotationsArray[p];
      if (parameterAnnotations == null) {
        throw parameterError(p, "No Retrofit annotation found.");
      }
      parameterHandlers[p] = parseParameter(p, parameterType, parameterAnnotations);
    }
    复制代码

     

    主要是这一个方法:

    parameterHandlers[p] = parseParameter(p, parameterType, parameterAnnotations);

    p为序号,parameterType为方法的参数类型,parameterAnnotations为参数的注解。

    里面就不细讲了,这里最终得到的是:

     

     对于一些异常的判断就不多讲了,比如:

    不能有多个带@Url注解的参数。

    不能同时使用@Path与@Url注解。

    被@QueryMap标注的参数类型必须是Map

    @QueryMap注解的参数的key必须是String

    至此,我们的ServiceMethod便构造完了。

     

     

    五、我们回到代理的那个方法里面,还差两句没有解析:

    OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);
    return serviceMethod.adapt(okHttpCall);

    主要看serviceMethod.adapt(okHttpCall)

    T adapt(Call<R> call) {
      return callAdapter.adapt(call);
    }

    这里的callAdapter是RxJava2CallAdapter。

    于是我们来到它的adapter方法:

    复制代码
    @Override public Object adapt(Call<R> call) {
      Observable<Response<R>> responseObservable = isAsync
          ? new CallEnqueueObservable<>(call)
          : new CallExecuteObservable<>(call);
      Observable<?> observable;
      if (isResult) {
        observable = new ResultObservable<>(responseObservable);
      } else if (isBody) {
        observable = new BodyObservable<>(responseObservable);
      } else {
        observable = responseObservable;
      }
      if (scheduler != null) {
        observable = observable.subscribeOn(scheduler);
      }
      if (isFlowable) {
        return observable.toFlowable(BackpressureStrategy.LATEST);
      }
      if (isSingle) {
        return observable.singleOrError();
      }
      if (isMaybe) {
        return observable.singleElement();
      }
      if (isCompletable) {
        return observable.ignoreElements();
      }
      return observable;
    }
    复制代码

    首先我们看isAsync,这里为false,为什么呢?我们创建adapter的时候是这样的:

    RxJava2CallAdapterFactory.create()

    public static RxJava2CallAdapterFactory create() {
      return new RxJava2CallAdapterFactory(null, false);
    }

    第二个参数便是isAsync

    1、所以我们创建的responseObservable为CallExecuteObservable<>(call),(同步执行的类)

    2、我们创建一个Observable<?> observable,这里创建的是BodyObservable<>(responseObservable),将刚刚创建的responseObservable

    传进去。

    3、最终将该observable传出去。

    复制代码
    service = retrofit.create(ApiService.class);
    public interface ApiService {
        @GET
        Observable<ResponseBody> doGet(@Url String url, @HeaderMap Map<String, String> headers, @QueryMap Map<String, String> map);
    }
    service.doGet(url, header, params?.params)
    复制代码

    也就是说,当我们调用service.doGet的时候,会走到代理的invoke方法,然后返回一个Observable

    而该Observable只有在被订阅的时候才会执行,而且我们用的是同步,所以还需要在外面自己切换到子线程执行。

    当被订阅的时候,该BodyObservable会调用subscribeActual:

     

    复制代码
    BodyObservable(Observable<Response<T>> upstream) {
      this.upstream = upstream;
    }
    @Override protected void subscribeActual(Observer<? super T> observer) {
      upstream.subscribe(new BodyObserver<T>(observer));
    }
    复制代码

     

    而这个upstream便是刚刚传进去的responseObservable,调用subscribe方法,最终会执行到responseObservable的subscribeActual方法。

    复制代码
    @Override protected void subscribeActual(Observer<? super Response<T>> observer) {
      // Since Call is a one-shot type, clone it for each new observer.
      Call<T> call = originalCall.clone();
      observer.onSubscribe(new CallDisposable(call));
      boolean terminated = false;
      try {
        Response<T> response = call.execute();
        if (!call.isCanceled()) {
          observer.onNext(response);
        }
        if (!call.isCanceled()) {
          terminated = true;
          observer.onComplete();
        }
      } catch (Throwable t) {
        Exceptions.throwIfFatal(t);
        if (terminated) {
          RxJavaPlugins.onError(t);
        } else if (!call.isCanceled()) {
          try {
            observer.onError(t);
          } catch (Throwable inner) {
            Exceptions.throwIfFatal(inner);
            RxJavaPlugins.onError(new CompositeException(t, inner));
          }
        }
      }
    }
    复制代码

    我们主要看Response<T> response = call.execute();call便是我们传进来的自定义的OkHttpCall

    在call.execute()里面:

    复制代码
    .
    .
    .
    call = rawCall;
    if (call == null) {
      try {
        call = rawCall = createRawCall();
      } catch (IOException | RuntimeException | Error e) {
        throwIfFatal(e); //  Do not assign a fatal error to creationFailure.
        creationFailure = e;
        throw e;
      }
    }
    .
    .
    .
    return parseResponse(call.execute());
    复制代码

    createRawCall()获取okhttp3.Call,call.execute()便是okhttp的网络请求了。

    我们主要看怎么获取okhttp3.Call,以及对请求结果的解析parseResponse方法。

    复制代码
    private okhttp3.Call createRawCall() throws IOException {
      okhttp3.Call call = serviceMethod.toCall(args);
      if (call == null) {
        throw new NullPointerException("Call.Factory returned null.");
      }
      return call;
    }
    复制代码

    ServiceMethod里面:

    复制代码
    /** Builds an HTTP request from method arguments. */
    okhttp3.Call toCall(@Nullable Object... args) throws IOException {
      RequestBuilder requestBuilder = new RequestBuilder(httpMethod, baseUrl, relativeUrl, headers,
          contentType, hasBody, isFormEncoded, isMultipart);
      @SuppressWarnings("unchecked") // It is an error to invoke a method with the wrong arg types.
      ParameterHandler<Object>[] handlers = (ParameterHandler<Object>[]) parameterHandlers;
      int argumentCount = args != null ? args.length : 0;
      if (argumentCount != handlers.length) {
        throw new IllegalArgumentException("Argument count (" + argumentCount
            + ") doesn't match expected count (" + handlers.length + ")");
      }
      for (int p = 0; p < argumentCount; p++) {
        handlers[p].apply(requestBuilder, args[p]);
      }
      return callFactory.newCall(requestBuilder.build());
    }
    复制代码

    方法主要是构造了request然后使用okhttp3.Call.Factory创建okhttp3.Call,而我们之前在构建ServiceMothod的构造的ParameterHandler<Object>[] handlers便参与了request的构建,主要是将之前解析到的参数,比如路径,头部信息等添加到request里面。

     

    再看一下请求结果的解析parseResponse方法:

    重点语句:

    T body = serviceMethod.toResponse(catchingBody);

    在看serviceMethod里面的toResponse方法:

    /** Builds a method return value from an HTTP response body. */
    R toResponse(ResponseBody body) throws IOException {
      return responseConverter.convert(body);
    }

    这里便用到了我们之前设置的数据转换器,对结果进行转换。

     

    以上便是大概的过程了。

     

     转载请标明:https://www.cnblogs.com/tangZH/p/13723480.html

     

  • 相关阅读:
    二手车商城交易网站平台(Java+SSM+MySQL)
    node-sass报错,node16运行node14的项目
    【Redis】Redis 的共享 session 应用(短信登录)
    HashMap实现原理, 扩容机制,面试题和总结
    把微信好友不小心删了,有什么办法找回?
    数据结构(Java)下压堆栈
    HBuilder创建uniapp默认项目导入uview(胎教)
    MERLIN-AToolfor Multi-party Privacy-preserving Record Linkage论文总结
    CVPR2023新作:源数据集对迁移学习性能的影响以及相应的解决方案
    Spring的事务
  • 原文地址:https://www.cnblogs.com/tangZH/p/13723480.html