/*
 * Decompiled with CFR 0.152.
 */
package au.com.ordermate.persistence.cache.remote;

import au.com.ordermate.OrderMateLog;
import au.com.ordermate.persistence.PersistentObjectI;
import au.com.ordermate.persistence.cache.remote.InvalidateableRemoteCache;
import au.com.ordermate.persistence.cache.remote.RemoteCacheSubscriptionServiceImpl;
import au.com.ordermate.persistence.cache.remote.Subscribeable;
import au.com.ordermate.simplermi.SimpleRMI;
import java.net.InetAddress;
import java.rmi.NotBoundException;
import java.rmi.RemoteException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

public class RemoteCachePublisher
implements Subscribeable {
    private final Map cachePublisherMap = Collections.synchronizedMap(new HashMap());
    private static final int MAX_FAILURE_COUNT = 30;
    private static final int MAX_FAILURE_QUEUE_SIZE = 1000;

    public RemoteCacheSubscriptionServiceImpl getSubscriptionService() throws RemoteException {
        return new RemoteCacheSubscriptionServiceImpl(this);
    }

    @Override
    public boolean subscribe(String cacheName, InetAddress address, int rmiPort) {
        if (cacheName == null) {
            return false;
        }
        try {
            InvalidateableRemoteCache cache = (InvalidateableRemoteCache)SimpleRMI.getObject(address, cacheName, rmiPort);
            if (this.cachePublisherMap.containsKey(cache) && ((Publisher)this.cachePublisherMap.get(cache)).isRunning()) {
                return true;
            }
            OrderMateLog.LOG.info("RemoteCachePublisher: Subscribing remote cache :" + cacheName + " on IP Address : " + address + " over rmi port  : " + rmiPort);
            cache.invalidateEntireCache();
            Publisher publisher = new Publisher(cache);
            this.cachePublisherMap.put(cache, publisher);
        }
        catch (RemoteException ex) {
            OrderMateLog.LOG.warn("Could not subscribe to cache " + cacheName, (Throwable)ex);
            return false;
        }
        catch (NotBoundException ex) {
            OrderMateLog.LOG.error("Could not subscribe cache the Cache has not been shared " + cacheName + " : " + address, (Throwable)ex);
            return false;
        }
        catch (Exception ex) {
            OrderMateLog.LOG.error("Unknown Exception when subscribing to cache", (Throwable)ex);
        }
        return true;
    }

    @Override
    public void unsubscribe(String cacheName, InetAddress address, int rmiPort) throws RemoteException {
        try {
            InvalidateableRemoteCache cache = (InvalidateableRemoteCache)SimpleRMI.getObject(address, cacheName, rmiPort);
            if (this.cachePublisherMap.remove(cache) == null) {
                OrderMateLog.LOG.warn("Could not find cache in cachePublisher map to unsubscripe : " + cacheName + " from host:" + address);
            }
        }
        catch (RemoteException ex) {
            OrderMateLog.LOG.warn("Cound not unsubscribe cache : " + cacheName);
            throw ex;
        }
        catch (NotBoundException ex) {
            OrderMateLog.LOG.error("Could not subscribe cache the Cache has not been shared " + cacheName + " : " + address, (Throwable)ex);
        }
    }

    public void invalidateObject(PersistentObjectI obj) {
        if (!obj.isPersistent()) {
            throw new IllegalArgumentException("Cannot invalidate object that has not been persisted");
        }
        this.enqueInvalidations(Collections.singletonList(obj));
    }

    public void invalidateObjects(Collection persistentObjectIs) {
        this.enqueInvalidations(persistentObjectIs);
    }

    private void enqueInvalidations(Collection objsToInvalidate) {
        Iterator it = this.cachePublisherMap.keySet().iterator();
        while (it.hasNext()) {
            Publisher publisher = (Publisher)this.cachePublisherMap.get(it.next());
            Queue queue = publisher.getQueue();
            if (queue == null) continue;
            queue.addAll(objsToInvalidate);
        }
    }

    private static class Publisher
    implements Runnable {
        private BlockingDeque deque = new LinkedBlockingDeque();
        private InvalidateableRemoteCache cache;
        int failureCount = 0;

        public Publisher(InvalidateableRemoteCache theCache) {
            this.cache = theCache;
            new Thread(this).start();
        }

        public boolean isRunning() {
            return this.deque != null;
        }

        public synchronized Queue getQueue() {
            return this.deque;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Unable to fully structure code
         */
        @Override
        public void run() {
            OrderMateLog.LOG.info("Publisher thread is started.");
            invalidate = new HashSet<E>();
            while (true) {
                try {
                    while (true) lbl-1000:
                    // 4 sources

                    {
                        obj = null;
                        try {
                            obj = this.deque.take();
                            if (this.failureCount != 0 && this.failureCount % 10 == 0) {
                                Thread.sleep(2000L);
                            } else {
                                Thread.sleep(100L);
                            }
                        }
                        catch (InterruptedException ex) {
                            if (obj != null) {
                                this.deque.add(obj);
                            }
                            Thread.currentThread().interrupt();
                        }
                        while (obj != null) {
                            invalidate.add(obj);
                            obj = this.deque.poll();
                        }
                        try {
                            this.cache.invalidateObjects(invalidate);
                            this.failureCount = 0;
                        }
                        catch (RemoteException ex) {
                            OrderMateLog.LOG.error("Error while invalidating remote cache", (Throwable)ex);
                            ++this.failureCount;
                            it = invalidate.iterator();
                            while (it.hasNext()) {
                                this.deque.addFirst(it.next());
                            }
                            queueSize = this.deque.size();
                            if (this.failureCount <= 30 && queueSize <= 1000) ** GOTO lbl-1000
                            OrderMateLog.LOG.warn("Publisher thread is terminated - Max failure count(" + this.failureCount + ") or Max queue size(" + queueSize + ") exceeded for remote cache: " + this.cache, (Throwable)ex);
                            this.deque = null;
                            return;
                        }
                        finally {
                            invalidate.clear();
                            continue;
                        }
                        break;
                    }
                }
                catch (Exception e) {
                    OrderMateLog.LOG.error((Object)e, (Throwable)e);
                    continue;
                }
                ** GOTO lbl-1000
                break;
            }
        }
    }
}

