/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.shaded.org.apache.ignite.internal.streamer;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.ignite.shaded.org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.shaded.org.apache.ignite.internal.streamer.StreamerBatchSender;
import org.apache.ignite.shaded.org.apache.ignite.internal.streamer.StreamerBuffer;
import org.apache.ignite.shaded.org.apache.ignite.internal.streamer.StreamerMetricSink;
import org.apache.ignite.shaded.org.apache.ignite.internal.streamer.StreamerOptions;
import org.apache.ignite.shaded.org.apache.ignite.internal.streamer.StreamerPartitionAwarenessProvider;
import org.apache.ignite.shaded.org.apache.ignite.table.DataStreamerException;
import org.apache.ignite.shaded.org.jetbrains.annotations.Nullable;

public class StreamerSubscriber<T, E, V, R, P>
implements Flow.Subscriber<E> {
    private final StreamerBatchSender<V, P, R> batchSender;
    @Nullable
    private final Flow.Subscriber<R> resultSubscriber;
    private final Function<E, T> keyFunc;
    private final Function<E, V> payloadFunc;
    private final Function<E, Boolean> deleteFunc;
    private final StreamerPartitionAwarenessProvider<T, P> partitionAwarenessProvider;
    private final StreamerOptions options;
    private final CompletableFuture<Void> completionFut = new CompletableFuture();
    private final AtomicInteger pendingItemCount = new AtomicInteger();
    private final AtomicInteger inFlightItemCount = new AtomicInteger();
    private final ConcurrentHashMap<P, StreamerBuffer<E>> buffers = new ConcurrentHashMap();
    private final ConcurrentMap<P, CompletableFuture<Collection<R>>> pendingRequests = new ConcurrentHashMap<P, CompletableFuture<Collection<R>>>();
    private final IgniteLogger log;
    private final StreamerMetricSink metrics;
    private final ScheduledExecutorService flushExecutor;
    private final Set<E> failedItems = Collections.synchronizedSet(new HashSet());
    @Nullable
    private Flow.Subscription subscription;
    @Nullable
    private ResultSubscription resultSubscription;
    @Nullable
    private ScheduledFuture<?> flushTask;
    private boolean closed;

    public StreamerSubscriber(StreamerBatchSender<V, P, R> batchSender, @Nullable Flow.Subscriber<R> resultSubscriber, Function<E, T> keyFunc, Function<E, V> payloadFunc, Function<E, Boolean> deleteFunc, StreamerPartitionAwarenessProvider<T, P> partitionAwarenessProvider, StreamerOptions options, ScheduledExecutorService flushExecutor, IgniteLogger log, @Nullable StreamerMetricSink metrics) {
        assert (batchSender != null);
        assert (keyFunc != null);
        assert (payloadFunc != null);
        assert (partitionAwarenessProvider != null);
        assert (options != null);
        assert (flushExecutor != null);
        assert (log != null);
        this.batchSender = batchSender;
        this.resultSubscriber = resultSubscriber;
        this.keyFunc = keyFunc;
        this.payloadFunc = payloadFunc;
        this.deleteFunc = deleteFunc;
        this.partitionAwarenessProvider = partitionAwarenessProvider;
        this.options = options;
        this.flushExecutor = flushExecutor;
        this.log = log;
        this.metrics = StreamerSubscriber.getMetrics(metrics);
    }

    @Override
    public synchronized void onSubscribe(Flow.Subscription subscription) {
        if (this.subscription != null) {
            throw new IllegalStateException("Subscription is already set.");
        }
        this.subscription = subscription;
        if (this.resultSubscriber != null) {
            this.resultSubscription = new ResultSubscription();
            this.resultSubscriber.onSubscribe(this.resultSubscription);
        }
        this.partitionAwarenessProvider.refreshAsync().whenComplete((res, err) -> {
            if (err != null) {
                this.log.error("Failed to refresh schemas and partition assignment: " + err.getMessage(), (Throwable)err);
                this.close((Throwable)err);
            } else {
                this.initFlushTimer();
                this.requestMore();
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onNext(E item) {
        this.pendingItemCount.decrementAndGet();
        T key = this.keyFunc.apply(item);
        P partition = this.partitionAwarenessProvider.partition(key);
        StreamerBuffer buf = this.buffers.computeIfAbsent(partition, p -> new StreamerBuffer(this.options.pageSize(), items -> this.enlistBatch((P)p, (List<E>)items)));
        StreamerSubscriber streamerSubscriber = this;
        synchronized (streamerSubscriber) {
            if (this.closed) {
                throw new IllegalStateException("Streamer is closed, can't add items.");
            }
            buf.add(item);
        }
        this.metrics.streamerItemsQueuedAdd(1L);
        this.requestMore();
    }

    @Override
    public void onError(Throwable throwable) {
        this.close(throwable);
    }

    @Override
    public void onComplete() {
        this.close(null);
    }

    public CompletableFuture<Void> completionFuture() {
        return this.completionFut;
    }

    private CompletableFuture<Collection<R>> enlistBatch(P partition, List<E> batch) {
        int batchSize = batch.size();
        assert (batchSize > 0) : "Batch size must be positive.";
        assert (partition != null) : "Partition must not be null.";
        this.inFlightItemCount.addAndGet(batchSize);
        this.metrics.streamerBatchesActiveAdd(1L);
        return this.pendingRequests.compute(partition, (part, fut) -> fut == null ? this.sendBatch(part, batch) : ((CompletableFuture)fut.whenComplete((res, err) -> {
            if (err != null) {
                this.failedItems.addAll(batch);
            }
        })).thenCompose(v -> this.sendBatch(part, batch)));
    }

    private CompletableFuture<Collection<R>> sendBatch(P partition, List<E> batch) {
        try {
            ArrayList<V> items = new ArrayList<V>();
            BitSet deleted = new BitSet(batch.size());
            for (E e : batch) {
                items.add(this.payloadFunc.apply(e));
                deleted.set(items.size() - 1, this.deleteFunc.apply(e));
            }
            return this.batchSender.sendAsync(partition, items, deleted).whenComplete((res, err) -> {
                if (err != null) {
                    this.log.error("Failed to send batch to partition " + String.valueOf(partition) + ": " + err.getMessage(), (Throwable)err);
                    this.failedItems.addAll(batch);
                    this.close((Throwable)err);
                } else {
                    int batchSize = batch.size();
                    this.metrics.streamerBatchesSentAdd(1L);
                    this.metrics.streamerBatchesActiveAdd(-1L);
                    this.metrics.streamerItemsSentAdd(batchSize);
                    this.metrics.streamerItemsQueuedAdd(-batchSize);
                    this.inFlightItemCount.addAndGet(-batchSize);
                    this.requestMore();
                    this.partitionAwarenessProvider.refreshAsync().exceptionally(refreshErr -> {
                        this.log.error("Failed to refresh schemas and partition assignment: " + refreshErr.getMessage(), (Throwable)refreshErr);
                        this.close((Throwable)refreshErr);
                        return null;
                    });
                    this.invokeResultSubscriber((Collection<R>)res);
                }
            });
        }
        catch (Throwable e) {
            this.log.error("Failed to send batch to partition " + String.valueOf(partition) + ": " + e.getMessage(), e);
            this.failedItems.addAll(batch);
            this.close(e);
            return CompletableFuture.failedFuture(e);
        }
    }

    private void invokeResultSubscriber(Collection<R> res) {
        if (res == null || this.resultSubscriber == null) {
            return;
        }
        ResultSubscription sub = this.resultSubscription();
        if (sub == null) {
            return;
        }
        for (R r : res) {
            if (sub.cancelled.get()) {
                return;
            }
            this.resultSubscriber.onNext(r);
        }
    }

    @Nullable
    private synchronized ResultSubscription resultSubscription() {
        return this.resultSubscription;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void close(@Nullable Throwable throwable) {
        CompletableFuture[] futs;
        ScheduledFuture<?> flushTask0;
        Flow.Subscription subscription0;
        StreamerSubscriber streamerSubscriber = this;
        synchronized (streamerSubscriber) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            subscription0 = this.subscription;
            flushTask0 = this.flushTask;
        }
        if (flushTask0 != null) {
            flushTask0.cancel(false);
        }
        if (subscription0 != null) {
            try {
                subscription0.cancel();
            }
            catch (Throwable e2) {
                this.log.error("Failed to cancel subscription: " + e2.getMessage(), e2);
            }
        }
        if (throwable == null) {
            this.buffers.values().forEach(StreamerBuffer::flushAndClose);
            futs = this.pendingRequests.values().toArray(new CompletableFuture[0]);
            CompletableFuture.allOf(futs).whenCompleteAsync((v, e) -> {
                if (e != null) {
                    this.completeWithError((Throwable)e);
                } else {
                    if (this.resultSubscriber != null) {
                        this.resultSubscriber.onComplete();
                    }
                    this.completionFut.complete(null);
                }
            }, (Executor)this.flushExecutor);
        } else {
            futs = this.pendingRequests.values().toArray(new CompletableFuture[0]);
            CompletableFuture.allOf(futs).whenCompleteAsync((v, e) -> this.completeWithError(throwable), (Executor)this.flushExecutor);
        }
    }

    private void completeWithError(Throwable throwable) {
        this.buffers.values().forEach(buf -> buf.forEach(this.failedItems::add));
        DataStreamerException streamerErr = new DataStreamerException(this.failedItems, throwable);
        this.completionFut.completeExceptionally(streamerErr);
        if (this.resultSubscriber != null) {
            this.resultSubscriber.onError(streamerErr);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void requestMore() {
        Flow.Subscription subscription0;
        int toRequest;
        StreamerSubscriber streamerSubscriber = this;
        synchronized (streamerSubscriber) {
            int inFlight;
            if (this.closed || this.subscription == null) {
                return;
            }
            int pending = this.pendingItemCount.get();
            int desiredInFlight = Math.max(1, this.buffers.size()) * this.options.pageSize() * this.options.perPartitionParallelOperations();
            toRequest = desiredInFlight - (inFlight = this.inFlightItemCount.get()) - pending;
            if (toRequest <= 0) {
                return;
            }
            this.pendingItemCount.addAndGet(toRequest);
            subscription0 = this.subscription;
        }
        try {
            subscription0.request(toRequest);
        }
        catch (Throwable e) {
            this.log.error("Failed to request more items: " + e.getMessage(), e);
            this.close(e);
        }
    }

    private synchronized void initFlushTimer() {
        if (this.closed) {
            return;
        }
        int interval = this.options.autoFlushInterval();
        if (interval <= 0) {
            return;
        }
        this.flushTask = this.flushExecutor.scheduleWithFixedDelay(this::onAutoFlushInterval, interval, interval, TimeUnit.MILLISECONDS);
    }

    private void onAutoFlushInterval() {
        long intervalNanos = TimeUnit.MILLISECONDS.toNanos(this.options.autoFlushInterval());
        for (StreamerBuffer<E> buf : this.buffers.values()) {
            buf.autoFlush(intervalNanos);
        }
    }

    private static StreamerMetricSink getMetrics(@Nullable StreamerMetricSink metrics) {
        return metrics != null ? metrics : new StreamerMetricSink(){

            @Override
            public void streamerBatchesSentAdd(long batches) {
            }

            @Override
            public void streamerItemsSentAdd(long items) {
            }

            @Override
            public void streamerBatchesActiveAdd(long batches) {
            }

            @Override
            public void streamerItemsQueuedAdd(long items) {
            }
        };
    }

    private static class ResultSubscription
    implements Flow.Subscription {
        AtomicBoolean cancelled = new AtomicBoolean();

        private ResultSubscription() {
        }

        @Override
        public void request(long n) {
        }

        @Override
        public void cancel() {
            this.cancelled.set(true);
        }
    }
}

