Эх сурвалжийг харах

lib/mlfpga: oop and ownership rework

subDesTagesMitExtraKaese 5 жил өмнө
parent
commit
7a92075c35

BIN
build/op_lib.so


+ 0 - 6
configure

@@ -1,6 +0,0 @@
-
-#include <iostream>
-#include <string>
-#include <chrono>
-#include <thread>
-#include <future>

+ 32 - 15
include/conv2D.hpp

@@ -1,3 +1,6 @@
+#ifndef CONV2D_FPGA_H
+#define CONV2D_FPGA_H
+
 #include "tensorflow/core/framework/op_kernel.h"
 #include "tensorflow/core/framework/function.h"
 #include <stdlib.h>
@@ -12,26 +15,40 @@
 #include "../lib/mlfpga/include/connectionManager.hpp"
 #include "../lib/mlfpga/include/modules.hpp"
 
-using namespace tensorflow;
-using namespace std::chrono;
-typedef FunctionDefHelper FDH;
+#include "entrypoint.hpp"
+
+namespace tf_lib {
+
+  using namespace tensorflow;
+  using namespace std::chrono;
+  typedef FunctionDefHelper FDH;
+
+
+  class Conv2DOp : public AsyncOpKernel {
+    public:
+      explicit Conv2DOp(OpKernelConstruction* context);
 
+      void ComputeAsync(OpKernelContext* context, DoneCallback done) override;
 
-class Conv2DOp : public AsyncOpKernel {
-  public:
-    explicit Conv2DOp(OpKernelConstruction* context);
+    private:
 
-    void ComputeAsync(OpKernelContext* context, DoneCallback done) override;
+      int instance = -1;
+      int delay = 1000;
 
-  private:
+      int outputSize = 28;
+      int tagCounter = 0;
 
-    int instance = -1;
-    int delay = 1000;
+      int width = 224;
+      int kernel = 5;
+      int border = kernel/2;
+      int sizeWithBorder = width + 2*border;
+      int pixels = sizeWithBorder * sizeWithBorder;
 
-    int outputSize = 28;
+      void fpgaCall(const Tensor *input, const Tensor *kernel, Tensor *output, int sample, int channel, int filter);
+      void delayThread(DoneCallback done);
 
-    void fpgaCall(const Tensor *input, const Tensor *kernel, Tensor *output, int sample, int channel, int filter);
-    void delayThread(DoneCallback done);
+    //TF_DISALLOW_COPY_AND_ASSIGN(Conv2DOp);
+  };
+}
 
-  //TF_DISALLOW_COPY_AND_ASSIGN(Conv2DOp);
-};
+#endif

+ 38 - 0
include/dummyOp.hpp

@@ -0,0 +1,38 @@
+#ifndef DUMMY_OP_FPGA_H
+#define DUMMY_OP_FPGA_H
+
+#include "tensorflow/core/framework/op_kernel.h"
+#include "tensorflow/core/framework/function.h"
+#include <stdlib.h>
+
+#include <iostream>
+#include <string>
+#include <chrono>
+#include <thread>
+#include <future>
+#include <mutex>
+#include <condition_variable>
+
+#include "../lib/mlfpga/include/connectionManager.hpp"
+#include "../lib/mlfpga/include/modules.hpp"
+
+#include "entrypoint.hpp"
+
+namespace tf_lib {
+  using namespace tensorflow;
+  using namespace std::chrono;
+
+  class DummyOp : public AsyncOpKernel {
+    public:
+      explicit DummyOp(OpKernelConstruction* context);
+      void ComputeAsync(OpKernelContext* context, DoneCallback done) override;
+
+    private:
+      void fpgaCall(const Tensor *input, Tensor *output, DoneCallback done);
+
+      const int dataLength = 4;
+      int tagCounter = 0;
+
+  };
+}
+#endif

+ 9 - 1
include/entrypoint.hpp

@@ -1,3 +1,5 @@
+#ifndef ENTRY_FPGA_H
+#define ENTRY_FPGA_H
 
 #include "tensorflow/core/framework/op.h"
 #include "tensorflow/core/framework/shape_inference.h"
@@ -7,6 +9,12 @@
 #include "tensorflow/core/lib/math/math_util.h"
 
 #include "conv2D.hpp"
+#include "dummyOp.hpp"
 #include "../lib/mlfpga/include/connectionManager.hpp"
 
-void __attribute__ ((constructor)) init(void);
+namespace tf_lib {
+  void __attribute__ ((constructor)) init(void);
+
+  extern ConnectionManager connectionManager;
+}
+#endif

+ 21 - 41
lib/mlfpga/include/udp.hpp → lib/mlfpga/include/commFPGA.hpp

@@ -5,12 +5,10 @@
 #include <stdio.h>
 #include <assert.h> 
 #include <mutex> 
-#include <map>
+#include <unordered_map>
 #include <vector>
-#include <random>
 #include <chrono>
 
-
 #include <sys/types.h>
 #include <unistd.h>
 #include <sys/socket.h>
@@ -20,10 +18,12 @@
 #include <arpa/inet.h>
 #include <netinet/in.h>
 
-#include <pthread.h>
+#include <thread>
+#include <future>
 #include <string.h>
 
 #include "job.hpp"
+#include "jobList.hpp"
 #include "modules.hpp"
 
 
@@ -38,13 +38,12 @@ typedef std::chrono::high_resolution_clock Clock;
 typedef std::chrono::milliseconds milliseconds;
 typedef std::chrono::microseconds microseconds;
 
-typedef enum {
+enum class RecvState {
   checkPreamble,
   checkJobId,
   checkModuleId,
-  writePayload,
-  checkCRC
-} recvState_t;
+  writePayload
+};
 
 //using jobCb_t = void(*)(commFPGA *, jobResponse *);
 
@@ -59,8 +58,9 @@ class commFPGA {
 
     //called by worker thread
     
-    int queueJob(jobResponse *job);
-    int bufferJob(jobData *job, uint recvPayloadLength);
+    int assignJob(Job *job);
+    int fillBuffer(JobData *sendBuf);
+    int unassignJob(Job *job);
 
     uint_least32_t jobCount();
     
@@ -68,56 +68,36 @@ class commFPGA {
     int sendRaw(uint8_t *buf, uint bufLen);
     int sendFromBuffer();
 
+    void start();
     //called by recv thread
     void recvUDP();
-    int parseRaw(uint8_t *buf, uint bufLen);
+    int parseRaw(uint32_t *buf, size_t bufLen);
     
-    //jobQueue[] being worked on, search for next job begins at currentJobIndex+1
-    uint_least32_t currentJobIndex = 0;
-    recvState_t recvState = checkPreamble;
-
+    std::unordered_map<uint32_t,Job*>::iterator currentJob;
+    RecvState recvState = RecvState::checkPreamble;
+    size_t recvPayloadIndex = 0;
 
     uint_least64_t successCounter = 0;
     uint_least64_t failedCounter = 0;
     float latency = 0;
 
-    //called by resp thread
-
-    int deleteOldJobs(int64_t micros=50000);
-    
-    void (*jobSuccessCb) (commFPGA *, jobResponse *);
-    void setSuccessCb(void (*cb) (commFPGA *, jobResponse *));
-
-    void (*jobFailedCb) (commFPGA *, jobResponse *);
-    void setFailedCb(void (*cb) (commFPGA *, jobResponse *));
-
-  protected:
+  private:
     //tx buffer for buffered send function
     uint32_t sendBuffer[MAX_JOB_LEN];
     uint_least32_t sendBufferReadIndex = 0;
     uint_least32_t sendBufferWriteIndex = 0;
 
     //list of pending responses
-    jobResponse* *jobQueue;
-    //list of associated jobIds as search index
-    uint32_t *jobQueueJobIds;
-
-    uint_least32_t jobQueueIndex = 0;
+    std::unordered_map<uint32_t,Job*> jobList;
     uint_least32_t jobsActive = 0;
     std::mutex jobLock;
 
     //listener for a single FPGA
     
     sockaddr_storage addrDest = {};
-    //pthread_t tRecv;
-    //pthread_attr_t attr;
-    //volatile bool running = 1;
-
-    //rng for the jobIds  
-    uint32_t jobIdCounter = 0;
-    static std::random_device seed_generator;
-    static unsigned seed;
-    static std::mt19937 mersenne_generator;
-    static std::uniform_int_distribution<uint32_t> distribution;
+    
+    std::future<void> recvResult;
+    bool running = true;
+
 };
 #endif

+ 29 - 7
lib/mlfpga/include/connectionManager.hpp

@@ -4,8 +4,13 @@
 #include <iostream>
 #include <stdio.h>
 #include <sys/types.h>
-#include <pthread.h>
-#include "udp.hpp"
+#include <thread>
+#include <future>
+#include <mutex>
+#include <condition_variable>
+
+#include "commFPGA.hpp"
+#include "worker.hpp"
 
 /*
   worker thread:
@@ -31,12 +36,29 @@
 
 */
 
-extern commFPGA fpgas[];
-extern const uint fpgaCount;
+class ConnectionManager {
+  public:
+    ConnectionManager();
+    ~ConnectionManager();
+
+    void addFPGA(const char* ip, const uint port);
+
+    void start();
 
-int sendJob(jobData *job, uint recvPayloadLength);
-void connection_init();
-void connection_close();
+    //send many Jobs and wait for all responses
+    int sendJobListSync(JobList &jobList);
+
+    //send many Jobs and call back
+    int sendJobListAsync(JobList &jobList);
+
+  private:
+    std::vector<std::reference_wrapper<commFPGA>> fpgas;
+    std::vector<std::reference_wrapper<Worker>> workers;
+    
+    void sendThread();
+    std::future<void> sendResult;
 
+    bool running = true;
+};
 
 #endif

+ 0 - 24
lib/mlfpga/include/dummy.hpp

@@ -1,24 +0,0 @@
-#ifndef myDUMMY_H
-#define myDUMMY_H
-
-#include "connectionManager.hpp"
-#include <iostream>
-#include <csignal>
-
-class dummyJob : public jobData {
-  public:
-    dummyJob(void);
-    const uint payloadLen = 4;
-};
-
-class dummyBigJob : public jobData {
-  public:
-    dummyBigJob(void);
-    const uint payloadLen = 1024;
-};
-
-void perfTest();
-void fillTest();
-
-
-#endif

+ 91 - 31
lib/mlfpga/include/job.hpp

@@ -5,59 +5,119 @@
 #include <stdio.h>
 #include <assert.h>
 #include <sys/types.h>
+#include <functional>
+#include <cstring>
+#include <mutex>
+#include <condition_variable>
 
 #include <chrono>
 
+#include "modules.hpp"
+#include "rng.hpp"
+
 typedef std::chrono::high_resolution_clock Clock;
 typedef std::chrono::milliseconds milliseconds;
 typedef std::chrono::microseconds microseconds;
 
+typedef std::function<void()> DoneCallback;
+
 #define PREAMBLE (0xE1E4C312)
 
 #define MAX_JOB_LEN (256*256)
 
-typedef enum {
-  waiting,
-  receiving,
-  finished
-} jobState_t;
+enum class JobState {
+  initialized,  //Job was created
+  ready,        //Job is ready to be sent
+  sent,   	    //sendBuf has been copied to sendQueue
+  receiving,    //first part of response has been received
+  finished,     //Job has been successfully received
+  failed,       //Job failed too many times, throw exception
+};
 
-//data structure of a job
-class jobData {
+//generic uint32 storage
+class WordBuffer {
   public:
-    jobData(uint payloadLength);
-    ~jobData();
+    WordBuffer(size_t length);
+    ~WordBuffer();
 
-    union {
-      uint8_t *bytes;
-      uint32_t *words;
-      //pointer to words[0]
-      uint32_t *preamble;
-    };
+    uint8_t* getByteAddr() const {return bytes;}
+    uint32_t* getWordAddr() const {return words;}
+
+    uint8_t getByte(size_t i) const {return bytes[i];}
+    uint8_t getWord(size_t i) const {return words[i];}
+
+    size_t getWordCount() const {return wordCount;}
+    size_t getByteCount() const {return wordCount*4;}
+
+  protected:
+      size_t wordCount;
+      union {
+        uint8_t *bytes;
+        uint32_t *words;
+      };
+};
+
+//data structure that is sent over the network
+class JobData : public WordBuffer {
+  public:
+    JobData(uint payloadLength);
+
+    uint32_t getPreamble() const {return words[0];}
+    void setPreamble(uint32_t v) const {words[0] = v;}
+
+    uint32_t getJobId() const {return words[1];}
+    void setJobId(uint32_t v) const {words[1] = v;}
+
+    uint32_t getModuleId() const {return words[2];}
+    void setModuleId(uint32_t v) const {words[2] = v;}
+
+    uint32_t getPayload(size_t i) const {return words[i+3];}
+    void setPayload(size_t i, uint32_t v) const {words[i+3] = v;}
+
+    uint32_t getCRC() const {return words[wordCount-1];}
+    void setCRC(uint32_t v) const {words[wordCount-1] = v;}
+
+};
+
+//entity to track a single Job
+class Job : public JobData {
+  public:
+    Job(Module mod);
 
-    uint32_t wordCount;
-    Clock::time_point created = Clock::now();
     uint32_t tag = 0;
 
-    //pointer to words[1]
-    uint32_t *jobId;
-    //pointer to words[2]
-    uint32_t *moduleId; 
-    //array at &words[3]
-    uint32_t *payload;
-    //pointer to words[wordCount-1]
-    uint32_t *crc;
+    //locks state
+    std::mutex stateMutex;
+    //locks sendBuf
+    std::mutex sendMutex;
+    //locks recvBuf, recvWordIndex
+    std::mutex recvMutex;
+
+    uint32_t getResponsePayload(size_t i) const {return recvBuf.getWord(i);}
+    void setResponsePayload(size_t i, uint32_t v) const {recvBuf.getWordAddr()[i] = v;}
+    uint32_t* getResponseAddr() const {return recvBuf.getWordAddr();}
+    size_t getResponseBufferWordCount() const {return recvBuf.getWordCount();}
 
     void calcCRC();
     bool checkCRC();
-};
 
-class jobResponse : public jobData {
-  public:
-    using jobData::jobData;
-    jobState_t state = waiting;
-    uint recvWordCounter = 0;
+    JobState getState() const {return state;}
+    void setState(JobState s) {state = s;}
+
+    void isComplete();
+    void setDoneCallback(DoneCallback cb);
+
+  private:
+    //only payload and CRC of response
+    WordBuffer recvBuf;
+    DoneCallback doneCb = NULL;
+
+    JobState state = JobState::initialized;
+    Clock::time_point created = Clock::now();
     Clock::time_point received;
+
+    
+
 };
 
 #endif

+ 35 - 0
lib/mlfpga/include/jobList.hpp

@@ -0,0 +1,35 @@
+#ifndef myJOBLIST_H
+#define myJOBLIST_H
+
+#include <functional>
+#include <vector>
+#include "job.hpp"
+#include "modules.hpp"
+
+//entity to track an array of similar jobs
+class JobList {
+  public:
+    JobList(Module mod, size_t numberOfJobs);
+    void waitAll();
+    void finishJob();
+
+    void setDoneCallback(DoneCallback cb);
+
+    size_t getPendingJobCount() const {return pendingJobCount;}
+
+    Job& getJob(size_t i);
+
+    Job* getNextJob();
+  private:
+    std::vector<std::reference_wrapper<Job>> jobs;
+    DoneCallback doneCb;
+
+    size_t jobCount;
+    size_t pendingJobCount;
+    std::condition_variable jobListDone;
+    std::mutex pendingJobCount_m;
+
+    size_t nextJobIndex = 0;
+};
+
+#endif

+ 9 - 9
lib/mlfpga/include/modules.hpp

@@ -5,19 +5,19 @@
 #include <sys/types.h>
 
 #define MODS_DEF \
-  MOD_DEF( dummyModule,  0xf218e0a2, "Dummy 4" ),  \
-  MOD_DEF( conv2D_2x11_Module, 0x9323eb24, "2D Konvolution 2x11" ),   \
-  MOD_DEF( neuronModule, 0x03b30000, "Neuron" ), \
-  MOD_DEF( dummyBigModule, 0x2cb31e7c, "Dummy 1024"), \
-  MOD_DEF( conv2D_5x5_Module, 0x4cd2e19c, "2D Konvolution 5x5")
+  MOD_DEF( dummyModule,  0xf218e0a2, "Dummy 4", 4, 4 ),  \
+  MOD_DEF( conv2D_2x11_Module, 0x9323eb24, "2D Konvolution 2x11", 224*224, 224*224 ),   \
+  MOD_DEF( neuronModule, 0x03b30000, "Neuron", 21, 1 ), \
+  MOD_DEF( dummyBigModule, 0x2cb31e7c, "Dummy 1024", 1024, 1024), \
+  MOD_DEF( conv2D_5x5_Module, 0x4cd2e19c, "2D Konvolution 5x5", 224*224, 224*224)
 
-#define MOD_DEF( identifier, id, name )  identifier
-enum { MODS_DEF };
+#define MOD_DEF( identifier, id, name, sendLen, recvLen )  identifier
+enum Module { MODS_DEF };
 #undef MOD_DEF
 
 extern const uint32_t moduleIds[];
 extern const char *moduleNames[];
-int main_udp();
-
+extern const size_t moduleSendPayloadLength[];
+extern const size_t moduleRecvPayloadLength[];
 
 #endif

+ 10 - 0
lib/mlfpga/include/rng.hpp

@@ -0,0 +1,10 @@
+#ifndef myRNG_H
+#define myRNG_H
+
+#include <sys/types.h>
+#include <random>
+#include <mutex>
+
+uint32_t getRandomNumber();
+
+#endif

+ 36 - 0
lib/mlfpga/include/worker.hpp

@@ -0,0 +1,36 @@
+#ifndef myWORKER_H
+#define myWORKER_H
+
+#include <vector>
+#include <condition_variable>
+#include <mutex>
+#include <future>
+
+#include "jobList.hpp"
+#include "commFPGA.hpp"
+
+class Worker {
+  public:
+    Worker(std::vector<std::reference_wrapper<commFPGA>> &fpgas);
+    ~Worker();
+
+    void start();
+    
+    int assignJobList(JobList &jobList);
+
+  private:
+    std::mutex currentJobList_m;
+    JobList *currentJobList = NULL;
+    std::vector<std::reference_wrapper<commFPGA>> *fpgaVector;
+
+    commFPGA* findAvailableFPGA();
+    
+
+    std::future<int> result;
+    int threadMain();
+
+    std::condition_variable hasJobList;
+    void waitForJobList();
+};
+
+#endif

+ 253 - 0
lib/mlfpga/src/commFPGA.cpp

@@ -0,0 +1,253 @@
+
+#include "../include/commFPGA.hpp"
+
+int resolvehelper(const char* hostname, int family, const char* service, sockaddr_storage* pAddr)
+{
+    int result;
+    addrinfo* result_list = NULL;
+    addrinfo hints = {};
+    hints.ai_family = family;
+    hints.ai_socktype = SOCK_DGRAM; // without this flag, getaddrinfo will return 3x the number of addresses (one for each socket type).
+    result = getaddrinfo(hostname, service, &hints, &result_list);
+    if (result == 0)
+    {
+        //ASSERT(result_list->ai_addrlen <= sizeof(sockaddr_in));
+        memcpy(pAddr, result_list->ai_addr, result_list->ai_addrlen);
+        freeaddrinfo(result_list);
+    }
+
+    return result;
+}
+
+
+// commFPGA class members
+
+void commFPGA::start() {
+  recvResult = std::async(std::launch::async, &commFPGA::recvUDP, this);
+}
+
+void commFPGA::recvUDP() {
+  while(running) {
+    int result = 0;
+
+    uint32_t buf[UDP_MTU/4];
+
+    uint slen = sizeof(addrDest);
+    result = recvfrom(sock, (uint8_t*)buf, UDP_MTU/4, 0, (sockaddr*)&addrDest, &slen);
+    if(result == -1)
+      return;
+
+    result /= 4;
+
+    for(int_least32_t i=0; i < result; i++) {
+      buf[i] = __builtin_bswap32(buf[i]);
+    }
+
+    parseRaw(buf, result);
+  }
+}
+
+int commFPGA::parseRaw(uint32_t *buf, size_t bufLen) {
+  jobLock.lock();
+
+  for(size_t i=0; i < bufLen; i++) {
+    
+    switch(recvState) {
+      case RecvState::checkPreamble:
+        if(buf[i] == PREAMBLE) {
+          recvState = RecvState::checkJobId;
+        }
+        break;
+
+      case RecvState::checkJobId: 
+        currentJob = jobList.find(buf[i]);
+        if(currentJob == jobList.end()) {
+          i -= 1;
+          recvState = RecvState::checkPreamble;
+        } else if(currentJob->second->getState() != JobState::sent) {
+          #ifdef DEBUG_JOB_RESP
+            printf("job %08X wasn't sent\n", buf[i]);
+          #endif
+          i -= 1;
+          recvState = RecvState::checkPreamble;
+        } else {
+          #ifdef DEBUG_JOB_RESP
+            printf("job %08X jobId not found\n", buf[i]);
+          #endif
+          recvState = RecvState::checkModuleId;
+        }
+        break;
+      
+      case RecvState::checkModuleId:
+        if(currentJob->second->getModuleId() == buf[i]) {
+          recvState = RecvState::writePayload;
+          recvPayloadIndex = 0;
+          currentJob->second->setState(JobState::sent);
+        } else {
+          i = i - 2 < 0 ? -1 : i - 2;
+          recvState = RecvState::checkPreamble;
+          #ifdef DEBUG_JOB_RESP
+            printf("job %08X wrong moduleId %08X\n", *currentJobResp->jobId, word);
+          #endif
+        }
+        break;
+      case RecvState::writePayload:
+        currentJob->second->setResponsePayload(recvPayloadIndex++, buf[i]);
+        if(recvPayloadIndex >= currentJob->second->getResponseBufferWordCount()) {
+          if(currentJob->second->checkCRC()) {
+            currentJob->second->setState(JobState::finished);
+            currentJob->second->isComplete();
+            jobList.erase(currentJob->second->getJobId());
+          } else {
+            currentJob->second->setState(JobState::sent);
+            #ifdef DEBUG_JOB_RESP
+              printf("job %08X wrong crc %08X, %4d, %4d\n", *currentJobResp->jobId, word, bufLen, i);
+              for(uint_least32_t k=0; k<currentJobResp->wordCount; k++) {
+                printf(" %4d %08X", k, currentJobResp->words[k]);
+              }
+              cout << endl;
+            #endif
+          }
+          recvState = RecvState::checkPreamble;
+        }
+        break;
+    }
+  }
+
+  jobLock.unlock();
+
+  return 0;
+}
+
+commFPGA::commFPGA(const char *host, uint _port, bool bindSelf) {
+  port = _port;
+  strcpy(ip, host);
+
+  int err = 0;
+
+  struct addrinfo hints, *res;
+  
+  //UDP host
+  memset(&hints, 0, sizeof hints);
+  hints.ai_family = AF_INET;  // use IPv4
+  hints.ai_socktype = SOCK_DGRAM;
+  hints.ai_flags = AI_PASSIVE;     // fill in my IP for me
+
+  getaddrinfo(NULL, std::to_string(port).c_str(), &hints, &res);
+  sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+  if(bindSelf)
+    err = bind(sock, res->ai_addr, res->ai_addrlen);
+  
+  if(err != 0) {
+    printf("%15s sock: %2d, err: %2d, port: %5d\n", ip, sock, err, port);
+    exit(1);
+  }
+
+  //set recieve timeout
+  struct timeval tv;
+  tv.tv_sec = 0;
+  tv.tv_usec = 100000;
+  setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof tv);
+
+  //set recv buffer size
+
+  int rcvbufsize = MAX_JOB_LEN * 4 * 2;
+  setsockopt(sock,SOL_SOCKET,SO_RCVBUF,(char*)&rcvbufsize,sizeof(rcvbufsize));
+  
+  //UDP client
+  resolvehelper(host, AF_INET, std::to_string(port).c_str(), &addrDest);
+
+  //send a packet to fpga to update its response port
+
+  sendRaw((uint8_t*)"0000", 4);
+
+}
+commFPGA::~commFPGA() {
+  //printf("%15s deleting job queue...\n", ip);
+  
+  
+}
+
+int commFPGA::sendRaw(uint8_t *buf, uint bufLen) {
+  int result = 0;
+
+  uint_least32_t byteIndex = 0;
+
+  while(byteIndex < bufLen) {
+    uint payloadLen = bufLen - byteIndex;
+    if(payloadLen > UDP_LEN/4*4)
+      payloadLen = UDP_LEN/4*4;
+
+    //printf("sending %d bytes at offset %d\n", payloadLen, byteIndex);
+
+    result = sendto(sock, &buf[byteIndex], payloadLen, 0, (sockaddr*)&addrDest, sizeof(addrDest));
+    if(result == -1) {
+      int err = errno;
+      std::cout << "error sending packet " << err << std::endl;
+      break;
+    } 
+    //usleep(50);
+    //printf("%d bytes sent\n", result);
+    //mark n * 4 bytes as sent
+    byteIndex += payloadLen;
+  }
+  return byteIndex;
+}
+
+int commFPGA::assignJob(Job *job) {
+  jobLock.lock();
+  if(jobList.size() >= JOB_COUNT)
+    return -1;
+  
+  jobList.insert(std::pair<uint32_t,Job*>(job->getJobId(), job));
+  
+  jobsActive++;
+  jobLock.unlock();
+  return 0;
+}
+
+int commFPGA::sendFromBuffer() {
+  uint_least32_t avail = (sendBufferWriteIndex - sendBufferReadIndex) % MAX_JOB_LEN;
+  
+  if(avail <= 0)
+    return -1;
+    
+  uint_least32_t readEnd;
+
+  if(avail*4 > UDP_LEN)
+    readEnd = sendBufferReadIndex + UDP_LEN / 4;
+  else
+    readEnd = sendBufferReadIndex + avail;
+
+  if(readEnd >= MAX_JOB_LEN)
+    readEnd = MAX_JOB_LEN;
+
+  //printf("avail %5d read %5d write %5d len %5d\n", avail, sendBufferReadIndex, sendBufferWriteIndex, (readEnd - sendBufferReadIndex));
+
+  int rc = sendRaw((uint8_t*)&sendBuffer[sendBufferReadIndex], (readEnd - sendBufferReadIndex) * 4);
+
+  
+  if(readEnd < MAX_JOB_LEN)
+    sendBufferReadIndex = readEnd;
+  else
+    sendBufferReadIndex = 0;
+
+  return rc;
+}
+
+int commFPGA::fillBuffer(JobData *jobData) {
+  uint_least32_t free = (sendBufferReadIndex - sendBufferWriteIndex) % MAX_JOB_LEN;
+  //printf("free %8d %8d %8d\n", free, sendBufferReadIndex, sendBufferWriteIndex);
+  if(free < jobData->getWordCount() && free != 0)
+    return -1;
+
+  for(uint_least32_t i=0; i<jobData->getWordCount(); i++) {
+    sendBuffer[(sendBufferWriteIndex + i) % MAX_JOB_LEN] = __builtin_bswap32(jobData->getWord(i));
+  }
+  sendBufferWriteIndex = (sendBufferWriteIndex + jobData->getWordCount()) % MAX_JOB_LEN;
+  return 0;
+}
+
+uint_least32_t commFPGA::jobCount() {
+  return jobsActive;
+}

+ 24 - 116
lib/mlfpga/src/connectionManager.cpp

@@ -1,133 +1,41 @@
-#include "../include/connectionManager.hpp"
+#include "connectionManager.hpp"
 
-
-const uint fpgaCount = 4;
-commFPGA fpgas[fpgaCount] = {
-  /*
-  {"localhost", 1234},
-  {"localhost", 1234},
-  {"localhost", 1234},
-  {"localhost", 1234},
-  {"localhost", 1234},
-  */
-  {"192.168.88.32", 1234},
-  {"192.168.88.33", 1234},
-  {"192.168.88.34", 1234},
-  {"192.168.88.35", 1234},
-  //{"192.168.88.36", 1234},
-
-  //{"192.168.88.37", 1234},
-  //{"192.168.88.38", 1234},
-  //{"192.168.88.39", 1234},
-  //{"192.168.88.40", 1234},
-  //{"192.168.88.41", 1234},
-};
-
-int findAvailableFPGA() {
-  uint_least32_t minCnt = JOB_COUNT-1;
-  uint_least32_t index = -1;
-  for(uint_least32_t i=0; i<fpgaCount; i++) {
-    uint_least32_t cnt = fpgas[i].jobCount();
-    if(cnt < minCnt) {
-      minCnt = cnt;
-      index = i;
-    }
-  }
-  return index;
+ConnectionManager::ConnectionManager() {
+  
+}
+ConnectionManager::~ConnectionManager() {
+  running = false;
+  sendResult.get();
 }
 
-int sendJob(jobData *job, uint recvPayloadLength) {
+void ConnectionManager::addFPGA(const char* ip, const uint port) {
+  commFPGA fpga(ip, port);
+  fpga.start();
 
-  //get FPGA with least pending responses
-  int id = findAvailableFPGA();
-  if(id < 0) {
-    return -1;
-  }
-  return fpgas[id].bufferJob(job, recvPayloadLength);
+  fpgas.push_back(fpga);
+}
 
+int ConnectionManager::sendJobListAsync(JobList &jobList) {
+  Worker worker(fpgas);
+  worker.assignJobList(jobList);
+  worker.start();
+  workers.push_back(worker);
+  return 0;
 }
 
+void ConnectionManager::start() {
+  sendResult = std::async(std::launch::async, &ConnectionManager::sendThread, this);
+}
 
-uint_least32_t currentSendFPGA = 0;
-bool running = true;
-void *sendThread(void *ref) {
+void ConnectionManager::sendThread() {
   while(running) {
     Clock::time_point start = Clock::now();
-    for(uint_least32_t i=0; i<fpgaCount; i++) {
-      fpgas[i].sendFromBuffer();
+    for(std::vector<std::reference_wrapper<commFPGA>>::iterator it=fpgas.begin(); it!=fpgas.end(); it++) {
+      it->get().sendFromBuffer();
     }
     //printf("%8d %8d\n", fpgas[0].sendBufferWriteIndex, fpgas[0].sendBufferReadIndex);
     uint us = std::chrono::duration_cast<microseconds>(Clock::now() - start).count();
     if(us < 50)
       usleep(50 - us);
   }
-  pthread_exit(NULL);
-}
-
-void *recvThread(void *ref) {
-  commFPGA *fpga = (commFPGA*)ref;
-  while(running) {
-    fpga->recvUDP();
-  }
-  pthread_exit(NULL);
-}
-
-void *respThread(void *ref) {
-  const uint_least32_t timeout = 1000*1000;
-  while(running) {
-    usleep(timeout/2);
-    for(uint_least32_t i=0; i<fpgaCount; i++) {
-      fpgas[i].deleteOldJobs(timeout);
-    }
-  }
-  pthread_exit(NULL);
-}
-
-void jobSuccess(commFPGA *fpga, jobResponse *res) {
-  fpga->successCounter++;
-  res->received = Clock::now();
-
-  microseconds us = std::chrono::duration_cast<microseconds>(res->received - res->created);
-
-  fpga->latency = us.count();
-}
-void jobFailed(commFPGA *fpga, jobResponse *res) {
-  fpga->failedCounter++;
-  //printf("%08X\n", *res->jobId);
-}
-
-
-pthread_t tSend, tRecv[fpgaCount], tResp;
-pthread_attr_t attr;
-
-void connection_init() {
-
-  pthread_attr_init(&attr);
-  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
-
-  for(uint i=0; i<fpgaCount; i++) {
-    fpgas[i].setSuccessCb(jobSuccess);
-    fpgas[i].setFailedCb(jobFailed);
-
-    pthread_create(&tRecv[i], &attr, recvThread, (void*)&fpgas[i]);
-    pthread_setname_np(tRecv[i], fpgas[i].ip);
-  }
-
-  pthread_create(&tSend, &attr, sendThread, 0);
-  pthread_setname_np(tSend, "tSend");
-  
-  pthread_create(&tResp, &attr, respThread, 0);
-  pthread_setname_np(tResp, "tResp");
-}
-
-void connection_close() {
-  running = false;
-  void *status;
-  pthread_join(tSend, &status);
-  for(uint i=0; i<fpgaCount; i++) {
-    pthread_join(tRecv[i], &status);
-  }
-  pthread_join(tResp, &status);
-  for(uint i=0; i<fpgaCount; i++)
-    delete &fpgas[i];
 }

+ 0 - 128
lib/mlfpga/src/dummy.cpp

@@ -1,128 +0,0 @@
-#include "../include/dummy.hpp"
-
-
-dummyJob::dummyJob(void) : jobData(4) {
-  *moduleId = moduleIds[dummyModule];
-  *crc = 0xAAAAAAAA;
-
-  for(uint_least32_t i=0; i<payloadLen; i++) {
-    payload[i] = i+1;
-  }
-}
-
-dummyBigJob::dummyBigJob(void) : jobData(1024) {
-  *moduleId = moduleIds[dummyBigModule];
-  *crc = 0xAAAAAAAA;
-
-  for(uint_least32_t i=0; i<payloadLen; i++) {
-    payload[i] = i+1;
-  }
-}
-
-void perfTest() {
-  connection_init();
-  usleep(5000);
-
-  Clock::time_point start = Clock::now();
-  uint_least64_t *cnt = (uint_least64_t*)malloc(sizeof(uint_least64_t) * fpgaCount);
-  assert(cnt != NULL);
-
-  dummyJob job;
-  uint_least32_t rejected = 0, successful = 0;
-
-  for(uint i=0; i<job.payloadLen; i++) {
-    job.payload[i] = i+1;
-  }
-  
-  while(1) {
-    
-    for(uint_least32_t i=0; i<MAX_JOB_LEN / job.wordCount / 2; i++) {
-      int ret = sendJob(&job, job.payloadLen);
-      if(ret < 0)
-        rejected++;
-      else
-        successful++;
-      
-    }
-    usleep(0);
-    Clock::time_point now = Clock::now();
-
-    if(std::chrono::duration_cast<milliseconds>(now - start).count() > 200) {
-      printf("rejected: %8d, sent: %8d\n", rejected, successful);
-      for(uint_least32_t k=0; k<fpgaCount; k++) {
-        printf(
-          "%15s  succ: %9ld fail: %9ld (%8.5f%%) %6.3f ms queue:%5d, %6ld jobs/s \n", 
-          fpgas[k].ip,
-          fpgas[k].successCounter, 
-          fpgas[k].failedCounter, 
-          100.0 * fpgas[k].failedCounter / (1+fpgas[k].successCounter + fpgas[k].failedCounter), 
-          fpgas[k].latency/1000,
-          fpgas[k].jobCount(),
-          std::chrono::duration_cast<microseconds>(now - start).count() * (fpgas[k].successCounter-cnt[k]) *5 / 1000000
-        );
-        cnt[k] = fpgas[k].successCounter;
-      }
-
-      printf("\n");
-      start = now;
-    }
-  }
-}
-void printJobResp(commFPGA *fpga, jobResponse *res) {
-  printf("%s:\n", fpga->ip);
-  uint_least16_t i = 0;
-
-  for(; i<res->wordCount; i++) {
-
-    if(i%8 == 0) {
-      printf("\n %4d:", i/8);
-    }
-    //if(res->words[i] == 0)
-    //  break;
-    printf(" %08X", res->words[i]);
-  }
-  printf("\n");
-  
-}
-
-void fillTest() {
-
-  connection_init();
-  usleep(50);
-
-  jobData job(4);
-  *job.jobId = 0x12345678;
-
-  for(uint i=0; i<job.wordCount-4; i++) {
-    job.payload[i] = i+1;
-  }
-  job.calcCRC();
-
-  for(uint i=0; i<fpgaCount; i++) {
-
-    jobResponse *response = new jobResponse(4);
-    *response->moduleId = *job.moduleId;
-    *response->jobId = *job.jobId;
-
-    uint32_t buf[UDP_LEN];
-    uint n = 1;
-
-    for(uint k=0; k<n; k++) {
-      for(uint_least32_t i=0; i<job.wordCount; i++) {
-        buf[i+k*job.wordCount] = __builtin_bswap32(job.words[i]);
-      }
-    }
-
-    //fpgas[i].setFailedCb(&printJobResp);
-    fpgas[i].setSuccessCb(&printJobResp);
-
-    fpgas[i].queueJob(response);
-
-    fpgas[i].sendRaw((uint8_t*)buf, n * job.wordCount * 4);
-
-  }
-  usleep(100000);
-
-  connection_close();
-
-}

+ 33 - 23
lib/mlfpga/src/job.cpp

@@ -1,38 +1,48 @@
 #include "../include/job.hpp"
 
-// jobData members
-
-
-jobData::jobData(uint payloadLength) {
-  wordCount = payloadLength + 4;
+WordBuffer::WordBuffer(size_t length) {
+  wordCount = length;
   bytes = (uint8_t*)malloc(wordCount * 4);
   assert(bytes != NULL);
+}
 
-  //set all the pointers
-  jobId = &words[1];
-  moduleId = &words[2];
-  payload = &words[3];
-  crc = &words[wordCount-1];
+WordBuffer::~WordBuffer() {
+  free(bytes);
+}
 
-  //insert constant values
-  *preamble = PREAMBLE;
+JobData::JobData(uint payloadLength) : WordBuffer(payloadLength + 4) {
+  setPreamble(PREAMBLE);
 }
-jobData::~jobData() {
-  free(bytes);
+
+Job::Job(Module mod) : JobData(moduleSendPayloadLength[mod]), recvBuf(moduleRecvPayloadLength[mod] + 1) {
+  setModuleId(moduleIds[mod]);
+  setJobId(getRandomNumber());
 }
 
-void jobData::calcCRC() {
+//sets CRC of sendBuf
+void Job::calcCRC() {
   uint32_t sum = 0;
-  for(uint_least32_t i=1; i<wordCount-1; i++) {
-    sum += words[i];
+  for(uint_least32_t i=1; i<getWordCount()-1; i++) {
+    sum += getWord(i);
   }
-  *crc = -sum;
+  setCRC(-sum);
 }
 
-bool jobData::checkCRC() {
-  uint32_t sum = 0;
-  for(uint_least32_t i=1; i<wordCount-1; i++) {
-    sum += words[i];
+//checks CRC of recvBuf
+bool Job::checkCRC() {
+  uint32_t sum = getPreamble() + getJobId() + getModuleId();
+  for(uint_least32_t i=1; i<recvBuf.getWordCount()-1; i++) {
+    sum += recvBuf.getWord(i);
   }
-  return *crc == -sum;
+  return recvBuf.getWord(recvBuf.getWordCount()-1) == -sum;
+}
+
+void Job::setDoneCallback(DoneCallback cb) {
+  doneCb = cb;
+}
+
+void Job::isComplete() {
+  received = Clock::now();
+  if(doneCb)
+    doneCb();
 }

+ 44 - 0
lib/mlfpga/src/jobList.cpp

@@ -0,0 +1,44 @@
+#include "jobList.hpp"
+
+JobList::JobList(Module mod, size_t numberOfJobs) {
+  jobCount = numberOfJobs;
+  pendingJobCount = numberOfJobs;
+  for(size_t i=0; i<numberOfJobs; i++) {
+    //jobs.emplace_back(mod);
+    Job job(mod);
+    job.setDoneCallback([this]{
+      finishJob();
+    });
+    jobs.push_back(job);
+  }
+}
+
+void JobList::waitAll() {
+  std::unique_lock<std::mutex> lk(pendingJobCount_m);
+  jobListDone.wait(lk, [this]{return pendingJobCount <= 0;});
+}
+
+void JobList::finishJob() {
+  std::lock_guard<std::mutex> lk(pendingJobCount_m);
+  pendingJobCount--;
+  jobListDone.notify_all();
+}
+
+Job& JobList::getJob(size_t i) {
+  return jobs.at(i);
+}
+
+Job* JobList::getNextJob() {
+  for(size_t i=0; i<jobCount; i++) {
+    size_t rotated_i = (i+nextJobIndex+1) % jobCount;
+    if(jobs.at(rotated_i).get().getState() == JobState::ready) {
+      nextJobIndex = rotated_i;
+      return &jobs.at(rotated_i).get();
+    }
+  }
+  return NULL;
+}
+
+void JobList::setDoneCallback(DoneCallback cb) {
+  doneCb = cb;
+}

+ 10 - 2
lib/mlfpga/src/modules.cpp

@@ -1,9 +1,17 @@
 #include "../include/modules.hpp"
 
-#define MOD_DEF( identifier, id, name )  id
+#define MOD_DEF( identifier, id, name, sendLen, recvLen )  id
 const uint32_t moduleIds[] = { MODS_DEF };
 #undef MOD_DEF
 
-#define MOD_DEF( identifier, id, name )  name
+#define MOD_DEF( identifier, id, name, sendLen, recvLen )  name
 const char *moduleNames[] = { MODS_DEF };
 #undef MOD_DEF
+
+#define MOD_DEF( identifier, id, name, sendLen, recvLen )  sendLen
+const size_t moduleSendPayloadLength[] = { MODS_DEF };
+#undef MOD_DEF
+
+#define MOD_DEF( identifier, id, name, sendLen, recvLen )  recvLen
+const size_t moduleRecvPayloadLength[] = { MODS_DEF };
+#undef MOD_DEF

+ 15 - 0
lib/mlfpga/src/rng.cpp

@@ -0,0 +1,15 @@
+#include "rng.hpp"
+
+namespace RNG {
+  std::random_device seed_generator;
+  unsigned seed = seed_generator();
+  std::uniform_int_distribution<uint32_t> distribution(0, UINT32_MAX);
+  std::mt19937 mersenne_generator(seed);
+
+  std::mutex lock;
+}
+
+uint32_t getRandomNumber() {
+  std::lock_guard<std::mutex> lk(RNG::lock);
+  return RNG::distribution(RNG::mersenne_generator);
+}

+ 0 - 383
lib/mlfpga/src/udp.cpp

@@ -1,383 +0,0 @@
-
-#include "../include/udp.hpp"
-
-int resolvehelper(const char* hostname, int family, const char* service, sockaddr_storage* pAddr)
-{
-    int result;
-    addrinfo* result_list = NULL;
-    addrinfo hints = {};
-    hints.ai_family = family;
-    hints.ai_socktype = SOCK_DGRAM; // without this flag, getaddrinfo will return 3x the number of addresses (one for each socket type).
-    result = getaddrinfo(hostname, service, &hints, &result_list);
-    if (result == 0)
-    {
-        //ASSERT(result_list->ai_addrlen <= sizeof(sockaddr_in));
-        memcpy(pAddr, result_list->ai_addr, result_list->ai_addrlen);
-        freeaddrinfo(result_list);
-    }
-
-    return result;
-}
-
-void _jobSuccess(commFPGA *fpga, jobResponse *res) {
-  //printf("job %08X successful\n", *res->jobId);
-  fpga->successCounter++;
-  res->received = Clock::now();
-
-  microseconds us = std::chrono::duration_cast<microseconds>(res->received - res->created);
-
-  fpga->latency = us.count();
-}
-void _jobFailed(commFPGA *fpga, jobResponse *res) {
-  //printf("job %08X failed %d\n", *res->jobId, res->state);
-  fpga->failedCounter++;
-}
-
-// commFPGA class members
-
-
-void commFPGA::recvUDP() {
-  int result = 0;
-
-  uint8_t buf[UDP_MTU];
-
-  uint slen = sizeof(addrDest);
-  result = recvfrom(sock, buf, UDP_MTU, 0, (sockaddr*)&addrDest, &slen);
-  if(result == -1)
-    return;
-
-  parseRaw(buf, result);
-  
-}
-
-int commFPGA::parseRaw(uint8_t *buf, uint bufLen) {
-  
-  jobLock.lock();
-  jobResponse *currentJobResp = jobQueue[currentJobIndex];
-
-  //check if job got deleted mid transmission
-
-  if(currentJobResp == NULL && (recvState != checkPreamble || recvState != checkModuleId)) {
-    recvState = checkPreamble;
-  }
-  /*if(currentJobResp != NULL) {
-    printf("%15s: %4d %08d\n", ip, bufLen, currentJobResp->recvWordCounter);
-  }*/
-
-  for(int_least32_t i=0; i < (int_least32_t)bufLen/4; i++) {
-    uint32_t word = __builtin_bswap32(((uint32_t*)buf)[i]);
-
-    //printf("%15s: %4d %08X\n", ip, i, word);
-    
-    switch(recvState) {
-      case checkPreamble:
-        if(word == PREAMBLE) {
-          recvState = checkJobId;
-        }
-        break;
-
-      case checkJobId: 
-        for(uint_least32_t k=0; k<JOB_COUNT; k++) {
-          uint_least32_t kWrapped = (currentJobIndex + k) % JOB_COUNT;
-          if(jobQueueJobIds[kWrapped] != word)
-            continue;
-
-          currentJobResp = jobQueue[kWrapped];
-          if(currentJobResp == NULL)
-            continue;
-
-          if(*currentJobResp->jobId == word && currentJobResp->state == waiting) {
-            recvState = checkModuleId;
-            currentJobIndex = kWrapped;
-            break;
-          }
-        }
-        if(recvState == checkModuleId) {
-          break;
-        } else {
-          i = i - 1 < 0 ? -1 : i - 1;
-          recvState = checkPreamble;
-          #ifdef DEBUG_JOB_RESP
-            printf("job %08X jobId not found\n", word);
-          #endif
-        }
-        break;
-      
-      case checkModuleId:
-        if(*currentJobResp->moduleId == word) {
-          recvState = writePayload;
-          currentJobResp->recvWordCounter = 3;
-          currentJobResp->state = receiving;
-        } else {
-          i = i - 2 < 0 ? -1 : i - 2;
-          recvState = checkPreamble;
-          #ifdef DEBUG_JOB_RESP
-            printf("job %08X wrong moduleId %08X\n", *currentJobResp->jobId, word);
-          #endif
-        }
-        break;
-      case writePayload:
-        currentJobResp->words[currentJobResp->recvWordCounter++] = word;
-        if(currentJobResp->recvWordCounter >= currentJobResp->wordCount - 1) {
-          recvState = checkCRC;
-        }
-        break;
-      case checkCRC:
-        *currentJobResp->crc = word;
-
-        if(currentJobResp->checkCRC()) {
-          //success
-          currentJobResp->state = finished;
-          jobSuccessCb(this, currentJobResp);
-          jobsActive--;
-          delete currentJobResp;
-          jobQueue[currentJobIndex] = NULL;
-          currentJobResp = NULL;
-
-          recvState = checkPreamble;
-
-        } else {
-          //crc fail
-          #ifdef DEBUG_JOB_RESP
-            printf("job %08X wrong crc %08X, %4d, %4d\n", *currentJobResp->jobId, word, bufLen, i);
-            for(uint_least32_t k=0; k<currentJobResp->wordCount; k++) {
-              printf(" %4d %08X", k, currentJobResp->words[k]);
-            }
-            cout << endl;
-          #endif
-          jobFailedCb(this, currentJobResp);
-          currentJobResp->state = waiting;
-          i = i - currentJobResp->wordCount + 1 < 0 ? -1 : i - currentJobResp->wordCount + 1;
-          recvState = checkPreamble;
-        }
-        
-        break;
-    }
-  }
-
-  jobLock.unlock();
-
-  return 0;
-}
-
-commFPGA::commFPGA(const char *host, uint _port, bool bindSelf) {
-  port = _port;
-  strcpy(ip, host);
-
-  jobQueue =   (jobResponse**)malloc(JOB_COUNT * sizeof(jobResponse*));
-  jobQueueJobIds = (uint32_t*)malloc(JOB_COUNT * sizeof(uint32_t));
-
-  memset(jobQueue,       0, JOB_COUNT * sizeof(jobQueue[0]));
-  memset(jobQueueJobIds, 0, JOB_COUNT * sizeof(jobQueueJobIds[0]));
-
-  jobSuccessCb = _jobSuccess;
-  jobFailedCb = _jobFailed;
-
-  int err = 0;
-
-  struct addrinfo hints, *res;
-  
-  //UDP host
-  memset(&hints, 0, sizeof hints);
-  hints.ai_family = AF_INET;  // use IPv4
-  hints.ai_socktype = SOCK_DGRAM;
-  hints.ai_flags = AI_PASSIVE;     // fill in my IP for me
-
-  getaddrinfo(NULL, std::to_string(port).c_str(), &hints, &res);
-  sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
-  if(bindSelf)
-    err = bind(sock, res->ai_addr, res->ai_addrlen);
-  
-  if(err != 0) {
-    printf("%15s sock: %2d, err: %2d, port: %5d\n", ip, sock, err, port);
-    exit(1);
-  }
-
-  //set recieve timeout
-  struct timeval tv;
-  tv.tv_sec = 0;
-  tv.tv_usec = 100000;
-  setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof tv);
-
-  //set recv buffer size
-
-  int rcvbufsize = MAX_JOB_LEN * 4 * 2;
-  setsockopt(sock,SOL_SOCKET,SO_RCVBUF,(char*)&rcvbufsize,sizeof(rcvbufsize));
-  
-  //UDP client
-  resolvehelper(host, AF_INET, std::to_string(port).c_str(), &addrDest);
-
-  //send a packet to fpga to update its response port
-
-  sendRaw((uint8_t*)"0000", 4);
-
-}
-commFPGA::~commFPGA() {
-  //printf("%15s deleting job queue...\n", ip);
-  
-  for(uint_least32_t i=0; i<JOB_COUNT; i++) {
-    if(jobQueue[i] != NULL) {
-      jobFailedCb(this, jobQueue[i]);
-      jobsActive--;
-      delete jobQueue[i];
-      jobQueue[i] = NULL;
-    }
-  }
-}
-
-int commFPGA::sendRaw(uint8_t *buf, uint bufLen) {
-  int result = 0;
-
-  uint_least32_t byteIndex = 0;
-
-  while(byteIndex < bufLen) {
-    uint payloadLen = bufLen - byteIndex;
-    if(payloadLen > UDP_LEN/4*4)
-      payloadLen = UDP_LEN/4*4;
-
-    //printf("sending %d bytes at offset %d\n", payloadLen, byteIndex);
-
-    result = sendto(sock, &buf[byteIndex], payloadLen, 0, (sockaddr*)&addrDest, sizeof(addrDest));
-    if(result == -1) {
-      int err = errno;
-      std::cout << "error sending packet " << err << std::endl;
-      break;
-    } 
-    //usleep(50);
-    //printf("%d bytes sent\n", result);
-    //mark n * 4 bytes as sent
-    byteIndex += payloadLen;
-  }
-  return byteIndex;
-}
-
-int commFPGA::queueJob(jobResponse *job) {
-  jobLock.lock();
-  if(jobCount() >= JOB_COUNT-1)
-    return -1;
-  
-  do {
-    jobQueueIndex = (jobQueueIndex + 1) % JOB_COUNT;
-  } while(jobQueue[jobQueueIndex] != NULL);
-  
- /*
-  jobQueueIndex = (jobQueueIndex + 1) % JOB_COUNT;
-  if(jobQueue[jobQueueIndex] != NULL) {
-    #ifdef DEBUG_JOB_RESP
-      printf("job %08X jobQueue full, %6d active\n", *jobQueue[jobQueueIndex]->jobId, jobsActive);
-    #endif
-    jobFailedCb(this, jobQueue[jobQueueIndex]);
-    jobsActive--;
-    delete jobQueue[jobQueueIndex];
-  }
-  */
-  jobQueue[jobQueueIndex] = job;
-  jobQueueJobIds[jobQueueIndex] = *job->jobId;
-  jobsActive++;
-  jobLock.unlock();
-  return 0;
-}
-
-int commFPGA::sendFromBuffer() {
-  uint_least32_t avail = (sendBufferWriteIndex - sendBufferReadIndex) % MAX_JOB_LEN;
-  
-  if(avail <= 0)
-    return -1;
-    
-  uint_least32_t readEnd;
-
-  if(avail*4 > UDP_LEN)
-    readEnd = sendBufferReadIndex + UDP_LEN / 4;
-  else
-    readEnd = sendBufferReadIndex + avail;
-
-  if(readEnd >= MAX_JOB_LEN)
-    readEnd = MAX_JOB_LEN;
-
-  //printf("avail %5d read %5d write %5d len %5d\n", avail, sendBufferReadIndex, sendBufferWriteIndex, (readEnd - sendBufferReadIndex));
-
-  int rc = sendRaw((uint8_t*)&sendBuffer[sendBufferReadIndex], (readEnd - sendBufferReadIndex) * 4);
-
-  
-  if(readEnd < MAX_JOB_LEN)
-    sendBufferReadIndex = readEnd;
-  else
-    sendBufferReadIndex = 0;
-
-  return rc;
-}
-
-int commFPGA::bufferJob(jobData *job, uint recvPayloadLength) {
-  uint_least32_t free = (sendBufferReadIndex - sendBufferWriteIndex) % MAX_JOB_LEN;
-  //printf("free %8d %8d %8d\n", free, sendBufferReadIndex, sendBufferWriteIndex);
-  if(free < job->wordCount && free != 0)
-    return -1;
-  
-  //*job->jobId = distribution(mersenne_generator);
-  *job->jobId = jobIdCounter++;
-  job->calcCRC();
-
-  jobResponse *response = new jobResponse(recvPayloadLength);
-  *response->moduleId = *job->moduleId;
-  *response->jobId = *job->jobId;
-  response->tag = job->tag;
-
-  if(queueJob(response) == -1) {
-    delete response;
-    return -1;
-  }
-
-  for(uint_least32_t i=0; i<job->wordCount; i++) {
-    sendBuffer[(sendBufferWriteIndex + i) % MAX_JOB_LEN] = __builtin_bswap32(job->words[i]);
-  }
-  sendBufferWriteIndex = (sendBufferWriteIndex + job->wordCount) % MAX_JOB_LEN;
-  return job->wordCount;
-}
-
-int commFPGA::deleteOldJobs(int64_t micros) {
-  
-  Clock::time_point now = Clock::now();
-  uint_least32_t count = 0;
-
-  jobLock.lock();
-
-  for(uint_least32_t i=0; i<JOB_COUNT; i++) {
-    jobResponse *res = jobQueue[i];
-    if(res == NULL)
-      continue;
-    microseconds us = std::chrono::duration_cast<microseconds>(now - res->created);
-    if(us.count() > micros || us.count() < 0) {
-      #ifdef DEBUG_JOB_RESP
-        //printf("job %08X timed out.\n", *res->jobId);
-      #endif
-      jobFailedCb(this, res);
-      jobsActive--;
-      delete res;
-      jobQueue[i] = NULL;
-      count++;
-    }
-  }
-  jobLock.unlock();
-  #ifdef DEBUG_JOB_RESP
-    if(count > 0) {
-      printf("%4d jobs timed out\n", count);
-    }
-  #endif
-  return count;
-}
-
-uint_least32_t commFPGA::jobCount() {
-  return jobsActive;
-}
-
-void commFPGA::setSuccessCb(void (*cb) (commFPGA*, jobResponse*)) {
-  jobSuccessCb = cb;
-}
-void commFPGA::setFailedCb(void (*cb) (commFPGA*, jobResponse*)) {
-  jobFailedCb = cb;
-}
-
-std::random_device commFPGA::seed_generator;
-unsigned commFPGA::seed = seed_generator();
-std::uniform_int_distribution<uint32_t> commFPGA::distribution(0, UINT32_MAX);
-std::mt19937 commFPGA::mersenne_generator(commFPGA::seed);

+ 54 - 0
lib/mlfpga/src/worker.cpp

@@ -0,0 +1,54 @@
+#include "worker.hpp"
+
+Worker::Worker(std::vector<std::reference_wrapper<commFPGA>> &fpgas) {
+  fpgaVector = &fpgas;
+}
+Worker::~Worker() {
+  hasJobList.notify_all();
+}
+
+void Worker::start() {
+  result = std::async(std::launch::async, &Worker::threadMain, this);
+}
+
+int Worker::assignJobList(JobList &jobList) {
+  std::lock_guard<std::mutex> lk(currentJobList_m);
+  if(currentJobList != NULL)
+    return -1;
+  
+  currentJobList = &jobList;
+  hasJobList.notify_all();
+
+  return 0;
+}
+
+int Worker::threadMain() {
+  if(currentJobList == NULL)
+    return -1;
+
+  while(currentJobList->getPendingJobCount() > 0) {
+    Job *job = currentJobList->getNextJob();
+    if(job == NULL) {
+      break;
+    }
+    commFPGA *fpga = findAvailableFPGA();
+    if(fpga == NULL) {
+      continue;
+    }
+    fpga->assignJob(job);
+  }
+  return 0;
+}
+
+commFPGA* Worker::findAvailableFPGA() {
+  uint_least32_t minCnt = JOB_COUNT-1;
+  commFPGA *fpga = NULL;
+  for(std::vector<std::reference_wrapper<commFPGA>>::iterator it=fpgaVector->begin(); it!=fpgaVector->end(); it++) {
+    uint_least32_t cnt = it->get().jobCount();
+    if(cnt < minCnt) {
+      minCnt = cnt;
+      fpga = &(it->get());
+    }
+  }
+  return fpga;
+}

BIN
model.png


+ 0 - 56
src/asyncDummy.cpp

@@ -1,56 +0,0 @@
-#ifndef ASYNC_DUMMY_FPGA
-#define ASYNC_DUMMY_FPGA
-
-#include "asyncDummy.hpp"
-
-
-using namespace std::chrono;
-
-std::string fetchDataFromDB(std::string recvdData)
-{
-	// Make sure that function takes 5 seconds to complete
-	std::this_thread::sleep_for(seconds(5));
- 
-	//Do stuff like creating DB Connection and fetching Data
-	return "DB_" + recvdData;
-}
- 
-std::string fetchDataFromFile(std::string recvdData)
-{
-	// Make sure that function takes 5 seconds to complete
-	std::this_thread::sleep_for(seconds(5));
- 
-	//Do stuff like fetching Data File
-	return "File_" + recvdData;
-}
- 
-int main()
-{
-	// Get Start Time
-	system_clock::time_point start = system_clock::now();
- 
-	std::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data");
- 
-	//Fetch Data from File
-	std::string fileData = fetchDataFromFile("Data");
- 
-	//Fetch Data from DB
-	// Will block till data is available in future<std::string> object.
-	std::string dbData = resultFromDB.get();
- 
-	// Get End Time
-	auto end = system_clock::now();
- 
-	auto diff = duration_cast < std::chrono::seconds > (end - start).count();
-	std::cout << "Total Time Taken = " << diff << " Seconds" << std::endl;
- 
-	//Combine The Data
-	std::string data = dbData + " :: " + fileData;
- 
-	//Printing the combined Data
-	std::cout << "Data = " << data << std::endl;
- 
-	return 0;
-}
-
-#endif

+ 119 - 154
src/conv2D.cpp

@@ -1,178 +1,143 @@
-#ifndef CONV2D_FPGA
-#define CONV2D_FPGA
 
 #include "conv2D.hpp"
 
-volatile int instances = 0;
-volatile int inParallel = 0;
-std::mutex printMu;
-
-void Conv2DOp::delayThread(DoneCallback done) {
-  printMu.lock();
-  //printf("parallel: %2d instance: %2d '%s' %dms sleep\n", ++inParallel, instance, name().c_str(), delay);
-  printMu.unlock();
-  std::this_thread::sleep_for(milliseconds(delay));
-  printMu.lock();
-  //printf("parallel: %2d instance: %2d '%s' done\n", --inParallel, instance, name().c_str());
-  printMu.unlock();
-  done();
-}
-
-int width = 224;
-int kernel = 5;
-int border = kernel/2;
-int sizeWithBorder = width + 2*border;
-int pixels = sizeWithBorder * sizeWithBorder;
-int tagCounter = 0;
-
-void Conv2DOp::fpgaCall(const Tensor *input, const Tensor *kernel, Tensor *output, int sample, int channel, int filter) {
-    auto input_tensor = input->tensor<int32, 4>();
-    auto kernel_tensor = kernel->tensor<int32, 4>();
-    auto output_tensor = output->tensor<int32, 4>();
-    
-    jobData job(pixels);
-    *job.moduleId = moduleIds[conv2D_5x5_Module];
-    job.tag = tagCounter++;
-
-    for(int x=0; x<outputSize; x++) {
-      for(int y=0; y<outputSize; y++) {
-        job.payload[x*outputSize + y] = input_tensor(sample, x, y, channel);
-      }
-    }
+namespace tf_lib {
 
-    sendJob(&job, pixels);
+  volatile int instances = 0;
+  volatile int inParallel = 0;
+  std::mutex printMu;
 
-    printMu.lock();
-    /*
-    printf(" sample: %3d, channel: %3d, filter: %3d\n", sample, channel, filter);
-    
-    for(int x=0; x<outputSize; x++) {
-      for(int y=0; y<outputSize; y++) {
-        printf("%c", input_tensor(sample, x, y, channel) > 0 ? '#' : ' ');
-      }
-      std::cout << std::endl;
-    }
-    std::cout << std::endl;
-    */
-    printMu.unlock();
-}
+  Conv2DOp::Conv2DOp(OpKernelConstruction* context) : AsyncOpKernel(context) {
+    instance = instances++;
+    OP_REQUIRES_OK(context, context->GetAttr("delay", &delay));
 
-Conv2DOp::Conv2DOp(OpKernelConstruction* context) : AsyncOpKernel(context) {
-  instance = instances++;
-  OP_REQUIRES_OK(context, context->GetAttr("delay", &delay));
+  };
+
+  void Conv2DOp::ComputeAsync(OpKernelContext* context, DoneCallback done) {
+    // Input tensor is of the following dimensions:
+    // [ batch, in_rows, in_cols, in_depth ]
+    const Tensor& input = context->input(0);
 
-};
+    ///const int32 *p = input.flat<int32>().data();
 
-void Conv2DOp::ComputeAsync(OpKernelContext* context, DoneCallback done) {
-  // Input tensor is of the following dimensions:
-  // [ batch, in_rows, in_cols, in_depth ]
-  const Tensor& input = context->input(0);
+    // Input filter is of the following dimensions:
+    // [ filter_rows, filter_cols, in_depth, out_depth]
+    const Tensor& kernel = context->input(1);
 
-  ///const int32 *p = input.flat<int32>().data();
+    TensorShape kernel_shape = kernel.shape();
+    TensorShape input_shape = input.shape();
 
-  // Input filter is of the following dimensions:
-  // [ filter_rows, filter_cols, in_depth, out_depth]
-  const Tensor& kernel = context->input(1);
 
-  TensorShape kernel_shape = kernel.shape();
-  TensorShape input_shape = input.shape();
+    int batchSize = input_shape.dim_size(0);
+    int channels = input_shape.dim_size(3);
+    int filters = kernel_shape.dim_size(3);
 
+    TensorShape output_shape;
+    const int32 dims[] = {batchSize, outputSize, outputSize, channels * filters};
+    TensorShapeUtils::MakeShape(dims, 4, &output_shape);
 
-  int batchSize = input_shape.dim_size(0);
-  int channels = input_shape.dim_size(3);
-  int filters = kernel_shape.dim_size(3);
+    output_shape.set_dim(0, batchSize);
+    output_shape.set_dim(1, outputSize);
+    output_shape.set_dim(2, outputSize);
+    output_shape.set_dim(3, channels * filters);
 
-  TensorShape output_shape;
-  const int32 dims[] = {batchSize, outputSize, outputSize, channels * filters};
-  TensorShapeUtils::MakeShape(dims, 4, &output_shape);
+    //printMu.lock();
+    //std::cout << output_shape.DebugString() << std::endl;
+    //printMu.unlock();
 
-  output_shape.set_dim(0, batchSize);
-  output_shape.set_dim(1, outputSize);
-  output_shape.set_dim(2, outputSize);
-  output_shape.set_dim(3, channels * filters);
+    // Output tensor is of the following dimensions:
+    // [ in_batch, out_rows, out_cols, out_depth ]
+    Tensor* output = nullptr;
+    OP_REQUIRES_OK(context, context->allocate_output(0, output_shape, &output));
 
-  //printMu.lock();
-  //std::cout << output_shape.DebugString() << std::endl;
-  //printMu.unlock();
 
-  // Output tensor is of the following dimensions:
-  // [ in_batch, out_rows, out_cols, out_depth ]
-  Tensor* output = nullptr;
-  OP_REQUIRES_OK(context, context->allocate_output(0, output_shape, &output));
+    auto input_tensor = input.tensor<int32, 4>();
+    auto output_tensor = output->tensor<int32, 4>();
 
-  for(int sample=0; sample<batchSize; sample++) {
-    for(int channel=0; channel<channels; channel++) {
-      for(int filter=0; filter<filters; filter++) {
-        std::async(std::launch::async, &Conv2DOp::fpgaCall, this, &input, &kernel, output, sample, channel, filter);
-        
+    JobList jobs(Module::dummyModule, batchSize * channels * filters);
+
+    for(int sample=0; sample<batchSize; sample++) {
+      for(int channel=0; channel<channels; channel++) {
+        for(int filter=0; filter<filters; filter++) {
+          Job &job = jobs.getJob(sample * channels * filters + channel * filters + filter);
+          for(int x=0; x<outputSize; x++) {
+            for(int y=0; y<outputSize; y++) {
+              job.setPayload(x*outputSize + y, input_tensor(sample, x, y, channel));
+            }
+          }
+        }
       }
     }
-  }
-  std::async(std::launch::async, &Conv2DOp::delayThread, this, done);
-}
-
-
-static Status MatMulGradHelper(FunctionDef* g, const string& opname,
-                               const string& attr_adj_x,
-                               const string& attr_adj_y, const string& x0,
-                               bool ax0, const string& x1, bool ax1,
-                               const string& y0, bool ay0, const string& y1,
-                               bool ay1) {
-  // The final outputs are "dx" and "dy". If we're broadcasting compute
-  // intermediate nodes for now.
-  std::vector<FDH::Node> nodes = {
-      {{("dx")},
-       opname,
-       {x0, x1},
-       {{"T", "$T"}, {attr_adj_x, ax0}, {attr_adj_y, ax1}}},
-      {{("dy")},
-       opname,
-       {y0, y1},
-       {{"T", "$T"}, {attr_adj_x, ay0}, {attr_adj_y, ay1}}},
-  };
+    jobs.setDoneCallback([output_tensor, &jobs, done]{
+      output_tensor(0) = jobs.getJob(0).getResponsePayload(0);
+      done();
+    });
+
+    connectionManager.sendJobListAsync(jobs);
 
-  *g = FDH::Define(
-      // Arg defs
-      {"x: T", "y: T", "dz: T"},
-      // Ret val defs
-      {"dx: T", "dy: T"},
-      // Attr defs
-      {{"T: {half, float, double}"}},
-      // Nodes
-      nodes);
-  return Status::OK();
-}
-
-Status MatMulGrad(const AttrSlice& attrs, FunctionDef* g) {
-  const string opname = "MyMatMul";
-  const string attr_adj_x = "transpose_a";
-  const string attr_adj_y = "transpose_b";
-  DataType T;
-  TF_RETURN_IF_ERROR(GetNodeAttr(attrs, "T", &T));
-  if (T == DT_COMPLEX64 || T == DT_COMPLEX128) {
-    return errors::Unimplemented(
-        "MatMul gradient for complex is not supported yet.");
-  }
-  bool ta;
-  bool tb;
-  TF_RETURN_IF_ERROR(GetNodeAttr(attrs, attr_adj_x, &ta));
-  TF_RETURN_IF_ERROR(GetNodeAttr(attrs, attr_adj_y, &tb));
-
-  if (!ta && !tb) {
-    return MatMulGradHelper(g, opname, attr_adj_x, attr_adj_y, "dz", false, "y",
-                            true, "x", true, "dz", false);
-  }
-  if (!ta && tb) {
-    return MatMulGradHelper(g, opname, attr_adj_x, attr_adj_y, "dz", false, "y",
-                            false, "dz", true, "x", false);
   }
-  if (ta && !tb) {
-    return MatMulGradHelper(g, opname, attr_adj_x, attr_adj_y, "y", false, "dz",
-                            true, "x", false, "dz", false);
+
+
+  static Status MatMulGradHelper(FunctionDef* g, const string& opname,
+                                const string& attr_adj_x,
+                                const string& attr_adj_y, const string& x0,
+                                bool ax0, const string& x1, bool ax1,
+                                const string& y0, bool ay0, const string& y1,
+                                bool ay1) {
+    // The final outputs are "dx" and "dy". If we're broadcasting compute
+    // intermediate nodes for now.
+    std::vector<FDH::Node> nodes = {
+        {{("dx")},
+        opname,
+        {x0, x1},
+        {{"T", "$T"}, {attr_adj_x, ax0}, {attr_adj_y, ax1}}},
+        {{("dy")},
+        opname,
+        {y0, y1},
+        {{"T", "$T"}, {attr_adj_x, ay0}, {attr_adj_y, ay1}}},
+    };
+
+    *g = FDH::Define(
+        // Arg defs
+        {"x: T", "y: T", "dz: T"},
+        // Ret val defs
+        {"dx: T", "dy: T"},
+        // Attr defs
+        {{"T: {half, float, double}"}},
+        // Nodes
+        nodes);
+    return Status::OK();
   }
-  CHECK(ta && tb);
-  return MatMulGradHelper(g, opname, attr_adj_x, attr_adj_y, "y", true, "dz",
-                          true, "dz", true, "x", true);
-}
 
-#endif
+  Status MatMulGrad(const AttrSlice& attrs, FunctionDef* g) {
+    const string opname = "MyMatMul";
+    const string attr_adj_x = "transpose_a";
+    const string attr_adj_y = "transpose_b";
+    DataType T;
+    TF_RETURN_IF_ERROR(GetNodeAttr(attrs, "T", &T));
+    if (T == DT_COMPLEX64 || T == DT_COMPLEX128) {
+      return errors::Unimplemented(
+          "MatMul gradient for complex is not supported yet.");
+    }
+    bool ta;
+    bool tb;
+    TF_RETURN_IF_ERROR(GetNodeAttr(attrs, attr_adj_x, &ta));
+    TF_RETURN_IF_ERROR(GetNodeAttr(attrs, attr_adj_y, &tb));
+
+    if (!ta && !tb) {
+      return MatMulGradHelper(g, opname, attr_adj_x, attr_adj_y, "dz", false, "y",
+                              true, "x", true, "dz", false);
+    }
+    if (!ta && tb) {
+      return MatMulGradHelper(g, opname, attr_adj_x, attr_adj_y, "dz", false, "y",
+                              false, "dz", true, "x", false);
+    }
+    if (ta && !tb) {
+      return MatMulGradHelper(g, opname, attr_adj_x, attr_adj_y, "y", false, "dz",
+                              true, "x", false, "dz", false);
+    }
+    CHECK(ta && tb);
+    return MatMulGradHelper(g, opname, attr_adj_x, attr_adj_y, "y", true, "dz",
+                            true, "dz", true, "x", true);
+  }
+}

+ 40 - 0
src/dummyOp.cpp

@@ -0,0 +1,40 @@
+
+#include "dummyOp.hpp"
+
+namespace tf_lib {
+  DummyOp::DummyOp(OpKernelConstruction* context) : AsyncOpKernel(context) {
+
+  };
+
+  void DummyOp::ComputeAsync(OpKernelContext* context, DoneCallback done) {
+    // Input tensor is of the following dimensions:
+    // [ batch, in_rows, in_cols, in_depth ]
+    const Tensor& input = context->input(0);
+
+    ///const int32 *p = input.flat<int32>().data();
+
+    TensorShape input_shape = input.shape();
+
+    TensorShape output_shape;
+    const int32 dims[] = {dataLength};
+    TensorShapeUtils::MakeShape(dims, 1, &output_shape);
+
+    output_shape.set_dim(0, dims[0]);
+
+    Tensor* output = nullptr;
+    OP_REQUIRES_OK(context, context->allocate_output(0, output_shape, &output));
+
+    auto input_tensor = input.tensor<int32, 1>();
+    auto output_tensor = output->tensor<int32, 1>();
+
+    JobList jobs(Module::dummyModule, 1);
+    jobs.getJob(0).setPayload(0, input_tensor(0));
+
+    jobs.setDoneCallback([output_tensor, &jobs, done]{
+      output_tensor(0) = jobs.getJob(0).getResponsePayload(0);
+      done();
+    });
+
+    connectionManager.sendJobListAsync(jobs);
+  }
+}

+ 137 - 113
src/entrypoint.cpp

@@ -1,127 +1,151 @@
-#ifndef ENTRY_FPGA
-#define ENTRY_FPGA
 
 #include "entrypoint.hpp"
 
-
-using namespace tensorflow;
-using namespace tensorflow::shape_inference;
-
-
-Status DimensionsFromShape(ShapeHandle shape, TensorFormat format,
-                           DimensionHandle* batch_dim,
-                           gtl::MutableArraySlice<DimensionHandle> spatial_dims,
-                           DimensionHandle* filter_dim,
-                           InferenceContext* context) {
-  const int32 rank = GetTensorDimsFromSpatialDims(spatial_dims.size(), format);
-  // Batch.
-  *batch_dim = context->Dim(shape, GetTensorBatchDimIndex(rank, format));
-  // Spatial.
-  for (int spatial_dim_index = 0; spatial_dim_index < spatial_dims.size();
-       ++spatial_dim_index) {
-    spatial_dims[spatial_dim_index] = context->Dim(
-        shape, GetTensorSpatialDimIndex(rank, format, spatial_dim_index));
-  }
-  // Channel.
-  *filter_dim = context->Dim(shape, GetTensorFeatureDimIndex(rank, format));
-  if (format == FORMAT_NCHW_VECT_C) {
-    TF_RETURN_IF_ERROR(context->Multiply(
-        *filter_dim,
-        context->Dim(shape, GetTensorInnerFeatureDimIndex(rank, format)),
-        filter_dim));
-  }
-  return Status::OK();
-}
-
-Status ShapeFromDimensions(DimensionHandle batch_dim,
-                           gtl::ArraySlice<DimensionHandle> spatial_dims,
-                           DimensionHandle filter_dim, TensorFormat format,
-                           InferenceContext* context, ShapeHandle* shape) {
-  const int32 rank = GetTensorDimsFromSpatialDims(spatial_dims.size(), format);
-  std::vector<DimensionHandle> out_dims(rank);
-
-  // Batch.
-  out_dims[tensorflow::GetTensorBatchDimIndex(rank, format)] = batch_dim;
-  // Spatial.
-  for (int spatial_dim_index = 0; spatial_dim_index < spatial_dims.size();
-       ++spatial_dim_index) {
-    out_dims[tensorflow::GetTensorSpatialDimIndex(
-        rank, format, spatial_dim_index)] = spatial_dims[spatial_dim_index];
-  }
-  // Channel.
-  if (format == tensorflow::FORMAT_NCHW_VECT_C) {
-    // When format is NCHW_VECT_C, factor the feature map count
-    // into the outer feature count and the inner feature count (=4).
-    TF_RETURN_IF_ERROR(context->Divide(
-        filter_dim, 4, /*evenly_divisible=*/true,
-        &out_dims[tensorflow::GetTensorFeatureDimIndex(rank, format)]));
-    out_dims[GetTensorInnerFeatureDimIndex(rank, format)] = context->MakeDim(4);
-  } else {
-    out_dims[tensorflow::GetTensorFeatureDimIndex(rank, format)] = filter_dim;
+namespace tf_lib {
+
+  using namespace tensorflow;
+  using namespace tensorflow::shape_inference;
+
+
+  Status DimensionsFromShape(ShapeHandle shape, TensorFormat format,
+                            DimensionHandle* batch_dim,
+                            gtl::MutableArraySlice<DimensionHandle> spatial_dims,
+                            DimensionHandle* filter_dim,
+                            InferenceContext* context) {
+    const int32 rank = GetTensorDimsFromSpatialDims(spatial_dims.size(), format);
+    // Batch.
+    *batch_dim = context->Dim(shape, GetTensorBatchDimIndex(rank, format));
+    // Spatial.
+    for (int spatial_dim_index = 0; spatial_dim_index < spatial_dims.size();
+        ++spatial_dim_index) {
+      spatial_dims[spatial_dim_index] = context->Dim(
+          shape, GetTensorSpatialDimIndex(rank, format, spatial_dim_index));
+    }
+    // Channel.
+    *filter_dim = context->Dim(shape, GetTensorFeatureDimIndex(rank, format));
+    if (format == FORMAT_NCHW_VECT_C) {
+      TF_RETURN_IF_ERROR(context->Multiply(
+          *filter_dim,
+          context->Dim(shape, GetTensorInnerFeatureDimIndex(rank, format)),
+          filter_dim));
+    }
+    return Status::OK();
   }
 
-  *shape = context->MakeShape(out_dims);
-  return tensorflow::Status::OK();
-}
+  Status ShapeFromDimensions(DimensionHandle batch_dim,
+                            gtl::ArraySlice<DimensionHandle> spatial_dims,
+                            DimensionHandle filter_dim, TensorFormat format,
+                            InferenceContext* context, ShapeHandle* shape) {
+    const int32 rank = GetTensorDimsFromSpatialDims(spatial_dims.size(), format);
+    std::vector<DimensionHandle> out_dims(rank);
+
+    // Batch.
+    out_dims[tensorflow::GetTensorBatchDimIndex(rank, format)] = batch_dim;
+    // Spatial.
+    for (int spatial_dim_index = 0; spatial_dim_index < spatial_dims.size();
+        ++spatial_dim_index) {
+      out_dims[tensorflow::GetTensorSpatialDimIndex(
+          rank, format, spatial_dim_index)] = spatial_dims[spatial_dim_index];
+    }
+    // Channel.
+    if (format == tensorflow::FORMAT_NCHW_VECT_C) {
+      // When format is NCHW_VECT_C, factor the feature map count
+      // into the outer feature count and the inner feature count (=4).
+      TF_RETURN_IF_ERROR(context->Divide(
+          filter_dim, 4, /*evenly_divisible=*/true,
+          &out_dims[tensorflow::GetTensorFeatureDimIndex(rank, format)]));
+      out_dims[GetTensorInnerFeatureDimIndex(rank, format)] = context->MakeDim(4);
+    } else {
+      out_dims[tensorflow::GetTensorFeatureDimIndex(rank, format)] = filter_dim;
+    }
+
+    *shape = context->MakeShape(out_dims);
+    return tensorflow::Status::OK();
+  }
 
-REGISTER_OP("MyConv2D")
+  REGISTER_OP("MyConv2D")
+      .Input("input: int32")
+      .Input("filter: int32")
+      .Attr("delay: int")
+      .Output("output: int32")
+      .SetShapeFn([](InferenceContext* c) {
+        //INPUT: NHWC
+        //KERNEL: HWIO
+        //OUTPUT: NHWC
+
+        constexpr int num_spatial_dims = 2;
+        TensorFormat data_format;
+        FormatFromString("NHWC", &data_format);
+        FilterTensorFormat filter_format;
+        FilterFormatFromString("HWIO", &filter_format);
+
+        ShapeHandle input_shape, filter_shape, output_shape;
+        TF_RETURN_IF_ERROR(c->WithRank(c->input(0), 4, &input_shape));
+        TF_RETURN_IF_ERROR(c->WithRank(c->input(1), 4, &filter_shape));
+
+        DimensionHandle batch_size_dim;
+        DimensionHandle input_depth_dim;
+        gtl::InlinedVector<DimensionHandle, 2> input_spatial_dims(2);
+        TF_RETURN_IF_ERROR(DimensionsFromShape(
+          input_shape, data_format, &batch_size_dim,
+          absl::MakeSpan(input_spatial_dims), &input_depth_dim, c));
+
+        DimensionHandle output_depth_dim = c->Dim(
+          filter_shape, GetFilterDimIndex<num_spatial_dims>(filter_format, 'O'));
+        DimensionHandle filter_rows_dim = c->Dim(
+          filter_shape, GetFilterDimIndex<num_spatial_dims>(filter_format, 'H'));
+        DimensionHandle filter_cols_dim = c->Dim(
+          filter_shape, GetFilterDimIndex<num_spatial_dims>(filter_format, 'W'));
+        DimensionHandle filter_input_depth_dim = c->Dim(
+          filter_shape, GetFilterDimIndex<num_spatial_dims>(filter_format, 'I'));
+
+        DimensionHandle output_rows, output_cols, output_channels;
+        c->Add(input_spatial_dims[0], 0, &output_rows);
+        c->Add(input_spatial_dims[1], 0, &output_cols);
+
+        c->Multiply(filter_input_depth_dim, output_depth_dim, &output_channels);
+
+        std::vector<DimensionHandle> out_dims(4);
+        out_dims[0] = batch_size_dim;
+        out_dims[1] = output_rows;
+        out_dims[2] = output_cols;
+        out_dims[3] = output_channels;
+
+        output_shape = c->MakeShape(out_dims);
+        c->set_output(0, output_shape);
+        return Status::OK();
+      });
+
+  REGISTER_KERNEL_BUILDER(Name("MyConv2D").Device(DEVICE_CPU), Conv2DOp);
+
+  REGISTER_OP("MyDummy")
     .Input("input: int32")
-    .Input("filter: int32")
-    .Attr("delay: int")
     .Output("output: int32")
-    .SetShapeFn([](InferenceContext* c) {
-      //INPUT: NHWC
-      //KERNEL: HWIO
-      //OUTPUT: NHWC
-
-      constexpr int num_spatial_dims = 2;
-      TensorFormat data_format;
-      FormatFromString("NHWC", &data_format);
-      FilterTensorFormat filter_format;
-      FilterFormatFromString("HWIO", &filter_format);
-
-      ShapeHandle input_shape, filter_shape, output_shape;
-      TF_RETURN_IF_ERROR(c->WithRank(c->input(0), 4, &input_shape));
-      TF_RETURN_IF_ERROR(c->WithRank(c->input(1), 4, &filter_shape));
-
-      DimensionHandle batch_size_dim;
-      DimensionHandle input_depth_dim;
-      gtl::InlinedVector<DimensionHandle, 2> input_spatial_dims(2);
-      TF_RETURN_IF_ERROR(DimensionsFromShape(
-        input_shape, data_format, &batch_size_dim,
-        absl::MakeSpan(input_spatial_dims), &input_depth_dim, c));
-
-      DimensionHandle output_depth_dim = c->Dim(
-        filter_shape, GetFilterDimIndex<num_spatial_dims>(filter_format, 'O'));
-      DimensionHandle filter_rows_dim = c->Dim(
-        filter_shape, GetFilterDimIndex<num_spatial_dims>(filter_format, 'H'));
-      DimensionHandle filter_cols_dim = c->Dim(
-        filter_shape, GetFilterDimIndex<num_spatial_dims>(filter_format, 'W'));
-      DimensionHandle filter_input_depth_dim = c->Dim(
-        filter_shape, GetFilterDimIndex<num_spatial_dims>(filter_format, 'I'));
-
-      DimensionHandle output_rows, output_cols, output_channels;
-      c->Add(input_spatial_dims[0], 0, &output_rows);
-      c->Add(input_spatial_dims[1], 0, &output_cols);
-
-      c->Multiply(filter_input_depth_dim, output_depth_dim, &output_channels);
-
-      std::vector<DimensionHandle> out_dims(4);
-      out_dims[0] = batch_size_dim;
-      out_dims[1] = output_rows;
-      out_dims[2] = output_cols;
-      out_dims[3] = output_channels;
-
-      output_shape = c->MakeShape(out_dims);
-      c->set_output(0, output_shape);
+    .SetShapeFn([](::tensorflow::shape_inference::InferenceContext* c) {
+      c->set_output(0, c->input(0));
       return Status::OK();
     });
+  ;
 
-REGISTER_KERNEL_BUILDER(Name("MyConv2D").Device(DEVICE_CPU), Conv2DOp);
+  REGISTER_KERNEL_BUILDER(Name("MyDummy").Device(DEVICE_CPU), DummyOp);
 
-void __attribute__ ((constructor)) init(void) {
+  ConnectionManager connectionManager;
+
+  void __attribute__ ((constructor)) init(void) {
     printf("starting fpga server\n");
-    connection_init();
-}
+      
+    connectionManager.addFPGA("localhost", 1234);
+    connectionManager.addFPGA("localhost", 1234);
+    connectionManager.addFPGA("localhost", 1234);
+    connectionManager.addFPGA("localhost", 1234);
+    connectionManager.addFPGA("localhost", 1234);
+
+    /*
+    connectionManager.addFPGA("192.168.88.32", 1234);
+    connectionManager.addFPGA("192.168.88.33", 1234);
+    connectionManager.addFPGA("192.168.88.34", 1234);
+    connectionManager.addFPGA("192.168.88.35", 1234);
+    */
+  }
 
-#endif
+}

+ 20 - 0
tests/op_test.py

@@ -0,0 +1,20 @@
+import tensorflow as tf
+import numpy as np
+from IPython import embed
+
+import sys
+import os
+sys.path.append('..')
+from hostLib.layers.conv2D import Conv2D as Conv2DFPGA
+from hostLib import load_op
+
+
+class FPGALibTest(tf.test.TestCase):
+  def testDummyOp(self):
+    input = [0,1,2,3]
+    with self.session():
+      result = load_op.op_lib.MyDummy(input=input)
+      self.assertAllEqual(result, input)
+
+if __name__ == "__main__":
+  tf.test.main()