Flink 1.14 源码阅读_Jhon_yh的博客-程序员宅基地

技术标签: flink  源码阅读  

env.execute(“test Flink”)

env.execute(“test Flink”) 为入口
在这里插入图片描述

    public JobExecutionResult execute(String jobName) throws Exception {
    
        Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
        // code 2 streamGraph
        final StreamGraph streamGraph = getStreamGraph();
        streamGraph.setJobName(jobName);
        return execute(streamGraph);
    }

getStreamGraph()

图转换的第一步,将code转换成streamGraph

    public StreamGraph getStreamGraph() {
    
        return getStreamGraph(true);
    }

    public StreamGraph getStreamGraph(boolean clearTransformations) {
    
    // StreamGraphGenerator.generate() 真正生成streamGraph的位置。
        final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
        if (clearTransformations) {
    
            transformations.clear();
        }
        return streamGraph;
    }

    private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> transformations) {
    
        if (transformations.size() <= 0) {
    
            throw new IllegalStateException(
                    "No operators defined in streaming topology. Cannot execute.");
        }

        return new StreamGraphGenerator(transformations, config, checkpointCfg, configuration)
                .setStateBackend(defaultStateBackend)
                .setChangelogStateBackendEnabled(changelogStateBackendEnabled)
                .setSavepointDir(defaultSavepointDirectory)
                .setChaining(isChainingEnabled)
                .setUserArtifacts(cacheFile)
                .setTimeCharacteristic(timeCharacteristic)
                .setDefaultBufferTimeout(bufferTimeout)
                .setSlotSharingGroupResource(slotSharingGroupResources);
    }

StreamGraphGenerator.generate()

    public StreamGraph generate() {
    
        streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
        streamGraph.setEnableCheckpointsAfterTasksFinish(
                configuration.get(
                        ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH));
        shouldExecuteInBatchMode = shouldExecuteInBatchMode();
        configureStreamGraph(streamGraph);

        alreadyTransformed = new HashMap<>();

        for (Transformation<?> transformation : transformations) {
    
            transform(transformation);
        }

        streamGraph.setSlotSharingGroupResource(slotSharingGroupResources);

        setFineGrainedGlobalStreamExchangeMode(streamGraph);

        for (StreamNode node : streamGraph.getStreamNodes()) {
    
            if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {
    
                for (StreamEdge edge : node.getInEdges()) {
    
                    edge.setSupportsUnalignedCheckpoints(false);
                }
            }
        }

        final StreamGraph builtStreamGraph = streamGraph;

        alreadyTransformed.clear();
        alreadyTransformed = null;
        streamGraph = null;

        return builtStreamGraph;
    }


transformations

这部分理解一下transformation的由来。
final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();

	// StreamExecutionEnvironment中声明变量。
    protected final List<Transformation<?>> transformations = new ArrayList<>();
	
	// 这段代码是操作添加转换算子的, 我们要看下图所示调用这个方法的类。
	    @Internal
    public void addOperator(Transformation<?> transformation) {
    
        Preconditions.checkNotNull(transformation, "transformation must not be null.");
        this.transformations.add(transformation);
    }


addOperator
transformation 有如下实现。
在这里插入图片描述

我们以map算子为例,看transformations转换的过程。关键代码如下。

    public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
    

        TypeInformation<R> outType =
                TypeExtractor.getMapReturnTypes(
                        clean(mapper), getType(), Utils.getCallLocationName(), true);
		// map方法
        return map(mapper, outType);
    }

	//
    public <R> SingleOutputStreamOperator<R> map(
            MapFunction<T, R> mapper, TypeInformation<R> outputType) {
    
        return transform("Map", outputType, new StreamMap<>(clean(mapper)));
    }

    public <R> SingleOutputStreamOperator<R> transform(
            String operatorName,
            TypeInformation<R> outTypeInfo,
            OneInputStreamOperator<T, R> operator) {
    

        return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
    }

	// 最后这个方法完成了 算子的转换和dataStream的返回。
	    protected <R> SingleOutputStreamOperator<R> doTransform(
            String operatorName,
            TypeInformation<R> outTypeInfo,
            StreamOperatorFactory<R> operatorFactory) {
    

        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        transformation.getOutputType();
		
		// 下面生成了transform
        OneInputTransformation<T, R> resultTransform =
                new OneInputTransformation<>(
                        this.transformation,
                        operatorName,
                        operatorFactory,
                        outTypeInfo,
                        environment.getParallelism());

        @SuppressWarnings({
    "unchecked", "rawtypes"})
        SingleOutputStreamOperator<R> returnStream =
                new SingleOutputStreamOperator(environment, resultTransform);
		// 下面将transform添加到了 transformations中
        getExecutionEnvironment().addOperator(resultTransform);

        return returnStream;
    }

	

execute(StreamGraph streamGraph)

    public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
    
        final JobClient jobClient = executeAsync(streamGraph);

        try {
    
            final JobExecutionResult jobExecutionResult;

            if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
    
                jobExecutionResult = jobClient.getJobExecutionResult().get();
            } else {
    
                jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
            }

            jobListeners.forEach(
                    jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));

            return jobExecutionResult;
        } catch (Throwable t) {
    
            // get() on the JobExecutionResult Future will throw an ExecutionException. This
            // behaviour was largely not there in Flink versions before the PipelineExecutor
            // refactoring so we should strip that exception.
            Throwable strippedException = ExceptionUtils.stripExecutionException(t);

            jobListeners.forEach(
                    jobListener -> {
    
                        jobListener.onJobExecuted(null, strippedException);
                    });
            ExceptionUtils.rethrowException(strippedException);

            // never reached, only make javac happy
            return null;
        }
    }


executeAsync(jobGraph)

这段代码我们主要看
CompletableFuture jobClientFuture =
executorFactory
.getExecutor(configuration) //获取执行器
== .execute(streamGraph, configuration, userClassloader); //和这里==

getExecutor()

通过下图我们可以看到常用的executor,拿我们生产中用到的YarnJobClusterFactor来看。

在这里插入图片描述
在这里插入图片描述

    public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
    
        checkNotNull(streamGraph, "StreamGraph cannot be null.");
        checkNotNull(
                configuration.get(DeploymentOptions.TARGET),
                "No execution.target specified in your configuration file.");

        final PipelineExecutorFactory executorFactory =
                executorServiceLoader.getExecutorFactory(configuration);

        checkNotNull(
                executorFactory,
                "Cannot find compatible factory for specified execution.target (=%s)",
                configuration.get(DeploymentOptions.TARGET));

        CompletableFuture<JobClient> jobClientFuture =
                executorFactory
                        .getExecutor(configuration)
                        .execute(streamGraph, configuration, userClassloader);

        try {
    
            JobClient jobClient = jobClientFuture.get();
            jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
            return jobClient;
        } catch (ExecutionException executionException) {
    
            final Throwable strippedException =
                    ExceptionUtils.stripExecutionException(executionException);
            jobListeners.forEach(
                    jobListener -> jobListener.onJobSubmitted(null, strippedException));

            throw new FlinkException(
                    String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
                    strippedException);
        }
    }


@Internal
public class YarnJobClusterExecutorFactory implements PipelineExecutorFactory {
    

    @Override
    public String getName() {
    
        return YarnJobClusterExecutor.NAME;
    }

    @Override
    public boolean isCompatibleWith(@Nonnull final Configuration configuration) {
    
        return YarnJobClusterExecutor.NAME.equalsIgnoreCase(
                configuration.get(DeploymentOptions.TARGET));
    }

    @Override
    public PipelineExecutor getExecutor(@Nonnull final Configuration configuration) {
    
        try {
    
        //返回yarnJobClusterExecutor执行器。
            return new YarnJobClusterExecutor();
        } catch (NoClassDefFoundError e) {
    
            throw new IllegalStateException(YarnDeploymentTarget.ERROR_MESSAGE);
        }
    }
}


execute(streamGraph, configuration, userClassloader)

通过下图我们可以看到,实现类有Yarn,k8s等,我们继续看AbstractJobClusterExecutor
在这里插入图片描述

    @Override
    public CompletableFuture<JobClient> execute(
            @Nonnull final Pipeline pipeline,
            @Nonnull final Configuration configuration,
            @Nonnull final ClassLoader userCodeClassloader)
            throws Exception {
    
        final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);

        try (final ClusterDescriptor<ClusterID> clusterDescriptor =
//这里初始化了  yarnClient 如下图。              
clusterClientFactory.createClusterDescriptor(configuration)) {
    
            final ExecutionConfigAccessor configAccessor =
                    ExecutionConfigAccessor.fromConfiguration(configuration);

            final ClusterSpecification clusterSpecification =
                    clusterClientFactory.getClusterSpecification(configuration);

            final ClusterClientProvider<ClusterID> clusterClientProvider =
                    clusterDescriptor.deployJobCluster(
                            clusterSpecification, jobGraph, configAccessor.getDetachedMode());
            LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());

            return CompletableFuture.completedFuture(
                    new ClusterClientJobClientAdapter<>(
                            clusterClientProvider, jobGraph.getJobID(), userCodeClassloader));
        }
    }

在这里插入图片描述

deployJobCluster()

如下图,找到实现类,接下来我们看 return deployInternal(…) 方法
在这里插入图片描述


    @Override
    public ClusterClientProvider<ApplicationId> deployJobCluster(
            ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached)
            throws ClusterDeploymentException {
    
        try {
    
            return deployInternal(
                    clusterSpecification,
                    "Flink per-job cluster",
                    getYarnJobClusterEntrypoint(), //获取yarn-job的main方法
                    jobGraph,
                    detached);
        } catch (Exception e) {
    
            throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
        }
    }
return deployInternal(

getYarnJobClusterEntrypoint() 这个方法 返回类名
**YarnJobClusterEntrypoint.class.getName()**这个类在后面提交任务的时候会用到。

deployInternal()

这段代码比较长完成的也比较多,在代码里面注释一下,直接看代码

    private ClusterClientProvider<ApplicationId> deployInternal(
            ClusterSpecification clusterSpecification,
            String applicationName,
            String yarnClusterEntrypoint,
            @Nullable JobGraph jobGraph,
            boolean detached)
            throws Exception {

        //获取当前的用户
        final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
        //是否开启kerberos认证
        if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
            boolean useTicketCache =
                    flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);

            if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
                throw new RuntimeException(
                        "Hadoop security with Kerberos is enabled but the login user "
                                + "does not have Kerberos credentials or delegation tokens!");
            }

            final boolean fetchToken =
                    flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
            final boolean yarnAccessFSEnabled =
                    !CollectionUtil.isNullOrEmpty(
                            flinkConfiguration.get(YarnConfigOptions.YARN_ACCESS));
            if (!fetchToken && yarnAccessFSEnabled) {
                throw new IllegalConfigurationException(
                        String.format(
                                "When %s is disabled, %s must be disabled as well.",
                                SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN.key(),
                                YarnConfigOptions.YARN_ACCESS.key()));
            }
        }

        // 校验资源
        isReadyForDeployment(clusterSpecification);

        // ------------------ Check if the specified queue exists --------------------
        // 队列
        checkYarnQueues(yarnClient);

        // ------------------ Check if the YARN ClusterClient has the requested resources
        // --------------

        // Create application via yarnClient
        final YarnClientApplication yarnApplication = yarnClient.createApplication();
        final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
        
        Resource maxRes = appResponse.getMaximumResourceCapability();

        final ClusterResourceDescription freeClusterMem;
        try {
            freeClusterMem = getCurrentFreeClusterResources(yarnClient);
        } catch (YarnException | IOException e) {
            failSessionDuringDeployment(yarnClient, yarnApplication);
            throw new YarnDeploymentException(
                    "Could not retrieve information about free cluster resources.", e);
        }

        final int yarnMinAllocationMB =
                yarnConfiguration.getInt(
                        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
                        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
        if (yarnMinAllocationMB <= 0) {
            throw new YarnDeploymentException(
                    "The minimum allocation memory "
                            + "("
                            + yarnMinAllocationMB
                            + " MB) configured via '"
                            + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
                            + "' should be greater than 0.");
        }

        final ClusterSpecification validClusterSpecification;
        try {
            validClusterSpecification =
                    validateClusterResources(
                            clusterSpecification, yarnMinAllocationMB, maxRes, freeClusterMem);
        } catch (YarnDeploymentException yde) {
            failSessionDuringDeployment(yarnClient, yarnApplication);
            throw yde;
        }

        LOG.info("Cluster specification: {}", validClusterSpecification);

        final ClusterEntrypoint.ExecutionMode executionMode =
                detached
                        ? ClusterEntrypoint.ExecutionMode.DETACHED
                        : ClusterEntrypoint.ExecutionMode.NORMAL;

        flinkConfiguration.setString(
                ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString());
        
        // 启动appMaster
        ApplicationReport report =
                startAppMaster(
                        flinkConfiguration,
                        applicationName,
                        yarnClusterEntrypoint,
                        jobGraph,
                        yarnClient,
                        yarnApplication,
                        validClusterSpecification);

        // print the application id for user to cancel themselves.
        if (detached) {
            final ApplicationId yarnApplicationId = report.getApplicationId();
            logDetachedClusterInformation(yarnApplicationId, LOG);
        }

        setClusterEntrypointInfoToConfig(report);

        return () -> {
            try {
                return new RestClusterClient<>(flinkConfiguration, report.getApplicationId());
            } catch (Exception e) {
                throw new RuntimeException("Error while creating RestClusterClient.", e);
            }
        };
    }



startAppMaster

启动appMaster过程中,主要作用初始化yarn提交上下文,上传依赖的jar,jobGraph写出到临时路径,生成临时配置文件==(appId-flink-conf.yaml)==,kerberos信息, JobManager描述信息, Container上下文,提交任务yarnClient.submitApplication(appContext);,获取任务提交的信息,返回report。


    ApplicationReport report =
                startAppMaster(
                        flinkConfiguration,
                        applicationName,
                        yarnClusterEntrypoint,
                        jobGraph,
                        yarnClient,
                        yarnApplication,
                        validClusterSpecification);

	    private ApplicationReport startAppMaster(
            Configuration configuration,
            String applicationName,
            String yarnClusterEntrypoint,
            JobGraph jobGraph,
            YarnClient yarnClient,
            YarnClientApplication yarnApplication,
            ClusterSpecification clusterSpecification)
            throws Exception {
    

        // ------------------ Initialize the file systems -------------------------

        org.apache.flink.core.fs.FileSystem.initialize(
                configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
        // 获取fileSystem
        final FileSystem fs = FileSystem.get(yarnConfiguration);

        // hard coded check for the GoogleHDFS client because its not overriding the getScheme()
        // method.
        if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem")
                && fs.getScheme().startsWith("file")) {
    
            LOG.warn(
                    "The file system scheme is '"
                            + fs.getScheme()
                            + "'. This indicates that the "
                            + "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
                            + "The Flink YARN client needs to store its files in a distributed file system");
        }

        //获取提交上下文。
        ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();

        //
        final List<Path> providedLibDirs =
                Utils.getQualifiedRemoteSharedPaths(configuration, yarnConfiguration);

        final YarnApplicationFileUploader fileUploader =
                YarnApplicationFileUploader.from(
                        fs,
                        getStagingDir(fs),
                        providedLibDirs,
                        appContext.getApplicationId(),
                        getFileReplication());

        // The files need to be shipped and added to classpath.
        Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
        for (File file : shipFiles) {
    
            systemShipFiles.add(file.getAbsoluteFile());
        }

        final String logConfigFilePath =
                configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
        if (logConfigFilePath != null) {
    
            systemShipFiles.add(new File(logConfigFilePath));
        }

        // Set-up ApplicationSubmissionContext for the application

        final ApplicationId appId = appContext.getApplicationId();

        // ------------------ Add Zookeeper namespace to local flinkConfiguraton ------
        setHAClusterIdIfNotSet(configuration, appId);

        if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
    
            // activate re-execution of failed applications
            appContext.setMaxAppAttempts(
                    configuration.getInteger(
                            YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
                            YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));

            activateHighAvailabilitySupport(appContext);
        } else {
    
            // set number of application retries to 1 in the default case
            appContext.setMaxAppAttempts(
                    configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1));
        }

        final Set<Path> userJarFiles = new HashSet<>();
        if (jobGraph != null) {
    
            userJarFiles.addAll(
                    jobGraph.getUserJars().stream()
                            .map(f -> f.toUri())
                            .map(Path::new)
                            .collect(Collectors.toSet()));
        }

        final List<URI> jarUrls =
                ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create);
        if (jarUrls != null
                && YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) {
    
            userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet()));
        }

        // only for per job mode
        if (jobGraph != null) {
    
            for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
                    jobGraph.getUserArtifacts().entrySet()) {
    
                // only upload local files
                if (!Utils.isRemotePath(entry.getValue().filePath)) {
    
                    Path localPath = new Path(entry.getValue().filePath);
                    Tuple2<Path, Long> remoteFileInfo =
                            fileUploader.uploadLocalFileToRemote(localPath, entry.getKey());
                    jobGraph.setUserArtifactRemotePath(
                            entry.getKey(), remoteFileInfo.f0.toString());
                }
            }

            jobGraph.writeUserArtifactEntriesToConfiguration();
        }

        if (providedLibDirs == null || providedLibDirs.isEmpty()) {
    
            addLibFoldersToShipFiles(systemShipFiles);
        }

        // Register all files in provided lib dirs as local resources with public visibility
        // and upload the remaining dependencies as local resources with APPLICATION visibility.
        final List<String> systemClassPaths = fileUploader.registerProvidedLocalResources();
        final List<String> uploadedDependencies =
                fileUploader.registerMultipleLocalResources(
                        systemShipFiles.stream()
                                .map(e -> new Path(e.toURI()))
                                .collect(Collectors.toSet()),
                        Path.CUR_DIR,
                        LocalResourceType.FILE);
        systemClassPaths.addAll(uploadedDependencies);

        // upload and register ship-only files
        // Plugin files only need to be shipped and should not be added to classpath.
        if (providedLibDirs == null || providedLibDirs.isEmpty()) {
    
            Set<File> shipOnlyFiles = new HashSet<>();
            addPluginsFoldersToShipFiles(shipOnlyFiles);
            fileUploader.registerMultipleLocalResources(
                    shipOnlyFiles.stream()
                            .map(e -> new Path(e.toURI()))
                            .collect(Collectors.toSet()),
                    Path.CUR_DIR,
                    LocalResourceType.FILE);
        }

        if (!shipArchives.isEmpty()) {
    
            fileUploader.registerMultipleLocalResources(
                    shipArchives.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),
                    Path.CUR_DIR,
                    LocalResourceType.ARCHIVE);
        }

        // Upload and register user jars
        final List<String> userClassPaths =
                fileUploader.registerMultipleLocalResources(
                        userJarFiles,
                        userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED
                                ? ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR
                                : Path.CUR_DIR,
                        LocalResourceType.FILE);

        if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
    
            systemClassPaths.addAll(userClassPaths);
        }

        // normalize classpath by sorting
        Collections.sort(systemClassPaths);
        Collections.sort(userClassPaths);

        // classpath assembler
        StringBuilder classPathBuilder = new StringBuilder();
        if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
    
            for (String userClassPath : userClassPaths) {
    
                classPathBuilder.append(userClassPath).append(File.pathSeparator);
            }
        }
        for (String classPath : systemClassPaths) {
    
            classPathBuilder.append(classPath).append(File.pathSeparator);
        }

        // Setup jar for ApplicationMaster
        final YarnLocalResourceDescriptor localResourceDescFlinkJar =
                fileUploader.uploadFlinkDist(flinkJarPath);
        classPathBuilder
                .append(localResourceDescFlinkJar.getResourceKey())
                .append(File.pathSeparator);

        // write job graph to tmp file and add it to local resource
        // TODO: server use user main method to generate job graph
        if (jobGraph != null) {
    
            File tmpJobGraphFile = null;
            try {
    
                tmpJobGraphFile = File.createTempFile(appId.toString(), null);
                try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile);
                        ObjectOutputStream obOutput = new ObjectOutputStream(output)) {
    
                    obOutput.writeObject(jobGraph);
                }

                final String jobGraphFilename = "job.graph";
                configuration.setString(JOB_GRAPH_FILE_PATH, jobGraphFilename);

                fileUploader.registerSingleLocalResource(
                        jobGraphFilename,
                        new Path(tmpJobGraphFile.toURI()),
                        "",
                        LocalResourceType.FILE,
                        true,
                        false);
                classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);
            } catch (Exception e) {
    
                LOG.warn("Add job graph to local resource fail.");
                throw e;
            } finally {
    
                if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) {
    
                    LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath());
                }
            }
        }

        // Upload the flink configuration
        // write out configuration file
        File tmpConfigurationFile = null;
        try {
    
            tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
            BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);

            String flinkConfigKey = "flink-conf.yaml";
            fileUploader.registerSingleLocalResource(
                    flinkConfigKey,
                    new Path(tmpConfigurationFile.getAbsolutePath()),
                    "",
                    LocalResourceType.FILE,
                    true,
                    true);
            classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
        } finally {
    
            if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) {
    
                LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath());
            }
        }

        if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
    
            for (String userClassPath : userClassPaths) {
    
                classPathBuilder.append(userClassPath).append(File.pathSeparator);
            }
        }

        // To support Yarn Secure Integration Test Scenario
        // In Integration test setup, the Yarn containers created by YarnMiniCluster does not have
        // the Yarn site XML
        // and KRB5 configuration files. We are adding these files as container local resources for
        // the container
        // applications (JM/TMs) to have proper secure cluster setup
        Path remoteYarnSiteXmlPath = null;
        if (System.getenv("IN_TESTS") != null) {
    
            File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
            LOG.info(
                    "Adding Yarn configuration {} to the AM container local resource bucket",
                    f.getAbsolutePath());
            Path yarnSitePath = new Path(f.getAbsolutePath());
            remoteYarnSiteXmlPath =
                    fileUploader
                            .registerSingleLocalResource(
                                    Utils.YARN_SITE_FILE_NAME,
                                    yarnSitePath,
                                    "",
                                    LocalResourceType.FILE,
                                    false,
                                    false)
                            .getPath();
            if (System.getProperty("java.security.krb5.conf") != null) {
    
                configuration.set(
                        SecurityOptions.KERBEROS_KRB5_PATH,
                        System.getProperty("java.security.krb5.conf"));
            }
        }

        Path remoteKrb5Path = null;
        boolean hasKrb5 = false;
        String krb5Config = configuration.get(SecurityOptions.KERBEROS_KRB5_PATH);
        if (!StringUtils.isNullOrWhitespaceOnly(krb5Config)) {
    
            final File krb5 = new File(krb5Config);
            LOG.info(
                    "Adding KRB5 configuration {} to the AM container local resource bucket",
                    krb5.getAbsolutePath());
            final Path krb5ConfPath = new Path(krb5.getAbsolutePath());
            remoteKrb5Path =
                    fileUploader
                            .registerSingleLocalResource(
                                    Utils.KRB5_FILE_NAME,
                                    krb5ConfPath,
                                    "",
                                    LocalResourceType.FILE,
                                    false,
                                    false)
                            .getPath();
            hasKrb5 = true;
        }

        Path remotePathKeytab = null;
        String localizedKeytabPath = null;
        String keytab = configuration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
        if (keytab != null) {
    
            boolean localizeKeytab =
                    flinkConfiguration.getBoolean(YarnConfigOptions.SHIP_LOCAL_KEYTAB);
            localizedKeytabPath =
                    flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
            if (localizeKeytab) {
    
                // Localize the keytab to YARN containers via local resource.
                LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
                remotePathKeytab =
                        fileUploader
                                .registerSingleLocalResource(
                                        localizedKeytabPath,
                                        new Path(keytab),
                                        "",
                                        LocalResourceType.FILE,
                                        false,
                                        false)
                                .getPath();
            } else {
    
                // // Assume Keytab is pre-installed in the container.
                localizedKeytabPath =
                        flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
            }
        }

        final JobManagerProcessSpec processSpec =
                JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
                        flinkConfiguration, JobManagerOptions.TOTAL_PROCESS_MEMORY);
        final ContainerLaunchContext amContainer =
                setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec);

        // setup security tokens
        if (UserGroupInformation.isSecurityEnabled()) {
    
            // set HDFS delegation tokens when security is enabled
            LOG.info("Adding delegation token to the AM container.");
            final List<Path> pathsToObtainToken = new ArrayList<>();
            boolean fetchToken =
                    configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
            if (fetchToken) {
    
                List<Path> yarnAccessList =
                        ConfigUtils.decodeListFromConfig(
                                configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
                pathsToObtainToken.addAll(yarnAccessList);
                pathsToObtainToken.addAll(fileUploader.getRemotePaths());
            }
            Utils.setTokensFor(amContainer, pathsToObtainToken, yarnConfiguration, fetchToken);
        }

        amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
        fileUploader.close();

        // Setup CLASSPATH and environment variables for ApplicationMaster
        final Map<String, String> appMasterEnv = new HashMap<>();
        // set user specified app master environment variables
        appMasterEnv.putAll(
                ConfigurationUtils.getPrefixedKeyValuePairs(
                        ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, configuration));
        // set Flink app class path
        appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, classPathBuilder.toString());

        // set Flink on YARN internal configuration values
        appMasterEnv.put(YarnConfigKeys.FLINK_DIST_JAR, localResourceDescFlinkJar.toString());
        appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
        appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fileUploader.getHomeDir().toString());
        appMasterEnv.put(
                YarnConfigKeys.ENV_CLIENT_SHIP_FILES,
                encodeYarnLocalResourceDescriptorListToString(
                        fileUploader.getEnvShipResourceList()));
        appMasterEnv.put(
                YarnConfigKeys.FLINK_YARN_FILES,
                fileUploader.getApplicationDir().toUri().toString());

        // https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
        appMasterEnv.put(
                YarnConfigKeys.ENV_HADOOP_USER_NAME,
                UserGroupInformation.getCurrentUser().getUserName());

        if (localizedKeytabPath != null) {
    
            appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);
            String principal = configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
            appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
            if (remotePathKeytab != null) {
    
                appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remotePathKeytab.toString());
            }
        }

        // To support Yarn Secure Integration Test Scenario
        if (remoteYarnSiteXmlPath != null) {
    
            appMasterEnv.put(
                    YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
        }
        if (remoteKrb5Path != null) {
    
            appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
        }

        // set classpath from YARN configuration
        Utils.setupYarnClassPath(yarnConfiguration, appMasterEnv);

        amContainer.setEnvironment(appMasterEnv);

        // Set up resource type requirements for ApplicationMaster
        Resource capability = Records.newRecord(Resource.class);
        capability.setMemory(clusterSpecification.getMasterMemoryMB());
        capability.setVirtualCores(
                flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));

        final String customApplicationName = customName != null ? customName : applicationName;

        appContext.setApplicationName(customApplicationName);
        appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
        appContext.setAMContainerSpec(amContainer);
        appContext.setResource(capability);

        // Set priority for application
        int priorityNum = flinkConfiguration.getInteger(YarnConfigOptions.APPLICATION_PRIORITY);
        if (priorityNum >= 0) {
    
            Priority priority = Priority.newInstance(priorityNum);
            appContext.setPriority(priority);
        }

        if (yarnQueue != null) {
    
            appContext.setQueue(yarnQueue);
        }

        setApplicationNodeLabel(appContext);

        setApplicationTags(appContext);

        // add a hook to clean up in case deployment fails
        Thread deploymentFailureHook =
                new DeploymentFailureHook(yarnApplication, fileUploader.getApplicationDir());
        Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
        LOG.info("Submitting application master " + appId);
        yarnClient.submitApplication(appContext);

        LOG.info("Waiting for the cluster to be allocated");
        final long startTime = System.currentTimeMillis();
        ApplicationReport report;
        YarnApplicationState lastAppState = YarnApplicationState.NEW;
        loop:
        while (true) {
    
            try {
    
                report = yarnClient.getApplicationReport(appId);
            } catch (IOException e) {
    
                throw new YarnDeploymentException("Failed to deploy the cluster.", e);
            }
            YarnApplicationState appState = report.getYarnApplicationState();
            LOG.debug("Application State: {}", appState);
            switch (appState) {
    
                case FAILED:
                case KILLED:
                    throw new YarnDeploymentException(
                            "The YARN application unexpectedly switched to state "
                                    + appState
                                    + " during deployment. \n"
                                    + "Diagnostics from YARN: "
                                    + report.getDiagnostics()
                                    + "\n"
                                    + "If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n"
                                    + "yarn logs -applicationId "
                                    + appId);
                    // break ..
                case RUNNING:
                    LOG.info("YARN application has been deployed successfully.");
                    break loop;
                case FINISHED:
                    LOG.info("YARN application has been finished successfully.");
                    break loop;
                default:
                    if (appState != lastAppState) {
    
                        LOG.info("Deploying cluster, current state " + appState);
                    }
                    if (System.currentTimeMillis() - startTime > 60000) {
    
                        LOG.info(
                                "Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
                    }
            }
            lastAppState = appState;
            Thread.sleep(250);
        }

        // since deployment was successful, remove the hook
        ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
        return report;
    }


YarnJobClusterEntrypoint

接下来我们看下这个入口类,

    public static void main(String[] args) {
    
        // startup checks and logging
        EnvironmentInformation.logEnvironmentInfo(
                LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args);
        SignalHandler.register(LOG);
        JvmShutdownSafeguard.installAsShutdownHook(LOG);

        Map<String, String> env = System.getenv();

        final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());
        Preconditions.checkArgument(
                workingDirectory != null,
                "Working directory variable (%s) not set",
                ApplicationConstants.Environment.PWD.key());

        try {
    
            YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
        } catch (IOException e) {
    
            LOG.warn("Could not log YARN environment information.", e);
        }

        final Configuration dynamicParameters =
                ClusterEntrypointUtils.parseParametersOrExit(
                        args,
                        new DynamicParametersConfigurationParserFactory(),
                        YarnJobClusterEntrypoint.class);
        final Configuration configuration =
                YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env);

        YarnJobClusterEntrypoint yarnJobClusterEntrypoint =
                new YarnJobClusterEntrypoint(configuration);

        
        //
        ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
    }

ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);

    public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
    

        final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
        try {
    
            clusterEntrypoint.startCluster();
        } catch (ClusterEntrypointException e) {
    
            LOG.error(
                    String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),
                    e);
            System.exit(STARTUP_FAILURE_RETURN_CODE);
        }

        int returnCode;
        Throwable throwable = null;

        try {
    
            returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();
        } catch (Throwable e) {
    
            throwable = ExceptionUtils.stripExecutionException(e);
            returnCode = RUNTIME_FAILURE_RETURN_CODE;
        }

        LOG.info(
                "Terminating cluster entrypoint process {} with exit code {}.",
                clusterEntrypointName,
                returnCode,
                throwable);
        System.exit(returnCode);
    }


clusterEntrypoint.startCluster();

    public void startCluster() throws ClusterEntrypointException {
    
        LOG.info("Starting {}.", getClass().getSimpleName());

        try {
    
            FlinkSecurityManager.setFromConfiguration(configuration);
            PluginManager pluginManager =
                    PluginUtils.createPluginManagerFromRootFolder(configuration);
            configureFileSystems(configuration, pluginManager);

            SecurityContext securityContext = installSecurityContext(configuration);

            ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);
            securityContext.runSecured(
                    (Callable<Void>)
                            () -> {
    
                                runCluster(configuration, pluginManager);

                                return null;
                            });
        } catch (Throwable t) {
    
            final Throwable strippedThrowable =
                    ExceptionUtils.stripException(t, UndeclaredThrowableException.class);

            try {
    
                // clean up any partial state
                shutDownAsync(
                                ApplicationStatus.FAILED,
                                ShutdownBehaviour.STOP_APPLICATION,
                                ExceptionUtils.stringifyException(strippedThrowable),
                                false)
                        .get(
                                INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(),
                                TimeUnit.MILLISECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
    
                strippedThrowable.addSuppressed(e);
            }

            throw new ClusterEntrypointException(
                    String.format(
                            "Failed to initialize the cluster entrypoint %s.",
                            getClass().getSimpleName()),
                    strippedThrowable);
        }
    }

runCluster(configuration, pluginManager);

    private void runCluster(Configuration configuration, PluginManager pluginManager)
            throws Exception {
    
        synchronized (lock) {
    
            initializeServices(configuration, pluginManager);

            // write host information into configuration
            configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
            configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

            final DispatcherResourceManagerComponentFactory
                    dispatcherResourceManagerComponentFactory =
                            createDispatcherResourceManagerComponentFactory(configuration);

            clusterComponent =
                    dispatcherResourceManagerComponentFactory.create(
                            configuration,
                            ioExecutor,
                            commonRpcService,
                            haServices,
                            blobServer,
                            heartbeatServices,
                            metricRegistry,
                            executionGraphInfoStore,
                            new RpcMetricQueryServiceRetriever(
                                    metricRegistry.getMetricQueryServiceRpcService()),
                            this);

            clusterComponent
                    .getShutDownFuture()
                    .whenComplete(
                            (ApplicationStatus applicationStatus, Throwable throwable) -> {
    
                                if (throwable != null) {
    
                                    shutDownAsync(
                                            ApplicationStatus.UNKNOWN,
                                            ShutdownBehaviour.STOP_APPLICATION,
                                            ExceptionUtils.stringifyException(throwable),
                                            false);
                                } else {
    
                                    // This is the general shutdown path. If a separate more
                                    // specific shutdown was
                                    // already triggered, this will do nothing
                                    shutDownAsync(
                                            applicationStatus,
                                            ShutdownBehaviour.STOP_APPLICATION,
                                            null,
                                            true);
                                }
                            });
        }
    }

dispatcherResourceManagerComponentFactory.create()

    public DispatcherResourceManagerComponent create(
            Configuration configuration,
            Executor ioExecutor,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            BlobServer blobServer,
            HeartbeatServices heartbeatServices,
            MetricRegistry metricRegistry,
            ExecutionGraphInfoStore executionGraphInfoStore,
            MetricQueryServiceRetriever metricQueryServiceRetriever,
            FatalErrorHandler fatalErrorHandler)
            throws Exception {
    

        LeaderRetrievalService dispatcherLeaderRetrievalService = null;
        LeaderRetrievalService resourceManagerRetrievalService = null;
        WebMonitorEndpoint<?> webMonitorEndpoint = null;
        ResourceManagerService resourceManagerService = null;
        DispatcherRunner dispatcherRunner = null;

        try {
    
            dispatcherLeaderRetrievalService =
                    highAvailabilityServices.getDispatcherLeaderRetriever();

            resourceManagerRetrievalService =
                    highAvailabilityServices.getResourceManagerLeaderRetriever();

            final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever =
                    new RpcGatewayRetriever<>(
                            rpcService,
                            DispatcherGateway.class,
                            DispatcherId::fromUuid,
                            new ExponentialBackoffRetryStrategy(
                                    12, Duration.ofMillis(10), Duration.ofMillis(50)));

            final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever =
                    new RpcGatewayRetriever<>(
                            rpcService,
                            ResourceManagerGateway.class,
                            ResourceManagerId::fromUuid,
                            new ExponentialBackoffRetryStrategy(
                                    12, Duration.ofMillis(10), Duration.ofMillis(50)));

            final ScheduledExecutorService executor =
                    WebMonitorEndpoint.createExecutorService(
                            configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
                            configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
                            "DispatcherRestEndpoint");

            final long updateInterval =
                    configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
            final MetricFetcher metricFetcher =
                    updateInterval == 0
                            ? VoidMetricFetcher.INSTANCE
                            : MetricFetcherImpl.fromConfiguration(
                                    configuration,
                                    metricQueryServiceRetriever,
                                    dispatcherGatewayRetriever,
                                    executor);

            webMonitorEndpoint =
                    restEndpointFactory.createRestEndpoint(
                            configuration,
                            dispatcherGatewayRetriever,
                            resourceManagerGatewayRetriever,
                            blobServer,
                            executor,
                            metricFetcher,
                            highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
                            fatalErrorHandler);

            log.debug("Starting Dispatcher REST endpoint.");
            webMonitorEndpoint.start();

            final String hostname = RpcUtils.getHostname(rpcService);

            resourceManagerService =
                    ResourceManagerServiceImpl.create(
                            resourceManagerFactory,
                            configuration,
                            rpcService,
                            highAvailabilityServices,
                            heartbeatServices,
                            fatalErrorHandler,
                            new ClusterInformation(hostname, blobServer.getPort()),
                            webMonitorEndpoint.getRestBaseUrl(),
                            metricRegistry,
                            hostname,
                            ioExecutor);

            final HistoryServerArchivist historyServerArchivist =
                    HistoryServerArchivist.createHistoryServerArchivist(
                            configuration, webMonitorEndpoint, ioExecutor);

            final PartialDispatcherServices partialDispatcherServices =
                    new PartialDispatcherServices(
                            configuration,
                            highAvailabilityServices,
                            resourceManagerGatewayRetriever,
                            blobServer,
                            heartbeatServices,
                            () ->
                                    JobManagerMetricGroup.createJobManagerMetricGroup(
                                            metricRegistry, hostname),
                            executionGraphInfoStore,
                            fatalErrorHandler,
                            historyServerArchivist,
                            metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
                            ioExecutor);

            log.debug("Starting Dispatcher.");
            dispatcherRunner =
                    dispatcherRunnerFactory.createDispatcherRunner(
                            highAvailabilityServices.getDispatcherLeaderElectionService(),
                            fatalErrorHandler,
                            new HaServicesJobGraphStoreFactory(highAvailabilityServices),
                            ioExecutor,
                            rpcService,
                            partialDispatcherServices);

            log.debug("Starting ResourceManagerService.");
            resourceManagerService.start();

            resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
            dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);

            return new DispatcherResourceManagerComponent(
                    dispatcherRunner,
                    resourceManagerService,
                    dispatcherLeaderRetrievalService,
                    resourceManagerRetrievalService,
                    webMonitorEndpoint,
                    fatalErrorHandler);

        } catch (Exception exception) {
    
            // clean up all started components
            if (dispatcherLeaderRetrievalService != null) {
    
                try {
    
                    dispatcherLeaderRetrievalService.stop();
                } catch (Exception e) {
    
                    exception = ExceptionUtils.firstOrSuppressed(e, exception);
                }
            }

            if (resourceManagerRetrievalService != null) {
    
                try {
    
                    resourceManagerRetrievalService.stop();
                } catch (Exception e) {
    
                    exception = ExceptionUtils.firstOrSuppressed(e, exception);
                }
            }

            final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);

            if (webMonitorEndpoint != null) {
    
                terminationFutures.add(webMonitorEndpoint.closeAsync());
            }

            if (resourceManagerService != null) {
    
                terminationFutures.add(resourceManagerService.closeAsync());
            }

            if (dispatcherRunner != null) {
    
                terminationFutures.add(dispatcherRunner.closeAsync());
            }

            final FutureUtils.ConjunctFuture<Void> terminationFuture =
                    FutureUtils.completeAll(terminationFutures);

            try {
    
                terminationFuture.get();
            } catch (Exception e) {
    
                exception = ExceptionUtils.firstOrSuppressed(e, exception);
            }

            throw new FlinkException(
                    "Could not create the DispatcherResourceManagerComponent.", exception);
        }
    }


持续更新ing…

持续更新ing…
持续更新ing…
持续更新ing…
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/yswhfd/article/details/124472868

智能推荐

苹果Pro安装win10鼠标不能用的问题_hejian1030的博客-程序员宅基地

苹果Pro安装win10鼠标不能用的问题右键 这台电脑设备管理器通用串行总线控制器 USB根集线器双击打开后,电源管理 去掉那个允许节约电源在设备管理中 【蓝牙】 Bluetooth无线收发器 ,把电源管理里面的节约电源关闭...

链表经典问题汇总_vivi的博客-程序员宅基地

收集了一下链表常见的面试为题:1、如何判断一个单链表有环2、如何判断一个环的入口点在哪里3、如何知道环的长度4、如何知道两个单链表(无环)是否相交5、如果两个单链表(无环)相交,如何知道它们相交的第一个节点是什么6、如何知道两个单链

java导出PDF_java 导出pdf到邮箱_橙子呼叫石榴的博客-程序员宅基地

前言公司新需求,把数据库结构化文章导出为PDF,没整过,现在需求开发完,整理一下,方便日后使用。涉及到文本、表格、图片,无需安装字体。详情见代码中注释。效果图POM <!--生成pdf--> <dependency> <groupId>com.itextpdf</groupId> <artifactId>itextpdf</artifactId>.._java 导出pdf到邮箱

7-30 念数字_爱打哈欠的小河马的博客-程序员宅基地

7-30 念数字题目代码题目输入一个整数,输出每个数字对应的拼音。当整数为负数时,先输出 fu 字。十个数字对应的拼音如下:0: ling1: yi2: er3: san4: si5: wu6: liu7: qi8: ba9: jiu输入格式:输入在一行中给出一个整数,如:1234。提示:整数包括负数、零和正数。输出格式:在一行中输出这个整数对应的拼音,每个数字..._7-30 念数字

关于docker挂载文件的生效问题的解决_docker path生效_qinzhao168的博客-程序员宅基地

主题 :docker 挂载方式有-v 方式 1.直接挂载目录 2.挂载指定文件_docker path生效

android studio上代码编译调试中遇到的一些异常记录_djh10000的博客-程序员宅基地

下面是记录的在平时代码编写或编译时的一些异常,答案有自己摸索出来的,也有参考其他程序猿朋友的,参考文章过多,就不一一贴出来了。I/Choreographer: Skipped 2267 frames! The application may be doing too much报出这个错误时app一启动要反应很久才会出现页面或者直接死掉,搞得我一脸懵逼。这个报错有的原因是...

随便推点

Oracle:oracle sum(求和) count(个数) 的区别_love398146779的博客-程序员宅基地

在开发时,我们经常会遇到以“累计(count)”或是“累加(sum)”为条件的查询。比如aaa表:ID,NAME,SHU1, a, 32, a, 43, b, 54, b, 75, a,6, a, 20   例1:查询出现过2次的name。  往往初学者会错误地认为在where 语句里直接使用count()算..._oracle中如何用sum求count 个数和

vue输入框组件_单击以显示输入文本框Vue组件_cuk5340的博客-程序员宅基地

vue输入框组件 标签编辑 (label-edit)Click to show input text box Vue Component ... inspired by Trello. This is my first time publishing Vue Component via npm package and let me know if you encounter any issues..._vue label edit

Windows下编译opencv4android+opencv_contrib_何明晓的博客-程序员宅基地

大家好,这是我第一次写博客,在平台上搜索资料受益这么久,今天终于有机会有能力给大家分享一下自己的经验,在这里给大家分享一下关于在windows编译opencv4Android+opencv_contrib,opencv从3.0版本开始,官网就不再提供整合版的库(也就是核心功能库和拓展功能库分开了),官方给的理由是核心库经过很多测试,是非常稳定的,contrib库是没有经过太多测试的,但是contr...

强化学习实战(六)【Windows安装星际争霸Ⅱ 强化学习环境教程】_星际争霸2学习版_小小何先生的博客-程序员宅基地

1.安装cuda、cudnn、tensorflow-gpu。可以参考文章。2.下载安装星际争霸。可以自行百度一下怎么安装。并添加系统变量或者去E:\install\anaconda\envs\starcraft\Lib\site-packages\pysc2\run_configs\platforms.py这个文件下面看看改改相关参数:可参考博客。添加完了系统变量之后需..._星际争霸2学习版

递归法求最大公约数_2466: 求最大公约数问题_勇敢*牛牛的博客-程序员宅基地

牢记递归三步走大法从辗转相除法举一反三,深入思考一下就是个递归过程……#include <stdio.h>#include <stdlib.h>int fun(int a,int b){ if(a%b == 0){ return b; }else{ return fun(b,a%b); } } //结合以前的知识此递归还可以更加简便int main() { int a,b; scnaf("%d%d",&a,&b); printf("最大_2466: 求最大公约数问题

modelsim仿真do文件方式_ise产生do文件_yf869778412的博客-程序员宅基地

Modelsim se仿真Xilinx IPcore 方法:先写好do文件常规框架,根据modelsim报错再添加ise IP核库仿真文件。注:记得添加并仿真glbl.v全局控制仿真文件到sim/ise_lib下步骤:1. 查看modelsim提示的错误信息:Error:./../ise_prj/ipcore_dir/sync_fifo_8x256.v(493_ise产生do文件

推荐文章

热门文章

相关标签