package org.apache.hadoop.hive.llap.registry.impl;

import com.shadedgoogle.common.base.Preconditions;
import com.shadedgoogle.common.collect.Lists;
import com.shadedgoogle.common.collect.Sets;
import com.shadedgoogle.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.security.auth.login.AppConfigurationEntry;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.llap.registry.ServiceInstance;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.types.Endpoint;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.http.cookie.ClientCookie;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.mortbay.io.Portable;
import org.mortbay.util.URIUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zetarisapache.hive.jdbc.Utils;

/* loaded from: input_file:org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.class */
public class LlapZookeeperRegistryImpl implements ServiceRegistry {
    private static final Logger LOG;
    private static final String IPC_SERVICES = "services";
    private static final String IPC_MNG = "llapmng";
    private static final String IPC_SHUFFLE = "shuffle";
    private static final String IPC_LLAP = "llap";
    private static final String IPC_OUTPUTFORMAT = "llapoutputformat";
    private static final String SASL_NAMESPACE = "llap-sasl";
    private static final String UNSECURE_NAMESPACE = "llap-unsecure";
    private static final String USER_SCOPE_PATH_PREFIX = "user-";
    private static final String DISABLE_MESSAGE;
    private static final String WORKER_PREFIX = "worker-";
    private static final String SLOT_PREFIX = "slot-";
    private final Configuration conf;
    private final CuratorFramework zooKeeperClient;
    private final String userPathPrefix;
    private final String workersPath;
    private String userNameFromPrincipal;
    private PersistentEphemeralNode znode;
    private SlotZnode slotZnode;
    private String znodePath;
    private final RegistryUtils.ServiceRecordMarshal encoder;
    private DynamicServiceInstanceSet instances;
    private PathChildrenCache instancesCache;
    private static final UUID uniq;
    private static final String UNIQUE_IDENTIFIER = "llap.unique.id";
    private Set<ServiceInstanceStateChangeListener> stateChangeListeners;
    private final Map<String, Set<ServiceInstance>> pathToInstanceCache;
    private final Map<String, Set<ServiceInstance>> nodeToInstanceCache;
    private final Lock instanceCacheLock = new ReentrantLock();
    private static final String hostname;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl$DynamicServiceInstance.class */
    public class DynamicServiceInstance implements ServiceInstance {
        private final ServiceRecord srv;
        private final String host;
        private final int rpcPort;
        private final int mngPort;
        private final int shufflePort;
        private final int outputFormatPort;
        private final String serviceAddress;
        private final Resource resource;

        public DynamicServiceInstance(ServiceRecord serviceRecord) throws IOException {
            this.srv = serviceRecord;
            if (LlapZookeeperRegistryImpl.LOG.isTraceEnabled()) {
                LlapZookeeperRegistryImpl.LOG.trace("Working with ServiceRecord: {}", serviceRecord);
            }
            Endpoint internalEndpoint = serviceRecord.getInternalEndpoint(LlapZookeeperRegistryImpl.IPC_SHUFFLE);
            Endpoint internalEndpoint2 = serviceRecord.getInternalEndpoint(LlapZookeeperRegistryImpl.IPC_LLAP);
            Endpoint internalEndpoint3 = serviceRecord.getInternalEndpoint(LlapZookeeperRegistryImpl.IPC_MNG);
            Endpoint internalEndpoint4 = serviceRecord.getInternalEndpoint(LlapZookeeperRegistryImpl.IPC_OUTPUTFORMAT);
            Endpoint externalEndpoint = serviceRecord.getExternalEndpoint(LlapZookeeperRegistryImpl.IPC_SERVICES);
            this.host = RegistryTypeUtils.getAddressField((Map) internalEndpoint2.addresses.get(0), "host");
            this.rpcPort = Integer.parseInt(RegistryTypeUtils.getAddressField((Map) internalEndpoint2.addresses.get(0), ClientCookie.PORT_ATTR));
            this.mngPort = Integer.parseInt(RegistryTypeUtils.getAddressField((Map) internalEndpoint3.addresses.get(0), ClientCookie.PORT_ATTR));
            this.shufflePort = Integer.parseInt(RegistryTypeUtils.getAddressField((Map) internalEndpoint.addresses.get(0), ClientCookie.PORT_ATTR));
            this.outputFormatPort = Integer.valueOf(RegistryTypeUtils.getAddressField((Map) internalEndpoint4.addresses.get(0), ClientCookie.PORT_ATTR)).intValue();
            this.serviceAddress = RegistryTypeUtils.getAddressField((Map) externalEndpoint.addresses.get(0), "uri");
            String str = serviceRecord.get(HiveConf.ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, "");
            String str2 = serviceRecord.get(HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, "");
            try {
                this.resource = Resource.newInstance(Integer.parseInt(str), Integer.parseInt(str2));
            } catch (NumberFormatException e) {
                throw new IOException("Invalid resource configuration for a LLAP node: memory " + str + ", vcores " + str2);
            }
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return getWorkerIdentity().equals(((DynamicServiceInstance) obj).getWorkerIdentity());
        }

        public int hashCode() {
            return getWorkerIdentity().hashCode();
        }

        @Override // org.apache.hadoop.hive.llap.registry.ServiceInstance
        public String getWorkerIdentity() {
            return this.srv.get(LlapZookeeperRegistryImpl.UNIQUE_IDENTIFIER);
        }

        @Override // org.apache.hadoop.hive.llap.registry.ServiceInstance
        public String getHost() {
            return this.host;
        }

        @Override // org.apache.hadoop.hive.llap.registry.ServiceInstance
        public int getRpcPort() {
            return this.rpcPort;
        }

        @Override // org.apache.hadoop.hive.llap.registry.ServiceInstance
        public int getShufflePort() {
            return this.shufflePort;
        }

        @Override // org.apache.hadoop.hive.llap.registry.ServiceInstance
        public String getServicesAddress() {
            return this.serviceAddress;
        }

        @Override // org.apache.hadoop.hive.llap.registry.ServiceInstance
        public Map<String, String> getProperties() {
            return this.srv.attributes();
        }

        @Override // org.apache.hadoop.hive.llap.registry.ServiceInstance
        public Resource getResource() {
            return this.resource;
        }

        public String toString() {
            return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host=" + this.host + ParameterizedMessage.ERROR_MSG_SEPARATOR + this.rpcPort + " with resources=" + getResource() + ", shufflePort=" + getShufflePort() + ", servicesAddress=" + getServicesAddress() + ", mgmtPort=" + getManagementPort() + SerDeUtils.RBRACKET;
        }

        @Override // org.apache.hadoop.hive.llap.registry.ServiceInstance
        public int getManagementPort() {
            return this.mngPort;
        }

        @Override // org.apache.hadoop.hive.llap.registry.ServiceInstance
        public int getOutputFormatPort() {
            return this.outputFormatPort;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl$DynamicServiceInstanceSet.class */
    public class DynamicServiceInstanceSet implements ServiceInstanceSet {
        private final PathChildrenCache instancesCache;
        static final /* synthetic */ boolean $assertionsDisabled;

        public DynamicServiceInstanceSet(PathChildrenCache pathChildrenCache) {
            this.instancesCache = pathChildrenCache;
            populateCache();
        }

        private void populateCache() {
            for (ChildData childData : this.instancesCache.getCurrentData()) {
                byte[] workerData = getWorkerData(childData);
                if (workerData != null) {
                    try {
                        DynamicServiceInstance dynamicServiceInstance = new DynamicServiceInstance((ServiceRecord) LlapZookeeperRegistryImpl.this.encoder.fromBytes(childData.getPath(), workerData));
                        LlapZookeeperRegistryImpl.this.addToCache(childData.getPath(), dynamicServiceInstance.getHost(), dynamicServiceInstance);
                    } catch (IOException e) {
                        LlapZookeeperRegistryImpl.LOG.error("Unable to decode data for zkpath: {}. Ignoring from current instances list..", childData.getPath());
                    }
                }
            }
        }

        @Override // org.apache.hadoop.hive.llap.registry.ServiceInstanceSet
        public Collection<ServiceInstance> getAll() {
            HashSet hashSet = new HashSet();
            Iterator it = LlapZookeeperRegistryImpl.this.pathToInstanceCache.values().iterator();
            while (it.hasNext()) {
                hashSet.addAll((Set) it.next());
            }
            return hashSet;
        }

        public ApplicationId getApplicationId() {
            for (ChildData childData : this.instancesCache.getCurrentData()) {
                byte[] workerData = getWorkerData(childData);
                if (workerData != null) {
                    try {
                        String str = ((ServiceRecord) LlapZookeeperRegistryImpl.this.encoder.fromBytes(childData.getPath(), workerData)).get(HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname);
                        if (str != null && !str.isEmpty()) {
                            return ContainerId.fromString(str).getApplicationAttemptId().getApplicationId();
                        }
                    } catch (IOException e) {
                        LlapZookeeperRegistryImpl.LOG.error("Unable to decode data for zkpath: {}. Ignoring from current instances list..", childData.getPath());
                    }
                }
            }
            return null;
        }

        private byte[] getWorkerData(ChildData childData) {
            byte[] data;
            if (childData == null || (data = childData.getData()) == null || !LlapZookeeperRegistryImpl.extractNodeName(childData).startsWith(LlapZookeeperRegistryImpl.WORKER_PREFIX)) {
                return null;
            }
            return data;
        }

        @Override // org.apache.hadoop.hive.llap.registry.ServiceInstanceSet
        public Collection<ServiceInstance> getAllInstancesOrdered(boolean z) {
            HashMap hashMap = new HashMap();
            HashSet<ServiceInstance> newHashSet = Sets.newHashSet();
            for (ChildData childData : this.instancesCache.getCurrentData()) {
                if (childData != null && childData.getData() != null) {
                    String extractNodeName = LlapZookeeperRegistryImpl.extractNodeName(childData);
                    if (extractNodeName.startsWith(LlapZookeeperRegistryImpl.WORKER_PREFIX)) {
                        Set set = (Set) LlapZookeeperRegistryImpl.this.pathToInstanceCache.get(childData.getPath());
                        if (set != null) {
                            newHashSet.addAll(set);
                        }
                    } else if (extractNodeName.startsWith(LlapZookeeperRegistryImpl.SLOT_PREFIX)) {
                        hashMap.put(LlapZookeeperRegistryImpl.extractWorkerIdFromSlot(childData), Long.valueOf(Long.parseLong(extractNodeName.substring(LlapZookeeperRegistryImpl.SLOT_PREFIX.length()))));
                    } else {
                        LlapZookeeperRegistryImpl.LOG.info("Ignoring unknown node {}", childData.getPath());
                    }
                }
            }
            TreeMap treeMap = new TreeMap();
            long j = Long.MIN_VALUE;
            for (ServiceInstance serviceInstance : newHashSet) {
                Long l = (Long) hashMap.get(serviceInstance.getWorkerIdentity());
                if (l == null) {
                    LlapZookeeperRegistryImpl.LOG.info("Unknown slot for {}", serviceInstance.getWorkerIdentity());
                } else {
                    j = Math.max(j, l.longValue());
                    treeMap.put(l, serviceInstance);
                }
            }
            if (z) {
                TreeMap treeMap2 = new TreeMap();
                long j2 = 0;
                Long l2 = null;
                for (Long l3 : treeMap.keySet()) {
                    if (!$assertionsDisabled && l3.longValue() < j2) {
                        throw new AssertionError();
                    }
                    while (l3.longValue() > j2) {
                        if (l2 == null) {
                            l2 = Long.valueOf(System.nanoTime());
                        }
                        treeMap2.put(Long.valueOf(j2), new InactiveServiceInstance("inactive-" + j2 + "-" + l2));
                        j2++;
                    }
                    j2++;
                }
                treeMap.putAll(treeMap2);
            }
            return treeMap.values();
        }

        @Override // org.apache.hadoop.hive.llap.registry.ServiceInstanceSet
        public ServiceInstance getInstance(String str) {
            for (ServiceInstance serviceInstance : getAll()) {
                if (serviceInstance.getWorkerIdentity().equals(str)) {
                    return serviceInstance;
                }
            }
            return null;
        }

        @Override // org.apache.hadoop.hive.llap.registry.ServiceInstanceSet
        public Set<ServiceInstance> getByHost(String str) {
            Set<ServiceInstance> set = (Set) LlapZookeeperRegistryImpl.this.nodeToInstanceCache.get(str);
            Set<ServiceInstance> newHashSet = set == null ? Sets.newHashSet() : set;
            if (LlapZookeeperRegistryImpl.LOG.isDebugEnabled()) {
                LlapZookeeperRegistryImpl.LOG.debug("Returning " + newHashSet.size() + " hosts for locality allocation on " + str);
            }
            return newHashSet;
        }

        @Override // org.apache.hadoop.hive.llap.registry.ServiceInstanceSet
        public int size() {
            return LlapZookeeperRegistryImpl.this.nodeToInstanceCache.size();
        }

        static {
            $assertionsDisabled = !LlapZookeeperRegistryImpl.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl$InstanceStateChangeListener.class */
    public class InstanceStateChangeListener implements PathChildrenCacheListener {
        private final Logger LOG;

        private InstanceStateChangeListener() {
            this.LOG = LoggerFactory.getLogger((Class<?>) InstanceStateChangeListener.class);
        }

        @Override // org.apache.curator.framework.recipes.cache.PathChildrenCacheListener
        public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
            Preconditions.checkArgument(curatorFramework != null && curatorFramework.getState() == CuratorFrameworkState.STARTED, "client is not started");
            synchronized (this) {
                ChildData data = pathChildrenCacheEvent.getData();
                if (data == null) {
                    return;
                }
                if (LlapZookeeperRegistryImpl.extractNodeName(data).startsWith(LlapZookeeperRegistryImpl.WORKER_PREFIX)) {
                    this.LOG.info("{} for zknode {} in llap namespace", pathChildrenCacheEvent.getType(), data.getPath());
                    ServiceInstance extractServiceInstance = LlapZookeeperRegistryImpl.this.extractServiceInstance(pathChildrenCacheEvent, data);
                    switch (pathChildrenCacheEvent.getType()) {
                        case CHILD_ADDED:
                            LlapZookeeperRegistryImpl.this.addToCache(data.getPath(), extractServiceInstance.getHost(), extractServiceInstance);
                            Iterator it = LlapZookeeperRegistryImpl.this.stateChangeListeners.iterator();
                            while (it.hasNext()) {
                                ((ServiceInstanceStateChangeListener) it.next()).onCreate(extractServiceInstance);
                            }
                            break;
                        case CHILD_UPDATED:
                            LlapZookeeperRegistryImpl.this.addToCache(data.getPath(), extractServiceInstance.getHost(), extractServiceInstance);
                            Iterator it2 = LlapZookeeperRegistryImpl.this.stateChangeListeners.iterator();
                            while (it2.hasNext()) {
                                ((ServiceInstanceStateChangeListener) it2.next()).onUpdate(extractServiceInstance);
                            }
                            break;
                        case CHILD_REMOVED:
                            LlapZookeeperRegistryImpl.this.removeFromCache(data.getPath(), extractServiceInstance.getHost());
                            Iterator it3 = LlapZookeeperRegistryImpl.this.stateChangeListeners.iterator();
                            while (it3.hasNext()) {
                                ((ServiceInstanceStateChangeListener) it3.next()).onRemove(extractServiceInstance);
                            }
                            break;
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl$JaasConfiguration.class */
    public static class JaasConfiguration extends javax.security.auth.login.Configuration {
        private final javax.security.auth.login.Configuration baseConfig = javax.security.auth.login.Configuration.getConfiguration();
        private final String loginContextName;
        private final String principal;
        private final String keyTabFile;

        public JaasConfiguration(String str, String str2, String str3) {
            this.loginContextName = str;
            this.principal = str2;
            this.keyTabFile = str3;
        }

        public AppConfigurationEntry[] getAppConfigurationEntry(String str) {
            if (!this.loginContextName.equals(str)) {
                if (this.baseConfig != null) {
                    return this.baseConfig.getAppConfigurationEntry(str);
                }
                return null;
            }
            HashMap hashMap = new HashMap();
            hashMap.put("doNotPrompt", "true");
            hashMap.put("storeKey", "true");
            hashMap.put("useKeyTab", "true");
            hashMap.put(Utils.JdbcConnectionParams.AUTH_PRINCIPAL, this.principal);
            hashMap.put("keyTab", this.keyTabFile);
            hashMap.put("refreshKrb5Config", "true");
            return new AppConfigurationEntry[]{new AppConfigurationEntry(KerberosUtil.getKrb5LoginModuleName(), AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, hashMap)};
        }
    }

    public LlapZookeeperRegistryImpl(String str, Configuration configuration) {
        this.conf = new Configuration(configuration);
        this.conf.addResource("yarn-site.xml");
        String quorumServers = getQuorumServers(this.conf);
        this.encoder = new RegistryUtils.ServiceRecordMarshal();
        int timeVar = (int) HiveConf.getTimeVar(configuration, HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
        int timeVar2 = (int) HiveConf.getTimeVar(configuration, HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
        int intVar = HiveConf.getIntVar(configuration, HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
        this.userPathPrefix = USER_SCOPE_PATH_PREFIX + getZkPathUser(this.conf);
        this.workersPath = "/" + this.userPathPrefix + "/" + str + "/workers";
        this.instancesCache = null;
        this.instances = null;
        this.stateChangeListeners = new HashSet();
        this.pathToInstanceCache = new ConcurrentHashMap();
        this.nodeToInstanceCache = new ConcurrentHashMap();
        final boolean isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
        ACLProvider aCLProvider = new ACLProvider() { // from class: org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl.1
            @Override // org.apache.curator.framework.api.ACLProvider, org.apache.curator.utils.InternalACLProvider
            public List<ACL> getDefaultAcl() {
                LlapZookeeperRegistryImpl.LOG.warn("getDefaultAcl was called");
                return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
            }

            @Override // org.apache.curator.framework.api.ACLProvider, org.apache.curator.utils.InternalACLProvider
            public List<ACL> getAclForPath(String str2) {
                return (isSecurityEnabled && str2 != null && str2.contains(LlapZookeeperRegistryImpl.this.userPathPrefix)) ? LlapZookeeperRegistryImpl.access$200() : Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
            }
        };
        String var = HiveConf.getVar(configuration, HiveConf.ConfVars.LLAP_ZK_REGISTRY_NAMESPACE);
        if (var == null) {
            var = isSecurityEnabled ? SASL_NAMESPACE : UNSECURE_NAMESPACE;
        }
        this.zooKeeperClient = CuratorFrameworkFactory.builder().connectString(quorumServers).sessionTimeoutMs(timeVar).aclProvider(aCLProvider).namespace(var).retryPolicy(new ExponentialBackoffRetry(timeVar2, intVar)).build();
        LOG.info("Llap Zookeeper Registry is enabled with registryid: " + str);
    }

    private static List<ACL> createSecureAcls() {
        ArrayList arrayList = new ArrayList(ZooDefs.Ids.READ_ACL_UNSAFE);
        arrayList.addAll(ZooDefs.Ids.CREATOR_ALL_ACL);
        return arrayList;
    }

    private String getQuorumServers(Configuration configuration) {
        String[] trimmedStrings = configuration.getTrimmedStrings(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM.varname);
        String str = configuration.get(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname, HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue());
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < trimmedStrings.length; i++) {
            sb.append(trimmedStrings[i].trim());
            if (!trimmedStrings[i].contains(ParameterizedMessage.ERROR_MSG_SEPARATOR)) {
                sb.append(ParameterizedMessage.ERROR_MSG_SEPARATOR);
                sb.append(str);
            }
            if (i != trimmedStrings.length - 1) {
                sb.append(",");
            }
        }
        return sb.toString();
    }

    private String getZkPathUser(Configuration configuration) {
        return HiveConf.getVar(configuration, HiveConf.ConfVars.LLAP_ZK_REGISTRY_USER, RegistryUtils.currentUser());
    }

    public Endpoint getRpcEndpoint() {
        return RegistryTypeUtils.ipcEndpoint(IPC_LLAP, new InetSocketAddress(hostname, HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT)));
    }

    public Endpoint getShuffleEndpoint() {
        return RegistryTypeUtils.inetAddrEndpoint(IPC_SHUFFLE, "tcp", hostname, HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT));
    }

    public Endpoint getServicesEndpoint() {
        try {
            return RegistryTypeUtils.webEndpoint(IPC_SERVICES, new URI[]{new URL(HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_WEB_SSL) ? URIUtil.HTTPS : "http", hostname, HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_WEB_PORT), "").toURI()});
        } catch (MalformedURLException e) {
            throw new RuntimeException(e);
        } catch (URISyntaxException e2) {
            throw new RuntimeException("llap service URI for " + hostname + " is invalid", e2);
        }
    }

    public Endpoint getMngEndpoint() {
        return RegistryTypeUtils.ipcEndpoint(IPC_MNG, new InetSocketAddress(hostname, HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_MANAGEMENT_RPC_PORT)));
    }

    public Endpoint getOutputFormatEndpoint() {
        return RegistryTypeUtils.ipcEndpoint(IPC_OUTPUTFORMAT, new InetSocketAddress(hostname, HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT)));
    }

    @Override // org.apache.hadoop.hive.llap.registry.ServiceRegistry
    public String register() throws IOException {
        ServiceRecord serviceRecord = new ServiceRecord();
        Endpoint rpcEndpoint = getRpcEndpoint();
        serviceRecord.addInternalEndpoint(rpcEndpoint);
        serviceRecord.addInternalEndpoint(getMngEndpoint());
        serviceRecord.addInternalEndpoint(getShuffleEndpoint());
        serviceRecord.addExternalEndpoint(getServicesEndpoint());
        serviceRecord.addInternalEndpoint(getOutputFormatEndpoint());
        Iterator it = this.conf.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            if (((String) entry.getKey()).startsWith(HiveConf.PREFIX_LLAP) || ((String) entry.getKey()).startsWith(HiveConf.PREFIX_HIVE_LLAP)) {
                serviceRecord.set((String) entry.getKey(), entry.getValue());
            }
        }
        serviceRecord.set(UNIQUE_IDENTIFIER, uniq.toString());
        try {
            this.znode = new PersistentEphemeralNode(this.zooKeeperClient, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, this.workersPath + "/" + WORKER_PREFIX, this.encoder.toBytes(serviceRecord));
            this.znode.start();
            if (!this.znode.waitForInitialCreate(120L, TimeUnit.SECONDS)) {
                throw new Exception("Max znode creation wait time: 120s exhausted");
            }
            this.znodePath = this.znode.getActualPath();
            this.slotZnode = new SlotZnode(this.zooKeeperClient, this.workersPath, SLOT_PREFIX, WORKER_PREFIX, uniq.toString());
            if (!this.slotZnode.start(120L, TimeUnit.SECONDS)) {
                throw new Exception("Max znode creation wait time: 120s exhausted");
            }
            if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.LLAP_VALIDATE_ACLS)) {
                try {
                    checkAndSetAcls();
                } catch (Exception e) {
                    throw new IOException("Error validating or setting ACLs. " + DISABLE_MESSAGE, e);
                }
            }
            if (this.zooKeeperClient.checkExists().forPath(this.znodePath) == null) {
                throw new Exception("Unable to create znode for this LLAP instance on ZooKeeper.");
            }
            LOG.info("Registered node. Created a znode on ZooKeeper for LLAP instance: rpc: {}, shuffle: {}, webui: {}, mgmt: {}, znodePath: {} ", rpcEndpoint, getShuffleEndpoint(), getServicesEndpoint(), getMngEndpoint(), this.znodePath);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Created zknode with path: {} service record: {}", this.znodePath, serviceRecord);
            }
            return uniq.toString();
        } catch (Exception e2) {
            LOG.error("Unable to create a znode for this server instance", (Throwable) e2);
            CloseableUtils.closeQuietly(this.znode);
            CloseableUtils.closeQuietly(this.slotZnode);
            if (e2 instanceof IOException) {
                throw ((IOException) e2);
            }
            throw new IOException(e2);
        }
    }

    private void checkAndSetAcls() throws Exception {
        if (UserGroupInformation.isSecurityEnabled()) {
            String str = this.workersPath;
            List<ACL> forPath = this.zooKeeperClient.getACL().forPath(str);
            if (forPath == null || forPath.isEmpty()) {
                LOG.warn("No ACLs on " + str + "; setting up ACLs. " + DISABLE_MESSAGE);
                setUpAcls(str);
                return;
            }
            if (!$assertionsDisabled && this.userNameFromPrincipal == null) {
                throw new AssertionError();
            }
            Id id = new Id("sasl", this.userNameFromPrincipal);
            for (ACL acl : forPath) {
                if ((acl.getPerms() & (-2)) != 0 && !id.equals(acl.getId())) {
                    LOG.warn("The ACL " + acl + " is unnacceptable for " + str + "; setting up ACLs. " + DISABLE_MESSAGE);
                    setUpAcls(str);
                    return;
                }
            }
        }
    }

    private void setUpAcls(String str) throws Exception {
        List<ACL> createSecureAcls = createSecureAcls();
        LinkedList linkedList = new LinkedList();
        linkedList.add(str);
        while (!linkedList.isEmpty()) {
            String str2 = (String) linkedList.poll();
            List<String> forPath = this.zooKeeperClient.getChildren().forPath(str2);
            if (forPath != null) {
                Iterator<String> it = forPath.iterator();
                while (it.hasNext()) {
                    linkedList.add(str2 + "/" + it.next());
                }
            }
            this.zooKeeperClient.setACL().withACL(createSecureAcls).forPath(str2);
        }
    }

    @Override // org.apache.hadoop.hive.llap.registry.ServiceRegistry
    public void unregister() throws IOException {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addToCache(String str, String str2, ServiceInstance serviceInstance) {
        this.instanceCacheLock.lock();
        try {
            putInCache(str, this.pathToInstanceCache, serviceInstance);
            putInCache(str2, this.nodeToInstanceCache, serviceInstance);
            this.instanceCacheLock.unlock();
            LOG.debug("Added path={}, host={} instance={} to cache. pathToInstanceCache:size={}, nodeToInstanceCache:size={}", str, str2, serviceInstance, Integer.valueOf(this.pathToInstanceCache.size()), Integer.valueOf(this.nodeToInstanceCache.size()));
        } catch (Throwable th) {
            this.instanceCacheLock.unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeFromCache(String str, String str2) {
        this.instanceCacheLock.lock();
        try {
            this.pathToInstanceCache.remove(str);
            this.nodeToInstanceCache.remove(str2);
            LOG.debug("Removed path={}, host={} from cache. pathToInstanceCache:size={}, nodeToInstanceCache:size={}", str, str2, Integer.valueOf(this.pathToInstanceCache.size()), Integer.valueOf(this.nodeToInstanceCache.size()));
        } finally {
            this.instanceCacheLock.unlock();
        }
    }

    private void putInCache(String str, Map<String, Set<ServiceInstance>> map, ServiceInstance serviceInstance) {
        Set<ServiceInstance> set = map.get(str);
        if (set == null) {
            set = Sets.newHashSet();
            map.put(str, set);
        }
        set.add(serviceInstance);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String extractWorkerIdFromSlot(ChildData childData) {
        return new String(childData.getData(), SlotZnode.CHARSET);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String extractNodeName(ChildData childData) {
        String path = childData.getPath();
        int lastIndexOf = path.lastIndexOf("/");
        if (lastIndexOf >= 0) {
            path = path.substring(lastIndexOf + 1);
        }
        return path;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ServiceInstance extractServiceInstance(PathChildrenCacheEvent pathChildrenCacheEvent, ChildData childData) {
        byte[] data = childData.getData();
        if (data == null) {
            return null;
        }
        try {
            return new DynamicServiceInstance((ServiceRecord) this.encoder.fromBytes(pathChildrenCacheEvent.getData().getPath(), data));
        } catch (IOException e) {
            LOG.error("Unable to decode data for zknode: {}. Dropping notification of type: {}", childData.getPath(), pathChildrenCacheEvent.getType());
            return null;
        }
    }

    @Override // org.apache.hadoop.hive.llap.registry.ServiceRegistry
    public ServiceInstanceSet getInstances(String str, long j) throws IOException {
        checkPathChildrenCache(j);
        if (this.instances == null) {
            this.instances = new DynamicServiceInstanceSet(this.instancesCache);
        }
        return this.instances;
    }

    @Override // org.apache.hadoop.hive.llap.registry.ServiceRegistry
    public ApplicationId getApplicationId() throws IOException {
        getInstances("LLAP", 0L);
        return this.instances.getApplicationId();
    }

    @Override // org.apache.hadoop.hive.llap.registry.ServiceRegistry
    public synchronized void registerStateChangeListener(ServiceInstanceStateChangeListener serviceInstanceStateChangeListener) throws IOException {
        checkPathChildrenCache(0L);
        this.stateChangeListeners.add(serviceInstanceStateChangeListener);
    }

    private synchronized void checkPathChildrenCache(long j) throws IOException {
        Preconditions.checkArgument(this.zooKeeperClient != null && this.zooKeeperClient.getState() == CuratorFrameworkState.STARTED, "client is not started");
        if (this.instancesCache != null) {
            return;
        }
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("StateChangeNotificationHandler").build());
        long nanoTime = System.nanoTime();
        long j2 = j * 1000000;
        long min = Math.min(16L, j);
        while (true) {
            long j3 = min;
            PathChildrenCache pathChildrenCache = new PathChildrenCache(this.zooKeeperClient, this.workersPath, true);
            pathChildrenCache.getListenable().addListener(new InstanceStateChangeListener(), newFixedThreadPool);
            try {
                pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
                this.instancesCache = pathChildrenCache;
                return;
            } catch (KeeperException.InvalidACLException e) {
                CloseableUtils.closeQuietly(pathChildrenCache);
                long nanoTime2 = System.nanoTime() - nanoTime;
                if (j2 == 0 || j2 <= nanoTime2) {
                    LOG.error("Unable to start curator PathChildrenCache", (Throwable) e);
                    throw new IOException(e);
                }
                LOG.warn("The cluster is not started yet (InvalidACL); will retry");
                try {
                    Thread.sleep(Math.min(j3, (j2 - nanoTime2) / 1000000));
                    min = j3 << 1;
                } catch (InterruptedException e2) {
                    LOG.error("Interrupted while retrying the PathChildrenCache startup");
                    throw new IOException(e2);
                }
            } catch (Exception e3) {
                CloseableUtils.closeQuietly(pathChildrenCache);
                LOG.error("Unable to start curator PathChildrenCache", (Throwable) e3);
                throw new IOException(e3);
            }
        }
        LOG.error("Unable to start curator PathChildrenCache", (Throwable) e);
        throw new IOException(e);
    }

    @Override // org.apache.hadoop.hive.llap.registry.ServiceRegistry
    public void start() throws IOException {
        if (this.zooKeeperClient != null) {
            setupZookeeperAuth(this.conf);
            this.zooKeeperClient.start();
        }
        CloseableUtils.class.getName();
    }

    @Override // org.apache.hadoop.hive.llap.registry.ServiceRegistry
    public void stop() throws IOException {
        CloseableUtils.closeQuietly(this.znode);
        CloseableUtils.closeQuietly(this.slotZnode);
        CloseableUtils.closeQuietly(this.instancesCache);
        CloseableUtils.closeQuietly(this.zooKeeperClient);
    }

    private void setupZookeeperAuth(Configuration configuration) throws IOException {
        if (!UserGroupInformation.isSecurityEnabled() || !LlapProxy.isDaemon()) {
            LOG.info("UGI security is not enabled, or non-daemon environment. Skipping setting up ZK auth.");
            return;
        }
        LOG.info("UGI security is enabled. Setting up ZK auth.");
        String var = HiveConf.getVar(configuration, HiveConf.ConfVars.LLAP_KERBEROS_PRINCIPAL);
        if (var == null || var.isEmpty()) {
            throw new IOException("Llap Kerberos principal is empty");
        }
        String var2 = HiveConf.getVar(configuration, HiveConf.ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
        if (var2 == null || var2.isEmpty()) {
            throw new IOException("Llap Kerberos keytab is empty");
        }
        setZookeeperClientKerberosJaasConfig(var, var2);
    }

    private void setZookeeperClientKerberosJaasConfig(String str, String str2) throws IOException {
        System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "LlapZooKeeperClient");
        String serverPrincipal = SecurityUtil.getServerPrincipal(str, Portable.ALL_INTERFACES);
        this.userNameFromPrincipal = LlapUtil.getUserNameFromPrincipal(serverPrincipal);
        javax.security.auth.login.Configuration.setConfiguration(new JaasConfiguration("LlapZooKeeperClient", serverPrincipal, str2));
    }

    static /* synthetic */ List access$200() {
        return createSecureAcls();
    }

    static {
        $assertionsDisabled = !LlapZookeeperRegistryImpl.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) LlapZookeeperRegistryImpl.class);
        DISABLE_MESSAGE = "Set " + HiveConf.ConfVars.LLAP_VALIDATE_ACLS.varname + " to false to disable ACL validation";
        uniq = UUID.randomUUID();
        String str = "localhost";
        try {
            str = InetAddress.getLocalHost().getCanonicalHostName();
        } catch (UnknownHostException e) {
        }
        hostname = str;
    }
}
