Now that there is an agent 'package' that can be sent from process to process, we need to provide the code that can play the role of sender and receiver.
At a conceptual level, this means creating a class or classes that provide four required functions (assuming 'Agent' represents the class of agents being exchanged):
void providePackage(Agent * agent, std::vector& out);
void provideContent(repast::AgentRequest req, std::vector& out);
Agent * createAgent(AgentPackage package);
void updateAgent(AgentPackage package);
The first two act to facilitate 'sending'. The 'providePackage' function is expected to take a pointer to an Agent, find that agent in the context, create an agent 'package', and add that package to the vector of agent packages. The second is very similar, except that it is given an 'AgentRequest'. An AgentRequest will contain a collection of AgentId, and the assumption is that the code will loop through this collection and retrieve the package for each of the agents listed (possibly by calling the 'providePackage' function.
The last two functions act to facilitate 'receiving'. The 'createAgent' function must take an agent 'package' and make an agent out of it, returning a pointer to it. This is used when an agent is being sent from one process to another; it will be destroyed on the original process and recreated on the new process. The 'updateAgent' function is used when a copy of an agent is already on the receiving process, and it needs only to be updated with the information that is being received from the sending process.
Any class or classes that provide these functions can be used. One variation, sometimes employed, is for the model class itself to provide these functions. However, for this demo, we will create classes that have as their only functions playing the 'sender' and 'receiver' roles.
The code we will use is added to the Model.h and Model.cpp files:
/* Demo_01_Model.h */
#ifndef DEMO_01_MODEL
#define DEMO_01_MODEL
#include <boost/mpi.hpp>
#include "repast_hpc/Schedule.h"
#include "repast_hpc/Properties.h"
#include "repast_hpc/SharedContext.h"
#include "repast_hpc/AgentRequest.h"
#include "Demo_01_Agent.h"
/* Agent Package Provider */
class RepastHPCDemoAgentPackageProvider {
private:
repast::SharedContext<RepastHPCDemoAgent>* agents;
public:
RepastHPCDemoAgentPackageProvider(repast::SharedContext<RepastHPCDemoAgent>* agentPtr);
void providePackage(RepastHPCDemoAgent * agent, std::vector<RepastHPCDemoAgentPackage>& out);
void provideContent(repast::AgentRequest req, std::vector<RepastHPCDemoAgentPackage>& out);
};
/* Agent Package Receiver */
class RepastHPCDemoAgentPackageReceiver {
private:
repast::SharedContext<RepastHPCDemoAgent>* agents;
public:
RepastHPCDemoAgentPackageReceiver(repast::SharedContext<RepastHPCDemoAgent>* agentPtr);
RepastHPCDemoAgent * createAgent(RepastHPCDemoAgentPackage package);
void updateAgent(RepastHPCDemoAgentPackage package);
};
class RepastHPCDemoModel{
int stopAt;
int countOfAgents;
repast::Properties* props;
repast::SharedContext<RepastHPCDemoAgent> context;
RepastHPCDemoAgentPackageProvider* provider;
RepastHPCDemoAgentPackageReceiver* receiver;
public:
RepastHPCDemoModel(std::string propsFile, int argc, char** argv, boost::mpi::communicator* comm);
~RepastHPCDemoModel();
void init();
void doSomething();
void initSchedule(repast::ScheduleRunner& runner);
void recordResults();
};
#endif
And
/* Demo_01_Model.cpp */
#include <stdio.h>
#include <vector>
#include <boost/mpi.hpp>
#include "repast_hpc/AgentId.h"
#include "repast_hpc/RepastProcess.h"
#include "repast_hpc/Utilities.h"
#include "repast_hpc/Properties.h"
#include "repast_hpc/initialize_random.h"
#include "Demo_01_Model.h"
RepastHPCDemoAgentPackageProvider::RepastHPCDemoAgentPackageProvider(repast::SharedContext<RepastHPCDemoAgent>* agentPtr): agents(agentPtr){ }
void RepastHPCDemoAgentPackageProvider::providePackage(RepastHPCDemoAgent * agent, std::vector<RepastHPCDemoAgentPackage>& out){
repast::AgentId id = agent->getId();
RepastHPCDemoAgentPackage package(id.id(), id.startingRank(), id.agentType(), id.currentRank(), agent->getC(), agent->getTotal());
out.push_back(package);
}
void RepastHPCDemoAgentPackageProvider::provideContent(repast::AgentRequest req, std::vector<RepastHPCDemoAgentPackage>& out){
std::vector<repast::AgentId> ids = req.requestedAgents();
for(size_t i = 0; i < ids.size(); i++){
providePackage(agents->getAgent(ids[i]), out);
}
}
RepastHPCDemoAgentPackageReceiver::RepastHPCDemoAgentPackageReceiver(repast::SharedContext<RepastHPCDemoAgent>* agentPtr): agents(agentPtr){}
RepastHPCDemoAgent * RepastHPCDemoAgentPackageReceiver::createAgent(RepastHPCDemoAgentPackage package){
repast::AgentId id(package.id, package.rank, package.type, package.currentRank);
return new RepastHPCDemoAgent(id, package.c, package.total);
}
void RepastHPCDemoAgentPackageReceiver::updateAgent(RepastHPCDemoAgentPackage package){
repast::AgentId id(package.id, package.rank, package.type);
RepastHPCDemoAgent * agent = agents->getAgent(id);
agent->set(package.currentRank, package.c, package.total);
}
RepastHPCDemoModel::RepastHPCDemoModel(std::string propsFile, int argc, char** argv, boost::mpi::communicator* comm): context(comm){
props = new repast::Properties(propsFile, argc, argv, comm);
stopAt = repast::strToInt(props->getProperty("stop.at"));
countOfAgents = repast::strToInt(props->getProperty("count.of.agents"));
initializeRandom(*props, comm);
if(repast::RepastProcess::instance()->rank() == 0) props->writeToSVFile("./output/record.csv");
provider = new RepastHPCDemoAgentPackageProvider(&context);
receiver = new RepastHPCDemoAgentPackageReceiver(&context);
}
RepastHPCDemoModel::~RepastHPCDemoModel(){
delete props;
delete provider;
delete receiver;
}
void RepastHPCDemoModel::init(){
int rank = repast::RepastProcess::instance()->rank();
for(int i = 0; i < countOfAgents; i++){
repast::AgentId id(i, rank, 0);
id.currentRank(rank);
RepastHPCDemoAgent* agent = new RepastHPCDemoAgent(id);
context.addAgent(agent);
}
}
void RepastHPCDemoModel::doSomething(){
std::vector<RepastHPCDemoAgent*> agents;
context.selectAgents(countOfAgents, agents);
std::vector<RepastHPCDemoAgent*>::iterator it = agents.begin();
while(it != agents.end()){
(*it)->play(&context);
it++;
}
}
void RepastHPCDemoModel::initSchedule(repast::ScheduleRunner& runner){
runner.scheduleEvent(1, 1, repast::Schedule::FunctorPtr(new repast::MethodFunctor<RepastHPCDemoModel> (this, &RepastHPCDemoModel::doSomething)));
runner.scheduleEndEvent(repast::Schedule::FunctorPtr(new repast::MethodFunctor<RepastHPCDemoModel> (this, &RepastHPCDemoModel::recordResults)));
runner.scheduleStop(stopAt);
}
void RepastHPCDemoModel::recordResults(){
if(repast::RepastProcess::instance()->rank() == 0){
props->putProperty("Result","Passed");
std::vector<std::string> keyOrder;
keyOrder.push_back("RunNumber");
keyOrder.push_back("stop.at");
keyOrder.push_back("Result");
props->writeToSVFile("./output/results.csv", keyOrder);
}
}
A few notes are in order to clarify this new code. First, notice that the model class has a pointer to a provider and receiver object; just like the 'Properties' object, the actual instance is created using the 'new' keyword, and thus has to be destroyed in the destructor using 'delete.'
Next, notice that the 'providePackage' and 'provideContent' functions work together: the first simply takes a pointer to an agent and makes a package out of it, and the second takes a list of agents (from the 'agent request' object), gets pointers to them, and uses 'providePackage' to generate and package the content to be sent.
Notice also that the 'create agent' method creates a new AgentId instance for the agent, matching its original ID (except for the 'current rank' variable, which will be reset automatically to the current rank). If, in the 'updateAgent' function, the specified agent is not found, the program will crash due to a null pointer exception (we will omit the error trap for this here; it should never happen.)
One final note is in order: the 'serialize()' method in the AgentPackage class does not need to be called explicitly. Repast HPC will handle the transfer of agents behind the scenes, so normal user code will not need to call any of these methods directly- they are called as parts of the Repast HPC internal parallelization management processes. The 'serialize()' method is actually called by the Boost library when the AgentPackage is added to a collection of data that is being sent from process to process. We only need to provide the method; calling it is done automatically.