问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

kafka发送消息的时候报超时,有人遇到过吗

发布网友 发布时间:2022-03-29 21:28

我来回答

1个回答

热心网友 时间:2022-03-29 22:57

Kafka是由LinkedIn设计的一个高吞吐量、分布式、基于发布订阅模式的消息系统,使用Scala编写,它以可水平扩展、可靠性、异步通信和高吞吐率等特性而被广泛使用。目前越来越多的开源分布式处理系统都支持与Kafka集成,其中SparkStreaming作为后端流引擎配合Kafka作为前端消息系统正成为当前流处理系统的主流架构之一。然而,当下越来越多的安全漏洞、数据泄露等问题的爆发,安全正成为系统选型不得不考虑的问题,Kafka由于其安全机制的匮乏,也导致其在数据敏感行业的部署存在严重的安全隐患。本文将围绕Kafka,先介绍其整体架构和关键概念,再深入分析其架构之中存在的安全问题,最后分享下Transwarp在Kafka安全性上所做的工作及其使用方法。Kafka架构与安全首先,我们来了解下有关Kafka的几个基本概念:Topic:Kafka把接收的消息按种类划分,每个种类都称之为Topic,由唯一的TopicName标识。Procer:向Topic发布消息的进程称为Procer。Consumer:从Topic订阅消息的进程称为Consumer。Broker:Kafka集群包含一个或多个服务器,这种服务器被称为Broker。Kafka的整体架构如下图所示,典型的Kafka集群包含一组发布消息的Procer,一组管理Topic的Broker,和一组订阅消息的Consumer。Topic可以有多个分区,每个分区只存储于一个Broker。Procer可以按照一定的策略将消息划分给指定的分区,如简单的轮询各个分区或者按照特定字段的Hash值指定分区。Broker需要通过ZooKeeper记录集群的所有Broker、选举分区的Leader,记录Consumer的消费消息的偏移量,以及在ConsumerGroup发生变化时进行relalance.Broker接收和发送消息是被动的:由Procer主动发送消息,Consumer主动拉取消息。然而,分析Kafka框架,我们会发现以下严重的安全问题:1.网络中的任何一台主机,都可以通过启动Broker进程而加入Kafka集群,能够接收Procer的消息,能够篡改消息并发送给Consumer。2.网络中的任何一台主机,都可以启动恶意的Procer/Consumer连接到Broker,发送非法消息或拉取隐私消息数据。3.Broker不支持连接到启用Kerberos认证的ZooKeeper集群,没有对存放在ZooKeeper上的数据设置权限。任意用户都能够直接访问ZooKeeper集群,对这些数据进行修改或删除。4.Kafka中的Topic不支持设置访问控制列表,任意连接到Kafka集群的Consumer(或Procer)都能对任意Topic读取(或发送)消息。随着Kafka应用场景越来越广泛,特别是一些数据隐私程度较高的领域(如道路交通的视频监控),上述安全问题的存在犹如一颗定时*,一旦内网被黑客入侵或者内部出现恶意用户,所有的隐私数据(如车辆出行记录)都能够轻易地被窃取,而无需攻破Broker所在的服务器。Kafka安全设计基于上述分析,Transwarp从以下两个方面增强Kafka的安全性:身份认证(Authentication):设计并实现了基于Kerberos和基于IP的两种身份认证机制。前者为强身份认证,相比于后者具有更好的安全性,后者适用于IP地址可信的网络环境,相比于前者部署更为简便。权限控制(Authorization):设计并实现了Topic级别的权限模型。Topic的权限分为READ(从Topic拉取数据)、WRITE(向Topic中生产数据)、CREATE(创建Topic)和DELETE(删除Topic)。基于Kerberos的身份机制如下图所示:Broker启动时,需要使用配置文件中的身份和密钥文件向KDC(Kerberos服务器)认证,认证通过则加入Kafka集群,否则报错退出。Procer(或Consumer)启动后需要经过如下步骤与Broker建立安全的Socket连接:1.Procer向KDC认证身份,通过则得到TGT(票证请求票证),否则报错退出2.Procer使用TGT向KDC请求Kafka服务,KDC验证TGT并向Procer返回SessionKey(会话密钥)和ServiceTicket(服务票证)3.Procer使用SessionKey和ServiceTicket与Broker建立连接,Broker使用自身的密钥解密ServiceTicket,获得与Procer通信的SessionKey,然后使用SessionKey验证Procer的身份,通过则建立连接,否则拒绝连接。ZooKeeper需要启用Kerberos认证模式,保证Broker或Consumer与其的连接是安全的。Topic的访问控制列表(ACL)存储于ZooKeeper中,存储节点的路径为/acl//,节点数据为R(ead)、W(rite)、C(reate)、D(elete)权限的集合,如/acl/transaction/jack节点的数据为RW,则表示用户jack能够对transaction这个topic进行读和写。另外,kafka为特权用户,只有kafka用户能够赋予/取消权限。因此,ACL相关的ZooKeeper节点权限为kafka具有所有权限,其他用户不具有任何权限。构建安全的Kafka服务首先,我们为Broker启用Kerberos认证模式,配置文件为/etc/kafka/conf/server.properties,安全相关的参数如下所示:其中,authentication参数表示认证模式,可选配置项为simple,kerberos和ipaddress,默认为simple。当认证模式为kerberos时,需要额外配置账户属性principal和对应的密钥文件路径keytab.认证模式为ipaddress时,Procer和Consumer创建时不需要做任何改变。而认证模式为kerberos时,需要预先创建好相应的principal和keytab,并使用API进行登录,样例代码如下所示:publicclassSecureProcerextendsThread{privatefinalkafka.javaapi.procer.Procerprocer;privatefinalStringtopic;privatefinalPropertiesprops=newProperties();publicSecureProcer(Stringtopic){AuthenticationManager.setAuthMethod(“kerberos”);AuthenticationManager.login(“procer1″,“/etc/procer1.keytab”);props.put(“serializer.class”,“kafka.serializer.StringEncoder”);props.put(“metadata.broker.list”,“172.16.1.190:9092,172.16.1.192:9092,172.16.1.193:9092″);//Userandompartitioner.Don’tneedthekeytype.JustsetittoInteger.//ThemessageisoftypeString.procer=newkafka.javaapi.procer.Procer(newProcerConfig(props));this.topic=topic;}...Topic权限管理Topic的权限管理主要是通过AuthorizationManager这个类来完成的,其类结构如下图所示:其中,resetPermission(user,Permissions,topic)为重置user对topic的权限。grant(user,Permissions,topic)为赋予user对topic权限。revoke(user,Permissions,topic)为取消user对topic权限。isPermitted(user,Permissions,topic)为检查user对topic是否具有指定权限。调用grant或revoke进行权限设置完成后,需要commit命令提交修改到ZooKeeperKerberos模式下,AuthorizationManager需要先使用AuthenticationManager.login方法登录,与ZooKeeper建立安全的连接,再进行权限设置。示例代码如下所示:publicclassAuthzTest{publicstaticvoidmain(String[]args){Propertiesprops=newProperties();props.setProperty(“authentication”,“kerberos”);props.setProperty(“zookeeper.connect”,“172.16.2.116:2181,172.16.2.117:2181,172.16.2.118:2181″);props.setProperty(“principal”,“kafka/host1@TDH”);props.setProperty(“keytab”,“/usr/lib/kafka/config/kafka.keytab”);ZKConfigconfig=newZKConfig(props);AuthenticationManager.setAuthMethod(config.authentication());AuthenticationManager.login(config.principal(),config.keytab());AuthorizationManagerauthzManager=newAuthorizationManager(config);//resetpermissionREADandWRITEtoip172.16.1.87ontopictestauthzManager.resetPermission(“172.16.1.87″,newPermissions(Permissions.READ,Permissions.WRITE),“test”);//grantpermissionWRITEtoip172.16.1.87ontopictestauthzManager.grant(“172.16.1.87″,newPermissions(Permissions.CREATE),“test”);//revokepermissionREADfromip172.16.1.87ontopictestauthzManager.revoke(“172.16.1.87″,newPermissions(Permissions.READ),“test”);//committhepermissionsettingsauthzManager.commit();authzManager.close();}}ipaddress认证模式下,取消和赋予权限的操作如下所示:publicclassAuthzTest{publicstaticvoidmain(String[]args){Propertiesprops=newProperties();props.setProperty(“authentication”,“ipaddress”);props.setProperty(“zookeeper.connect”,“172.16.1.87:2181,172.16.1.88:2181,172.16.1.89:2181″);ZKConfigconfig=newZKConfig(props);//newauthorizationmanagerAuthorizationManagerauthzManager=newAuthorizationManager(config);//resetpermissionREADandWRITEtoip172.16.1.87ontopictestauthzManager.resetPermission(“172.16.1.87″,newPermissions(Permissions.READ,Permissions.WRITE),“test”);//grantpermissionWRITEtoip172.16.1.87ontopictestauthzManager.grant(“172.16.1.87″,newPermissions(Permissions.CREATE),“test”);//revokepermissionREADfromip172.16.1.87ontopictestauthzManager.revoke(“172.16.1.87″,newPermissions(Permissions.READ),“test”);//committhepermissionsettingsauthzManager.commit();authzManager.close();}}总结与展望本文通过介绍Kafka现有架构,深入挖掘其中存在的安全问题,并给出Transwarp在Kafka安全上所做的工作及其使用方式。然而,纵观Hadoop&Spark生态系统,安全功能还存在很多问题,各组件的权限系*立混乱,缺少集中易用的账户管理系统。某些组件的权限管理还很不成熟,如Spark的调度器缺少用户的概念,不能*具体用户使用资源的多少。Transwarp基于开源版本,在安全方面已有相当多的积累,并持续改进开发,致力于为企业用户提供一个易用、高效、安全和稳定的基础数据平台。
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
大伙说说洗衣机要不要带烘干好 热烘干洗衣机怎么样 ef英语哪个好 EF英孚英语培训怎么样? 英孚英语好不好 EF英孚教育到底好不好 大佬们,麦芒7和荣耀10那个值得入手?2500以下的机子还有啥好推荐的么... 介绍几款2500元以前的手机 像素一定要高 其他的不做要求 近期想入手一部安卓手机,价格2200到2500左右…买HTC desire Z还是 三星... 笔记本忘记开机密码怎么办急死了 海底捞大学生69折优惠券领了为什么没有到账? 海底捞大学生优惠是可以截图付款码然后发给朋友帮... 海底捞学生证69折使用方法是什么? 海底捞69折需要本人付款吗 海底捞学生优惠券领了之后放在哪里 大学生领取海底捞6.9折优惠该怎样操作? 海底捞大学生69折是买单时间还是点餐? 海底捞大学生69折怎么用 海底捞大学生69折怎么用? 海底捞研究生打折规定 怎么折海底捞大学生69折? 研究生吃海底捞有69折吗? 海底捞学生69折怎么用? 支付宝海底捞69折券领了以后在哪里 梦见到男朋友家和男朋友睡被他妈妈撞到了。说明什么? 梦见带前男友回家睡觉被家人发现,连去世好几年的... 我和男朋友分手了,但今天做梦梦到他去我家和我睡... 经常梦见男朋友睡在一起,被父母发现,准备订婚,彩礼... 梦见去男朋友家 我昨天晚上梦见我去我男朋友家了,在他家睡了一夜... kafka 问题 如何解决啊,求助 微软的word怎么使用 word怎么设计 word怎么打开,在哪里打开啊???? word怎么快速编辑格式 word怎么输入→ word怎么输入∠ word怎么打开格式 2021快手如何隐藏快手号 怎么样把快手主页的账号弄没? 你好我手机快手号在桌面显示不了怎么回事? 快手主页不要显示直播怎么弄 快手用户设置不可见是什么 别人还能搜到我的快手号 但是主页介绍那些不见了 ... 快手主页没有昵称只是显示 快手用户1644984426131... 个人主页不想显示开播时间 快手账号出现异常请到个人主页自主解除 快手显示当前用户己注消,去首页看看吧是怎么回事? 我的快手是最新版本了但是还是不显示主页访客怎么办 为什么有些快手号有访客记录有些快手号没有为什么...