package com.mirth.connect.donkey.server.channel;

import com.mirth.connect.donkey.model.DonkeyException;
import com.mirth.connect.donkey.model.channel.DebugOptions;
import com.mirth.connect.donkey.model.channel.DeployedState;
import com.mirth.connect.donkey.model.channel.MetaDataColumn;
import com.mirth.connect.donkey.model.channel.SourceConnectorProperties;
import com.mirth.connect.donkey.model.channel.SourceConnectorPropertiesInterface;
import com.mirth.connect.donkey.model.event.DeployedStateEventType;
import com.mirth.connect.donkey.model.event.ErrorEventType;
import com.mirth.connect.donkey.model.message.ConnectorMessage;
import com.mirth.connect.donkey.model.message.ContentType;
import com.mirth.connect.donkey.model.message.Message;
import com.mirth.connect.donkey.model.message.MessageContent;
import com.mirth.connect.donkey.model.message.MessageSerializerException;
import com.mirth.connect.donkey.model.message.RawMessage;
import com.mirth.connect.donkey.model.message.Response;
import com.mirth.connect.donkey.model.message.Status;
import com.mirth.connect.donkey.model.message.attachment.Attachment;
import com.mirth.connect.donkey.model.message.attachment.AttachmentException;
import com.mirth.connect.donkey.model.message.attachment.AttachmentHandler;
import com.mirth.connect.donkey.model.message.attachment.AttachmentHandlerProvider;
import com.mirth.connect.donkey.server.ConnectorTaskException;
import com.mirth.connect.donkey.server.Constants;
import com.mirth.connect.donkey.server.DeployException;
import com.mirth.connect.donkey.server.Donkey;
import com.mirth.connect.donkey.server.HaltException;
import com.mirth.connect.donkey.server.PauseException;
import com.mirth.connect.donkey.server.ResumeException;
import com.mirth.connect.donkey.server.StartException;
import com.mirth.connect.donkey.server.StopException;
import com.mirth.connect.donkey.server.UndeployException;
import com.mirth.connect.donkey.server.channel.components.PostProcessor;
import com.mirth.connect.donkey.server.channel.components.PreProcessor;
import com.mirth.connect.donkey.server.controllers.ChannelController;
import com.mirth.connect.donkey.server.controllers.MessageController;
import com.mirth.connect.donkey.server.data.DonkeyDao;
import com.mirth.connect.donkey.server.data.DonkeyDaoFactory;
import com.mirth.connect.donkey.server.event.DeployedStateEvent;
import com.mirth.connect.donkey.server.event.ErrorEvent;
import com.mirth.connect.donkey.server.event.EventDispatcher;
import com.mirth.connect.donkey.server.message.batch.BatchAdaptorFactory;
import com.mirth.connect.donkey.server.queue.ConnectorMessageQueueDataSource;
import com.mirth.connect.donkey.server.queue.DestinationQueue;
import com.mirth.connect.donkey.server.queue.SourceQueue;
import com.mirth.connect.donkey.util.Base64Util;
import com.mirth.connect.donkey.util.MessageMaps;
import com.mirth.connect.donkey.util.Serializer;
import com.mirth.connect.donkey.util.ThreadUtils;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/mirth/connect/donkey/server/channel/Channel.class */
public class Channel implements Runnable {
    private String channelId;
    private long localChannelId;
    private String name;
    private String serverId;
    private int revision;
    private Calendar deployDate;
    private Set<String> resourceIds;
    private String contextFactoryId;
    private DeployedState initialState;
    private DonkeyDaoFactory daoFactory;
    private MessageMaps messageMaps;
    private AttachmentHandlerProvider attachmentHandlerProvider;
    private SourceConnector sourceConnector;
    private int processingThreads;
    private SourceQueue sourceQueue;
    private QueueHandler queueHandler;
    private PreProcessor preProcessor;
    private PostProcessor postProcessor;
    private ResponseSelector responseSelector;
    private DebugOptions debugOptions;
    public static Semaphore DELETE_PERMIT = new Semaphore(2, true);
    private ExecutorService channelExecutor;
    private ChannelProcessLock processLock;
    private DeployedState currentState = DeployedState.STOPPED;
    private StorageSettings storageSettings = new StorageSettings();
    private EventDispatcher eventDispatcher = Donkey.getInstance().getEventDispatcher();
    private Serializer serializer = Donkey.getInstance().getSerializer();
    private List<MetaDataColumn> metaDataColumns = new ArrayList();
    private Map<Long, Thread> queueThreads = new ConcurrentHashMap();
    private List<DestinationChainProvider> destinationChainProviders = new ArrayList();
    private Set<Thread> dispatchThreads = new HashSet();
    private volatile boolean shuttingDown = false;
    private volatile boolean stopSourceQueue = false;
    private Lock removeContentLock = new ReentrantLock(true);
    private MessageController messageController = MessageController.getInstance();
    private Logger logger = LogManager.getLogger(getClass());

    public DebugOptions getDebugOptions() {
        return this.debugOptions;
    }

    public void setDebugOptions(DebugOptions debugOptions) {
        this.debugOptions = debugOptions;
    }

    public String getChannelId() {
        return this.channelId;
    }

    public void setChannelId(String str) {
        this.channelId = str;
    }

    public long getLocalChannelId() {
        return this.localChannelId;
    }

    public void setLocalChannelId(long j) {
        this.localChannelId = j;
    }

    public String getName() {
        return this.name;
    }

    public void setName(String str) {
        this.name = str;
    }

    public String getServerId() {
        return this.serverId;
    }

    public void setServerId(String str) {
        this.serverId = str;
    }

    public int getRevision() {
        return this.revision;
    }

    public void setRevision(int i) {
        this.revision = i;
    }

    public Calendar getDeployDate() {
        return this.deployDate;
    }

    public void setDeployDate(Calendar calendar) {
        this.deployDate = calendar;
    }

    public Set<String> getResourceIds() {
        return this.resourceIds;
    }

    public void setResourceIds(Set<String> set) {
        this.resourceIds = set;
    }

    public String getContextFactoryId() {
        return this.contextFactoryId;
    }

    public void setContextFactoryId(String str) {
        this.contextFactoryId = str;
    }

    public DeployedState getInitialState() {
        return this.initialState;
    }

    public void setInitialState(DeployedState deployedState) {
        this.initialState = deployedState;
    }

    public DeployedState getCurrentState() {
        return this.currentState;
    }

    public void updateCurrentState(DeployedState deployedState) {
        this.currentState = deployedState;
        this.eventDispatcher.dispatchEvent(new DeployedStateEvent(this.channelId, this.name, null, null, DeployedStateEventType.getTypeFromDeployedState(deployedState)));
    }

    public StorageSettings getStorageSettings() {
        return this.storageSettings;
    }

    public void setStorageSettings(StorageSettings storageSettings) {
        this.storageSettings = storageSettings;
    }

    public DonkeyDaoFactory getDaoFactory() {
        return this.daoFactory;
    }

    public void setDaoFactory(DonkeyDaoFactory donkeyDaoFactory) {
        this.daoFactory = donkeyDaoFactory;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public EventDispatcher getEventDispatcher() {
        return this.eventDispatcher;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Serializer getSerializer() {
        return this.serializer;
    }

    public MessageMaps getMessageMaps() {
        return this.messageMaps;
    }

    public void setMessageMaps(MessageMaps messageMaps) {
        this.messageMaps = messageMaps;
    }

    public AttachmentHandlerProvider getAttachmentHandlerProvider() {
        return this.attachmentHandlerProvider;
    }

    public void setAttachmentHandlerProvider(AttachmentHandlerProvider attachmentHandlerProvider) {
        this.attachmentHandlerProvider = attachmentHandlerProvider;
    }

    public List<MetaDataColumn> getMetaDataColumns() {
        return this.metaDataColumns;
    }

    public void setMetaDataColumns(List<MetaDataColumn> list) {
        this.metaDataColumns = list;
    }

    public SourceConnector getSourceConnector() {
        return this.sourceConnector;
    }

    public void setSourceConnector(SourceConnector sourceConnector) {
        this.sourceConnector = sourceConnector;
    }

    public int getProcessingThreads() {
        return this.processingThreads;
    }

    public SourceQueue getSourceQueue() {
        return this.sourceQueue;
    }

    public void setSourceQueue(SourceQueue sourceQueue) {
        this.sourceQueue = sourceQueue;
    }

    public QueueHandler getQueueHandler() {
        return this.queueHandler;
    }

    public void setQueueHandler(QueueHandler queueHandler) {
        this.queueHandler = queueHandler;
    }

    public PreProcessor getPreProcessor() {
        return this.preProcessor;
    }

    public void setPreProcessor(PreProcessor preProcessor) {
        this.preProcessor = preProcessor;
    }

    public PostProcessor getPostProcessor() {
        return this.postProcessor;
    }

    public void setPostProcessor(PostProcessor postProcessor) {
        this.postProcessor = postProcessor;
    }

    public void addDestinationChainProvider(DestinationChainProvider destinationChainProvider) {
        this.destinationChainProviders.add(destinationChainProvider);
        destinationChainProvider.setChainId(Integer.valueOf(this.destinationChainProviders.size()));
    }

    public List<DestinationChainProvider> getDestinationChainProviders() {
        return this.destinationChainProviders;
    }

    public ResponseSelector getResponseSelector() {
        return this.responseSelector;
    }

    public void setResponseSelector(ResponseSelector responseSelector) {
        this.responseSelector = responseSelector;
    }

    public ChannelProcessLock getProcessLock() {
        return this.processLock;
    }

    public void setProcessLock(ChannelProcessLock channelProcessLock) {
        this.processLock = channelProcessLock;
    }

    public void addDispatchThread(Thread thread) {
        synchronized (this.dispatchThreads) {
            this.dispatchThreads.add(thread);
        }
    }

    public void removeDispatchThread(Thread thread) {
        synchronized (this.dispatchThreads) {
            this.dispatchThreads.remove(thread);
        }
    }

    public boolean isActive() {
        return (this.currentState == DeployedState.STOPPED || this.currentState == DeployedState.STOPPING) ? false : true;
    }

    public boolean isConfigurationValid() {
        if (this.channelId == null || this.daoFactory == null || this.sourceConnector == null || this.sourceConnector.getFilterTransformerExecutor() == null) {
            return false;
        }
        for (DestinationChainProvider destinationChainProvider : this.destinationChainProviders) {
            Iterator<Integer> it = destinationChainProvider.getMetaDataIds().iterator();
            while (it.hasNext()) {
                if (destinationChainProvider.getDestinationConnectors().get(it.next()).getFilterTransformerExecutor() == null) {
                    return false;
                }
            }
        }
        return true;
    }

    public int getDestinationCount() {
        int i = 0;
        Iterator<DestinationChainProvider> it = this.destinationChainProviders.iterator();
        while (it.hasNext()) {
            i += it.next().getDestinationConnectors().size();
        }
        return i;
    }

    public DestinationConnector getDestinationConnector(int i) {
        Iterator<DestinationChainProvider> it = this.destinationChainProviders.iterator();
        while (it.hasNext()) {
            DestinationConnector destinationConnector = it.next().getDestinationConnectors().get(Integer.valueOf(i));
            if (destinationConnector != null) {
                return destinationConnector;
            }
        }
        return null;
    }

    public List<Integer> getMetaDataIds() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(Integer.valueOf(getSourceConnector().getMetaDataId()));
        Iterator<DestinationChainProvider> it = this.destinationChainProviders.iterator();
        while (it.hasNext()) {
            arrayList.addAll(it.next().getMetaDataIds());
        }
        return arrayList;
    }

    public boolean isUsingDestinationQueues() {
        Iterator<DestinationChainProvider> it = this.destinationChainProviders.iterator();
        while (it.hasNext()) {
            Iterator<DestinationConnector> it2 = it.next().getDestinationConnectors().values().iterator();
            while (it2.hasNext()) {
                if (it2.next().isQueueEnabled()) {
                    return true;
                }
            }
        }
        return false;
    }

    public void invalidateQueues() {
        this.sourceQueue.invalidate(true, false);
        for (DestinationChainProvider destinationChainProvider : this.destinationChainProviders) {
            Iterator<Integer> it = destinationChainProvider.getMetaDataIds().iterator();
            while (it.hasNext()) {
                DestinationQueue queue = destinationChainProvider.getDestinationConnectors().get(it.next()).getQueue();
                Lock invalidationLock = queue.getInvalidationLock();
                invalidationLock.lock();
                try {
                    queue.invalidate(true, false);
                    invalidationLock.unlock();
                } catch (Throwable th) {
                    invalidationLock.unlock();
                    throw th;
                }
            }
        }
    }

    public synchronized void deploy() throws DeployException {
        deploy(null);
    }

    public synchronized void debugDeploy(DebugOptions debugOptions) throws DeployException {
        deploy(debugOptions);
    }

    public synchronized void deploy(DebugOptions debugOptions) throws DeployException {
        if (!isConfigurationValid()) {
            throw new DeployException("Failed to deploy channel. The channel configuration is incomplete.");
        }
        ChannelController.getInstance().initChannelStorage(this.channelId);
        if (!this.sourceConnector.isRespondAfterProcessing() && (!this.storageSettings.isEnabled() || !this.storageSettings.isStoreRaw() || (!this.storageSettings.isStoreMaps() && !this.storageSettings.isRawDurable()))) {
            throw new DeployException("Failed to deploy channel " + this.name + " (" + this.channelId + "): the source connector has queueing enabled, but the current storage settings do not support queueing on the source connector.");
        }
        for (DestinationChainProvider destinationChainProvider : this.destinationChainProviders) {
            Iterator<Integer> it = destinationChainProvider.getMetaDataIds().iterator();
            while (it.hasNext()) {
                if (destinationChainProvider.getDestinationConnectors().get(it.next()).isQueueEnabled() && (!this.storageSettings.isEnabled() || !this.storageSettings.isStoreSourceEncoded() || !this.storageSettings.isStoreSent() || !this.storageSettings.isStoreMaps())) {
                    throw new DeployException("Failed to deploy channel " + this.name + " (" + this.channelId + "): one or more destination connectors have queueing enabled, but the current storage settings do not support queueing on destination connectors.");
                }
            }
        }
        try {
            updateMetaDataColumns();
            ArrayList arrayList = new ArrayList();
            try {
                if (this.responseSelector == null) {
                    this.responseSelector = new ResponseSelector(this.sourceConnector.getInboundDataType());
                }
                if (this.queueHandler != null) {
                    this.sourceQueue.setDataSource(this.queueHandler.createSourceQueueDataSource(this.sourceConnector, this.daoFactory));
                } else {
                    this.sourceQueue.setDataSource(new ConnectorMessageQueueDataSource(this.channelId, this.serverId, 0, Status.RECEIVED, false, this.daoFactory));
                }
                this.sourceQueue.updateSize();
                arrayList.add(0);
                if (debugOptions != null) {
                    this.sourceConnector.onDebugDeploy(debugOptions);
                } else {
                    this.sourceConnector.onDeploy();
                }
                if (this.sourceConnector.getBatchAdaptorFactory() != null) {
                    this.sourceConnector.getBatchAdaptorFactory().onDeploy();
                }
                for (DestinationChainProvider destinationChainProvider2 : this.destinationChainProviders) {
                    destinationChainProvider2.setDaoFactory(this.daoFactory);
                    destinationChainProvider2.setStorageSettings(this.storageSettings);
                    for (Integer num : destinationChainProvider2.getMetaDataIds()) {
                        DestinationConnector destinationConnector = destinationChainProvider2.getDestinationConnectors().get(num);
                        destinationConnector.setDaoFactory(this.daoFactory);
                        destinationConnector.setStorageSettings(this.storageSettings);
                        if (this.queueHandler != null) {
                            destinationConnector.getQueue().setDataSource(this.queueHandler.createDestinationQueueDataSource(destinationConnector, this.daoFactory));
                        } else {
                            destinationConnector.getQueue().setDataSource(new ConnectorMessageQueueDataSource(getChannelId(), getServerId(), destinationConnector.getMetaDataId(), Status.QUEUED, destinationConnector.isQueueRotate(), this.daoFactory));
                        }
                        destinationConnector.getQueue().updateSize();
                        arrayList.add(num);
                        if (debugOptions != null) {
                            destinationConnector.onDebugDeploy(debugOptions);
                        } else {
                            destinationConnector.onDeploy();
                        }
                    }
                }
                this.responseSelector.setNumDestinations(getDestinationCount());
                Statistics statistics = ChannelController.getInstance().getStatistics();
                HashMap hashMap = new HashMap();
                HashMap hashMap2 = new HashMap(statistics.getConnectorStats(this.channelId, 0));
                hashMap2.put(Status.QUEUED, Long.valueOf(this.sourceQueue.size()));
                hashMap.put(0, hashMap2);
                Iterator<DestinationChainProvider> it2 = this.destinationChainProviders.iterator();
                while (it2.hasNext()) {
                    for (Integer num2 : it2.next().getMetaDataIds()) {
                        HashMap hashMap3 = new HashMap(statistics.getConnectorStats(this.channelId, num2));
                        hashMap3.put(Status.QUEUED, Long.valueOf(r0.getDestinationConnectors().get(num2).getQueue().size()));
                        hashMap.put(num2, hashMap3);
                    }
                }
                this.eventDispatcher.dispatchEvent(new DeployedStateEvent(this.channelId, this.name, null, null, DeployedStateEventType.DEPLOYED, hashMap));
            } catch (Throwable th) {
                Iterator it3 = arrayList.iterator();
                while (it3.hasNext()) {
                    try {
                        undeployConnector((Integer) it3.next());
                    } catch (Exception e) {
                    }
                }
                throw new DeployException("Failed to deploy channel " + this.name + " (" + this.channelId + ").", th);
            }
        } catch (SQLException e2) {
            throw new DeployException("Failed to deploy channel " + this.name + " (" + this.channelId + "): Unable to update custom metadata columns.");
        }
    }

    public synchronized void undeploy() throws UndeployException {
        updateCurrentState(DeployedState.UNDEPLOYING);
        Throwable th = null;
        ArrayList arrayList = new ArrayList();
        arrayList.add(0);
        Iterator<DestinationChainProvider> it = this.destinationChainProviders.iterator();
        while (it.hasNext()) {
            Iterator<Integer> it2 = it.next().getMetaDataIds().iterator();
            while (it2.hasNext()) {
                arrayList.add(it2.next());
            }
        }
        Iterator it3 = arrayList.iterator();
        while (it3.hasNext()) {
            try {
                undeployConnector((Integer) it3.next());
            } catch (Throwable th2) {
                if (th == null) {
                    th = th2;
                }
            }
        }
        if (th != null) {
            throw new UndeployException("Failed to undeploy channel " + this.name + " (" + this.channelId + "): One or more connectors failed to undeploy.", th);
        }
        this.eventDispatcher.dispatchEvent(new DeployedStateEvent(this.channelId, this.name, null, null, DeployedStateEventType.UNDEPLOYED));
    }

    private void undeployConnector(Integer num) throws Exception {
        try {
            if (num.intValue() != 0) {
                DestinationConnector destinationConnector = getDestinationConnector(num.intValue());
                if (destinationConnector != null) {
                    destinationConnector.onUndeploy();
                }
            } else if (this.sourceConnector != null) {
                BatchAdaptorFactory batchAdaptorFactory = this.sourceConnector.getBatchAdaptorFactory();
                if (batchAdaptorFactory != null) {
                    batchAdaptorFactory.onUndeploy();
                }
                this.sourceConnector.onUndeploy();
            }
        } catch (Exception e) {
            if (num.intValue() == 0) {
                this.logger.error("Error undeploying Source connector for channel " + this.name + " (" + this.channelId + ").", e);
            } else {
                this.logger.error("Error undeploying destination connector \"" + getDestinationConnector(num.intValue()).getDestinationName() + "\" for channel " + this.name + " (" + this.channelId + ").", e);
            }
            throw e;
        }
    }

    public void startSourceQueue() {
        this.stopSourceQueue = false;
        if (this.queueHandler == null || this.queueHandler.canStartSourceQueue()) {
            this.sourceQueue.invalidate(true, true);
            if (this.sourceConnector.isRespondAfterProcessing()) {
                return;
            }
            this.queueThreads.clear();
            for (int i = 1; i <= this.processingThreads; i++) {
                Thread thread = new Thread(this);
                thread.setName("Source Queue Thread " + i + " on " + this.name + " (" + this.channelId + ")");
                thread.start();
                this.queueThreads.put(Long.valueOf(thread.getId()), thread);
            }
        }
    }

    public void stopSourceQueue() throws InterruptedException {
        this.stopSourceQueue = true;
        if (MapUtils.isNotEmpty(this.queueThreads)) {
            Iterator<Thread> it = this.queueThreads.values().iterator();
            while (it.hasNext()) {
                it.next().join();
            }
            this.queueThreads.clear();
        }
    }

    public void haltSourceQueue() {
        if (MapUtils.isNotEmpty(this.queueThreads)) {
            Iterator<Thread> it = this.queueThreads.values().iterator();
            while (it.hasNext()) {
                it.next().interrupt();
            }
        }
    }

    public synchronized void start(Set<Integer> set) throws StartException {
        if (this.currentState != DeployedState.DEPLOYING && this.currentState != DeployedState.STOPPED) {
            this.logger.warn("Failed to start channel " + this.name + " (" + this.channelId + "): The channel is already running.");
            return;
        }
        ArrayList arrayList = new ArrayList();
        try {
            ThreadUtils.checkInterruptedStatus();
            updateCurrentState(DeployedState.STARTING);
            this.processLock.reset();
            this.removeContentLock = new ReentrantLock(true);
            this.dispatchThreads.clear();
            this.shuttingDown = false;
            this.processingThreads = ((SourceConnectorPropertiesInterface) this.sourceConnector.getConnectorProperties()).getSourceConnectorProperties().getProcessingThreads();
            if (this.processingThreads < 1) {
                this.processingThreads = 1;
            }
            this.sourceQueue.invalidate(true, true);
            this.channelExecutor = Executors.newCachedThreadPool();
            for (DestinationChainProvider destinationChainProvider : this.destinationChainProviders) {
                for (Integer num : destinationChainProvider.getMetaDataIds()) {
                    DestinationConnector destinationConnector = destinationChainProvider.getDestinationConnectors().get(num);
                    if (destinationConnector.getCurrentState() == DeployedState.STOPPED && (set == null || set.contains(num))) {
                        arrayList.add(num);
                        destinationConnector.start();
                    }
                }
            }
            ThreadUtils.checkInterruptedStatus();
            try {
                processUnfinishedMessages();
            } catch (InterruptedException e) {
                this.logger.error("Startup recovery interrupted for channel " + this.name + " (" + this.channelId + ")", e);
                throw e;
            } catch (Exception e2) {
                Throwable cause = e2 instanceof ExecutionException ? e2.getCause() : e2;
                this.logger.error("Startup recovery failed for channel " + this.name + " (" + this.channelId + "): " + cause.getMessage(), cause);
            }
            ThreadUtils.checkInterruptedStatus();
            Iterator<Integer> it = arrayList.iterator();
            while (it.hasNext()) {
                getDestinationConnector(it.next().intValue()).startQueue();
            }
            startSourceQueue();
            if (set == null || set.contains(0)) {
                ThreadUtils.checkInterruptedStatus();
                if (this.sourceConnector.getCurrentState() == DeployedState.STOPPED) {
                    arrayList.add(0);
                    this.sourceConnector.start();
                }
                updateCurrentState(DeployedState.STARTED);
            } else {
                updateCurrentState(DeployedState.PAUSED);
            }
        } catch (Throwable th) {
            if (th instanceof InterruptedException) {
                throw new StartException("Start channel task for " + this.name + " (" + this.channelId + ") terminated by halt notification.", th);
            }
            try {
                updateCurrentState(DeployedState.STOPPING);
                stop(arrayList);
                updateCurrentState(DeployedState.STOPPED);
            } catch (Throwable th2) {
                if (th2 instanceof InterruptedException) {
                    throw new StartException("Start channel task for " + this.name + " (" + this.channelId + ") terminated by halt notification.", th);
                }
                updateCurrentState(DeployedState.STOPPED);
            }
            throw new StartException("Failed to start channel " + this.name + " (" + this.channelId + ").", th);
        }
    }

    public synchronized void stop() throws StopException {
        if (this.currentState == DeployedState.STOPPED) {
            this.logger.warn("Failed to stop channel " + this.name + " (" + this.channelId + "): The channel is already stopped.");
            return;
        }
        try {
            updateCurrentState(DeployedState.STOPPING);
            ArrayList arrayList = new ArrayList();
            arrayList.add(0);
            Iterator<DestinationChainProvider> it = this.destinationChainProviders.iterator();
            while (it.hasNext()) {
                Iterator<Integer> it2 = it.next().getMetaDataIds().iterator();
                while (it2.hasNext()) {
                    arrayList.add(it2.next());
                }
            }
            stop(arrayList);
            updateCurrentState(DeployedState.STOPPED);
        } catch (Throwable th) {
            if (!(th instanceof InterruptedException)) {
                throw new StopException("Failed to stop channel " + this.name + " (" + this.channelId + ").", th);
            }
            throw new StopException("Stop channel task for " + this.name + " (" + this.channelId + ") terminated by halt notification.", th);
        }
    }

    public void halt() throws HaltException {
        if (this.channelExecutor != null) {
            for (Runnable runnable : this.channelExecutor.shutdownNow()) {
                if (runnable instanceof Future) {
                    ((Future) runnable).cancel(true);
                }
            }
        }
        haltSourceQueue();
        synchronized (this.dispatchThreads) {
            this.shuttingDown = true;
            Iterator<Thread> it = this.dispatchThreads.iterator();
            while (it.hasNext()) {
                it.next().interrupt();
            }
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(0);
        Iterator<DestinationChainProvider> it2 = this.destinationChainProviders.iterator();
        while (it2.hasNext()) {
            Iterator<Integer> it3 = it2.next().getMetaDataIds().iterator();
            while (it3.hasNext()) {
                arrayList.add(it3.next());
            }
        }
        Iterator<Integer> it4 = arrayList.iterator();
        while (it4.hasNext()) {
            try {
                haltConnector(it4.next());
            } catch (Throwable th) {
            }
        }
        synchronized (this) {
            if (this.currentState != DeployedState.STOPPED) {
                try {
                    updateCurrentState(DeployedState.STOPPING);
                    halt(arrayList);
                    updateCurrentState(DeployedState.STOPPED);
                } catch (Throwable th2) {
                    if (!(th2 instanceof InterruptedException)) {
                        throw new HaltException("Failed to halt channel " + this.name + " (" + this.channelId + ").", th2);
                    }
                    throw new HaltException("Halt channel task for " + this.name + " (" + this.channelId + ") terminated by another halt notification.", th2);
                }
            } else {
                this.logger.warn("Failed to stop channel " + this.name + " (" + this.channelId + "): The channel is already stopped.");
            }
        }
    }

    public synchronized void pause() throws PauseException {
        if (this.currentState != DeployedState.STARTED) {
            if (this.currentState == DeployedState.PAUSED) {
                this.logger.warn("Failed to pause channel " + this.name + " (" + this.channelId + "): The channel is already paused.");
                return;
            } else {
                this.logger.warn("Failed to pause channel " + this.name + " (" + this.channelId + "): The channel is currently " + this.currentState.toString().toLowerCase() + " and cannot be paused.");
                return;
            }
        }
        try {
            updateCurrentState(DeployedState.PAUSING);
            this.sourceConnector.stop();
            updateCurrentState(DeployedState.PAUSED);
        } catch (Throwable th) {
            if (!(th instanceof InterruptedException)) {
                throw new PauseException("Failed to pause channel " + this.name + " (" + this.channelId + ").", th);
            }
            throw new PauseException("Pause channel task for " + this.name + " (" + this.channelId + ") terminated by halt notification.", th);
        }
    }

    public synchronized void resume() throws ResumeException {
        if (this.currentState != DeployedState.PAUSED) {
            this.logger.warn("Failed to resume channel " + this.name + " (" + this.channelId + "): The source connector is not currently paused.");
            return;
        }
        try {
            updateCurrentState(DeployedState.STARTING);
            this.sourceConnector.start();
            updateCurrentState(DeployedState.STARTED);
        } catch (Throwable th) {
            if (th instanceof InterruptedException) {
                throw new ResumeException("Resume channel task for " + this.name + " (" + this.channelId + ") terminated by halt notification.", th);
            }
            try {
                updateCurrentState(DeployedState.PAUSING);
                this.sourceConnector.stop();
                updateCurrentState(DeployedState.PAUSED);
            } catch (Throwable th2) {
            }
            throw new ResumeException("Failed to resume channel " + this.name + " (" + this.channelId + ").", th);
        }
    }

    public synchronized void removeAllMessages(boolean z, boolean z2) throws InterruptedException {
        boolean z3 = false;
        HashSet hashSet = new HashSet();
        if (this.currentState != DeployedState.STOPPED && z) {
            if (this.sourceConnector.getCurrentState() != DeployedState.STOPPED) {
                hashSet.add(0);
            }
            Iterator<DestinationChainProvider> it = this.destinationChainProviders.iterator();
            while (it.hasNext()) {
                for (DestinationConnector destinationConnector : it.next().getDestinationConnectors().values()) {
                    if (destinationConnector.getCurrentState() != DeployedState.STOPPED) {
                        hashSet.add(Integer.valueOf(destinationConnector.getMetaDataId()));
                    }
                }
            }
            try {
                stop();
                z3 = true;
            } catch (StopException e) {
                this.logger.error("Failed to stop channel " + this.name + " (" + this.channelId + ") in order to remove all messages.", e);
                return;
            }
        }
        if (this.currentState == DeployedState.STOPPED) {
            DELETE_PERMIT.acquire();
            try {
                DonkeyDao dao = getDaoFactory().getDao();
                boolean z4 = false;
                try {
                    this.logger.debug("Removing messages for channel " + this.name + " (" + this.channelId + ").");
                    dao.deleteAllMessages(this.channelId);
                    if (z2) {
                        this.logger.debug("Clearing statistics for channel " + this.name + " (" + this.channelId + ").");
                        Set<Status> trackedStatuses = Statistics.getTrackedStatuses();
                        dao.resetStatistics(this.channelId, null, trackedStatuses);
                        Iterator<Integer> it2 = getMetaDataIds().iterator();
                        while (it2.hasNext()) {
                            dao.resetStatistics(this.channelId, it2.next(), trackedStatuses);
                        }
                    }
                    dao.commit();
                    z4 = true;
                    if (dao != null) {
                        if (1 == 0) {
                            try {
                                dao.rollback();
                            } catch (Exception e2) {
                            }
                        }
                        dao.close();
                    }
                    DELETE_PERMIT.release();
                    invalidateQueues();
                } catch (Throwable th) {
                    if (dao != null) {
                        if (!z4) {
                            try {
                                dao.rollback();
                            } catch (Exception e3) {
                            }
                        }
                        dao.close();
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                DELETE_PERMIT.release();
                throw th2;
            }
        }
        if (z3) {
            try {
                this.logger.debug("Restarting channel " + this.name + " (" + this.channelId + ") after removing all messages");
                start(hashSet);
            } catch (StartException e4) {
                this.logger.error("Failed to start channel " + this.name + " (" + this.channelId + ") after removing all messages.", e4);
            }
        }
    }

    /* JADX WARN: Removed duplicated region for block: B:43:0x0174  */
    /* JADX WARN: Removed duplicated region for block: B:65:0x0203  */
    /* JADX WARN: Removed duplicated region for block: B:67:0x020c A[RETURN] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void stop(java.util.List<java.lang.Integer> r6) throws java.lang.Throwable {
        /*
            Method dump skipped, instructions count: 525
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.mirth.connect.donkey.server.channel.Channel.stop(java.util.List):void");
    }

    /* JADX WARN: Removed duplicated region for block: B:68:0x012f  */
    /* JADX WARN: Removed duplicated region for block: B:70:0x0138 A[RETURN] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void halt(java.util.List<java.lang.Integer> r6) throws java.lang.Throwable {
        /*
            Method dump skipped, instructions count: 313
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.mirth.connect.donkey.server.channel.Channel.halt(java.util.List):void");
    }

    public void startConnector(Integer num) throws StartException, ResumeException {
        if (num.intValue() == 0) {
            resume();
            return;
        }
        DestinationConnector destinationConnector = getDestinationConnector(num.intValue());
        if (this.currentState != DeployedState.STARTED && this.currentState != DeployedState.PAUSED) {
            this.logger.error("Failed to start connector " + destinationConnector.getDestinationName() + " for channel " + this.name + " (" + this.channelId + "): The channel is not started or paused.");
            return;
        }
        if (destinationConnector.getCurrentState() == DeployedState.STOPPED) {
            try {
                destinationConnector.start();
                destinationConnector.startQueue();
            } catch (Throwable th) {
                if (th instanceof InterruptedException) {
                    throw new StartException("Start task for connector " + destinationConnector.getDestinationName() + " for channel " + this.name + " (" + this.channelId + ") terminated by halt notification.", th);
                }
                try {
                    destinationConnector.stop();
                } catch (Throwable th2) {
                }
                throw new StartException("Failed to start connector " + destinationConnector.getDestinationName() + " for channel " + this.name + " (" + this.channelId + "). ", th);
            }
        }
    }

    public void stopConnector(Integer num) throws StopException, PauseException {
        if (num.intValue() == 0) {
            pause();
            return;
        }
        DestinationConnector destinationConnector = getDestinationConnector(num.intValue());
        if (this.currentState != DeployedState.STARTED && this.currentState != DeployedState.PAUSED) {
            this.logger.error("Failed to stop connector " + destinationConnector.getDestinationName() + " for channel " + this.name + " (" + this.channelId + "): The channel is not started or paused.");
            return;
        }
        if (destinationConnector.getCurrentState() != DeployedState.STOPPED) {
            if (!destinationConnector.isQueueEnabled()) {
                this.logger.error("Failed to stop connector " + destinationConnector.getDestinationName() + " for channel " + this.name + " (" + this.channelId + "): Destination connectors must have queueing enabled to be stopped individually.");
                return;
            }
            try {
                destinationConnector.setForceQueue(true);
                destinationConnector.stop();
            } catch (Throwable th) {
                throw new StopException("Failed to stop connector " + destinationConnector.getDestinationName() + " for channel " + this.name + " (" + this.channelId + "). ", th);
            }
        }
    }

    private void haltConnector(Integer num) throws ConnectorTaskException, InterruptedException {
        try {
            if (num.intValue() == 0) {
                this.sourceConnector.halt();
            } else {
                getDestinationConnector(num.intValue()).halt();
            }
        } catch (ConnectorTaskException e) {
            if (num.intValue() == 0) {
                this.logger.error("Error halting Source connector for channel " + this.name + " (" + this.channelId + ").", e);
            } else {
                this.logger.error("Error halting destination connector \"" + getDestinationConnector(num.intValue()).getDestinationName() + "\" for channel " + this.name + " (" + this.channelId + ").", e);
            }
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DispatchResult dispatchRawMessage(RawMessage rawMessage, boolean z) throws ChannelException {
        ChannelException channelException;
        boolean z2;
        Long valueOf;
        if ((this.currentState == DeployedState.STOPPING && !z) || this.currentState == DeployedState.STOPPED) {
            throw new ChannelException(true);
        }
        Thread currentThread = Thread.currentThread();
        String name = currentThread.getName();
        boolean z3 = false;
        Long l = null;
        try {
            try {
                synchronized (this.dispatchThreads) {
                    if (this.shuttingDown) {
                        throw new ChannelException(true);
                    }
                    this.dispatchThreads.add(currentThread);
                }
                if (StringUtils.contains(name, this.channelId)) {
                    currentThread.setName("Channel Dispatch Thread < " + name);
                } else {
                    currentThread.setName("Channel Dispatch Thread on " + this.name + " (" + this.channelId + ") < " + name);
                }
                DonkeyDao donkeyDao = null;
                Message message = null;
                Response response = null;
                String str = null;
                DispatchResult dispatchResult = null;
                try {
                    try {
                        obtainProcessLock();
                        boolean z4 = true;
                        DonkeyDao dao = this.daoFactory.getDao();
                        ConnectorMessage createAndStoreSourceMessage = createAndStoreSourceMessage(dao, rawMessage);
                        ThreadUtils.checkInterruptedStatus();
                        if (this.sourceConnector.isRespondAfterProcessing()) {
                            dao.commit(this.storageSettings.isRawDurable());
                            z2 = true;
                            valueOf = Long.valueOf(createAndStoreSourceMessage.getMessageId());
                            dao.close();
                            markDeletedQueuedMessages(rawMessage, valueOf);
                            message = process(createAndStoreSourceMessage, false);
                        } else {
                            synchronized (this.sourceQueue) {
                                dao.commit(this.storageSettings.isRawDurable());
                                z2 = true;
                                valueOf = Long.valueOf(createAndStoreSourceMessage.getMessageId());
                                dao.close();
                                queue(createAndStoreSourceMessage);
                            }
                            markDeletedQueuedMessages(rawMessage, valueOf);
                        }
                        if (this.responseSelector.canRespond()) {
                            try {
                                response = this.responseSelector.getResponse(createAndStoreSourceMessage, message);
                            } catch (Exception e) {
                                str = ExceptionUtils.getStackTrace(e);
                            }
                        }
                        if (1 != 0 && (!this.sourceConnector.isRespondAfterProcessing() || valueOf == null || Thread.currentThread().isInterrupted())) {
                            releaseProcessLock();
                            z4 = false;
                        }
                        if (dao != null && !dao.isClosed()) {
                            if (!z2) {
                                try {
                                    dao.rollback();
                                } catch (Exception e2) {
                                }
                            }
                            dao.close();
                        }
                        if (valueOf != null) {
                            dispatchResult = new DispatchResult(valueOf.longValue(), message, response, this.sourceConnector.isRespondAfterProcessing(), z4);
                            if (StringUtils.isNotBlank(str)) {
                                dispatchResult.setResponseError(str);
                            }
                        }
                        DispatchResult dispatchResult2 = dispatchResult;
                        synchronized (this.dispatchThreads) {
                            this.dispatchThreads.remove(currentThread);
                        }
                        currentThread.setName(name);
                        return dispatchResult2;
                    } catch (Throwable th) {
                        if (0 != 0 && (!this.sourceConnector.isRespondAfterProcessing() || 0 == 0 || Thread.currentThread().isInterrupted())) {
                            releaseProcessLock();
                            z3 = false;
                        }
                        if (0 != 0 && !donkeyDao.isClosed()) {
                            if (0 == 0) {
                                try {
                                    donkeyDao.rollback();
                                } catch (Exception e3) {
                                }
                            }
                            donkeyDao.close();
                        }
                        if (0 != 0) {
                            DispatchResult dispatchResult3 = new DispatchResult(l.longValue(), null, null, this.sourceConnector.isRespondAfterProcessing(), z3);
                            if (StringUtils.isNotBlank((CharSequence) null)) {
                                dispatchResult3.setResponseError(null);
                            }
                        }
                        throw th;
                    }
                } catch (RuntimeException e4) {
                    throw new ChannelException(true, e4);
                }
            } catch (Throwable th2) {
                synchronized (this.dispatchThreads) {
                    this.dispatchThreads.remove(currentThread);
                    currentThread.setName(name);
                    throw th2;
                }
            }
        } catch (InterruptedException e5) {
            throw new ChannelException(true, e5);
        } catch (Throwable th3) {
            Throwable cause = th3.getCause();
            if (cause instanceof InterruptedException) {
                channelException = new ChannelException(true, cause);
            } else if (cause instanceof ChannelException) {
                this.logger.error("Runtime error in channel " + this.name + " (" + this.channelId + ").", cause);
                channelException = (ChannelException) cause;
            } else {
                this.logger.error("Error processing message in channel " + this.name + " (" + this.channelId + ").", th3);
                channelException = new ChannelException(false, th3);
            }
            if (0 == 0) {
                throw channelException;
            }
            DispatchResult dispatchResult4 = new DispatchResult(l.longValue(), null, null, false, false, channelException);
            synchronized (this.dispatchThreads) {
                this.dispatchThreads.remove(currentThread);
                currentThread.setName(name);
                return dispatchResult4;
            }
        }
    }

    private void markDeletedQueuedMessages(RawMessage rawMessage, Long l) throws InterruptedException {
        if (!rawMessage.isOverwrite() || rawMessage.getOriginalMessageId() == null) {
            return;
        }
        for (Integer num : getMetaDataIds()) {
            if (!num.equals(0)) {
                getDestinationConnector(num.intValue()).getQueue().markAsDeleted(l);
            }
        }
        for (Integer num2 : getMetaDataIds()) {
            if (!num2.equals(0)) {
                while (getDestinationConnector(num2.intValue()).getQueue().isCheckedOut(l)) {
                    Thread.sleep(100L);
                }
            }
        }
    }

    private ConnectorMessage createAndStoreSourceMessage(DonkeyDao donkeyDao, RawMessage rawMessage) throws ChannelException, InterruptedException {
        Long valueOf;
        Calendar calendar;
        ThreadUtils.checkInterruptedStatus();
        if (!rawMessage.isOverwrite() || rawMessage.getOriginalMessageId() == null) {
            valueOf = Long.valueOf(donkeyDao.getNextMessageId(this.channelId));
            calendar = Calendar.getInstance();
            Message message = new Message();
            message.setMessageId(valueOf);
            message.setChannelId(this.channelId);
            message.setServerId(this.serverId);
            message.setReceivedDate(calendar);
            message.setOriginalId(rawMessage.getOriginalMessageId());
            donkeyDao.insertMessage(message);
        } else {
            valueOf = rawMessage.getOriginalMessageId();
            HashSet hashSet = new HashSet();
            if (rawMessage.getDestinationMetaDataIds() != null) {
                hashSet.addAll(rawMessage.getDestinationMetaDataIds());
            } else {
                hashSet.addAll(getMetaDataIds());
            }
            hashSet.add(0);
            if (!rawMessage.isImported()) {
                donkeyDao.deleteMessageStatistics(this.channelId, valueOf.longValue(), hashSet);
            }
            donkeyDao.deleteMessageAttachments(this.channelId, valueOf.longValue());
            donkeyDao.deleteConnectorMessages(this.channelId, valueOf.longValue(), hashSet);
            donkeyDao.resetMessage(this.channelId, valueOf.longValue());
            calendar = Calendar.getInstance();
        }
        ConnectorMessage connectorMessage = new ConnectorMessage(this.channelId, this.name, valueOf.longValue(), 0, this.serverId, calendar, Status.RECEIVED);
        connectorMessage.setConnectorName(this.sourceConnector.getSourceName());
        connectorMessage.setChainId(0);
        connectorMessage.setOrderId(0);
        connectorMessage.setRaw(new MessageContent(this.channelId, valueOf.longValue(), 0, ContentType.RAW, null, this.sourceConnector.getInboundDataType().getType(), false));
        HashMap hashMap = new HashMap();
        if (rawMessage.getSourceMap() != null) {
            hashMap.putAll(rawMessage.getSourceMap());
            if (hashMap.containsKey(Constants.BATCH_SEQUENCE_ID_KEY)) {
                Object obj = hashMap.get(Constants.BATCH_SEQUENCE_ID_KEY);
                if ((obj instanceof Integer) && ((Integer) obj).intValue() == 1) {
                    hashMap.put(Constants.BATCH_ID_KEY, valueOf);
                }
            }
        }
        Collection<Integer> destinationMetaDataIds = rawMessage.getDestinationMetaDataIds();
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        if (destinationMetaDataIds != null) {
            linkedHashSet.addAll(destinationMetaDataIds);
        } else {
            Iterator<DestinationChainProvider> it = this.destinationChainProviders.iterator();
            while (it.hasNext()) {
                linkedHashSet.addAll(it.next().getMetaDataIds());
            }
        }
        hashMap.put(Constants.DESTINATION_SET_KEY, linkedHashSet);
        connectorMessage.setSourceMap(Collections.unmodifiableMap(hashMap));
        if (CollectionUtils.isNotEmpty(rawMessage.getAttachments())) {
            for (Attachment attachment : rawMessage.getAttachments()) {
                ThreadUtils.checkInterruptedStatus();
                if (this.storageSettings.isStoreAttachments()) {
                    donkeyDao.insertMessageAttachment(this.channelId, valueOf.longValue(), attachment);
                }
            }
            rawMessage.setAttachments(null);
        }
        if (this.attachmentHandlerProvider != null && this.attachmentHandlerProvider.canExtractAttachments()) {
            ThreadUtils.checkInterruptedStatus();
            AttachmentHandler handler = this.attachmentHandlerProvider.getHandler();
            try {
                handler.initialize(rawMessage, this);
                rawMessage.clearMessage();
                while (true) {
                    Attachment nextAttachment = handler.nextAttachment();
                    if (nextAttachment == null) {
                        break;
                    }
                    ThreadUtils.checkInterruptedStatus();
                    if (this.storageSettings.isStoreAttachments()) {
                        donkeyDao.insertMessageAttachment(this.channelId, valueOf.longValue(), nextAttachment);
                    }
                }
                connectorMessage.getRaw().setContent(handler.shutdown());
            } catch (AttachmentException e) {
                this.eventDispatcher.dispatchEvent(new ErrorEvent(this.channelId, null, valueOf, ErrorEventType.ATTACHMENT_HANDLER, null, null, "Error processing attachments for channel " + this.channelId + ".", e));
                this.logger.error("Error processing attachments for channel " + this.name + " (" + this.channelId + ").", e);
                throw new ChannelException(false, e);
            }
        } else if (rawMessage.isBinary().booleanValue()) {
            ThreadUtils.checkInterruptedStatus();
            try {
                byte[] encodeBase64 = Base64Util.encodeBase64(rawMessage.getRawBytes());
                rawMessage.clearMessage();
                connectorMessage.getRaw().setContent(org.apache.commons.codec.binary.StringUtils.newStringUsAscii(encodeBase64));
            } catch (IOException e2) {
                this.logger.error("Error processing binary data for channel " + this.name + " (" + this.channelId + ").", e2);
                throw new ChannelException(false, e2);
            }
        } else {
            connectorMessage.getRaw().setContent(rawMessage.getRawData());
            rawMessage.clearMessage();
        }
        ThreadUtils.checkInterruptedStatus();
        donkeyDao.insertConnectorMessage(connectorMessage, this.storageSettings.isStoreMaps() || this.storageSettings.isRawDurable(), true);
        if (this.storageSettings.isStoreRaw()) {
            ThreadUtils.checkInterruptedStatus();
            donkeyDao.insertMessageContent(connectorMessage.getRaw());
        }
        return connectorMessage;
    }

    public void obtainProcessLock() throws InterruptedException {
        this.processLock.acquire();
    }

    public void releaseProcessLock() {
        this.processLock.release();
    }

    public void obtainAllProcessLockPermits() throws InterruptedException {
        this.processLock.acquireAll();
    }

    public void releaseAllProcessLockPermits() {
        this.processLock.releaseAll();
    }

    public void obtainRemoveContentLock() throws InterruptedException {
        this.removeContentLock.lockInterruptibly();
    }

    public void releaseRemoveContentLock() {
        this.removeContentLock.unlock();
    }

    protected void queue(ConnectorMessage connectorMessage) {
        this.sourceQueue.add(connectorMessage);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Message process(ConnectorMessage connectorMessage, boolean z) throws InterruptedException {
        ThreadUtils.checkInterruptedStatus();
        long messageId = connectorMessage.getMessageId();
        if (connectorMessage.getMetaDataId() != 0 || connectorMessage.getStatus() != Status.RECEIVED) {
            throw new RuntimeException("Received a source message with an invalid state");
        }
        Message message = new Message();
        message.setMessageId(Long.valueOf(messageId));
        message.setServerId(this.serverId);
        message.setChannelId(this.channelId);
        message.setReceivedDate(connectorMessage.getReceivedDate());
        message.getConnectorMessages().put(0, connectorMessage);
        String str = null;
        ThreadUtils.checkInterruptedStatus();
        try {
            str = this.preProcessor.doPreProcess(connectorMessage);
        } catch (DonkeyException e) {
            connectorMessage.setStatus(Status.ERROR);
            connectorMessage.setProcessingError(e.getFormattedError());
        }
        ThreadUtils.checkInterruptedStatus();
        DonkeyDao dao = this.daoFactory.getDao();
        boolean z2 = false;
        try {
            if (connectorMessage.getStatus() == Status.ERROR) {
                dao.updateStatus(connectorMessage, Status.RECEIVED);
                if (StringUtils.isNotBlank(connectorMessage.getProcessingError())) {
                    dao.updateErrors(connectorMessage);
                }
                ThreadUtils.checkInterruptedStatus();
                dao.commit(this.storageSettings.isDurable());
                dao.close();
                finishMessage(message, z);
                if (!dao.isClosed()) {
                    if (dao != null && 1 == 0) {
                        try {
                            dao.rollback();
                        } catch (Exception e2) {
                        }
                    }
                    dao.close();
                }
                return message;
            }
            if (str != null) {
                connectorMessage.setProcessedRaw(new MessageContent(this.channelId, messageId, 0, ContentType.PROCESSED_RAW, str, this.sourceConnector.getInboundDataType().getType(), false));
            }
            try {
                this.sourceConnector.getFilterTransformerExecutor().processConnectorMessage(connectorMessage);
            } catch (DonkeyException e3) {
                if (e3 instanceof MessageSerializerException) {
                    this.eventDispatcher.dispatchEvent(new ErrorEvent(this.channelId, 0, Long.valueOf(messageId), ErrorEventType.SERIALIZER, this.sourceConnector.getSourceName(), null, e3.getMessage(), e3));
                }
                connectorMessage.setStatus(Status.ERROR);
                connectorMessage.setProcessingError(e3.getFormattedError());
            }
            dao.updateStatus(connectorMessage, Status.RECEIVED);
            this.sourceConnector.getMetaDataReplacer().setMetaDataMap(connectorMessage, this.metaDataColumns);
            if (!connectorMessage.getMetaDataMap().isEmpty() && this.storageSettings.isStoreCustomMetaData()) {
                ThreadUtils.checkInterruptedStatus();
                dao.insertMetaData(connectorMessage, this.metaDataColumns);
            }
            if (this.storageSettings.isStoreMaps()) {
                ThreadUtils.checkInterruptedStatus();
                dao.updateMaps(connectorMessage);
                dao.updateSourceMap(connectorMessage);
            }
            if (connectorMessage.getStatus() != Status.TRANSFORMED) {
                if (this.storageSettings.isStoreProcessedRaw() && connectorMessage.getProcessedRaw() != null) {
                    ThreadUtils.checkInterruptedStatus();
                    dao.insertMessageContent(connectorMessage.getProcessedRaw());
                }
                if (this.storageSettings.isStoreTransformed() && connectorMessage.getTransformed() != null) {
                    dao.insertMessageContent(connectorMessage.getTransformed());
                }
                if (StringUtils.isNotBlank(connectorMessage.getProcessingError())) {
                    dao.updateErrors(connectorMessage);
                }
                ThreadUtils.checkInterruptedStatus();
                dao.commit();
                dao.close();
                finishMessage(message, z);
                if (!dao.isClosed()) {
                    if (dao != null && 1 == 0) {
                        try {
                            dao.rollback();
                        } catch (Exception e4) {
                        }
                    }
                    dao.close();
                }
                return message;
            }
            boolean z3 = false;
            ThreadUtils.checkInterruptedStatus();
            if (this.storageSettings.isStoreProcessedRaw() && connectorMessage.getProcessedRaw() != null) {
                dao.batchInsertMessageContent(connectorMessage.getProcessedRaw());
                z3 = true;
            }
            if (this.storageSettings.isStoreTransformed() && connectorMessage.getTransformed() != null) {
                dao.batchInsertMessageContent(connectorMessage.getTransformed());
                z3 = true;
            }
            if (this.storageSettings.isStoreSourceEncoded() && connectorMessage.getEncoded() != null) {
                dao.batchInsertMessageContent(connectorMessage.getEncoded());
                z3 = true;
            }
            if (z3) {
                dao.executeBatchInsertMessageContent(this.channelId);
            }
            HashMap hashMap = new HashMap();
            MessageContent encoded = connectorMessage.getEncoded();
            Collection collection = connectorMessage.getSourceMap().containsKey(Constants.DESTINATION_SET_KEY) ? (Collection) connectorMessage.getSourceMap().get(Constants.DESTINATION_SET_KEY) : null;
            ArrayList<DestinationChain> arrayList = new ArrayList();
            for (DestinationChainProvider destinationChainProvider : this.destinationChainProviders) {
                DestinationChain chain = destinationChainProvider.getChain();
                if (collection != null) {
                    ArrayList arrayList2 = new ArrayList();
                    for (Integer num : destinationChainProvider.getMetaDataIds()) {
                        if (collection.contains(num)) {
                            arrayList2.add(num);
                        }
                    }
                    chain.setEnabledMetaDataIds(arrayList2);
                }
                if (!chain.getEnabledMetaDataIds().isEmpty()) {
                    ThreadUtils.checkInterruptedStatus();
                    Integer num2 = chain.getEnabledMetaDataIds().get(0);
                    DestinationConnector destinationConnector = destinationChainProvider.getDestinationConnectors().get(num2);
                    MessageContent messageContent = new MessageContent(this.channelId, messageId, num2.intValue(), ContentType.RAW, encoded.getContent(), destinationConnector.getInboundDataType().getType(), encoded.isEncrypted());
                    ConnectorMessage connectorMessage2 = new ConnectorMessage(this.channelId, this.name, messageId, num2.intValue(), this.serverId, Calendar.getInstance(), Status.RECEIVED);
                    connectorMessage2.setConnectorName(destinationConnector.getDestinationName());
                    connectorMessage2.setChainId(destinationChainProvider.getChainId().intValue());
                    connectorMessage2.setOrderId(destinationConnector.getOrderId().intValue());
                    connectorMessage2.setSourceMap(connectorMessage.getSourceMap());
                    connectorMessage2.setChannelMap(new HashMap(connectorMessage.getChannelMap()));
                    connectorMessage2.setResponseMap(new HashMap(connectorMessage.getResponseMap()));
                    connectorMessage2.setRaw(messageContent);
                    dao.insertConnectorMessage(connectorMessage2, this.storageSettings.isStoreMaps(), true);
                    hashMap.put(num2, connectorMessage2);
                }
                arrayList.add(chain);
            }
            ThreadUtils.checkInterruptedStatus();
            dao.commit();
            z2 = true;
            dao.close();
            ArrayList arrayList3 = new ArrayList();
            for (DestinationChain destinationChain : arrayList) {
                if (!destinationChain.getEnabledMetaDataIds().isEmpty()) {
                    destinationChain.setMessage((ConnectorMessage) hashMap.get(destinationChain.getEnabledMetaDataIds().get(0)));
                    arrayList3.add(destinationChain);
                }
            }
            if (!arrayList3.isEmpty()) {
                ArrayList arrayList4 = new ArrayList();
                for (int i = 0; i <= arrayList3.size() - 2; i++) {
                    try {
                        DestinationChain destinationChain2 = (DestinationChain) arrayList3.get(i);
                        destinationChain2.setName("Destination Chain Thread " + (i + 1) + " on " + this.name + " (" + this.channelId + ")");
                        arrayList4.add(this.channelExecutor.submit(destinationChain2));
                    } catch (RejectedExecutionException e5) {
                        Thread.currentThread().interrupt();
                        throw new InterruptedException();
                    }
                }
                List<ConnectorMessage> list = null;
                try {
                    DestinationChain destinationChain3 = (DestinationChain) arrayList3.get(arrayList3.size() - 1);
                    destinationChain3.setName("Destination Chain Thread " + arrayList3.size() + " on " + this.name + " (" + this.channelId + ")");
                    list = destinationChain3.call();
                } catch (Throwable th) {
                    handleDestinationChainThrowable(th);
                }
                addConnectorMessages(message, connectorMessage, list);
                Iterator it = arrayList4.iterator();
                while (it.hasNext()) {
                    List<ConnectorMessage> list2 = null;
                    try {
                        list2 = (List) ((Future) it.next()).get();
                    } catch (Exception e6) {
                        handleDestinationChainThrowable(e6);
                    }
                    addConnectorMessages(message, connectorMessage, list2);
                }
            }
            finishMessage(message, z);
            if (!dao.isClosed()) {
                if (dao != null && 1 == 0) {
                    try {
                        dao.rollback();
                    } catch (Exception e7) {
                    }
                }
                dao.close();
            }
            return message;
        } catch (Throwable th2) {
            if (!dao.isClosed()) {
                if (dao != null && !z2) {
                    try {
                        dao.rollback();
                    } catch (Exception e8) {
                    }
                }
                dao.close();
            }
            throw th2;
        }
    }

    private void addConnectorMessages(Message message, ConnectorMessage connectorMessage, List<ConnectorMessage> list) {
        if (list != null) {
            for (ConnectorMessage connectorMessage2 : list) {
                message.getConnectorMessages().put(Integer.valueOf(connectorMessage2.getMetaDataId()), connectorMessage2);
                connectorMessage.getResponseMap().putAll(connectorMessage2.getResponseMap());
            }
        }
    }

    private void handleDestinationChainThrowable(Throwable th) throws OutOfMemoryError, InterruptedException {
        Throwable cause = th instanceof ExecutionException ? th.getCause() : th;
        if (cause.getMessage() != null && cause.getMessage().contains("Java heap space")) {
            this.logger.error(cause.getMessage(), cause);
            throw new OutOfMemoryError();
        }
        if (cause instanceof CancellationException) {
            Thread.currentThread().interrupt();
            throw new InterruptedException();
        }
        if (!(cause instanceof InterruptedException)) {
            throw new RuntimeException(cause);
        }
        Thread.currentThread().interrupt();
        throw ((InterruptedException) cause);
    }

    public void processUnfinishedMessages() throws Exception {
        this.channelExecutor.submit(new RecoveryTask(this)).get();
    }

    @Override // java.lang.Runnable
    public void run() {
        do {
            try {
                processSourceQueue(1000);
                if (!isActive()) {
                    break;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
        } while (!this.stopSourceQueue);
    }

    /* JADX WARN: Finally extract failed */
    public void processSourceQueue(int i) throws InterruptedException {
        ThreadUtils.checkInterruptedStatus();
        ConnectorMessage poll = this.sourceQueue.poll(i, TimeUnit.MILLISECONDS);
        while (poll != null) {
            try {
                try {
                    if (this.stopSourceQueue) {
                        break;
                    }
                    try {
                        process(poll, true);
                        this.sourceQueue.finish(poll);
                    } catch (Throwable th) {
                        ThreadUtils.checkInterruptedException(th);
                        this.logger.error("An error occurred in channel " + this.name + " (" + this.channelId + ") while processing message ID " + poll.getMessageId() + " from the source queue", th);
                        this.eventDispatcher.dispatchEvent(new ErrorEvent(this.channelId, 0, Long.valueOf(poll.getMessageId()), ErrorEventType.SOURCE_CONNECTOR, this.sourceConnector.getSourceName(), null, th.getMessage(), th));
                        this.sourceQueue.finish(poll);
                        this.sourceQueue.invalidate(false, false);
                        Thread.sleep(1000L);
                    }
                    poll = this.sourceQueue.poll();
                } catch (Throwable th2) {
                    ThreadUtils.checkInterruptedException(th2);
                    this.logger.error("An error occurred in channel " + this.name + " (" + this.channelId + ") while polling from the source queue", th2);
                    this.eventDispatcher.dispatchEvent(new ErrorEvent(this.channelId, 0, null, ErrorEventType.SOURCE_CONNECTOR, this.sourceConnector.getSourceName(), null, th2.getMessage(), th2));
                    Thread.sleep(1000L);
                    if (poll != null) {
                        this.sourceQueue.finish(poll);
                        return;
                    }
                    return;
                }
            } catch (Throwable th3) {
                if (poll != null) {
                    this.sourceQueue.finish(poll);
                }
                throw th3;
            }
        }
        if (poll != null) {
            this.sourceQueue.finish(poll);
        }
    }

    public void finishMessage(Message message, boolean z) throws InterruptedException {
        ThreadUtils.checkInterruptedStatus();
        Response response = null;
        boolean z2 = false;
        ConnectorMessage connectorMessage = message.getConnectorMessages().get(0);
        try {
            response = this.postProcessor.doPostProcess(message);
        } catch (DonkeyException e) {
            connectorMessage.setPostProcessorError(e.getFormattedError());
            z2 = true;
        }
        connectorMessage.getResponseMap().putAll(message.getMergedConnectorMessage().getResponseMap());
        if (response != null) {
            connectorMessage.getResponseMap().put(SourceConnectorProperties.RESPONSE_POST_PROCESSOR, response);
        }
        ThreadUtils.checkInterruptedStatus();
        DonkeyDao donkeyDao = null;
        boolean z3 = false;
        if (z2) {
            try {
                donkeyDao = this.daoFactory.getDao();
                donkeyDao.updateErrors(connectorMessage);
            } finally {
                if (donkeyDao != null) {
                    if (!z3) {
                        try {
                            donkeyDao.rollback();
                        } catch (Exception e2) {
                        }
                    }
                    donkeyDao.close();
                }
            }
        }
        if (z) {
            if (donkeyDao == null) {
                donkeyDao = this.daoFactory.getDao();
            }
            if (this.storageSettings.isStoreMergedResponseMap()) {
                ThreadUtils.checkInterruptedStatus();
                donkeyDao.updateResponseMap(connectorMessage);
            }
            donkeyDao.markAsProcessed(this.channelId, message.getMessageId().longValue());
            message.setProcessed(true);
            if (!isUsingDestinationQueues()) {
                removeContent(donkeyDao, message, message.getMessageId().longValue(), false, false);
            }
        }
        if (donkeyDao != null) {
            donkeyDao.commit(this.storageSettings.isDurable());
            z3 = true;
        }
        if (z && isUsingDestinationQueues()) {
            removeContent(donkeyDao, message, message.getMessageId().longValue(), false, true);
        }
    }

    public void removeContent(DonkeyDao donkeyDao, Message message, long j, boolean z, boolean z2) {
        if (this.storageSettings.isEnabled()) {
            if (this.storageSettings.isRemoveContentOnCompletion() || this.storageSettings.isRemoveAttachmentsOnCompletion()) {
                if (!z2) {
                    if (z2 || message == null || !this.messageController.isMessageCompleted(message)) {
                        return;
                    }
                    if (this.storageSettings.isRemoveContentOnCompletion()) {
                        if (this.storageSettings.isRemoveOnlyFilteredOnCompletion()) {
                            HashSet hashSet = new HashSet();
                            for (ConnectorMessage connectorMessage : message.getConnectorMessages().values()) {
                                if (connectorMessage.getStatus().getStatusCode() == Status.FILTERED.getStatusCode()) {
                                    hashSet.add(Integer.valueOf(connectorMessage.getMetaDataId()));
                                }
                            }
                            if (!hashSet.isEmpty()) {
                                donkeyDao.deleteMessageContentByMetaDataIds(this.channelId, j, hashSet);
                            }
                        } else {
                            donkeyDao.deleteMessageContent(this.channelId, j);
                        }
                    }
                    if (this.storageSettings.isRemoveAttachmentsOnCompletion()) {
                        donkeyDao.deleteMessageAttachments(this.channelId, j);
                        return;
                    }
                    return;
                }
                if (message != null) {
                    for (ConnectorMessage connectorMessage2 : message.getConnectorMessages().values()) {
                        int metaDataId = connectorMessage2.getMetaDataId();
                        if (connectorMessage2.getStatus() == Status.ERROR && (metaDataId == 0 || !getDestinationConnector(metaDataId).isQueueEnabled())) {
                            return;
                        }
                    }
                }
                try {
                    Map<Integer, Status> connectorMessageStatuses = donkeyDao.getConnectorMessageStatuses(this.channelId, j, z);
                    if (this.messageController.isMessageCompleted(new HashSet(connectorMessageStatuses.values()))) {
                        HashSet hashSet2 = null;
                        if (this.storageSettings.isRemoveContentOnCompletion() && this.storageSettings.isRemoveOnlyFilteredOnCompletion()) {
                            hashSet2 = new HashSet();
                            for (Map.Entry<Integer, Status> entry : connectorMessageStatuses.entrySet()) {
                                if (entry.getValue().getStatusCode() == Status.FILTERED.getStatusCode()) {
                                    hashSet2.add(entry.getKey());
                                }
                            }
                        }
                        if (this.storageSettings.isRemoveAttachmentsOnCompletion() || !this.storageSettings.isRemoveOnlyFilteredOnCompletion() || !hashSet2.isEmpty()) {
                            obtainRemoveContentLock();
                            try {
                                if (this.storageSettings.isRemoveContentOnCompletion()) {
                                    if (!this.storageSettings.isRemoveOnlyFilteredOnCompletion()) {
                                        donkeyDao.deleteMessageContent(this.channelId, j);
                                    } else if (!hashSet2.isEmpty()) {
                                        donkeyDao.deleteMessageContentByMetaDataIds(this.channelId, j, hashSet2);
                                    }
                                }
                                if (this.storageSettings.isRemoveAttachmentsOnCompletion()) {
                                    donkeyDao.deleteMessageAttachments(this.channelId, j);
                                }
                                donkeyDao.commit();
                                releaseRemoveContentLock();
                            } catch (Throwable th) {
                                releaseRemoveContentLock();
                                throw th;
                            }
                        }
                    }
                } catch (Exception e) {
                    this.logger.error("Error removing content for message " + j + " for channel " + this.name + " (" + this.channelId + ").", e);
                }
            }
        }
    }

    public void importMessage(Message message) throws DonkeyException {
        DonkeyDao donkeyDao = null;
        boolean z = false;
        try {
            donkeyDao = this.daoFactory.getDao();
            importMessage(message, donkeyDao);
            donkeyDao.commit();
            z = true;
            if (donkeyDao != null && 1 == 0) {
                try {
                    donkeyDao.rollback();
                } catch (Exception e) {
                }
            }
            donkeyDao.close();
        } catch (Throwable th) {
            if (donkeyDao != null && !z) {
                try {
                    donkeyDao.rollback();
                } catch (Exception e2) {
                }
            }
            donkeyDao.close();
            throw th;
        }
    }

    public void importMessage(Message message, DonkeyDao donkeyDao) throws DonkeyException {
        if (message.getImportId() == null) {
            message.setImportId(message.getMessageId());
        }
        if (message.getImportChannelId() == null && !message.getChannelId().equals(this.channelId)) {
            message.setImportChannelId(message.getChannelId());
        }
        long nextMessageId = donkeyDao.getNextMessageId(this.channelId);
        message.setMessageId(Long.valueOf(nextMessageId));
        message.setChannelId(this.channelId);
        message.setServerId(this.serverId);
        message.setProcessed(true);
        donkeyDao.insertMessage(message);
        for (ConnectorMessage connectorMessage : message.getConnectorMessages().values()) {
            connectorMessage.setMessageId(nextMessageId);
            connectorMessage.setChannelId(this.channelId);
            connectorMessage.setServerId(this.serverId);
            Status status = connectorMessage.getStatus();
            if (status != Status.FILTERED && status != Status.TRANSFORMED && status != Status.SENT && status != Status.ERROR) {
                connectorMessage.setStatus(Status.ERROR);
            }
            int metaDataId = connectorMessage.getMetaDataId();
            donkeyDao.insertConnectorMessage(connectorMessage, true, false);
            if (!connectorMessage.getMetaDataMap().isEmpty()) {
                donkeyDao.insertMetaData(connectorMessage, this.metaDataColumns);
            }
            for (ContentType contentType : ContentType.getMessageTypes()) {
                MessageContent messageContent = connectorMessage.getMessageContent(contentType);
                if (messageContent != null) {
                    messageContent.setMessageId(Long.valueOf(nextMessageId));
                    messageContent.setChannelId(this.channelId);
                }
            }
            if (this.storageSettings.isStoreRaw() && connectorMessage.getRaw() != null) {
                donkeyDao.insertMessageContent(connectorMessage.getRaw());
            }
            if (this.storageSettings.isStoreProcessedRaw() && connectorMessage.getProcessedRaw() != null) {
                donkeyDao.insertMessageContent(connectorMessage.getProcessedRaw());
            }
            if (this.storageSettings.isStoreTransformed() && connectorMessage.getTransformed() != null) {
                donkeyDao.insertMessageContent(connectorMessage.getTransformed());
            }
            if (this.storageSettings.isStoreSourceEncoded() && metaDataId == 0 && connectorMessage.getEncoded() != null) {
                donkeyDao.insertMessageContent(connectorMessage.getEncoded());
            }
            if (this.storageSettings.isStoreDestinationEncoded() && metaDataId > 0 && connectorMessage.getEncoded() != null) {
                donkeyDao.insertMessageContent(connectorMessage.getEncoded());
            }
            if (this.storageSettings.isStoreSent() && connectorMessage.getSent() != null) {
                donkeyDao.insertMessageContent(connectorMessage.getSent());
            }
            if (this.storageSettings.isStoreResponse() && connectorMessage.getResponse() != null) {
                donkeyDao.insertMessageContent(connectorMessage.getResponse());
            }
            if (this.storageSettings.isStoreResponseTransformed() && connectorMessage.getResponseTransformed() != null) {
                donkeyDao.insertMessageContent(connectorMessage.getResponseTransformed());
            }
            if (this.storageSettings.isStoreProcessedResponse() && connectorMessage.getProcessedResponse() != null) {
                donkeyDao.insertMessageContent(connectorMessage.getProcessedResponse());
            }
        }
        List<Attachment> attachments = message.getAttachments();
        if (CollectionUtils.isNotEmpty(attachments)) {
            Iterator<Attachment> it = attachments.iterator();
            while (it.hasNext()) {
                donkeyDao.insertMessageAttachment(this.channelId, nextMessageId, it.next());
            }
        }
    }

    private void updateMetaDataColumns() throws SQLException {
        DonkeyDao dao = this.daoFactory.getDao();
        boolean z = false;
        try {
            HashMap hashMap = new HashMap();
            ArrayList arrayList = new ArrayList();
            for (MetaDataColumn metaDataColumn : dao.getMetaDataColumns(this.channelId)) {
                hashMap.put(metaDataColumn.getName(), metaDataColumn.getType());
                arrayList.add(metaDataColumn.getName());
            }
            for (MetaDataColumn metaDataColumn2 : this.metaDataColumns) {
                String name = metaDataColumn2.getName();
                if (!hashMap.containsKey(name)) {
                    dao.addMetaDataColumn(this.channelId, metaDataColumn2);
                } else if (hashMap.get(name) != metaDataColumn2.getType()) {
                    dao.removeMetaDataColumn(this.channelId, name);
                    dao.addMetaDataColumn(this.channelId, metaDataColumn2);
                }
                arrayList.remove(name);
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                dao.removeMetaDataColumn(this.channelId, (String) it.next());
            }
            dao.commit();
            z = true;
            if (dao != null && 1 == 0) {
                try {
                    dao.rollback();
                } catch (Exception e) {
                }
            }
            dao.close();
        } catch (Throwable th) {
            if (dao != null && !z) {
                try {
                    dao.rollback();
                } catch (Exception e2) {
                }
            }
            dao.close();
            throw th;
        }
    }
}
