博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
storm的数据流组
阅读量:4180 次
发布时间:2019-05-26

本文共 3927 字,大约阅读时间需要 13 分钟。

数据流组

设计一个拓扑时,你要做的最重要的事情之一就是定义如何在各组件之间交换数据(数据流是如何被bolts消费的)。一个数据流组指定了每个bolt会消费哪些数据流,以及如何消费它们。

NOTE:一个节点能够发布一个以上的数据流,一个数据流组允许我们选择接收哪个。

数据流组在定义拓扑时设置

···    builder.setBolt("word-normalizer", new WordNormalizer())           .shuffleGrouping("word-reader");···

在前面的代码块里,一个boltTopologyBuilder对象设定, 然后使用随机数据流组指定数据源。数据流组通常将数据源组件的ID作为参数,取决于数据流组的类型不同还有其它可选参数。

NOTE:每个InputDeclarer可以有一个以上的数据源,而且每个数据源可以分到不同的组。

随机数据流组

随机流组是最常用的数据流组。它只有一个参数(数据源组件),并且数据源会向随机选择的bolt发送元组,保证每个消费者收到近似数量的元组。

随机数据流组用于数学计算这样的原子操作。然而,如果操作不能被随机分配,就像前面单词计数的例子,你就要考虑其它分组方式了。

域数据流组

域数据流组允许你基于元组的一个或多个域控制如何把元组发送给bolts。它保证拥有相同域组合的值集发送给同一个bolt。回到单词计数器的例子,如果你用word域为数据流分组,word-normalizer bolt将只会把相同单词的元组发送给同一个word-counterbolt实例。

···    builder.setBolt("word-counter", new WordCounter(),2)           .fieldsGrouping("word-normalizer", new Fields("word"));···

NOTE: 在域数据流组中的所有域集合必须存在于数据源的域声明中。

全部数据流组

全部数据流组,为每个接收数据的实例复制一份元组副本。这种分组方式用于向bolts发送信号。比如,你要刷新缓存,你可以向所有的bolts发送一个刷新缓存信号。在单词计数器的例子里,你可以使用一个全部数据流组,添加清除计数器缓存的功能(见)

public void execute(Tuple input) {        String str = null;        try{            if(input.getSourceStreamId().equals("signals")){                str = input.getStringByField("action");                if("refreshCache".equals(str))                    counters.clear();            }        }catch (IllegalArgumentException e){            //什么也不做        }        ···    }

我们添加了一个if分支,用来检查源数据流。Storm允许我们声明具名数据流(如果你不把元组发送到一个具名数据流,默认发送到名为”default“的数据流)。这是一个识别元组的极好的方式,就像这个例子中,我们想识别signals一样。 在拓扑定义中,你要向word-counter bolt添加第二个数据流,用来接收从signals-spout数据流发送到所有bolt实例的每一个元组。

builder.setBolt("word-counter", new WordCounter(),2)           .fieldsGroupint("word-normalizer",new Fields("word"))           .allGrouping("signals-spout","signals");

signals-spout的实现请参考。

自定义数据流组

你可以通过实现backtype.storm.grouping.CustormStreamGrouping接口创建自定义数据流组,让你自己决定哪些bolt接收哪些元组。

让我们修改单词计数器示例,使首字母相同的单词由同一个bolt接收。

public class ModuleGrouping mplents CustormStreamGrouping, Serializable{        int numTasks = 0;        @Override        public List
chooseTasks(List
values) { List
boltIds = new ArrayList
(); if(values.size()>0){ String str = values.get(0).toString(); if(str.isEmpty()){ boltIds.add(0); }else{ boltIds.add(str.charAt(0) % numTasks); } } return boltIds; } @Override public void prepare(TopologyContext context, Fields outFields, List
targetTasks) { numTasks = targetTasks.size(); } }

这是一个CustomStreamGrouping的简单实现,在这里我们采用单词首字母字符的整数值与任务数的余数,决定接收元组的bolt

按下述方式word-normalizer修改即可使用这个自定义数据流组。

builder.setBolt("word-normalizer", new WordNormalizer())           .customGrouping("word-reader", new ModuleGrouping());

直接数据流组

这是一个特殊的数据流组,数据源可以用它决定哪个组件接收元组。与前面的例子类似,数据源将根据单词首字母决定由哪个bolt接收元组。要使用直接数据流组,在WordNormalizer bolt中,使用emitDirect方法代替emit

public void execute(Tuple input) {        ...        for(String word : words){            if(!word.isEmpty()){                ...                collector.emitDirect(getWordCountIndex(word),new Values(word));            }        }        //对元组做出应答        collector.ack(input);    }    public Integer getWordCountIndex(String word) {        word = word.trim().toUpperCase();        if(word.isEmpty()){            return 0;        }else{            return word.charAt(0) % numCounterTasks;        }    }

prepare方法中计算任务数

public void prepare(Map stormConf, TopologyContext context,                 OutputCollector collector) {        this.collector = collector;        this.numCounterTasks = context.getComponentTasks("word-counter");    }

在拓扑定义中指定数据流将被直接分组:

builder.setBolt("word-counter", new WordCounter(),2)           .directGrouping("word-normalizer");

全局数据流组

全局数据流组把所有数据源创建的元组发送给单一目标实例(即拥有最低ID的任务)。

不分组

(Stom0.7.1版),这个数据流组相当于随机数据流组。也就是说,使用这个数据流组时,并不关心数据流是如何分组的。

转载地址:http://gjhai.baihongyu.com/

你可能感兴趣的文章
(转载)linux命令之十八locate 命令
查看>>
Linux发行光盘(红旗 5.0 SP2发行版,已不使用仅参考)
查看>>
linux下如何将文件打包、压缩并分割成制定大小
查看>>
CentOS6.5升级内核到3.10.28
查看>>
linux内核补丁安装和编译安装
查看>>
java.lang.IllegalArgumentException: FacesContext must not be null 错误分析及解决
查看>>
Spring框架事务管理之四:Spring编程式事务
查看>>
JOSSO入门指南及其与WildFly AS 10的集成
查看>>
为WildFly AS 10中的Liferay Portal 6.2配置JOSSO Agent
查看>>
Spring ORM与Hibernate的集成开发详解
查看>>
WildFly AS 10中基于PicketLink的SAML SSO实现
查看>>
Spring AOP概述
查看>>
Apache Maven入门指南
查看>>
Apache Maven的插件概述
查看>>
Apache Maven项目提供的Archetype插件详解
查看>>
Apache Maven项目提供的Compiler插件详解
查看>>
Apache Maven项目提供的Ant插件详解
查看>>
Apache Maven项目提供的AntRun插件详解
查看>>
Apache Maven项目提供的EJB插件详解
查看>>
Hibernate中持久化上下文的flush操作之一COMMIT
查看>>