软件简介
FastFlow 是一个多核编程框架,实现了无锁的 MPMC FIFO 队列规范,用以支持多核的高级应用开发。速度比 TBB、OpenMP 都要快。
主页:
https://github.com/fastflow/fastflow
Farm模式的编写:
1.基本Farm模式编写
#include <vector>
#include <ff/farm.hpp>
#include <iostream>
using namespace ff;
struct Worker : ff_node {
void *svc(void *t) {
std::cout << "Hello I am the worker " << get_my_id() << "\n";
return t;
}
};
int main(int argc, char* argv[]) {
assert(argc > 1);
int nworkers = atoi(argv[1]);
std::vector<ff_node *> Workers;
for (int i = 0; i < nworkers; ++i) Workers.push_back(new Worker);
ff_farm<> myFarm(Workers);
if (myFarm.run_and_wait_end() < 0) error("running myFarm");
return 0;
}
运行结果:
2. Pipeline+Farm模式
#include <vector>
#include <ff/farm.hpp>
#include <ff/pipeline.hpp>
#include <iostream>
using namespace ff;
struct Worker : ff_node_t<long> {
int svc_init(){
std::cout << "Hello I am the worker " << get_my_id() << "\n";
return 0;
}
long *svc(long *t) {
return t;
}
};
struct firstStage : ff_node_t<long> {
long size = 10;
long *svc(long*) {
for (long i = 0; i< size; ++i)
ff_send_out(new long(i));
return EOS;
}
} streamGenerator;
struct lastStage : ff_node_t<long> {
long *svc(long *t) {
const long &task = *t;
std::cout << "Last stage received " << task << "\n";
delete t;
return GO_ON;
}
} streamDrainer;
int main(int argc, char* argv[]) {
assert(argc > 1);
int nworkers = atoi(argv[1]);
std::vector<std::unique_ptr<ff_node> > Workers;
for (int i = 0; i < nworkers; ++i) Workers.push_back(make_unique<Worker>());
ff_Farm<long> myFarm(std::move(Workers));
ff_Pipe<> pipe(streamGenerator, myFarm, streamDrainer);
if (pipe.run_and_wait_end() < 0) error("running pipe");
return 0;
}
运行结果:
3. 用单个函数而不是ff_node来创建task-farm
例子如下:
#include <ff/farm.hpp>
using namespace ff;
struct myTask{....};
myTask * F(myTask *in, ff_node* const node) {...}
ff_Farm<> farm(F, 3); //create a farm executing 3 replicas of F
4. 在Farm中重定义Emitter与Collector
#include <vector>
#include <ff/farm.hpp>
#include <ff/pipeline.hpp>
#include <iostream>
using namespace ff;
struct Worker : ff_node_t<long> {
int svc_init(){
std::cout << "Hello I am the worker " << get_my_id() << "\n";
return 0;
}
long *svc(long *t) {
return t;
}
};
struct firstStage : ff_node_t<long> {
long size = 10;
long *svc(long*) {
for (long i = 0; i< size; ++i)
ff_send_out(new long(i));
return EOS;
}
} Emitter;
struct lastStage : ff_node_t<long> {
long *svc(long *t) {
const long &task = *t;
std::cout << "Last stage received " << task << "\n";
delete t;
return GO_ON;
}
} Collector;
int main(int argc, char* argv[]) {
assert(argc > 1);
int nworkers = atoi(argv[1]);
ff_Farm<long> farm([nworkers]() {
std::vector<std::unique_ptr<ff_node> > Workers;
for (int i = 0; i < nworkers; ++i)
Workers.push_back(make_unique<Worker>());
return Workers;
} (), Emitter, Collector);
if (farm.run_and_wait_end() < 0) error("running farm");
return 0;
}
运行结果:
5. 没有的Collector的Farm
调用remove_collector将Farm中的搜集器删除
#include <vector>
#include <ff/farm.hpp>
#include <ff/pipeline.hpp>
#include <iostream>
using namespace ff;
struct Worker : ff_node_t<long> {
int svc_init(){
std::cout << "Hello I am the worker " << get_my_id() << "\n";
return 0;
}
long *svc(long *t) {
return t;
}
};
struct firstStage : ff_node_t<long> {
long size = 10;
long *svc(long*) {
for (long i = 0; i< size; ++i)
ff_send_out(new long(i));
return EOS;
}
} Emitter;
struct lastStage : ff_minode_t<long> { //Note: multi-input node
long *svc(long *t) {
const long &task = *t;
std::cout << "Last stage received " << task
<< " from " << get_channel_id() << "\n";
delete t;
return GO_ON;
}
} LastStage;
int main(int argc, char* argv[]) {
assert(argc > 1);
int nworkers = atoi(argv[1]);
ff_Farm<long> farm([nworkers]() {
std::vector<std::unique_ptr<ff_node> > Workers;
for (int i = 0; i < nworkers; ++i)
Workers.push_back(make_unique<Worker>());
return Workers;
} (), Emitter);
farm.remove_collector();
ff_Pipe<> pipe(farm, LastStage);
if (pipe.run_and_wait_end() < 0) error("running pipe");
return 0;
}
运行结果: