【背景】
前一段时间总结了hadoop中的token认证、yarn任务运行中的token,其中也都提到了delegation token。而最近也遇到了一个问题,问题现象是:flink任务运行超过七天后,由于宿主机异常导致任务失败,继而触发任务的重试,但接连重试几次都是失败的,并且任务的日志也没有聚合,导致无法分析问题失败的原因。最后发现是和delegation token有关,本文就来总结下相关的原理。
【原理】
1. 什么是delegation token
先简单描述下为什么需要delegation token。在开启kerberos之后,服务之间交互前,都需要先向KDC认证获取对应的票据。而在一个yarn任务运行过程中可能会产生很多任务container,每个这样的任务container都可能会访问hdfs,由于访问前需要先获取票据来进行认证,那么这个时候KDC就很容易成为性能瓶颈。delegation token(委派token)就是为了减少不必要的认证工作而出现的。
2. delegation token在任务提交运行过程中的使用
任务提交运行过程中,delegation token相关的流程如下图所示:
1)首先,RM启动后,内部会创建一个服务线程专门用于处理token的更新
// ResourceManager.java protected void serviceInit(Configuration configuration) throws Exception { ... if (UserGroupInformation.isSecurityEnabled()) { delegationTokenRenewer = createDelegationTokenRenewer(); rmContext.setDelegationTokenRenewer(delegationTokenRenewer); } .... } protected DelegationTokenRenewer createDelegationTokenRenewer() { return new DelegationTokenRenewer(); }
2)客户端申请delegation token
客户端在提交任务前,通常需要先向hdfs上传资源文件(包括运行所需的jar包等),在此过程中会向nn申请一个delegation token,并放到任务启动上下文中,然后向rm发送提交任务请求(请求中包含任务的启动上下文)。
下面是flink on yarn提交任务时的代码片段:
// flink YarnClusterDescriptor.java private ApplicationReport startAppMaster(...){ // 开启kerberos的情况下,获取token if (UserGroupInformation.isSecurityEnabled()) { // set HDFS delegation tokens when security is enabled LOG.info("Adding delegation token to the AM container."); List<Path> yarnAccessList = ConfigUtils.decodeListFromConfig( configuration, YarnConfigOptions.YARN_ACCESS, Path::new); Utils.setTokensFor( amContainer, ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()), yarnConfiguration); } } public static void setTokensFor( ContainerLaunchContext amContainer, List<Path> paths, Configuration conf) throws IOException { Credentials credentials = new Credentials(); // for HDFS TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf); // for HBase obtainTokenForHBase(credentials, conf); // for user UserGroupInformation currUsr = UserGroupInformation.getCurrentUser(); // 获取到的token 放到启动上下文中 Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens(); for (Token<? extends TokenIdentifier> token : usrTok) { final Text id = new Text(token.getIdentifier()); LOG.info("Adding user token " + id + " with " + token); credentials.addToken(id, token); } try (DataOutputBuffer dob = new DataOutputBuffer()) { credentials.writeTokenStorageToStream(dob); if (LOG.isDebugEnabled()) { LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength()); } ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); amContainer.setTokens(securityTokens); } } // TokenCache.java // 调用hadoop的接口 向nn请求token public static void obtainTokensForNamenodes( Credentials credentials, Path[] ps, Configuration conf) throws IOException { if (!UserGroupInformation.isSecurityEnabled()) { return; } obtainTokensForNamenodesInternal(credentials, ps, conf); } static void obtainTokensForNamenodesInternal( Credentials credentials, Path[] ps, Configuration conf) throws IOException { Set<FileSystem> fsSet = new HashSet<FileSystem>(); for (Path p : ps) { fsSet.add(p.getFileSystem(conf)); } String masterPrincipal = Master.getMasterPrincipal(conf); for (FileSystem fs : fsSet) { obtainTokensForNamenodesInternal(fs, credentials, conf, masterPrincipal); } } static void obtainTokensForNamenodesInternal( FileSystem fs, Credentials credentials, Configuration conf, String renewer) throws IOException { ... final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer, credentials); ... } // FileSystem.java public Token<?>[] addDelegationTokens( final String renewer, Credentials credentials) throws IOException { if (credentials == null) { credentials = new Credentials(); } final List<Token<?>> tokens = new ArrayList<>(); collectDelegationTokens(renewer, credentials, tokens); return tokens.toArray(new Token<?>[tokens.size()]); } private void collectDelegationTokens( final String renewer, final Credentials credentials, final List<Token<?>> tokens) throws IOException { final String serviceName = getCanonicalServiceName(); // Collect token of the this filesystem and then of its embedded children if (serviceName != null) { // fs has token, grab it final Text service = new Text(serviceName); Token<?> token = credentials.getToken(service); if (token == null) { // 向NN 请求delegation token token = getDelegationToken(renewer); if (token != null) { tokens.add(token); credentials.addToken(service, token); } } } ... }
3)RM将token添加到delegation token更新服务中
RM在处理客户端提交任务请求时,判断是否启用kerberos认证,如果启用则从任务启动上下文中解析出delegation token,并添加到delegation token更新服务中。在该服务中,会启动线程定时对delegation token进行更新。此后,继续向NM发送启动container的请求,delegation token则随启动上下文被带到NM中。
// RMAppManager.java protected void submitApplication( ApplicationSubmissionContext submissionContext, long submitTime, String user) throws YarnException { ... if (UserGroupInformation.isSecurityEnabled()) { this.rmContext.getDelegationTokenRenewer().addApplicationAsync( applicationId, BuilderUtils.parseCredentials(submissionContext), submissionContext.getCancelTokensWhenComplete(), application.getUser(), BuilderUtils.parseTokensConf(submissionContext)); } ... }
4)NM使用delegation token
NM收到启动container的请求后,从请求(任务启动上下文)中解析出delegation token,并为该container构造一个对应的实例对象,同时将delegation token保存在该实例对象中,然后为该container进行资源本地化,即从hdfs中下载必须的资源文件,这里就会用到传递过来的delegation token。同时在任务结束时,如果需要进行任务日志聚合,仍旧会使用该delegation token将任务的日志上传到hdfs的指定路径。
另外,delegation token还会写入到持久化文件中,一方面用于NM的异常恢复,另一方面是将token传递给任务container进程以供使用。
3. delegation token的更新与生命周期
1)申请token时已经指定了token的最大生命周期
// FSNamesystem.java Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException { ... DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner, renewer, realUser); token = new Token<DelegationTokenIdentifier>(dtId, dtSecretManager); ... return token; } // Token.java public Token(T id, SecretManager<T> mgr) { password = mgr.createPassword(id); identifier = id.getBytes(); kind = id.getKind(); service = new Text(); } // AbstractDelegationTokenSecretManager protected synchronized byte[] createPassword(TokenIdent identifier) { long now = Time.now(); identifier.setMaxDate(now + tokenMaxLifetime); ... }
2)RM接收到任务提交请求后,先进行一次更新得到token的下次超时时间,然后再根据超时时间设置定时器时间触发进行更新。
public void addApplicationSync( ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd, String user) throws IOException, InterruptedException { handleAppSubmitEvent( new DelegationTokenRenewerAppSubmitEvent( applicationId, ts, shouldCancelAtEnd, user, new Configuration())); } private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt) throws IOException, InterruptedException { ... Credentials ts = evt.getCredentials(); Collection<Token<?>> tokens = ts.getAllTokens(); for (Token<?> token : tokens) { DelegationTokenToRenew dttr = allTokens.get(token); if (dttr == null) { dttr = new DelegationTokenToRenew( Arrays.asList(applicationId), token, tokenConf, now, shouldCancelAtEnd, evt.getUser()); try { // 先进行一次更新 renewToken(dttr) } catch (IOException ioe) { ... } } tokenList.add(dttr); } if (!tokenList.isEmpty()) { for (DelegationTokenToRenew dtr : tokenList) { DelegationTokenToRenew currentDtr = allTokens.putIfAbsent(dtr.token, dtr); if (currentDtr != null) { // another job beat us currentDtr.referringAppIds.add(applicationId); appTokens.get(applicationId).add(currentDtr); } else { appTokens.get(applicationId).add(dtr); setTimerForTokenRenewal(dtr); } } } } protected void renewToken(final DelegationTokenToRenew dttr) throws IOException { // need to use doAs so that http can find the kerberos tgt // NOTE: token renewers should be responsible for the correct UGI! try { // 更新delegation token 并得到下次超时时间 dttr.expirationDate = UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction<Long>() { @Override public Long run() throws Exception { return dttr.token.renew(dttr.conf); } }); } catch (InterruptedException e) { throw new IOException(e); } LOG.info("Renewed delegation-token= [" + dttr + "]"); } protected void setTimerForTokenRenewal(DelegationTokenToRenew token) throws IOException { // calculate timer time long expiresIn = token.expirationDate - System.currentTimeMillis(); if (expiresIn <= 0) { LOG.info("Will not renew token " + token); return; } long renewIn = token.expirationDate - expiresIn / 10; // little bit before the expiration // need to create new task every time RenewalTimerTask tTask = new RenewalTimerTask(token); token.setTimerTask(tTask); // keep reference to the timer renewalTimer.schedule(token.timerTask, new Date(renewIn)); LOG.info( "Renew " + token + " in " + expiresIn + " ms, appId = " + token.referringAppIds); }
再来看更新token的请求与处理细节:
// 客户端发送更新请求 public long renew(Token<?> token, Configuration conf) throws IOException { Token<DelegationTokenIdentifier> delToken = (Token<DelegationTokenIdentifier>) token; ClientProtocol nn = getNNProxy(delToken, conf); try { return nn.renewDelegationToken(delToken); } catch (RemoteException re) { throw re.unwrapRemoteException(InvalidToken.class, AccessControlException.class); } } // 服务端的响应处理 long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws InvalidToken, IOException { try { ... expiryTime = dtSecretManager.renewToken(token, renewer); } catch (AccessControlException ace) { ... } return expiryTime; } public synchronized long renewToken( Token<TokenIdent> token, String renewer) throws InvalidToken, IOException { ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); DataInputStream in = new DataInputStream(buf); TokenIdent id = createIdentifier(); id.readFields(in); LOG.info( "Token renewal for identifier: " + formatTokenId(id) + "; total currentTokens " + currentTokens.size()); long now = Time.now(); if (id.getMaxDate() < now) { throw new InvalidToken( renewer + " tried to renew an expired token " + formatTokenId(id) + " max expiration date: " + Time.formatTime(id.getMaxDate()) + " currentTime: " + Time.formatTime(now)); } if ((id.getRenewer() == null) || (id.getRenewer().toString().isEmpty())) { throw new AccessControlException( renewer + " tried to renew a token " + formatTokenId(id) + " without a renewer"); } if (!id.getRenewer().toString().equals(renewer)) { throw new AccessControlException( renewer + " tries to renew a token " + formatTokenId(id) + " with non-matching renewer " + id.getRenewer()); } DelegationKey key = getDelegationKey(id.getMasterKeyId()); if (key == null) { throw new InvalidToken( "Unable to find master key for keyId=" + id.getMasterKeyId() + " from cache. Failed to renew an unexpired token " + formatTokenId(id) + " with sequenceNumber=" + id.getSequenceNumber()); } byte[] password = createPassword(token.getIdentifier(), key.getKey()); if (!MessageDigest.isEqual(password, token.getPassword())) { throw new AccessControlException( renewer + " is trying to renew a token " + formatTokenId(id) + " with wrong password"); } long renewTime = Math.min(id.getMaxDate(), now + tokenRenewInterval); String trackingId = getTrackingIdIfEnabled(id); DelegationTokenInformation info = new DelegationTokenInformation(renewTime, password, trackingId); if (getTokenInfo(id) == null) { throw new InvalidToken( "Renewal request for unknown token " + formatTokenId(id)); } updateToken(id, info); return renewTime; }
3)token达到最大生命周期的处理
在定时器中,会捕获更新抛出的异常,并直接移除失效的token。
但是注意:在每次更新之前,会按需重新申请新的delegation token(后面再展开讲解)
public void run() { if (cancelled.get()) { return; } Token<?> token = dttr.token; try { // 先判断是否需要申请新的token requestNewHdfsDelegationTokenIfNeeded(dttr); // if the token is not replaced by a new token, renew the token if (!dttr.isTimerCancelled()) { renewToken(dttr); setTimerForTokenRenewal(dttr);// set the next one } else { LOG.info("The token was removed already. Token = [" + dttr + "]"); } } catch (Exception e) { LOG.error("Exception renewing token" + token + ". Not rescheduled", e); removeFailedDelegationToken(dttr); } }
【问题分析】
来看看前面问题失败的相关日志,复盘分析下。
首先从NM的日志中发现任务在重试时,因为无法下载资源(到本地)导致无法启动任务,而下载资源失败的原因则是因为无效的token。
2022-07-18 13:44:18,665 WARN org.apache.hadoop.ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 4361 for hncscwc) can't be found in cache 2022-07-18 13:44:18,669 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService: { hdfs://hdfsHACluster/user/hncscwc/.flink/application_1637733238080_3800/application_1637733238080_38002636034628721129021.tmp, 1656925873322, FILE, null } failed: token (HDFS_DELEGATION_TOKEN token 4361 for hncscwc) can't be found in cache org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 4361 for hncscwc) can't be found in cache at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1486) at org.apache.hadoop.ipc.Client.call(Client.java:1432) at org.apache.hadoop.ipc.Client.call(Client.java:1342) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) at com.sun.proxy.$Proxy15.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:796) at sun.reflect.GeneratedMethodAccessor172.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:411) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:348) at com.sun.proxy.$Proxy16.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1649) at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1440) at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1437) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1452) at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253) at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63) at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361) at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1922) at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:359) at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
为什么会出现无效的token,接着再看RM的日志。
2022-07-04 17:11:13,400 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler: Application 'application_1637733238080_3800' is submitted without priority hence considering default queue/cluster priority: 0 2022-07-04 17:11:13,424 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657012273422; apps=[application_1637733238080_3800]] 2022-07-05 14:47:13,462 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657090033446; apps=[application_1637733238080_3800]] 2022-07-06 12:23:13,467 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657167793465; apps=[application_1637733238080_3800]] 2022-07-07 09:59:13,487 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657245553484; apps=[application_1637733238080_3800]] 2022-07-08 07:35:13,532 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657323313511; apps=[application_1637733238080_3800]] 2022-07-09 05:11:13,551 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657401073532; apps=[application_1637733238080_3800]] 2022-07-10 02:47:13,564 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657478833547; apps=[application_1637733238080_3800]] 2022-07-11 00:23:13,591 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800]] 2022-07-11 17:11:07,361 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800]] 2022-07-11 17:11:07,361 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renew Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800] in 6032 ms, appId = [application_1637733238080_3800] 2022-07-11 17:11:12,793 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800]] 2022-07-11 17:11:12,793 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renew Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800] in 600 ms, appId = [application_1637733238080_3800] 2022-07-11 17:11:13,337 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800]] 2022-07-11 17:11:13,337 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renew Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800] in 56 ms, appId = [application_1637733238080_3800] 2022-07-11 17:11:13,391 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800]] 2022-07-11 17:11:13,391 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renew Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800] in 2 ms, appId = [application_1637733238080_3800] 2022-07-11 17:11:13,398 ERROR org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Exception renewing tokenKind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc). Not rescheduled org.apache.hadoop.security.token.SecretManager$InvalidToken: hadoop tried to renew an expired token (HDFS_DELEGATION_TOKEN token 4361 for hncscwc) max expiration date: 2022-07-11 17:11:13,393+0800 currentTime: 2022-07-11 17:11:13,394+0800 at org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.renewToken(AbstractDelegationTokenSecretManager.java:499) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renewDelegationToken(FSNamesystem.java:5952) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.renewDelegationToken(NameNodeRpcServer.java:675) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.renewDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1035) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:850) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:793) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1922) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2489) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121) at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88) at org.apache.hadoop.hdfs.DFSClient$Renewer.renew(DFSClient.java:761) at org.apache.hadoop.security.token.Token.renew(Token.java:458) at org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer$1.run(DelegationTokenRenewer.java:601) at org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer$1.run(DelegationTokenRenewer.java:598) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1922) at org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer.renewToken(DelegationTokenRenewer.java:597) at org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer$RenewalTimerTask.run(DelegationTokenRenewer.java:531) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): hadoop tried to renew an expired token (HDFS_DELEGATION_TOKEN token 4361 for hncscwc) max expiration date: 2022-07-11 17:11:13,393+0800 currentTime: 2022-07-11 17:11:13,394+0800 at org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.renewToken(AbstractDelegationTokenSecretManager.java:499) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renewDelegationToken(FSNamesystem.java:5952) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.renewDelegationToken(NameNodeRpcServer.java:675) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.renewDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1035) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:850) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:793) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1922) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2489) at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1486) at org.apache.hadoop.ipc.Client.call(Client.java:1432) at org.apache.hadoop.ipc.Client.call(Client.java:1342) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) at com.sun.proxy.$Proxy94.renewDelegationToken(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.renewDelegationToken(ClientNamenodeProtocolTranslatorPB.java:964) at sun.reflect.GeneratedMethodAccessor277.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:411) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:348) at com.sun.proxy.$Proxy95.renewDelegationToken(Unknown Source) at org.apache.hadoop.hdfs.DFSClient$Renewer.renew(DFSClient.java:759) ... 10 more 2022-07-11 17:11:13,399 ERROR org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: removing failed delegation token for appid=[application_1637733238080_3800];t=ha-hdfs:hdfsHACluster
从上面的日志可以看到,任务从提交后,delegation token每天都有在更新,然而运行到第7天后,更新失败而失效。失效后,NN内部会删除无效的token,此时如果任务失败需要重试,或者任务结束需要进行日志聚合,都会继续使用该无效的token来操作hdfs,最终结果就是在NN中找不到对应的token而抛异常导致失败。
【问题解决】
要解决该问题,一种最简单直接的办法就是加大delegation token的最大生命周期时间。
但一开始觉得该办法略有些low,尤其对于flink长周期运行的实时任务的场景,是无法确定任务的运行时长的,因此也就无法确定设置token的最大生命周期。
因此,再次分析了源码,发现RM中对于将要过期(超过最大生命周期)的delegation token,会按需重新申请一个新的token,也就是定时器线程中token更新之前的requestNewHdfsDelegationTokenIfNeeded方法。
来看看具体的实现逻辑:
private void requestNewHdfsDelegationTokenIfNeeded( final DelegationTokenToRenew dttr) throws IOException, InterruptedException { // 拥有特权 并且 token类型为委派token 并且 快到最大生命周期 if (hasProxyUserPrivileges && dttr.maxDate - dttr.expirationDate < credentialsValidTimeRemaining && dttr.token.getKind().equals(HDFS_DELEGATION_KIND)) { final Collection<ApplicationId> applicationIds; synchronized (dttr.referringAppIds) { applicationIds = new HashSet<>(dttr.referringAppIds); dttr.referringAppIds.clear(); } // remove all old expiring hdfs tokens for this application. for (ApplicationId appId : applicationIds) { Set<DelegationTokenToRenew> tokenSet = appTokens.get(appId); if (tokenSet == null || tokenSet.isEmpty()) { continue; } Iterator<DelegationTokenToRenew> iter = tokenSet.iterator(); synchronized (tokenSet) { while (iter.hasNext()) { DelegationTokenToRenew t = iter.next(); if (t.token.getKind().equals(HDFS_DELEGATION_KIND)) { iter.remove(); allTokens.remove(t.token); t.cancelTimer(); LOG.info("Removed expiring token " + t); } } } } LOG.info("Token= (" + dttr + ") is expiring, request new token."); requestNewHdfsDelegationTokenAsProxyUser( applicationIds, dttr.user, dttr.shouldCancelAtEnd); } }
申请到新的token之后,会在RM内部进行更新,然后通过NM的心跳响应同步给NM。
private void requestNewHdfsDelegationTokenAsProxyUser( ... // Get new hdfs tokens for this user Credentials credentials = new Credentials(); Token<?>[] newTokens = obtainSystemTokensForUser(user, credentials); DataOutputBuffer dob = new DataOutputBuffer(); credentials.writeTokenStorageToStream(dob); ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); for (ApplicationId applicationId : referringAppIds) { // 更新app的delegation token // 在NM心跳时进行同步 rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer); } } public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { ... ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials = rmContext.getSystemCredentialsForApps(); if (!systemCredentials.isEmpty()) { nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials); } ... }
NM在心跳响应中解析出token并在内存中更新保存,后续任务重试启动资源本地化和任务结束触发日志聚合时会使用到。
注意:这里只提到了资源本地化和日志聚合时会使用到更新后的token,那么正在运行的任务会用到更新后的token吗?
答案是不会(至少是2.X版本不会)。主要是因为:token已经写入到持久化文件中,任务启动时读取该文件获取token并使用;delegation token在更新后没有写入到持久化文件中,即使可以写入(更新)到该文件,也需要有机制通知任务进程更新读取该文件才行。因此正在运行中的任务在token过期后继续操作hdfs仍旧会抛出异常。
另外,在3.X的最新版本中,注意到有相关代码的改动,应该是通知正在运行的container,但具体细节还未深入研究,后面有时间再调研。
【相关配置】
与delegation token相关的配置包括:
配置项名称 | 默认值 | 说明 |
dfs.namenode.delegation.key.update-interval | 1天 | token更新密钥的时间间隔 |
dfs.namenode.delegation.token.renew-interval | 1天 | token更新的时间间隔 |
dfs.namenode.delegation.token.max-lifetime | 7天 | token的最大生命周期 |
yarn.resourcemanager.delegation-token.alwys-cancel | false | RM结束时是否需要移除token |
yarn.resourcemanager.proxy-user-privileges.enabled | false | 是否开启特权在delegation token快过期时重新申请新的token |
yarn.resourcemanager.system-credentials.valid-time-remaining | 10800000 | 距离最大生命周期之前多长时间进行重新申请token的操作,单位毫秒 |
yarn.resourcemanager.delegation-token-renewer.thread-count | 50 | RM中delegation token更新线程的线程数 |
【总结】
本文通过一个实际的问题,并结合源码讲解了hadoop的delegation token的相关原理。
文中如有不对的地方,欢迎拍砖指正。
好了,这就是本文的全部内容,如果觉得本文对您有帮助,不要吝啬点赞在看转发,也欢迎加我微信交流~
本文分享自微信公众号 – hncscwc(gh_383bc7486c1a)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。