/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive.allocator;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SharedSlot
implements SlotOwner,
PhysicalSlot.Payload {
    private static final Logger LOG = LoggerFactory.getLogger(SharedSlot.class);
    private final SlotRequestId physicalSlotRequestId;
    private final PhysicalSlot physicalSlot;
    private final Runnable externalReleaseCallback;
    private final Map<SlotRequestId, LogicalSlot> allocatedLogicalSlots;
    private final boolean slotWillBeOccupiedIndefinitely;
    private State state;

    public SharedSlot(SlotRequestId physicalSlotRequestId, PhysicalSlot physicalSlot, boolean slotWillBeOccupiedIndefinitely, Runnable externalReleaseCallback) {
        this.physicalSlotRequestId = physicalSlotRequestId;
        this.physicalSlot = physicalSlot;
        this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
        this.externalReleaseCallback = externalReleaseCallback;
        this.allocatedLogicalSlots = new HashMap<SlotRequestId, LogicalSlot>();
        Preconditions.checkState(physicalSlot.tryAssignPayload(this), "The provided slot (%s) was not free.", physicalSlot.getAllocationId());
        this.state = State.ALLOCATED;
    }

    public LogicalSlot allocateLogicalSlot() {
        LOG.debug("Allocating logical slot from shared slot ({})", (Object)this.physicalSlotRequestId);
        Preconditions.checkState(this.state == State.ALLOCATED, "The shared slot has already been released.");
        SingleLogicalSlot slot = new SingleLogicalSlot(new SlotRequestId(), this.physicalSlot, Locality.UNKNOWN, this, this.slotWillBeOccupiedIndefinitely);
        this.allocatedLogicalSlots.put(slot.getSlotRequestId(), slot);
        return slot;
    }

    @Override
    public void returnLogicalSlot(LogicalSlot logicalSlot) {
        LOG.debug("Returning logical slot to shared slot ({})", (Object)this.physicalSlotRequestId);
        Preconditions.checkState(this.state != State.RELEASED, "The shared slot has already been released.");
        Preconditions.checkState(!logicalSlot.isAlive(), "Returned logic slot must not be alive.");
        Preconditions.checkState(this.allocatedLogicalSlots.remove(logicalSlot.getSlotRequestId()) != null, "Trying to remove a logical slot request which has been either already removed or never created.");
        this.tryReleaseExternally();
    }

    private void tryReleaseExternally() {
        if (this.state == State.ALLOCATED && this.allocatedLogicalSlots.isEmpty()) {
            LOG.debug("Release shared slot externally ({})", (Object)this.physicalSlotRequestId);
            this.externalReleaseCallback.run();
        }
    }

    @Override
    public void release(Throwable cause) {
        LOG.debug("Release shared slot ({})", (Object)this.physicalSlotRequestId);
        Preconditions.checkState(this.state == State.ALLOCATED, "The shared slot has already been released.");
        this.state = State.RELEASING;
        ArrayList<LogicalSlot> logicalSlotsToRelease = new ArrayList<LogicalSlot>(this.allocatedLogicalSlots.values());
        for (LogicalSlot allocatedLogicalSlot : logicalSlotsToRelease) {
            allocatedLogicalSlot.releaseSlot(cause);
        }
        this.allocatedLogicalSlots.clear();
        this.state = State.RELEASED;
    }

    @Override
    public boolean willOccupySlotIndefinitely() {
        return this.slotWillBeOccupiedIndefinitely;
    }

    private static enum State {
        ALLOCATED,
        RELEASING,
        RELEASED;

    }
}

