nifi简介
nifi背景 NiFi之前是在美国国家安全局(NSA)开发和使用了8年的一个可视化、可定制的数据集成产品。2014年NSA将其贡献给了Apache开源社区,2015年7月成功成为Apache顶级项目。NiFi概念
Apache NiFi 是一个易于使用、功能强大而且可靠的数据处理和分发系统。Apache NiFi 是为数据流设计,它支持高度可配置的指示图的数据路由、转换和系统中介逻辑,支持从多种数据源动态拉取数据。简单地说,NiFi是为自动化系统之间的数据流而生。 这里的数据流表示系统之间的自动化和受管理的信息流。 基于WEB图形界面,通过拖拽、连接、配置完成基于流程的编程,实现数据采集、处理等功能。数据流面临的挑战
1、系统故障网络故障
硬盘故障 软件宕机 人员操作失误 2、数据接入超出处理能力数据源的输出超出系统所能处理的能力
传递链出现问题,比如某一个弱连接处出现问题。 3、很少对边界做出界定数据量太大、太小、太快、太慢
数据损坏、错误、格式不对等 4、系统的调整系统地某一部分经常需要改动
需要快速的使用一个新的数据流,或快速调整现有的一个流。 5、系统以不同的速度发展给定系统使用的协议和格式可以随时改变,而不管其周围的系统如何。数据流存在的连接本质上是大量分散的组件系统,这些组件松散地或者不是全部设计在一起工作。
6、合规性和安全性法律法规和政策变化。企业对企业协议的变化。系统对系统和系统与用户的交互必须是安全,可靠,负责任的。
7、生产中持续改善 实验室通常不可能接近复制生产环境。 多年以来,数据流已经成为一种架构中必不可少的恶性循环之一。现在虽然有一些积极和快速发展的运动,使得数据流更有趣,对于给定企业的成功更为重要。这些包括像 面向服务的架构SOA ,API的兴起,物联网IOT ,和大数据bigdata 。此外,合规性,隐私性和安全性所需的严谨程度不断增加。即使仍然存在所有这些新概念,数据流的模式和需求仍然基本相同。主要的差异在于复杂性的范围,适应需要的变化率,并且在规模上,边缘情况变得普遍存在。NiFi旨在帮助解决这些现代数据流挑战。NiFi核心组件
FlowFile一个FlowFile表示一个对象,这个对象可以在系统之间进行移动,被传递。NIFI保存一个FlowFile的key-value的属性列表和这个FlowFile的字节内容。
FlowFileProcessor
Proccessor对FlowFile进行各种操作,它可以获取FlowFile的属性和内容,同时操作多个FlowFile,可以将操作结果提交给下一个Processor或者回滚。
Connection
它提供Processor之间连接。它的作用就像一个queue,各种processor和它进行交互。
FlowController
控制器,它记录processor之间的连接关系,管理线程,管理processor和线程之间的使用关系。是一个类似broker的角色。
Processgroup
它表示一组processor和它们之间的连接。对于这个group而言,它可以通过input端口和output端口,进行数据的接收和发送。
上述这些组件为nifi的工作带来了些许好处
适用于视觉创建和管理处理器的有向图 本质上是异步的,即使在处理和流量波动时也允许非常高的吞吐量和自然缓冲 提供高度并发的模型,而开发人员不必担心并发性的典型复杂性 促进发展粘性和松散耦合的部件,然后可以在其他情况下重复使用,并促进可测试的部件 资源受限的连接使关键功能(如背压和压力释放)非常自然和直观 错误处理变得与基本逻辑一样自然,而不是粗粒度的一网打尽 数据进出系统的点以及流程如何被很好的理解和易于跟踪 NiFi架构NiFi在主机操作系统上的JVM内执行。JVM上的NiFi的主要组件如下:
网络服务器
Web服务器的目的是托管NiFi的基于HTTP的命令和控制API。流控制器
流控制器是操作的大脑。它提供用于扩展程序运行的线程,并管理扩展程序接收资源以执行的时间表。扩展
有各种类型的NiFi扩展在其他文档中描述。这里的关键是扩展在JVM中运行和执行。FlowFile存储库
FlowFile存储库是NiFi跟踪目前在流程中活动的给定FlowFile的知识状态。存储库的实现是可插拔的。默认方法是位于指定磁盘分区上的持久写入前端日志。内容存储库
Content Repository是给定FlowFile的实际内容字节。存储库的实现是可插拔的。默认方法是一个相当简单的机制,它将数据块存储在文件系统中。可以指定多个文件系统存储位置,以便获得不同的物理分区,以减少任何单个卷上的争用。源头存储库
Provenance Repository是存储所有来源的事件数据的地方。存储库构造是可插入的,默认实现是使用一个或多个物理磁盘卷。在每个位置内,事件数据被索引和可搜索。NiFi还能够在集群内运行。
从NiFi 1.0版本开始,采用零主分类范例。NiFi集群中的每个节点对数据执行相同的任务,但是每个节点都在不同的数据集上进行操作。Apache ZooKeeper选择单个节点作为群集协调器,故障转移由ZooKeeper自动处理。所有群集节点向群集协调器报告心跳和状态信息。集群协调器负责断开连接节点。此外,每个群集都有一个主节点,也由ZooKeeper选择。作为DataFlow管理器,您可以通过任何节点的用户界面(UI)与NiFi集群进行交互。您所做的任何更改都会复制到群集中的所有节点,从而允许多个入口点。NiFi功能的高级概述
流量管理 保证交货NiFi的核心理念是,即使在非常高的规模,保证交付是必须的。这是通过有效使用专用的持久预写日志和内容存储库来实现的。它们一起设计成允许非常高的事务速率,有效的负载传播,写时复制以及发挥传统磁盘读/写功能的优势。
数据缓冲带背压和压力释放
NiFi支持对所有排队的数据进行缓冲,以及当队列达到指定限制时提供背压的能力,或者在数据达到指定年龄时使其老化(其值已经消失)的能力。
优先排队
NiFi允许设置一个或多个优先级排序方案来了解如何从队列中检索数据。默认值是最早的,但有时候数据应该被拉到最新,最大的第一个或其他一些自定义方案。
流特定QoS(延迟v吞吐量,丢失容限等)
数据流的一些点数据绝对关键,并且是不容忍的。还有一段时间,它必须在几秒钟内被处理和交付成为任何价值。NiFi使得细粒度流特定配置这些问题。
使用方便
视觉指挥与控制数据流可能变得相当复杂。能够可视化这些流程并在视觉上表达它们可以大大减少复杂性并确定需要简化的领域。NiFi不仅可以直观地建立数据流,而且可以实时地实现。而不是设计和部署它更像是成型粘土。如果对更改的数据流进行更改立即生效。更改是细粒度的,并且与受影响的组件隔离。您不需要停止整个流程或流程只是为了进行一些具体的修改。
流模板
数据流往往是高度模式化的,而通常有许多不同的方式来解决问题,它可以大大地分享这些最佳实践。模板允许主题专家构建和发布他们的流程设计,并为其他人创造和合作。
资料来源
NiFi自动记录,索引并提供可用的来源数据,因为对象即使在扇入,扇出,转换等过程中也可以流经系统。该信息在支持合规性,故障排除,优化和其他场景方面变得非常重要。
恢复/记录细粒历史的滚动缓冲区
NiFi的内容存储库旨在作为历史的滚动缓冲区。只有当数据从内容存储库中老化或者需要空间时才会被删除。这与数据来源功能相结合,使得在对象的生命周期中甚至跨越世代的特定点上实现点击内容,内容下载和重放非常有用的基础。
安全
系统到系统数据流只是安全的一样好。数据流中每一点的NiFi都可以通过使用诸如双向SSL等加密协议提供安全交换。此外,NiFi使得流可以加密和解密内容,并使用发件人/收件人方程的任一侧上的共享密钥或其他机制。
用户到系统
NiFi支持双向SSL身份验证,并提供可插拔授权,从而可以正确控制用户的访问和特定级别(只读,数据流管理器,管理员)。如果用户在流程中输入密码等敏感属性,则立即加密服务器端,即使在加密形式下也不会再次暴露在客户端。
多租户授权
给定数据流的权限级别适用于每个组件,允许管理员用户具有细粒度的访问控制。这意味着每个NiFi集群都能够处理一个或多个组织的要求。与独立拓扑相比,多租户授权可实现数据流管理的自助服务模式,从而允许每个团队或组织对流程进行管理,同时充分了解流程的其他部分,无法访问。
可扩展架构
延期NiFi的核心是扩展的核心,因此它是数据流处理可以以可预测和可重复的方式执行和交互的平台。扩展点包括:处理器,控制器服务,报告任务,优先级和客户用户界面。
分类器隔离
对于任何基于组件的系统,可能会迅速发生依赖问题。NiFi通过提供自定义类加载器模型来解决这个问题,确保每个扩展捆绑包都暴露在非常有限的依赖关系中。因此,可以构建扩展,而不用担心它们是否可能与另一个扩展冲突。这些扩展束的概念称为NiFi Archives,并在开发人员指南中有更详细的讨论。
站点到站点通信协议
NiFi实例之间的首选通信协议是NiFi站点到站点(S2S)协议。S2S可以方便,高效,安全地将数据从一个NiFi实例传输到另一个。NiFi客户端库可以轻松构建并捆绑到其他应用程序或设备中,以通过S2S与NiFi通信。S2S中都支持基于套接字的协议和HTTP(S)协议作为底层传输协议,从而可以将代理服务器嵌入到S2S通信中。
灵活的缩放模型
横向扩展(聚类)NiFi旨在通过如上所述将多个节点聚类在一起使用来展开。如果单个节点被配置并配置为每秒处理数百MB,则可以配置适度的集群来处理每秒的GB数。这将带来NiFi与获取数据的系统之间的负载平衡和故障转移的有趣挑战。使用基于异步排队的协议(如消息传递服务,Kafka等)可以帮助您。使用NiFi的站点到站点功能也非常有效,因为它是允许NiFi和客户端(包括另一个NiFi集群)相互通话,共享关于加载的信息以及在特定授权端口上交换数据的协议。
放大和缩小
NiFi也被设计成以非常灵活的方式进行放大和缩小。在从NiFi框架的角度增加吞吐量方面,可以在配置时增加“计划”选项卡下的处理器上的并发任务数量。这允许更多的进程同时执行,提供更大的吞吐量。另一方面,您可以将NiFi完美地缩放到适合于在硬件资源有限的边缘设备上运行,因为需要较小的占用空间。为了专门解决第一个英里数据收集挑战和边缘用例。
nifi RecordPath 的使用
nifi提供了对各种数据源的集成、处理、路由、转换以及传送,而且数据来源多种多样。我们熟悉的数据格式有json、csv、avro等,也有一些数据可以看作是”records” 或 “messages,针对这些记录型的数据nifi提供了RecordReader API来处理这些数据,这些功能在工作中我并未用到,这里是读到了文档故而记录在这里。 RecordPath可处理的数据类型如下: String、Boolean、Byte、Character、Short、Integer、Long、BigInt、Float、Double Date - Represents a Date without a Time component Time - Represents a Time of Day without a Date component Timestamp - Represents a Date and Time Embedded Record - Hierarchical data, such as JSON, can be represented by allowing a field to be of Type Record itself. Choice - A field may be any one of several types. Array - All elements of an array have the same type. Map - All Map Keys are of type String. The Values are of the same type. 其中数据以序列化成流的方式在不同的系统中进行传送。RecordPath 的的结构
外部record(ancestor)包含内部record(child),例如一个json结构的record:{
"name": "John Doe", "workAddress": { "number": "123", "street": "5th Avenue", "city": "New York", "state": "NY", "zip": "10020" }, "homeAddress": { "number": "456", "street": "116th Avenue", "city": "New York", "state": "NY", "zip": "11697" } } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 子操作符(//) 取zip:/workAddress/zip
1 Descendant Operator(//) 取zip://zip
1 此操作将会取出所有的zip值过滤器( [ ] )
/person[ isEmpty('name') ]/id 1 此语句将会检索出名字为空的人的id取数组的值(Arrays)
{ "name": "John Doe", "addresses": [ "work": { "number": "123", "street": "5th Avenue", "city": "New York", "state": "NY", "zip": "10020" }, "home": { "number": "456", "street": "116th Avenue", "city": "New York", "state": "NY", "zip": "11697" } ] } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 /addresses[1] - 取第一个元素的值 /addresses[-1] - 取倒数第一个元素的值 /addresses[0, 1, 2, 3] - 取指定元算的值 /addresses[0..8] - 取一定范围内元素的值 /addresses[0, 1, 4, 6..-1] - 混合用法 1 2 3 4 5 取不同的值(Maps) /details/address['zip'] /details/address['city', 'state', 'zip'] /details/address[*] 1 2 3 比较操作 Equals (=) Not Equal (!=) Greater Than (>) Greater Than or Equal To (>=) Less Than (<) Less Than or Equal To (<=) /*[./state != 'NY'] /*[./state != 'NY']/zip /*[./state = /details/preferredState] /*/city[../state = 'NJ'] 1 2 3 4 函数的应用 函数用法/name[ substringAfter(., ' ') = 'Doe']
substringAfter( /name, ' ' ) ubstringAfterLast( /name, 'o' ) substringBefore( /name, 'xyz' ) ---------------------