subDesTagesMitExtraKaese 5 жил өмнө
parent
commit
25dcac2123

BIN
build/op_lib.so


+ 4 - 4
lib/mlfpga/include/commFPGA.hpp

@@ -58,9 +58,9 @@ class commFPGA {
 
     //called by worker thread
     
-    int assignJob(Job *job);
+    int assignJob(std::shared_ptr<Job> &job);
     int fillBuffer(JobData *sendBuf);
-    int unassignJob(Job *job);
+    int unassignJob(std::shared_ptr<Job> &job);
 
     uint_least32_t jobCount();
     
@@ -73,7 +73,7 @@ class commFPGA {
     void recvUDP();
     int parseRaw(uint32_t *buf, size_t bufLen);
     
-    std::unordered_map<uint32_t,Job*>::iterator currentJob;
+    std::unordered_map<uint32_t,std::shared_ptr<Job>>::iterator currentJob;
     RecvState recvState = RecvState::checkPreamble;
     size_t recvPayloadIndex = 0;
 
@@ -88,7 +88,7 @@ class commFPGA {
     uint_least32_t sendBufferWriteIndex = 0;
 
     //list of pending responses
-    std::unordered_map<uint32_t,Job*> jobList;
+    std::unordered_map<uint32_t,std::shared_ptr<Job>> jobList;
     uint_least32_t jobsActive = 0;
     std::mutex jobLock;
 

+ 7 - 12
lib/mlfpga/include/connectionManager.hpp

@@ -19,19 +19,14 @@
     queue response
     cb on overwrite + delete old resp
     fills send buffer
+    retransmit job
 
   send thread:
-    cycle fd
-    send 1 packet if available
+    send 1 packet per fpga if available
 
   recv thread:
-    select(readFD)
     recv data into response
-    cb on success + delete response
-
-  response thread:
-    search old responses
-    cb on timeout + delete response
+    cb on success
     
 
 */
@@ -46,14 +41,14 @@ class ConnectionManager {
     void start();
 
     //send many Jobs and wait for all responses
-    int sendJobListSync(JobList &jobList);
+    int sendJobListSync(std::shared_ptr<JobList> &jobList);
 
     //send many Jobs and call back
-    int sendJobListAsync(JobList &jobList);
+    int sendJobListAsync(std::shared_ptr<JobList> &jobList);
 
   private:
-    std::vector<std::reference_wrapper<commFPGA>> fpgas;
-    std::vector<std::reference_wrapper<Worker>> workers;
+    std::vector<std::unique_ptr<commFPGA>> fpgas;
+    std::vector<std::unique_ptr<Worker>> workers;
     
     void sendThread();
     std::future<void> sendResult;

+ 3 - 3
lib/mlfpga/include/jobList.hpp

@@ -17,11 +17,11 @@ class JobList {
 
     size_t getPendingJobCount() const {return pendingJobCount;}
 
-    Job& getJob(size_t i);
+    std::shared_ptr<Job>& getJob(size_t i);
 
-    Job* getNextJob();
+    std::shared_ptr<Job> getNextJob();
   private:
-    std::vector<std::reference_wrapper<Job>> jobs;
+    std::vector<std::shared_ptr<Job>> jobs;
     DoneCallback doneCb;
 
     size_t jobCount;

+ 4 - 4
lib/mlfpga/include/worker.hpp

@@ -11,17 +11,17 @@
 
 class Worker {
   public:
-    Worker(std::vector<std::reference_wrapper<commFPGA>> &fpgas);
+    Worker(std::vector<std::unique_ptr<commFPGA>> *fpgas);
     ~Worker();
 
     void start();
     
-    int assignJobList(JobList &jobList);
+    int assignJobList(std::shared_ptr<JobList> &jobList);
 
   private:
     std::mutex currentJobList_m;
-    JobList *currentJobList = NULL;
-    std::vector<std::reference_wrapper<commFPGA>> *fpgaVector;
+    std::shared_ptr<JobList> currentJobList = NULL;
+    std::vector<std::unique_ptr<commFPGA>> *fpgaVector;
 
     commFPGA* findAvailableFPGA();
     

+ 2 - 2
lib/mlfpga/src/commFPGA.cpp

@@ -194,12 +194,12 @@ int commFPGA::sendRaw(uint8_t *buf, uint bufLen) {
   return byteIndex;
 }
 
-int commFPGA::assignJob(Job *job) {
+int commFPGA::assignJob(std::shared_ptr<Job> &job) {
   jobLock.lock();
   if(jobList.size() >= JOB_COUNT)
     return -1;
   
-  jobList.insert(std::pair<uint32_t,Job*>(job->getJobId(), job));
+  jobList.insert(std::pair<uint32_t,std::shared_ptr<Job>>(job->getJobId(), job));
   
   jobsActive++;
   jobLock.unlock();

+ 8 - 11
lib/mlfpga/src/connectionManager.cpp

@@ -9,17 +9,14 @@ ConnectionManager::~ConnectionManager() {
 }
 
 void ConnectionManager::addFPGA(const char* ip, const uint port) {
-  commFPGA fpga(ip, port);
-  fpga.start();
-
-  fpgas.push_back(fpga);
+  fpgas.emplace_back(new commFPGA(ip, port));
+  fpgas.back()->start();
 }
 
-int ConnectionManager::sendJobListAsync(JobList &jobList) {
-  Worker worker(fpgas);
-  worker.assignJobList(jobList);
-  worker.start();
-  workers.push_back(worker);
+int ConnectionManager::sendJobListAsync(std::shared_ptr<JobList> &jobList) {
+  workers.emplace_back(new Worker(&fpgas));
+  workers.back()->assignJobList(jobList);
+  workers.back()->start();
   return 0;
 }
 
@@ -30,8 +27,8 @@ void ConnectionManager::start() {
 void ConnectionManager::sendThread() {
   while(running) {
     Clock::time_point start = Clock::now();
-    for(std::vector<std::reference_wrapper<commFPGA>>::iterator it=fpgas.begin(); it!=fpgas.end(); it++) {
-      it->get().sendFromBuffer();
+    for(std::vector<std::unique_ptr<commFPGA>>::iterator it=fpgas.begin(); it!=fpgas.end(); it++) {
+      it->get()->sendFromBuffer();
     }
     //printf("%8d %8d\n", fpgas[0].sendBufferWriteIndex, fpgas[0].sendBufferReadIndex);
     uint us = std::chrono::duration_cast<microseconds>(Clock::now() - start).count();

+ 6 - 7
lib/mlfpga/src/jobList.cpp

@@ -4,9 +4,8 @@ JobList::JobList(Module mod, size_t numberOfJobs) {
   jobCount = numberOfJobs;
   pendingJobCount = numberOfJobs;
   for(size_t i=0; i<numberOfJobs; i++) {
-    //jobs.emplace_back(mod);
-    Job job(mod);
-    job.setDoneCallback([this]{
+    std::shared_ptr<Job> job(new Job(mod));
+    job->setDoneCallback([this]{
       finishJob();
     });
     jobs.push_back(job);
@@ -24,16 +23,16 @@ void JobList::finishJob() {
   jobListDone.notify_all();
 }
 
-Job& JobList::getJob(size_t i) {
+std::shared_ptr<Job>& JobList::getJob(size_t i) {
   return jobs.at(i);
 }
 
-Job* JobList::getNextJob() {
+std::shared_ptr<Job> JobList::getNextJob() {
   for(size_t i=0; i<jobCount; i++) {
     size_t rotated_i = (i+nextJobIndex+1) % jobCount;
-    if(jobs.at(rotated_i).get().getState() == JobState::ready) {
+    if(jobs.at(rotated_i)->getState() == JobState::ready) {
       nextJobIndex = rotated_i;
-      return &jobs.at(rotated_i).get();
+      return jobs.at(rotated_i);
     }
   }
   return NULL;

+ 8 - 8
lib/mlfpga/src/worker.cpp

@@ -1,7 +1,7 @@
 #include "worker.hpp"
 
-Worker::Worker(std::vector<std::reference_wrapper<commFPGA>> &fpgas) {
-  fpgaVector = &fpgas;
+Worker::Worker(std::vector<std::unique_ptr<commFPGA>> *fpgas) {
+  fpgaVector = fpgas;
 }
 Worker::~Worker() {
   hasJobList.notify_all();
@@ -11,12 +11,12 @@ void Worker::start() {
   result = std::async(std::launch::async, &Worker::threadMain, this);
 }
 
-int Worker::assignJobList(JobList &jobList) {
+int Worker::assignJobList(std::shared_ptr<JobList> &jobList) {
   std::lock_guard<std::mutex> lk(currentJobList_m);
   if(currentJobList != NULL)
     return -1;
   
-  currentJobList = &jobList;
+  currentJobList = jobList;
   hasJobList.notify_all();
 
   return 0;
@@ -27,7 +27,7 @@ int Worker::threadMain() {
     return -1;
 
   while(currentJobList->getPendingJobCount() > 0) {
-    Job *job = currentJobList->getNextJob();
+    std::shared_ptr<Job> job = currentJobList->getNextJob();
     if(job == NULL) {
       break;
     }
@@ -43,11 +43,11 @@ int Worker::threadMain() {
 commFPGA* Worker::findAvailableFPGA() {
   uint_least32_t minCnt = JOB_COUNT-1;
   commFPGA *fpga = NULL;
-  for(std::vector<std::reference_wrapper<commFPGA>>::iterator it=fpgaVector->begin(); it!=fpgaVector->end(); it++) {
-    uint_least32_t cnt = it->get().jobCount();
+  for(std::vector<std::unique_ptr<commFPGA>>::iterator it=fpgaVector->begin(); it!=fpgaVector->end(); it++) {
+    uint_least32_t cnt = it->get()->jobCount();
     if(cnt < minCnt) {
       minCnt = cnt;
-      fpga = &(it->get());
+      fpga = it->get();
     }
   }
   return fpga;

+ 1 - 1
makefile

@@ -5,7 +5,7 @@ INC_DIR=include
 BUILD_DIR=build
 FPGA_LIB_DIR=lib/mlfpga
 
-CFLAGS=-g -Wall -pthread -std=c++11
+CFLAGS=-g -Wall -std=c++11
 LFLAGS=-shared -Wl,--no-as-needed,-Map=$(BUILD_DIR)/project.map
 
 

+ 5 - 5
src/conv2D.cpp

@@ -54,22 +54,22 @@ namespace tf_lib {
     auto input_tensor = input.tensor<int32, 4>();
     auto output_tensor = output->tensor<int32, 4>();
 
-    JobList jobs(Module::dummyModule, batchSize * channels * filters);
+    std::shared_ptr<JobList> jobs(new JobList(Module::dummyModule, batchSize * channels * filters));
 
     for(int sample=0; sample<batchSize; sample++) {
       for(int channel=0; channel<channels; channel++) {
         for(int filter=0; filter<filters; filter++) {
-          Job &job = jobs.getJob(sample * channels * filters + channel * filters + filter);
+          std::shared_ptr<Job> &job = jobs->getJob(sample * channels * filters + channel * filters + filter);
           for(int x=0; x<outputSize; x++) {
             for(int y=0; y<outputSize; y++) {
-              job.setPayload(x*outputSize + y, input_tensor(sample, x, y, channel));
+              job->setPayload(x*outputSize + y, input_tensor(sample, x, y, channel));
             }
           }
         }
       }
     }
-    jobs.setDoneCallback([output_tensor, &jobs, done]{
-      output_tensor(0) = jobs.getJob(0).getResponsePayload(0);
+    jobs->setDoneCallback([output_tensor, &jobs, done]{
+      output_tensor(0) = jobs->getJob(0)->getResponsePayload(0);
       done();
     });
 

+ 4 - 4
src/dummyOp.cpp

@@ -27,11 +27,11 @@ namespace tf_lib {
     auto input_tensor = input.tensor<int32, 1>();
     auto output_tensor = output->tensor<int32, 1>();
 
-    JobList jobs(Module::dummyModule, 1);
-    jobs.getJob(0).setPayload(0, input_tensor(0));
+    std::shared_ptr<JobList> jobs(new JobList(Module::dummyModule, 1));
+    jobs->getJob(0)->setPayload(0, input_tensor(0));
 
-    jobs.setDoneCallback([output_tensor, &jobs, done]{
-      output_tensor(0) = jobs.getJob(0).getResponsePayload(0);
+    jobs->setDoneCallback([output_tensor, &jobs, done]{
+      output_tensor(0) = jobs->getJob(0)->getResponsePayload(0);
       done();
     });