Sfoglia il codice sorgente

Demo instance parallelism

subDesTagesMitExtraKaese 5 anni fa
parent
commit
c768881f4c
8 ha cambiato i file con 104 aggiunte e 33 eliminazioni
  1. BIN
      build/op_lib.so
  2. 13 12
      examples/train.py
  3. 3 3
      layers/conv2D.py
  4. 56 0
      src/asyncDummy.cpp
  5. 6 0
      src/asyncDummy.hpp
  6. 15 17
      src/conv2D.cpp
  7. 10 1
      src/conv2D.hpp
  8. 1 0
      src/entrypoint.hpp

BIN
build/op_lib.so


+ 13 - 12
examples/train.py

@@ -27,10 +27,10 @@ x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
 x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
 input_shape = (img_rows, img_cols, 1)
 
-x_train = x_train.astype('float32')
-x_test = x_test.astype('float32')
-x_train /= 255
-x_test /= 255
+x_train = x_train.astype('int32')
+x_test = x_test.astype('int32')
+#x_train /= 255
+#x_test /= 255
 print('x_train shape:', x_train.shape)
 print(x_train.shape[0], 'train samples')
 print(x_test.shape[0], 'test samples')
@@ -39,13 +39,13 @@ print(x_test.shape[0], 'test samples')
 y_train = to_categorical(y_train, num_classes)
 y_test = to_categorical(y_test, num_classes)
 
-a = layers.Input(shape=(28, 28, 1))
-b = Conv2DFPGA(2)(a)
-c = Conv2DFPGA(2)(a)
-d = Conv2DFPGA(2)(a)
-e = Conv2DFPGA(2)(a)
+a = layers.Input(dtype=tf.int32, shape=(28, 28, 1))
+b = Conv2DFPGA(32)(a)
+c = Conv2DFPGA(32)(a)
+d = Conv2DFPGA(2)(b)
+e = Conv2DFPGA(2)(c)
 
-x = layers.Add()([b,c,d,e])
+x = layers.Add()([d,e])
 y = layers.Flatten()(x)
 z = layers.Dense(num_classes, activation='softmax')(y)
 
@@ -63,7 +63,9 @@ model.add(Dense(num_classes, activation='softmax'))
 model.compile(loss=keras.losses.categorical_crossentropy,
               optimizer=keras.optimizers.Adadelta(),
               metrics=['accuracy'])
-              
+
+plot_model(model, to_file='model.png', expand_nested=True, show_shapes=True)
+
 model.fit(x_train, y_train,
           batch_size=batch_size,
           epochs=epochs,
@@ -74,4 +76,3 @@ score = model.evaluate(x_test, y_test, verbose=0)
 print('Test loss:', score[0])
 print('Test accuracy:', score[1])
 
-plot_model(model, to_file='model.png', expand_nested=True, show_shapes=True)

+ 3 - 3
layers/conv2D.py

@@ -37,7 +37,7 @@ class Conv2D(layers.Layer):
 
     #out = tf.Tensor(tf.int32, shape=inputs.shape)
 
-    ch_inputs = tf.unstack(tf.dtypes.cast(inputs, dtype=tf.int32), axis=3)
+    ch_inputs = tf.unstack(inputs, axis=3)#tf.dtypes.cast(inputs, dtype=tf.int32), axis=3)
     ch_kernel = tf.unstack(tf.dtypes.cast(self.kernel, dtype=tf.int32), axis=2)
 
     ch_outputs = [None] * len(ch_inputs)
@@ -47,8 +47,8 @@ class Conv2D(layers.Layer):
       ch_outputs[ch] = [None] * self.filters
       kernel_2d = tf.unstack(ch_kernel[ch], axis=2)
       for f in range(len(kernel_2d)):
-        ch_outputs[ch][f] = load_op.op_lib.MyConv2D(input=ch_inputs[ch], filter=kernel_2d[f])
+        ch_outputs[ch][f] = load_op.op_lib.MyConv2D(input=ch_inputs[ch], filter=kernel_2d[f], delay=(f+1)*100)
       
       ch_outputs[ch] = tf.stack(ch_outputs[ch], axis=2)
     outs = tf.stack(ch_outputs, axis=2)
-    return tf.dtypes.cast(outs, dtype=tf.float32)
+    return outs #tf.dtypes.cast(outs, dtype=tf.float32)

+ 56 - 0
src/asyncDummy.cpp

@@ -0,0 +1,56 @@
+#ifndef ASYNC_DUMMY_FPGA
+#define ASYNC_DUMMY_FPGA
+
+#include "asyncDummy.hpp"
+
+
+using namespace std::chrono;
+
+std::string fetchDataFromDB(std::string recvdData)
+{
+	// Make sure that function takes 5 seconds to complete
+	std::this_thread::sleep_for(seconds(5));
+ 
+	//Do stuff like creating DB Connection and fetching Data
+	return "DB_" + recvdData;
+}
+ 
+std::string fetchDataFromFile(std::string recvdData)
+{
+	// Make sure that function takes 5 seconds to complete
+	std::this_thread::sleep_for(seconds(5));
+ 
+	//Do stuff like fetching Data File
+	return "File_" + recvdData;
+}
+ 
+int main()
+{
+	// Get Start Time
+	system_clock::time_point start = system_clock::now();
+ 
+	std::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data");
+ 
+	//Fetch Data from File
+	std::string fileData = fetchDataFromFile("Data");
+ 
+	//Fetch Data from DB
+	// Will block till data is available in future<std::string> object.
+	std::string dbData = resultFromDB.get();
+ 
+	// Get End Time
+	auto end = system_clock::now();
+ 
+	auto diff = duration_cast < std::chrono::seconds > (end - start).count();
+	std::cout << "Total Time Taken = " << diff << " Seconds" << std::endl;
+ 
+	//Combine The Data
+	std::string data = dbData + " :: " + fileData;
+ 
+	//Printing the combined Data
+	std::cout << "Data = " << data << std::endl;
+ 
+	return 0;
+}
+
+#endif

+ 6 - 0
src/asyncDummy.hpp

@@ -0,0 +1,6 @@
+
+#include <iostream>
+#include <string>
+#include <chrono>
+#include <thread>
+#include <future>

+ 15 - 17
src/conv2D.cpp

@@ -4,19 +4,24 @@
 #include "conv2D.hpp"
 
 volatile int instances = 0;
-pthread_t tDelay;
-pthread_attr_t attr;
-typedef void (*fptr)();
-void *delayThread(void *ref) {
-  sleep(1);
-  fptr done = reinterpret_cast<fptr>(ref);
-  printf("cb!\n");
+volatile int inParallel = 0;
+std::mutex mu;
+
+void delayThread(int ins, const char *name, int delay, std::function<void ()> done) {
+  mu.lock();
+  printf("parallel: %2d instance: %2d '%s' %dms sleep\n", ++inParallel, ins, name, delay);
+  mu.unlock();
+  std::this_thread::sleep_for(milliseconds(delay));
+  mu.lock();
+  printf("parallel: %2d instance: %2d '%s' done\n", --inParallel, ins, name);
+  mu.unlock();
   done();
-  return 0;
 }
 
 Conv2DOp::Conv2DOp(OpKernelConstruction* context) : AsyncOpKernel(context) {
   instance = instances++;
+  OP_REQUIRES_OK(context, context->GetAttr("delay", &delay));
+
 };
 
 void Conv2DOp::ComputeAsync(OpKernelContext* context, DoneCallback done) {
@@ -29,14 +34,6 @@ void Conv2DOp::ComputeAsync(OpKernelContext* context, DoneCallback done) {
   const Tensor& filter = context->input(1);
   TensorShape filterShape = filter.shape();
 
-
-  printf("\ninstance: %d shape: ", instance);
-  for(int i=0; i<filterShape.dims(); i++) {
-    printf(" %lld", filter.shape().dim_size(i));
-  }
-  printf("\n");
-  sleep(1);
-
   TensorShape out_shape = input.shape();
 
   // Output tensor is of the following dimensions:
@@ -44,8 +41,9 @@ void Conv2DOp::ComputeAsync(OpKernelContext* context, DoneCallback done) {
   Tensor* output = nullptr;
   OP_REQUIRES_OK(context, context->allocate_output(0, out_shape, &output));
 
-  pthread_create(&tDelay, &attr, delayThread, static_cast<void*>(&done));
+  context->cancellation_manager();
 
+  std::async(std::launch::async, delayThread, instance, name().c_str(), delay, done);
   
 }
 

+ 10 - 1
src/conv2D.hpp

@@ -2,9 +2,15 @@
 #include "tensorflow/core/framework/function.h"
 #include <stdlib.h>
 
-#include <pthread.h>
+#include <iostream>
+#include <string>
+#include <chrono>
+#include <thread>
+#include <future>
+#include <mutex>
 
 using namespace tensorflow;
+using namespace std::chrono;
 typedef FunctionDefHelper FDH;
 
 
@@ -15,6 +21,9 @@ class Conv2DOp : public AsyncOpKernel {
     void ComputeAsync(OpKernelContext* context, DoneCallback done) override;
 
   private:
+
     int instance = -1;
+    int delay = 1000;
+
   //TF_DISALLOW_COPY_AND_ASSIGN(Conv2DOp);
 };

+ 1 - 0
src/entrypoint.hpp

@@ -12,6 +12,7 @@
 REGISTER_OP("MyConv2D")
     .Input("input: int32")
     .Input("filter: int32")
+    .Attr("delay: int")
     .Output("output: int32")
     .SetShapeFn([](::tensorflow::shape_inference::InferenceContext* c) {
       c->set_output(0, c->input(0));