JDK 11的响应式编程Flow核心主逻辑源代码分析

作者: jekkay 分类: java,默认 发布时间: 2019-07-10 20:34

# JDK 11的响应式编程Flow核心主逻辑源代码分析

// java.base\java\util\concurrent\SubmissionPublisher.java

public void subscribe(Subscriber<? super T> subscriber) {
        if (subscriber == null) throw new NullPointerException();
        int max = maxBufferCapacity; // allocate initial array
        Object[] array = new Object[max < INITIAL_CAPACITY ?
                                    max : INITIAL_CAPACITY];
        BufferedSubscription<T> subscription =
            new BufferedSubscription<T>(subscriber, executor, onNextHandler,
                                        array, max);
        synchronized (this) {
            if (!subscribed) {
                subscribed = true;
                owner = Thread.currentThread();
            }
            for (BufferedSubscription<T> b = clients, pred = null;;) {
                if (b == null) {
                    Throwable ex;
                    subscription.onSubscribe();
                    if ((ex = closedException) != null)
                        subscription.onError(ex);
                    else if (closed)
                        subscription.onComplete();
                    else if (pred == null)
                        clients = subscription;
                    else
                        pred.next = subscription;
                    break;
                }
                BufferedSubscription<T> next = b.next;
                if (b.isClosed()) {   // remove
                    b.next = null;    // detach
                    if (pred == null)
                        clients = next;
                    else
                        pred.next = next;
                }
                else if (subscriber.equals(b.subscriber)) {
                    b.onError(new IllegalStateException("Duplicate subscribe"));
                    break;
                }
                else
                    pred = b;
                b = next;
            }
        }
    }

// Java\jdk-11.0.3\lib\src.zip!\java.base\java\util\concurrent\SubmissionPublisher.java

    // 投递信息,一直阻塞到投递完成
    public int submit(T item) {
        return doOffer(item, Long.MAX_VALUE, null);
    }

    // 投递信息,如果无法投递则忽略,直接返回
    public int offer(T item, BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
        return doOffer(item, 0L, onDrop);
    }

    // 投递信息,设定超时时间
    public int offer(T item, long timeout, TimeUnit unit,
                     BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
        long nanos = unit.toNanos(timeout);
        // distinguishes from untimed (only wrt interrupt policy)
        if (nanos == Long.MAX_VALUE) --nanos;
        return doOffer(item, nanos, onDrop);
    }

    private int doOffer(T item, long nanos,
                        BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
        if (item == null) throw new NullPointerException();
        int lag = 0;
        boolean complete, unowned;
        synchronized (this) {
            Thread t = Thread.currentThread(), o;
            BufferedSubscription<T> b = clients;
            // 如果当前的推送的数据线程,不是首次订阅者,就置owner为空
            if ((unowned = ((o = owner) != t)) && o != null)
                owner = null;                     // disable bias
            if (b == null)
                complete = closed;
            else {
                complete = false;
                boolean cleanMe = false;
                BufferedSubscription<T> retries = null, rtail = null, next;
                do {
                    next = b.next;
                    int stat = b.offer(item, unowned);
                    if (stat == 0) {              // saturated; add to retry list
                    // 如果已经满了,则添加到一个尝试链表中
                        b.nextRetry = null;       // avoid garbage on exceptions
                        if (rtail == null)
                            retries = b;
                        else
                            rtail.nextRetry = b;
                        rtail = b;
                    }
                    else if (stat < 0) // 表示订阅消费者已经关闭,标记需要清除         
                        cleanMe = true;           
                    else if (stat > lag) // 如果返回值为正数消息积压,更新最大积压数
                        lag = stat;
                } while ((b = next) != null);

                if (retries != null || cleanMe)
                    lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
            }
        }
        if (complete)
            throw new IllegalStateException("Closed");
        else
            return lag;
    }
// Java\jdk-11.0.3\lib\src.zip!\java.base\java\util\concurrent\SubmissionPublisher.java

// 尝试再次推送发送
private int retryOffer(T item, long nanos,
                           BiPredicate<Subscriber<? super T>, ? super T> onDrop,
                           BufferedSubscription<T> retries, int lag,
                           boolean cleanMe) {
        for (BufferedSubscription<T> r = retries; r != null;) {
            BufferedSubscription<T> nextRetry = r.nextRetry;
            r.nextRetry = null;
            if (nanos > 0L)
                // 等待有空闲空间
                r.awaitSpace(nanos);
            // 再次尝试发布
            int stat = r.retryOffer(item);
            if (stat == 0 && onDrop != null && onDrop.test(r.subscriber, item))
                // 如果还是已满,调用drop的lambda函数确定是否需要再重试一次
                stat = r.retryOffer(item);
            if (stat == 0) // 还是满了
                lag = (lag >= 0) ? -1 : lag - 1; //<0表示发送失败的client的数量,>0预估的最大积压数
            else if (stat < 0)
                cleanMe = true; // 已经关闭,标记下需要清理信标
            else if (lag >= 0 && stat > lag)
                lag = stat; // 更新一下预估最大积压量
            r = nextRetry;
        }
        if (cleanMe) // 清理已关闭的订阅者
            cleanAndCount();
        return lag;
    }
Java\jdk-11.0.3\lib\src.zip!\java.base\java\util\concurrent\SubmissionPublisher.java
//  清理已经关闭的订阅消费者
private int cleanAndCount() {
        int count = 0;
        BufferedSubscription<T> pred = null, next;
        for (BufferedSubscription<T> b = clients; b != null; b = next) {
            next = b.next;
            if (b.isClosed()) {
            // 如果已经关闭,则从链表中移除
                b.next = null;
                if (pred == null)
                    clients = next;
                else
                    pred.next = next;
            }
            else {
                pred = b;
                ++count; // 统计当前有效的订阅者数
            }
        }
        return count;
    }

Java\jdk-11.0.3\lib\src.zip!\java.base\java\util\concurrent\SubmissionPublisher.java

// 订阅者订阅后,发布者颁发给订阅者的类似证书&交流管道

static final class BufferedSubscription<T>
        implements Subscription, ForkJoinPool.ManagedBlocker {
        long timeout;                      // Long.MAX_VALUE if untimed wait
        int head;                          // 头部:消费位置
        int tail;                          // 尾部:存放数据位置
        final int maxCapacity;             // 最大的数据缓存空间
        volatile int ctl;                  // 控制标记位
        Object[] array;                    // 缓存空间
        final Subscriber<? super T> subscriber;
        final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
        Executor executor;                 // null on error
        Thread waiter;                     // blocked producer thread
        Throwable pendingError;            // holds until onError issued
        BufferedSubscription<T> next;      // used only by publisher
        BufferedSubscription<T> nextRetry; // used only by publisher

        @jdk.internal.vm.annotation.Contended("c") // segregate
        volatile long demand;              // # unfilled requests
        @jdk.internal.vm.annotation.Contended("c")
        volatile int waiting;              // nonzero if producer blocked

Java\jdk-11.0.3\lib\src.zip!\java.base\java\util\concurrent\SubmissionPublisher.java#BufferedSubscription<T>

    final void onSubscribe() {
        startOnSignal(RUN | ACTIVE);
    }

    final void startOnSignal(int bits) {
        if ((ctl & bits) != bits &&
            (getAndBitwiseOrCtl(bits) & (RUN | CLOSED)) == 0)
            tryStart();
    }

    final void tryStart() {
        try {
            Executor e;
            ConsumerTask<T> task = new ConsumerTask<T>(this);
            if ((e = executor) != null)   // skip if disabled on error
                e.execute(task);
        } catch (RuntimeException | Error ex) {
            getAndBitwiseOrCtl(ERROR | CLOSED);
            throw ex;
        }
    }

    static final class ConsumerTask<T> extends ForkJoinTask<Void>
        implements Runnable, CompletableFuture.AsynchronousCompletionTask {
        final BufferedSubscription<T> consumer;
        ConsumerTask(BufferedSubscription<T> consumer) {
            this.consumer = consumer;
        }
        public final Void getRawResult() { return null; }
        public final void setRawResult(Void v) {}
        public final boolean exec() { consumer.consume(); return false; }
        public final void run() { consumer.consume(); }
    }

final void consume() {
    Subscriber<? super T> s;
    if ((s = subscriber) != null) {          // hoist checks
        subscribeOnOpen(s);
        long d = demand;
        for (int h = head, t = tail;;) {
            int c, taken; boolean empty;
            if (((c = ctl) & ERROR) != 0) {
                closeOnError(s, null);
                break;
            }
            else if ((taken = takeItems(s, d, h)) > 0) {
                head = h += taken;
                d = subtractDemand(taken);
            }
            else if ((d = demand) == 0L && (c & REQS) != 0)
                weakCasCtl(c, c & ~REQS);    // exhausted demand
            else if (d != 0L && (c & REQS) == 0)
                weakCasCtl(c, c | REQS);     // new demand
            else if (t == (t = tail)) {      // stability check
                if ((empty = (t == h)) && (c & COMPLETE) != 0) {
                    closeOnComplete(s);      // end of stream
                    break;
                }
                else if (empty || d == 0L) {
                    int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
                    if (weakCasCtl(c, c & ~bit) && bit == RUN)
                        break;               // un-keep-alive or exit
                }
            }
        }
    }
}

final int takeItems(Subscriber<? super T> s, long d, int h) {
    Object[] a;
    int k = 0, cap;
    if ((a = array) != null && (cap = a.length) > 0) {
        int m = cap - 1, b = (m >>> 3) + 1; // min(1, cap/8)
        int n = (d < (long)b) ? (int)d : b;
        for (; k < n; ++h, ++k) {
            Object x = QA.getAndSet(a, h & m, null);
            if (waiting != 0)
                signalWaiter();
            if (x == null)
                break;
            else if (!consumeNext(s, x))
                break;
        }
    }
    return k;
}

final boolean consumeNext(Subscriber<? super T> s, Object x) {
    try {
        @SuppressWarnings("unchecked") T y = (T) x;
        if (s != null)
            s.onNext(y);
        return true;
    } catch (Throwable ex) {
        handleOnNext(s, ex);
        return false;
    }
}

final int offer(T item, boolean unowned) {
    Object[] a;
    int stat = 0, cap = ((a = array) == null) ? 0 : a.length;
    int t = tail, i = t & (cap - 1), n = t + 1 - head; // i表示存放数据的位置, n表示所需空间
    if (cap > 0) {
        // 已经分配空间
        boolean added;
        if (n >= cap && cap < maxCapacity) // 需要扩展缓存队列
            added = growAndOffer(item, a, t);
        else if (n >= cap || unowned)      // 如果已大最大容量,或当前线程,需要CAS
            added = QA.compareAndSet(a, i, null, item);
        else {
           // 空间够的话,直接存放对应位置
            QA.setRelease(a, i, item);
            added = true;
        }
        if (added) {
            // 添加成功,尾索引位置往后移动
            tail = t + 1;
            stat = n;
        }
    }
    return startOnOffer(stat);
}
final int startOnOffer(int stat) {
    int c; // start or keep alive if requests exist and not active
    if (((c = ctl) & (REQS | ACTIVE)) == REQS &&
        ((c = getAndBitwiseOrCtl(RUN | ACTIVE)) & (RUN | CLOSED)) == 0)
        tryStart();
    else if ((c & CLOSED) != 0)
        stat = -1;
    return stat;
}


二·参考文档

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

发表评论

电子邮件地址不会被公开。