• Spark连接ES实现kerberos认证


    1、jar包

    1. org.apache.spark
    2. spark-core_${scala.version}
    3. ${spark.version}
    4. org.apache.spark
    5. spark-sql_${scala.version}
    6. ${spark.version}
    7. org.elasticsearch
    8. elasticsearch-hadoop
    9. 6.8.21
    10. org.apache.hadoop
    11. hadoop-common
    12. 2.8.3

    2、代码

    1. package study
    2. import org.apache.spark.SparkConf
    3. import org.apache.spark.sql.types._
    4. import org.apache.spark.sql.{Row, SaveMode, SparkSession}
    5. import org.elasticsearch.spark.sql.DefaultSource15
    6. import scala.collection.mutable
    7. object SparkToEs {
    8. def main(args: Array[String]): Unit = {
    9. initEsProperties(false)
    10. }
    11. def initEsProperties(useKerberos: Boolean): Unit = {
    12. // 构建spark上下文
    13. val conf = new SparkConf().setAppName("df").setMaster("local[2]")
    14. val spark = SparkSession.builder()
    15. .config(conf)
    16. .getOrCreate()
    17. val DOC_ID = "a1"
    18. val esConfig = new mutable.HashMap[String, String]
    19. var host = ""
    20. esConfig += ("es.resource" -> "cool_es_test/cool_es_test_tb")
    21. esConfig += ("es.write.operation" -> "index")
    22. val httpUrls = "http://192.168.1.151:9200"
    23. if (httpUrls.contains(",")) {
    24. esConfig += ("es.nodes.wan.only" -> "true")
    25. esConfig += ("es.nodes" -> httpUrls)
    26. host = httpUrls.split(":")(1).substring(2)
    27. } else {
    28. esConfig += ("es.nodes.wan.only" -> "true")
    29. esConfig += ("es.nodes" -> httpUrls.split(":")(1).substring(2))
    30. esConfig += ("es.port" -> httpUrls.split(":")(2).replace("/", ""))
    31. host = httpUrls.split(":")(1).substring(2)
    32. }
    33. esConfig += ("es.mapping.id" -> DOC_ID)
    34. if (useKerberos) {
    35. val KEYTAB = "D:\\workspace\\demo\\src\\main\\resources\\kerberos\\XXXXX.keytab"
    36. val PRINCIPAL = "XXXXX"
    37. val KRB5CONF = "D:\\workspace\\demo\\src\\main\\resources\\kerberos\\krb5.conf"
    38. var es_principal = "HTTP/" + host + "@EXAMPLE.COM"
    39. esConfig += ("es.security.authencation" -> "kerberos")
    40. esConfig += ("es.net.spnego.auth.elasticsearch.principal" -> es_principal)
    41. esConfig += ("es.security.user.provider.class" -> "com.cool.kerberos.OHadoopUserProvider")
    42. esConfig += ("es.spark.principal" -> PRINCIPAL)
    43. esConfig += ("es.spark.keytab" -> KEYTAB)
    44. spark.sparkContext.addFile(KEYTAB)
    45. System.setProperty("http.auth.preference", "Kerberos")
    46. System.setProperty("java.security.krb5.conf", KRB5CONF)
    47. System.setProperty("sun.security.krb5.debug", "false")
    48. System.setProperty("sun.security.spnego.debug", "false")
    49. }
    50. val ds: DefaultSource15 = new DefaultSource15
    51. val someData = Seq(Row("2022-06-23 11:", 6, 10.6, "2022-10-27", "13:11:22", "2021-10-27T14:33:33"))
    52. val someSchema = List(
    53. StructField("a1", StringType, true),
    54. StructField("a2", IntegerType, true),
    55. StructField("a3", DoubleType, true),
    56. StructField("a4", StringType, true),
    57. StructField("a5", StringType, true),
    58. StructField("a6", StringType, true)
    59. )
    60. val df = spark.createDataFrame(spark.sparkContext.parallelize(someData), StructType(someSchema))
    61. df.show()
    62. val writeModeMap = Map("append" -> SaveMode.Append, "overwrite" -> SaveMode.Overwrite)
    63. ds.createRelation(spark.sqlContext, writeModeMap.getOrElse("append", SaveMode.Append), esConfig.toMap, df)
    64. }
    65. }

    3、spark连接es,需要保证每个executor都有对应凭证连接ES 

    1. package com.cool.kerberos;
    2. import org.apache.hadoop.conf.Configuration;
    3. import org.apache.hadoop.security.UserGroupInformation;
    4. import org.apache.spark.SparkFiles;
    5. import org.elasticsearch.hadoop.EsHadoopException;
    6. import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
    7. import org.elasticsearch.hadoop.cfg.Settings;
    8. import org.elasticsearch.hadoop.mr.security.HadoopUser;
    9. import org.elasticsearch.hadoop.security.User;
    10. import org.elasticsearch.hadoop.security.UserProvider;
    11. import java.io.File;
    12. import java.io.IOException;
    13. /**
    14. * 重写了HadoopUserProvider类
    15. *
    16. * 修改内容: 在获取ugi的时候,判断是否登录,如果没有就自动登录,并返回ugi,这样在每个executor节点中都会自动登录
    17. * 用法: esConifg中配置 es.security.user.provider.class
    18. * esConfig.put(ConfigurationOptions.ES_SECURITY_USER_PROVIDER_CLASS -> "com.cool.kerberos.KHadoopUserProvider"
    19. *
    20. *
    21. */
    22. public class OHadoopUserProvider extends UserProvider {
    23. public static final String SPARK_ES_PRINCIPAL = "es.spark.principal";
    24. public static final String SPARK_ES_KEYTAB = "es.spark.keytab";
    25. public KHadoopUserProvider() {
    26. super();
    27. }
    28. public static KHadoopUserProvider create(Settings settings) {
    29. KHadoopUserProvider provider = new KHadoopUserProvider();
    30. provider.setSettings(settings);
    31. return provider;
    32. }
    33. @Override
    34. public User getUser() {
    35. try {
    36. UserGroupInformation ugi = UserGroupInformation.getLoginUser();
    37. if (ugi == null || !ugi.hasKerberosCredentials()) {
    38. ugi = doLogin(settings);
    39. }
    40. return new HadoopUser(ugi, getSettings());
    41. } catch (IOException e) {
    42. throw new EsHadoopException("Could not retrieve the current user", e);
    43. }
    44. }
    45. private UserGroupInformation doLogin(Settings settings) throws IOException {
    46. Configuration krb_conf = new Configuration();
    47. krb_conf.set("hadoop.security.authentication", "kerberos");
    48. krb_conf.set(ConfigurationOptions.ES_SECURITY_AUTHENTICATION, "kerberos");
    49. String keytab = settings.getProperty(SPARK_ES_KEYTAB);
    50. String principal = settings.getProperty(SPARK_ES_PRINCIPAL);
    51. System.out.println("keytab: " + keytab + "; principal: " + principal);
    52. if (principal == null || keytab == null) {
    53. throw new RuntimeException("principal or keytabPath 参数不存在, 请配置 " + SPARK_ES_KEYTAB + " and " + SPARK_ES_PRINCIPAL);
    54. }
    55. String keytabPath = SparkFiles.get(keytab);
    56. // 判断文件是否存在
    57. if (!new File(keytabPath).exists()) {
    58. throw new RuntimeException("executor端登陆失败, keytab文件(" + keytabPath + ")不存在, 请通过sparkContext.addFile将文件添加入executor节点");
    59. }
    60. System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
    61. if (!new File("/etc/krb5.conf").exists()) {
    62. throw new RuntimeException("executor登录失败, /etc/krb5.conf 文件不存在");
    63. }
    64. UserGroupInformation.setConfiguration(krb_conf);
    65. UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytabPath);
    66. UserGroupInformation.setLoginUser(ugi);
    67. return ugi;
    68. }
    69. }

    4、kerberos认证过程中出现的问题以及处理方案

    问题1:

    Caused by: GSSException: No valid credentials provided (Mechanism level: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)

    解决1:_HOST 写死成ES 主机名,局限是:host 只能配置一台ip

    问题2:

    1. org.elasticsearch.hadoop.rest.EsHadooptransportException : Missing required negotiation token
    2. org.elasticsearch.hadoop.rest.EsHadoopNodesLeftException: Connection error (check network and/or proxy settings) - all nodes failed; tried[[hostname]]

    解决2: 使用官方推荐配置,HTTP/_HOST@REALM.NAME.HERE

     问题3:时钟同步问题;Java连接ES有时候也会出现同样错误

    1. Caused by: org.elasticsearch.client.ResponseException: method [POST],host[http://test.com:9200],URI[/_bulk?timeout=1m],status line[HTTP/1.1 401 Unauthorized]
    2. Caused by: ElasticsearchException[Elasticsearch exception [type=g_s_s_excpetion,reason=Failure unspecified at GSS-API level (Mechanism level: Request is a replay (34))]]; nested: ElasticsearchException[Elasticsearch exception [type=krb_ap_err_exception, reason=Request is a replay (34)]]

    解决3: es的JVM 增加一个参数 -Dsun.security.krb5.rcache=none

    5、参考文章

    Kerberos | Elasticsearch for Apache Hadoop [master] | Elastic

    https://community.cloudera.com/t5/Support-Questions/Solr-quot-Request-is-a-replay-quot-Ambari-Infra-Solr-2-5/m-p/212870

    https://web.mit.edu/kerberos/krb5-devel/doc/basic/rcache_def.html

    http://www.srcmini.com/20576.html

  • 相关阅读:
    Java开发学习----Spring事务简介与事务角色解析
    【数据结构】Java对象的比较
    2034. 股票价格波动
    CSAPP实验记录(1)--------- DataLab
    2022年0902Maven的继承和利用Idea创建Maven工程的内容<第五课>
    RK3588+FPGA高速图像处理通信处理机解决方案
    常用数据库之sqlite的使用
    Deno 命令行界面
    线程/进程/协程的区别以及多线程状态/多线程的统一异常处理
    Qt中的线程同步:确保多线程程序的安全性
  • 原文地址:https://blog.csdn.net/dkjhl/article/details/126871880