package com.mirth.connect.donkey.server.channel;

import com.mirth.connect.donkey.model.DonkeyException;
import com.mirth.connect.donkey.model.channel.ConnectorProperties;
import com.mirth.connect.donkey.model.channel.DeployedState;
import com.mirth.connect.donkey.model.channel.DestinationConnectorProperties;
import com.mirth.connect.donkey.model.channel.DestinationConnectorPropertiesInterface;
import com.mirth.connect.donkey.model.channel.MetaDataColumn;
import com.mirth.connect.donkey.model.channel.SourceConnectorPropertiesInterface;
import com.mirth.connect.donkey.model.event.ConnectionStatusEventType;
import com.mirth.connect.donkey.model.event.DeployedStateEventType;
import com.mirth.connect.donkey.model.event.ErrorEventType;
import com.mirth.connect.donkey.model.message.ConnectorMessage;
import com.mirth.connect.donkey.model.message.ContentType;
import com.mirth.connect.donkey.model.message.MessageContent;
import com.mirth.connect.donkey.model.message.MessageSerializerException;
import com.mirth.connect.donkey.model.message.Response;
import com.mirth.connect.donkey.model.message.Status;
import com.mirth.connect.donkey.model.message.attachment.AttachmentHandlerProvider;
import com.mirth.connect.donkey.server.ConnectorTaskException;
import com.mirth.connect.donkey.server.Constants;
import com.mirth.connect.donkey.server.Donkey;
import com.mirth.connect.donkey.server.data.DonkeyDao;
import com.mirth.connect.donkey.server.data.DonkeyDaoFactory;
import com.mirth.connect.donkey.server.event.ConnectionStatusEvent;
import com.mirth.connect.donkey.server.event.DeployedStateEvent;
import com.mirth.connect.donkey.server.event.ErrorEvent;
import com.mirth.connect.donkey.server.message.ResponseValidator;
import com.mirth.connect.donkey.server.queue.DestinationQueue;
import com.mirth.connect.donkey.util.MessageMaps;
import com.mirth.connect.donkey.util.Serializer;
import com.mirth.connect.donkey.util.ThreadUtils;
import java.util.Calendar;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/mirth/connect/donkey/server/channel/DestinationConnector.class */
public abstract class DestinationConnector extends Connector implements Runnable {
    private static final String QUEUED_RESPONSE = "Message queued successfully";
    private Integer orderId;
    private Deque<Long> processingThreadIdStack;
    private DestinationConnectorProperties destinationConnectorProperties;
    private DestinationQueue queue;
    private String destinationName;
    private boolean enabled;
    private MetaDataReplacer metaDataReplacer;
    private List<MetaDataColumn> metaDataColumns;
    private ResponseValidator responseValidator;
    private ResponseTransformerExecutor responseTransformerExecutor;
    private DonkeyDaoFactory daoFactory;
    private Map<Long, DestinationQueueThread> queueThreads = new ConcurrentHashMap();
    private int queueEmptySleepTime = Constants.DESTINATION_QUEUE_EMPTY_SLEEP_TIME;
    private AtomicBoolean forceQueue = new AtomicBoolean(false);
    private AtomicBoolean stopQueue = new AtomicBoolean(false);
    private StorageSettings storageSettings = new StorageSettings();
    private Logger logger = LogManager.getLogger(getClass());

    /* loaded from: input_file:com/mirth/connect/donkey/server/channel/DestinationConnector$DestinationQueueThread.class */
    public static class DestinationQueueThread extends Thread {
        private AtomicBoolean waitingRetryInterval;

        public DestinationQueueThread(Runnable runnable) {
            super(runnable);
            this.waitingRetryInterval = new AtomicBoolean(false);
        }

        public AtomicBoolean getWaitingRetryInterval() {
            return this.waitingRetryInterval;
        }

        public void interruptIfWaitingRetryInterval() {
            synchronized (this.waitingRetryInterval) {
                if (this.waitingRetryInterval.get()) {
                    interrupt();
                }
            }
        }
    }

    public abstract void replaceConnectorProperties(ConnectorProperties connectorProperties, ConnectorMessage connectorMessage);

    public abstract Response send(ConnectorProperties connectorProperties, ConnectorMessage connectorMessage) throws InterruptedException;

    public DestinationQueue getQueue() {
        return this.queue;
    }

    public void setQueue(DestinationQueue destinationQueue) {
        this.queue = destinationQueue;
    }

    public void setQueueEmptySleepTime(int i) {
        this.queueEmptySleepTime = i;
    }

    public int getPotentialThreadCount() {
        int processingThreads = ((SourceConnectorPropertiesInterface) getChannel().getSourceConnector().getConnectorProperties()).getSourceConnectorProperties().getProcessingThreads();
        return Math.max(this.destinationConnectorProperties.isQueueEnabled() ? !this.destinationConnectorProperties.isSendFirst() ? this.destinationConnectorProperties.getThreadCount() : Math.max(this.destinationConnectorProperties.getThreadCount(), processingThreads) : processingThreads, 1);
    }

    private long getDispatcherId() {
        long id = Thread.currentThread().getId();
        if (isQueueEnabled() && this.queueThreads.containsKey(Long.valueOf(id))) {
            if (this.queueThreads.size() <= 1) {
                return 0L;
            }
            return id;
        }
        if (getChannel().getProcessingThreads() <= 1) {
            return 0L;
        }
        return this.processingThreadIdStack.pop().longValue();
    }

    private void returnProcessingThreadId(long j) {
        this.processingThreadIdStack.push(Long.valueOf(j));
    }

    public String getDestinationName() {
        return this.destinationName;
    }

    public void setDestinationName(String str) {
        this.destinationName = str;
    }

    public boolean isEnabled() {
        return this.enabled;
    }

    public void setEnabled(boolean z) {
        this.enabled = z;
    }

    public boolean isForceQueue() {
        return this.forceQueue.get();
    }

    public void setForceQueue(boolean z) {
        this.forceQueue.set(z);
    }

    public Integer getOrderId() {
        return this.orderId;
    }

    public void setOrderId(Integer num) {
        this.orderId = num;
    }

    public Serializer getSerializer() {
        return this.channel.getSerializer();
    }

    public MessageMaps getMessageMaps() {
        return this.channel.getMessageMaps();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.mirth.connect.donkey.server.channel.Connector
    public void setConnectorProperties(ConnectorProperties connectorProperties) {
        super.setConnectorProperties(connectorProperties);
        if (connectorProperties instanceof DestinationConnectorPropertiesInterface) {
            this.destinationConnectorProperties = ((DestinationConnectorPropertiesInterface) connectorProperties).getDestinationConnectorProperties();
        }
    }

    public void setMetaDataReplacer(MetaDataReplacer metaDataReplacer) {
        this.metaDataReplacer = metaDataReplacer;
    }

    public void setMetaDataColumns(List<MetaDataColumn> list) {
        this.metaDataColumns = list;
    }

    public ResponseValidator getResponseValidator() {
        return this.responseValidator;
    }

    public void setResponseValidator(ResponseValidator responseValidator) {
        this.responseValidator = responseValidator;
    }

    public ResponseTransformerExecutor getResponseTransformerExecutor() {
        return this.responseTransformerExecutor;
    }

    public void setResponseTransformerExecutor(ResponseTransformerExecutor responseTransformerExecutor) {
        this.responseTransformerExecutor = responseTransformerExecutor;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setStorageSettings(StorageSettings storageSettings) {
        this.storageSettings = storageSettings;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setDaoFactory(DonkeyDaoFactory donkeyDaoFactory) {
        this.daoFactory = donkeyDaoFactory;
    }

    public boolean isQueueEnabled() {
        return this.destinationConnectorProperties != null && this.destinationConnectorProperties.isQueueEnabled();
    }

    public boolean isQueueRotate() {
        return this.destinationConnectorProperties != null && this.destinationConnectorProperties.isRotate();
    }

    public boolean willAttemptSend() {
        return !isQueueEnabled() || (this.destinationConnectorProperties.isSendFirst() && this.queue.size() == 0 && !isForceQueue() && (this.channel.getQueueHandler() == null || this.channel.getQueueHandler().allowSendFirst(this)));
    }

    public boolean includeFilterTransformerInQueue() {
        return isQueueEnabled() && this.destinationConnectorProperties.isRegenerateTemplate() && this.destinationConnectorProperties.isIncludeFilterTransformer();
    }

    protected AttachmentHandlerProvider getAttachmentHandlerProvider() {
        return this.channel.getAttachmentHandlerProvider();
    }

    public void updateCurrentState(DeployedState deployedState) {
        setCurrentState(deployedState);
        this.channel.getEventDispatcher().dispatchEvent(new DeployedStateEvent(getChannelId(), this.channel.getName(), Integer.valueOf(getMetaDataId()), this.destinationName, DeployedStateEventType.getTypeFromDeployedState(deployedState)));
    }

    @Override // com.mirth.connect.donkey.server.channel.Connector
    public void start() throws ConnectorTaskException, InterruptedException {
        updateCurrentState(DeployedState.STARTING);
        int processingThreads = getChannel().getProcessingThreads();
        if (processingThreads > 1) {
            this.processingThreadIdStack = new LinkedBlockingDeque();
            for (int i = processingThreads; i >= 1; i--) {
                this.processingThreadIdStack.push(Long.valueOf(-i));
            }
        }
        onStart();
        this.forceQueue.set(false);
        updateCurrentState(DeployedState.STARTED);
    }

    public void startQueue() {
        this.stopQueue.set(false);
        if (isQueueEnabled()) {
            if (this.channel.getQueueHandler() == null || this.channel.getQueueHandler().canStartDestinationQueue(this)) {
                this.queue.invalidate(true, true);
                for (int i = 1; i <= this.destinationConnectorProperties.getThreadCount(); i++) {
                    DestinationQueueThread destinationQueueThread = new DestinationQueueThread(this);
                    destinationQueueThread.setName("Destination Queue Thread " + i + " on " + this.channel.getName() + " (" + getChannelId() + "), " + this.destinationName + " (" + getMetaDataId() + ")");
                    destinationQueueThread.start();
                    this.queueThreads.put(Long.valueOf(destinationQueueThread.getId()), destinationQueueThread);
                }
            }
        }
    }

    public void stopQueue() throws InterruptedException {
        this.stopQueue.set(true);
        if (MapUtils.isNotEmpty(this.queueThreads)) {
            try {
                Iterator<DestinationQueueThread> it = this.queueThreads.values().iterator();
                while (it.hasNext()) {
                    it.next().interruptIfWaitingRetryInterval();
                }
                Iterator<DestinationQueueThread> it2 = this.queueThreads.values().iterator();
                while (it2.hasNext()) {
                    it2.next().join();
                }
                this.queueThreads.clear();
            } finally {
                this.queue.invalidate(false, true);
            }
        }
    }

    @Override // com.mirth.connect.donkey.server.channel.Connector
    public void stop() throws ConnectorTaskException, InterruptedException {
        updateCurrentState(DeployedState.STOPPING);
        this.stopQueue.set(true);
        stopQueue();
        try {
            onStop();
            updateCurrentState(DeployedState.STOPPED);
        } catch (Throwable th) {
            Throwable th2 = th;
            if (th2 instanceof ConnectorTaskException) {
                th2 = th2.getCause();
            }
            if (th2 instanceof ExecutionException) {
                th2 = th2.getCause();
            }
            if (th2 instanceof InterruptedException) {
                throw ((InterruptedException) th2);
            }
            updateCurrentState(DeployedState.STOPPED);
            if (!(th instanceof ConnectorTaskException)) {
                throw new ConnectorTaskException(th);
            }
            throw ((ConnectorTaskException) th);
        }
    }

    @Override // com.mirth.connect.donkey.server.channel.Connector
    public void halt() throws ConnectorTaskException, InterruptedException {
        updateCurrentState(DeployedState.STOPPING);
        this.stopQueue.set(true);
        if (MapUtils.isNotEmpty(this.queueThreads)) {
            Iterator<DestinationQueueThread> it = this.queueThreads.values().iterator();
            while (it.hasNext()) {
                it.next().interrupt();
            }
        }
        try {
            onHalt();
            if (MapUtils.isNotEmpty(this.queueThreads)) {
                try {
                    Iterator<DestinationQueueThread> it2 = this.queueThreads.values().iterator();
                    while (it2.hasNext()) {
                        it2.next().join();
                    }
                    this.queueThreads.clear();
                } finally {
                }
            }
            this.channel.getEventDispatcher().dispatchEvent(new ConnectionStatusEvent(getChannelId(), Integer.valueOf(getMetaDataId()), getDestinationName(), ConnectionStatusEventType.IDLE));
            updateCurrentState(DeployedState.STOPPED);
        } catch (Throwable th) {
            if (MapUtils.isNotEmpty(this.queueThreads)) {
                try {
                    Iterator<DestinationQueueThread> it3 = this.queueThreads.values().iterator();
                    while (it3.hasNext()) {
                        it3.next().join();
                    }
                    this.queueThreads.clear();
                } finally {
                }
            }
            this.channel.getEventDispatcher().dispatchEvent(new ConnectionStatusEvent(getChannelId(), Integer.valueOf(getMetaDataId()), getDestinationName(), ConnectionStatusEventType.IDLE));
            updateCurrentState(DeployedState.STOPPED);
            throw th;
        }
    }

    private MessageContent getSentContent(ConnectorMessage connectorMessage, ConnectorProperties connectorProperties) {
        return new MessageContent(connectorMessage.getChannelId(), connectorMessage.getMessageId(), connectorMessage.getMetaDataId(), ContentType.SENT, this.channel.getSerializer().serialize(connectorProperties), null, false);
    }

    public void transform(DonkeyDao donkeyDao, ConnectorMessage connectorMessage, Status status, boolean z) throws InterruptedException {
        try {
            getFilterTransformerExecutor().processConnectorMessage(connectorMessage);
        } catch (DonkeyException e) {
            if (e instanceof MessageSerializerException) {
                Donkey.getInstance().getEventDispatcher().dispatchEvent(new ErrorEvent(getChannelId(), Integer.valueOf(getMetaDataId()), Long.valueOf(connectorMessage.getMessageId()), ErrorEventType.SERIALIZER, this.destinationName, null, e.getMessage(), e));
            }
            connectorMessage.setStatus(Status.ERROR);
            connectorMessage.setProcessingError(e.getFormattedError());
        }
        if (connectorMessage.getStatus() == Status.ERROR && StringUtils.isNotBlank(connectorMessage.getProcessingError())) {
            donkeyDao.updateErrors(connectorMessage);
        }
        this.metaDataReplacer.setMetaDataMap(connectorMessage, this.metaDataColumns);
        if (this.storageSettings.isStoreCustomMetaData() && !connectorMessage.getMetaDataMap().isEmpty()) {
            ThreadUtils.checkInterruptedStatus();
            if (z) {
                donkeyDao.insertMetaData(connectorMessage, this.metaDataColumns);
            } else {
                donkeyDao.storeMetaData(connectorMessage, this.metaDataColumns);
            }
        }
        if (this.storageSettings.isStoreTransformed() && connectorMessage.getTransformed() != null) {
            ThreadUtils.checkInterruptedStatus();
            if (z) {
                donkeyDao.insertMessageContent(connectorMessage.getTransformed());
            } else {
                donkeyDao.storeMessageContent(connectorMessage.getTransformed());
            }
        }
        if (connectorMessage.getStatus() != Status.TRANSFORMED) {
            if (connectorMessage.getStatus() == Status.FILTERED) {
                connectorMessage.getResponseMap().put("d" + String.valueOf(getMetaDataId()), new Response(Status.FILTERED, "", "Message has been filtered"));
            } else if (connectorMessage.getStatus() == Status.ERROR) {
                connectorMessage.getResponseMap().put("d" + String.valueOf(getMetaDataId()), new Response(Status.ERROR, "", "Error converting message or evaluating filter/transformer"));
            }
            donkeyDao.updateStatus(connectorMessage, status);
            if (this.storageSettings.isStoreMaps()) {
                donkeyDao.updateMaps(connectorMessage);
                return;
            }
            return;
        }
        connectorMessage.setStatus(Status.QUEUED);
        if (this.storageSettings.isStoreDestinationEncoded() && connectorMessage.getEncoded() != null) {
            ThreadUtils.checkInterruptedStatus();
            if (z) {
                donkeyDao.insertMessageContent(connectorMessage.getEncoded());
            } else {
                donkeyDao.storeMessageContent(connectorMessage.getEncoded());
            }
        }
        if (this.storageSettings.isStoreMaps()) {
            donkeyDao.updateMaps(connectorMessage);
        }
    }

    public void process(DonkeyDao donkeyDao, ConnectorMessage connectorMessage, Status status) throws InterruptedException {
        Response handleSend;
        ThreadUtils.checkInterruptedStatus();
        ConnectorProperties clone = ((DestinationConnectorPropertiesInterface) getConnectorProperties()).clone();
        replaceConnectorProperties(clone, connectorMessage);
        connectorMessage.setSentProperties(clone);
        if (this.storageSettings.isStoreSent()) {
            ThreadUtils.checkInterruptedStatus();
            MessageContent sentContent = getSentContent(connectorMessage, clone);
            connectorMessage.setSent(sentContent);
            if (sentContent != null) {
                ThreadUtils.checkInterruptedStatus();
                donkeyDao.insertMessageContent(sentContent);
            }
        }
        if (!willAttemptSend()) {
            updateQueuedStatus(donkeyDao, connectorMessage, status);
            return;
        }
        int retryCount = this.destinationConnectorProperties == null ? 0 : this.destinationConnectorProperties.getRetryCount();
        int i = 0;
        do {
            ThreadUtils.checkInterruptedStatus();
            if (i > 0) {
                Thread.sleep(this.destinationConnectorProperties.getRetryIntervalMillis());
            }
            handleSend = handleSend(clone, connectorMessage);
            i++;
            connectorMessage.setSendAttempts(i);
            handleSend.fixStatus(isQueueEnabled());
            Status status2 = handleSend.getStatus();
            if (status2 != Status.ERROR && status2 != Status.QUEUED) {
                break;
            }
        } while (i - 1 < retryCount);
        afterSend(donkeyDao, connectorMessage, handleSend, status);
        if (connectorMessage.getStatus() == Status.QUEUED) {
            connectorMessage.setAttemptedFirst(true);
        }
    }

    public void updateQueuedStatus(DonkeyDao donkeyDao, ConnectorMessage connectorMessage, Status status) throws InterruptedException {
        connectorMessage.setStatus(Status.QUEUED);
        connectorMessage.getResponseMap().put("d" + String.valueOf(getMetaDataId()), new Response(Status.QUEUED, "", QUEUED_RESPONSE));
        if (this.storageSettings.isStoreResponseMap()) {
            donkeyDao.updateResponseMap(connectorMessage);
            ThreadUtils.checkInterruptedStatus();
        }
        donkeyDao.updateStatus(connectorMessage, status);
    }

    public void processPendingConnectorMessage(DonkeyDao donkeyDao, ConnectorMessage connectorMessage) throws InterruptedException {
        Serializer serializer = this.channel.getSerializer();
        Response response = (Response) serializer.deserialize(connectorMessage.getResponse().getContent(), Response.class);
        if (this.responseTransformerExecutor != null) {
            try {
                this.responseTransformerExecutor.runResponseTransformer(donkeyDao, connectorMessage, response, isQueueEnabled(), this.storageSettings, serializer);
                String str = null;
                if (StringUtils.isNotBlank(response.getError())) {
                    str = response.getError();
                }
                connectorMessage.setProcessingError(str);
                if (connectorMessage.getErrorCode() > 0) {
                    donkeyDao.updateErrors(connectorMessage);
                }
                connectorMessage.getResponseMap().put("d" + String.valueOf(getMetaDataId()), response);
                boolean isEmpty = connectorMessage.getMetaDataMap().isEmpty();
                this.channel.getSourceConnector().getMetaDataReplacer().setMetaDataMap(connectorMessage, this.channel.getMetaDataColumns());
                if (this.storageSettings.isStoreCustomMetaData() && !connectorMessage.getMetaDataMap().isEmpty()) {
                    ThreadUtils.checkInterruptedStatus();
                    if (isEmpty) {
                        donkeyDao.insertMetaData(connectorMessage, this.channel.getMetaDataColumns());
                    } else {
                        donkeyDao.storeMetaData(connectorMessage, this.channel.getMetaDataColumns());
                    }
                }
                if (this.storageSettings.isStoreMaps()) {
                    donkeyDao.updateMaps(connectorMessage);
                }
            } catch (DonkeyException e) {
                this.logger.error("Error executing response transformer for channel " + this.channel.getName() + " (" + this.channel.getChannelId() + ") on destination " + this.destinationName + ".", e);
                response.setStatus(Status.ERROR);
                response.setError(e.getFormattedError());
                connectorMessage.setProcessingError(connectorMessage.getProcessingError() != null ? connectorMessage.getProcessingError() + System.getProperty("line.separator") + System.getProperty("line.separator") + e.getFormattedError() : e.getFormattedError());
                donkeyDao.updateErrors(connectorMessage);
                return;
            }
        }
        afterResponse(donkeyDao, connectorMessage, response, connectorMessage.getStatus());
    }

    @Override // java.lang.Runnable
    public void run() {
        ConnectorProperties clone;
        DonkeyDao donkeyDao = null;
        boolean z = false;
        Serializer serializer = this.channel.getSerializer();
        ConnectorMessage connectorMessage = null;
        int retryIntervalMillis = this.destinationConnectorProperties.getRetryIntervalMillis();
        AtomicBoolean waitingRetryInterval = ((DestinationQueueThread) Thread.currentThread()).getWaitingRetryInterval();
        Long l = null;
        boolean z2 = true;
        Lock lock = null;
        this.queue.registerThreadId();
        do {
            if (z2) {
                try {
                    try {
                        connectorMessage = this.queue.acquire();
                    } catch (Throwable th) {
                        if (lock != null) {
                            lock.unlock();
                        }
                        throw th;
                    }
                } catch (InterruptedException e) {
                    if (lock != null) {
                        lock.unlock();
                        return;
                    }
                    return;
                } catch (Throwable th2) {
                    if (lock != null) {
                        lock.unlock();
                        lock = null;
                    }
                    this.logger.error("Error in queue thread for channel " + this.channel.getName() + " (" + this.channel.getChannelId() + ") on destination " + this.destinationName + ".\n" + ExceptionUtils.getStackTrace(th2));
                    getChannel().getEventDispatcher().dispatchEvent(new ErrorEvent(getChannelId(), Integer.valueOf(getMetaDataId()), null, ErrorEventType.DESTINATION_CONNECTOR, getDestinationName(), getConnectorProperties().getName(), th2.getMessage(), th2));
                    try {
                        try {
                            waitingRetryInterval.set(true);
                            Thread.sleep(retryIntervalMillis);
                            l = null;
                            synchronized (waitingRetryInterval) {
                                waitingRetryInterval.set(false);
                                if (lock != null) {
                                    lock.unlock();
                                    lock = null;
                                }
                            }
                        } catch (InterruptedException e2) {
                            synchronized (waitingRetryInterval) {
                                waitingRetryInterval.set(false);
                                if (lock != null) {
                                    lock.unlock();
                                    return;
                                }
                                return;
                            }
                        }
                    } catch (Throwable th3) {
                        synchronized (waitingRetryInterval) {
                            waitingRetryInterval.set(false);
                            throw th3;
                        }
                    }
                }
            }
            if (connectorMessage != null) {
                try {
                    try {
                        if (connectorMessage.isAttemptedFirst() || (l != null && (l.longValue() == connectorMessage.getMessageId() || (this.queue.isRotate() && l.longValue() > connectorMessage.getMessageId() && this.queue.hasBeenRotated())))) {
                            try {
                                waitingRetryInterval.set(true);
                                Thread.sleep(retryIntervalMillis);
                                synchronized (waitingRetryInterval) {
                                    waitingRetryInterval.set(false);
                                }
                                connectorMessage.setAttemptedFirst(false);
                            } catch (Throwable th4) {
                                synchronized (waitingRetryInterval) {
                                    waitingRetryInterval.set(false);
                                    throw th4;
                                }
                            }
                        }
                        l = Long.valueOf(connectorMessage.getMessageId());
                        donkeyDao = this.daoFactory.getDao();
                        Status status = connectorMessage.getStatus();
                        Class<?> cls = getConnectorProperties().getClass();
                        Class<?> cls2 = null;
                        if (this.destinationConnectorProperties.isRegenerateTemplate() || connectorMessage.getSent() == null) {
                            clone = ((DestinationConnectorPropertiesInterface) getConnectorProperties()).clone();
                        } else {
                            clone = connectorMessage.getSentProperties();
                            if (clone == null) {
                                clone = (ConnectorProperties) serializer.deserialize(connectorMessage.getSent().getContent(), ConnectorProperties.class);
                                connectorMessage.setSentProperties(clone);
                            }
                            cls2 = clone.getClass();
                        }
                        if (connectorMessage.getSent() == null || this.destinationConnectorProperties.isRegenerateTemplate() || cls2 == cls) {
                            ThreadUtils.checkInterruptedStatus();
                            if (connectorMessage.getSent() != null || includeFilterTransformerInQueue()) {
                                if (includeFilterTransformerInQueue()) {
                                    transform(donkeyDao, connectorMessage, status, connectorMessage.getSent() == null);
                                }
                                if (connectorMessage.getStatus() == Status.QUEUED) {
                                    if (connectorMessage.getSent() == null || this.destinationConnectorProperties.isRegenerateTemplate()) {
                                        replaceConnectorProperties(clone, connectorMessage);
                                        MessageContent sentContent = getSentContent(connectorMessage, clone);
                                        connectorMessage.setSent(sentContent);
                                        if (sentContent != null && this.storageSettings.isStoreSent()) {
                                            ThreadUtils.checkInterruptedStatus();
                                            donkeyDao.storeMessageContent(sentContent);
                                        }
                                    }
                                    Response handleSend = handleSend(clone, connectorMessage);
                                    connectorMessage.setSendAttempts(connectorMessage.getSendAttempts() + 1);
                                    if (handleSend == null) {
                                        throw new RuntimeException("Received null response from destination " + this.destinationName + ".");
                                    }
                                    handleSend.fixStatus(isQueueEnabled());
                                    afterSend(donkeyDao, connectorMessage, handleSend, status);
                                }
                            } else {
                                connectorMessage.setStatus(Status.ERROR);
                                connectorMessage.setProcessingError("Queued message has not yet been transformed, and Include Filter/Transformer is currently disabled.");
                                donkeyDao.updateStatus(connectorMessage, status);
                                donkeyDao.updateErrors(connectorMessage);
                            }
                        } else {
                            connectorMessage.setStatus(Status.ERROR);
                            connectorMessage.setProcessingError("Mismatched connector properties detected in queued message. The connector type may have changed since the message was queued.\nFOUND: " + cls2.getSimpleName() + "\nEXPECTED: " + cls.getSimpleName());
                            donkeyDao.updateStatus(connectorMessage, status);
                            donkeyDao.updateErrors(connectorMessage);
                        }
                        if (connectorMessage.getStatus() != Status.QUEUED) {
                            Lock statusUpdateLock = this.queue.getStatusUpdateLock();
                            statusUpdateLock.lock();
                            lock = statusUpdateLock;
                        }
                        ThreadUtils.checkInterruptedStatus();
                        donkeyDao.commit(this.storageSettings.isDurable());
                        z = true;
                        if (connectorMessage.getStatus().isCompleted()) {
                            try {
                                this.channel.removeContent(donkeyDao, null, l.longValue(), true, true);
                            } catch (RuntimeException e3) {
                                this.logger.error("Error removing content for message " + l + " for channel " + this.channel.getName() + " (" + this.channel.getChannelId() + ") on destination " + this.destinationName + ". This error is expected if the message was manually removed from the queue.", e3);
                            }
                        }
                        if (donkeyDao != null) {
                            if (1 == 0) {
                                try {
                                    donkeyDao.rollback();
                                } catch (Exception e4) {
                                }
                            }
                            donkeyDao.close();
                        }
                        if (0 != 0) {
                            z2 = true;
                            synchronized (this.queue) {
                                this.queue.release(connectorMessage, true);
                                if (lock != null) {
                                    lock.unlock();
                                    lock = null;
                                }
                                this.queue.invalidate(true, false);
                            }
                        } else if (connectorMessage.getStatus() != Status.QUEUED) {
                            z2 = true;
                            this.queue.release(connectorMessage, true);
                        } else if (this.destinationConnectorProperties.isRotate()) {
                            z2 = true;
                            this.queue.release(connectorMessage, false);
                        } else {
                            z2 = this.queue.releaseIfDeleted(connectorMessage);
                        }
                        if (lock != null) {
                            lock.unlock();
                            lock = null;
                        }
                    } catch (Throwable th5) {
                        if (donkeyDao != null) {
                            if (!z) {
                                try {
                                    donkeyDao.rollback();
                                } catch (Exception e5) {
                                }
                            }
                            donkeyDao.close();
                        }
                        if (0 != 0) {
                            synchronized (this.queue) {
                                this.queue.release(connectorMessage, true);
                                if (lock != null) {
                                    lock.unlock();
                                    lock = null;
                                }
                                this.queue.invalidate(true, false);
                            }
                        } else if (connectorMessage.getStatus() != Status.QUEUED) {
                            this.queue.release(connectorMessage, true);
                        } else if (this.destinationConnectorProperties.isRotate()) {
                            this.queue.release(connectorMessage, false);
                        } else {
                            this.queue.releaseIfDeleted(connectorMessage);
                        }
                        if (lock != null) {
                            lock.unlock();
                        }
                        throw th5;
                    }
                } catch (InterruptedException e6) {
                    if (donkeyDao != null) {
                        if (!z) {
                            try {
                                donkeyDao.rollback();
                            } catch (Exception e7) {
                            }
                        }
                        donkeyDao.close();
                    }
                    if (0 != 0) {
                        synchronized (this.queue) {
                            this.queue.release(connectorMessage, true);
                            if (lock != null) {
                                lock.unlock();
                                lock = null;
                            }
                            this.queue.invalidate(true, false);
                        }
                    } else if (connectorMessage.getStatus() != Status.QUEUED) {
                        this.queue.release(connectorMessage, true);
                    } else if (this.destinationConnectorProperties.isRotate()) {
                        this.queue.release(connectorMessage, false);
                    } else {
                        this.queue.releaseIfDeleted(connectorMessage);
                    }
                    if (lock != null) {
                        lock.unlock();
                        lock = null;
                    }
                    if (lock != null) {
                        lock.unlock();
                        return;
                    }
                    return;
                } catch (RuntimeException e8) {
                    this.logger.error("Error processing queued " + (connectorMessage != null ? connectorMessage.toString() : "message (null)") + " for channel " + this.channel.getName() + " (" + this.channel.getChannelId() + ") on destination " + this.destinationName + ". This error is expected if the message was manually removed from the queue.", e8);
                    if (donkeyDao != null) {
                        if (!z) {
                            try {
                                donkeyDao.rollback();
                            } catch (Exception e9) {
                            }
                        }
                        donkeyDao.close();
                    }
                    if (1 != 0) {
                        z2 = true;
                        synchronized (this.queue) {
                            this.queue.release(connectorMessage, true);
                            if (lock != null) {
                                lock.unlock();
                                lock = null;
                            }
                            this.queue.invalidate(true, false);
                        }
                    } else if (connectorMessage.getStatus() != Status.QUEUED) {
                        z2 = true;
                        this.queue.release(connectorMessage, true);
                    } else if (this.destinationConnectorProperties.isRotate()) {
                        z2 = true;
                        this.queue.release(connectorMessage, false);
                    } else {
                        z2 = this.queue.releaseIfDeleted(connectorMessage);
                    }
                    if (lock != null) {
                        lock.unlock();
                        lock = null;
                    }
                } catch (Throwable th6) {
                    this.logger.error("Error processing queued " + (connectorMessage != null ? connectorMessage.toString() : "message (null)") + " for channel " + this.channel.getName() + " (" + this.channel.getChannelId() + ") on destination " + this.destinationName + ".", th6);
                    getChannel().getEventDispatcher().dispatchEvent(new ErrorEvent(getChannelId(), Integer.valueOf(getMetaDataId()), connectorMessage != null ? Long.valueOf(connectorMessage.getMessageId()) : null, ErrorEventType.DESTINATION_CONNECTOR, getDestinationName(), getConnectorProperties().getName(), th6.getMessage(), th6));
                    if (donkeyDao != null) {
                        if (!z) {
                            try {
                                donkeyDao.rollback();
                            } catch (Exception e10) {
                            }
                        }
                        donkeyDao.close();
                    }
                    if (1 != 0) {
                        z2 = true;
                        synchronized (this.queue) {
                            this.queue.release(connectorMessage, true);
                            if (lock != null) {
                                lock.unlock();
                                lock = null;
                            }
                            this.queue.invalidate(true, false);
                        }
                    } else if (connectorMessage.getStatus() != Status.QUEUED) {
                        z2 = true;
                        this.queue.release(connectorMessage, true);
                    } else if (this.destinationConnectorProperties.isRotate()) {
                        z2 = true;
                        this.queue.release(connectorMessage, false);
                    } else {
                        z2 = this.queue.releaseIfDeleted(connectorMessage);
                    }
                    if (lock != null) {
                        lock.unlock();
                        lock = null;
                    }
                }
            } else {
                Thread.sleep(this.queueEmptySleepTime);
            }
            if (lock != null) {
                lock.unlock();
                lock = null;
            }
            if (getCurrentState() != DeployedState.STARTED && getCurrentState() != DeployedState.STARTING) {
                return;
            }
        } while (!this.stopQueue.get());
    }

    private Response handleSend(ConnectorProperties connectorProperties, ConnectorMessage connectorMessage) throws InterruptedException {
        connectorMessage.setSendDate(Calendar.getInstance());
        long dispatcherId = getDispatcherId();
        try {
            connectorMessage.setDispatcherId(dispatcherId);
            Response send = send(connectorProperties, connectorMessage);
            if (dispatcherId < 0) {
                returnProcessingThreadId(dispatcherId);
            }
            if (send.isValidate() && send.getStatus() == Status.SENT) {
                send = this.responseValidator.validate(send, connectorMessage);
                if (send.getStatus() != Status.SENT) {
                    this.channel.getEventDispatcher().dispatchEvent(new ErrorEvent(getChannelId(), Integer.valueOf(getMetaDataId()), Long.valueOf(connectorMessage.getMessageId()), ErrorEventType.RESPONSE_VALIDATION, getDestinationName(), connectorProperties.getName(), send.getStatusMessage(), null));
                }
            }
            connectorMessage.setResponseDate(Calendar.getInstance());
            return send;
        } catch (Throwable th) {
            if (dispatcherId < 0) {
                returnProcessingThreadId(dispatcherId);
            }
            throw th;
        }
    }

    private void afterSend(DonkeyDao donkeyDao, ConnectorMessage connectorMessage, Response response, Status status) throws InterruptedException {
        Serializer serializer = this.channel.getSerializer();
        donkeyDao.updateSendAttempts(connectorMessage);
        if (this.storageSettings.isStoreResponse()) {
            MessageContent messageContent = new MessageContent(connectorMessage.getChannelId(), connectorMessage.getMessageId(), connectorMessage.getMetaDataId(), ContentType.RESPONSE, serializer.serialize(response), this.responseTransformerExecutor.getInbound().getType(), false);
            ThreadUtils.checkInterruptedStatus();
            if (connectorMessage.getResponse() != null) {
                donkeyDao.storeMessageContent(messageContent);
            } else {
                donkeyDao.insertMessageContent(messageContent);
            }
            connectorMessage.setResponse(messageContent);
        }
        ThreadUtils.checkInterruptedStatus();
        if (this.responseTransformerExecutor.isActive(response)) {
            connectorMessage.setStatus(Status.PENDING);
            donkeyDao.updateStatus(connectorMessage, status);
            donkeyDao.commit(this.storageSettings.isDurable());
            status = connectorMessage.getStatus();
        }
        try {
            this.responseTransformerExecutor.runResponseTransformer(donkeyDao, connectorMessage, response, isQueueEnabled(), this.storageSettings, serializer);
            String str = null;
            if (StringUtils.isNotBlank(response.getError())) {
                str = response.getError();
            }
            connectorMessage.setProcessingError(str);
            if (connectorMessage.getErrorCode() > 0) {
                donkeyDao.updateErrors(connectorMessage);
            }
            connectorMessage.getResponseMap().put("d" + String.valueOf(getMetaDataId()), response);
            boolean isEmpty = connectorMessage.getMetaDataMap().isEmpty();
            this.channel.getSourceConnector().getMetaDataReplacer().setMetaDataMap(connectorMessage, this.channel.getMetaDataColumns());
            if (this.storageSettings.isStoreCustomMetaData() && !connectorMessage.getMetaDataMap().isEmpty()) {
                ThreadUtils.checkInterruptedStatus();
                if (isEmpty) {
                    donkeyDao.insertMetaData(connectorMessage, this.channel.getMetaDataColumns());
                } else {
                    donkeyDao.storeMetaData(connectorMessage, this.channel.getMetaDataColumns());
                }
            }
            if (this.storageSettings.isStoreMaps()) {
                donkeyDao.updateMaps(connectorMessage);
            }
            ThreadUtils.checkInterruptedStatus();
            afterResponse(donkeyDao, connectorMessage, response, status);
        } catch (DonkeyException e) {
            this.logger.error("Error executing response transformer for channel " + this.channel.getName() + " (" + this.channel.getChannelId() + ") on destination " + this.destinationName + ".", e);
            response.setStatus(Status.ERROR);
            response.setError(e.getFormattedError());
            connectorMessage.setStatus(response.getStatus());
            connectorMessage.setProcessingError(connectorMessage.getProcessingError() != null ? connectorMessage.getProcessingError() + System.getProperty("line.separator") + System.getProperty("line.separator") + e.getFormattedError() : e.getFormattedError());
            donkeyDao.updateStatus(connectorMessage, status);
            donkeyDao.updateErrors(connectorMessage);
        }
    }

    private void afterResponse(DonkeyDao donkeyDao, ConnectorMessage connectorMessage, Response response, Status status) {
        connectorMessage.setStatus(response.getStatus());
        donkeyDao.updateStatus(connectorMessage, status);
        connectorMessage.getStatus();
    }
}
