Communicating between Python and .NET Core with Boost Interprocess

To see if I could, I put together a cross communication library for .Net Core and Python applications using Boost.Interprocess, Boost.Python, and Boost.Signals2. The goal was simple, expose the same interface for cross communication to C# and Python. The approach taken was to use the condition example and edit it to expose to the different languages.

Shared Definitions

First I need to create the objects to make the interface. There are four files making up these objects:

  • shm_remove.hpp – just a lifecycle object to clear the shared buffer when it is destructed
  • TraceQueue.hpp – The shared memory object
  • SharedMemoryConsumer.hpp – The subscriber to the shared memory data
  • SharedMemoryProducer.hpp – The publisher for the shared memory data


template<size_t BufferSize>
class SharedMemoryConsumer
{
public:
explicit SharedMemoryConsumer(const std::string& memory_name)
: name_length_(memory_name.size()),
shm_(boost::interprocess::open_or_create,memory_name.c_str(),boost::interprocess::read_write)
{
shm_.truncate(sizeof(TraceQueue<BufferSize>));
region_ = boost::interprocess::mapped_region(shm_,boost::interprocess::read_write);
//Get the address of the mapped region
void * addr = region_.get_address();
//Obtain a pointer to the shared structure
data_ = static_cast<TraceQueue<BufferSize>*>(addr);
if (data_->name_length != name_length_)
{
data_ = new (addr)TraceQueue<BufferSize>;
data_->name_length = name_length_;
}
}
~SharedMemoryConsumer()
{
keep_listening_ = false;
background_listener_.join();
}
void start()
{
keep_listening_ = true;
background_listener_ = std::thread([=](){
do {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(data_->mutex);
if (!data_->message_in) {
data_->cond_empty.wait(lock);
}
sig_(data_->message);
//Notify the other process that the buffer is empty
data_->message_in = false;
data_->cond_full.notify_one();
} while (!data_->closing_stream && keep_listening_);
});
}
void set_callable(const std::function<void(const InterprocessMessage<BufferSize>& memory_block)>& callback)
{
conn_ = sig_.connect([=](const InterprocessMessage<BufferSize>& message)
{
callback(message);
});
}
void wait()
{
background_listener_.join();
}
private:
int name_length_;
boost::signals2::signal<void(const InterprocessMessage<BufferSize>&)> sig_;
boost::signals2::connection conn_;
boost::interprocess::shared_memory_object shm_;
boost::interprocess::mapped_region region_;
std::thread background_listener_;
bool keep_listening_;
TraceQueue<BufferSize> * data_;
};


template <size_t BufferSize>
class SharedMemoryProducer
{
public:
explicit SharedMemoryProducer(const std::string& memory_name)
: name_length_(memory_name.size())
,remove_guard_(memory_name)
, shm_(boost::interprocess::open_or_create,memory_name.c_str(),boost::interprocess::read_write)
{
shm_.truncate(sizeof(TraceQueue<BufferSize>));
region_ = boost::interprocess::mapped_region(shm_,boost::interprocess::read_write);
//Get the address of the mapped region
void * addr = region_.get_address();
//Construct the shared structure in memory
data_ = static_cast<TraceQueue<BufferSize>*>(addr);
if (data_->name_length != name_length_)
{
data_ = new (addr)TraceQueue<BufferSize>;
data_->name_length = name_length_;
}
}
~SharedMemoryProducer()
{
data_->name_length = 0;
data_->closing_stream = true;
}
void write_data(const std::string& buffer)
{
if (buffer.size() > BufferSize)
{
throw std::invalid_argument("Message size is larger than buffer size");
}
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(data_->mutex);
if(data_->message_in){
data_->cond_full.wait(lock);
}
memcpy(data_->message.message,buffer.data(),buffer.size());
data_->message.message_length = buffer.size();
//Notify to the other process that there is a message
data_->cond_empty.notify_one();
//Mark message buffer as full
data_->message_in = true;
}
private:
int name_length_;
shm_remove remove_guard_;
boost::interprocess::shared_memory_object shm_;
boost::interprocess::mapped_region region_;
TraceQueue<BufferSize> * data_;
};


#include <boost/interprocess/shared_memory_object.hpp>
struct shm_remove
{
public:
explicit shm_remove(const std::string& shm_name)
: shm_name_(shm_name)
{
// boost::interprocess::shared_memory_object::remove(shm_name.c_str());
}
~shm_remove(){ boost::interprocess::shared_memory_object::remove(shm_name_.c_str()); }
private:
std::string shm_name_;
};

view raw

shm_remove.hpp

hosted with ❤ by GitHub


template <size_t MESSAGE_SIZE>
struct InterprocessMessage
{
//Items to fill
char message[MESSAGE_SIZE];
//Message size
size_t message_length;
std::string to_string() const
{
return std::string(message,message_length);
}
};
template <size_t MESSAGE_SIZE>
struct TraceQueue
{
TraceQueue()
: message_in(false)
{}
//Mutex to protect access to the queue
boost::interprocess::interprocess_mutex mutex;
//Condition to wait when the queue is empty
boost::interprocess::interprocess_condition cond_empty;
//Condition to wait when the queue is full
boost::interprocess::interprocess_condition cond_full;
//Message
InterprocessMessage<MESSAGE_SIZE> message;
//Is there any message
bool message_in;
//Notify closing shop
bool closing_stream = false;
//Check to see if memory has been created
int name_length;
};

view raw

TraceQueue.hpp

hosted with ❤ by GitHub

These objects comprise the core interface of the shared memory provider. Now, the memory providers need to be exposed to multiple languages. There are different ways to do this and I decided to do it by hand. I should point out SWIG is my usual approach to this task, however, in this instance it seemed easy enough to do it by hand.

Boost Python

To expose the python code, I needed to create a few classes to expose the interface definitions to Boost.Python. The two classes are:

  • PythonSharedMemoryConsumer.hpp – The python interface for the SharedMemoryConsumer
  • PythonModule.cpp  – The file that exposes the module to python


BOOST_PYTHON_MODULE(ScryUnlimitedCommunicator)
{
Py_Initialize();
class_<SeralizedPythonSharedMemoryConsumer, boost::noncopyable>("SeralizedSharedMemoryConsumer",init<std::string>())
.def("set_callable", &SeralizedPythonSharedMemoryConsumer::set_callable)
.def("start",&SeralizedPythonSharedMemoryConsumer::start)
.def("wait",&SeralizedPythonSharedMemoryConsumer::wait);
class_<ImagePythonSharedMemoryConsumer, boost::noncopyable>("ImageSharedMemoryConsumer",init<std::string>())
.def("set_callable", &ImagePythonSharedMemoryConsumer::set_callable)
.def("start",&ImagePythonSharedMemoryConsumer::start)
.def("wait",&ImagePythonSharedMemoryConsumer::wait);
class_<SeralizedSharedMemoryProducer, boost::noncopyable>("SeralizedSharedMemoryProducer",init<std::string>())
.def("write_data", &SeralizedSharedMemoryProducer::write_data);
class_<ImageSharedMemoryProducer, boost::noncopyable>("ImageSharedMemoryProducer",init<std::string>())
.def("write_data", &ImageSharedMemoryProducer::write_data);
}


template <size_t BufferSize>
class PythonSharedMemoryConsumer
{
public:
explicit PythonSharedMemoryConsumer(const std::string& memory_name)
: sm_(memory_name) {}
void set_callable(boost::python::object callback)
{
if (!PyCallable_Check(callback.ptr()))
{
PyErr_SetString(PyExc_TypeError, "Callback must be a callable object");
boost::python::throw_error_already_set();
}
sm_.set_callable([=](const InterprocessMessage<BufferSize>& buffer)
{
try
{
callback(boost::python::object(
boost::python::handle<>(
PyBytes_FromStringAndSize(buffer.message,buffer.message_length))));
//callback(buffer.to_string());
}
catch (boost::python::error_already_set)
{
PyErr_Print();
}
});
}
void start()
{
sm_.start();
}
void wait()
{
sm_.wait();
}
private:
ScryUnlimited::SharedMemory::SharedMemoryConsumer<BufferSize> sm_;
};

These two classes combine to expose the files to python and can be used in a python script by just importing the shared library.

.NET Core

With the python portion complete, I needed to expose the shared memory objects to CSharp. This is easy enough to do by hand if you expose the classes to be used by PInvoke. To accomplish this, I only needed three files:

  • NetCoreSharedMemoryProducer.hpp – The .NET Core version of the publisher
  • NetCoreSharedMemoryConsumer.hpp – The .NET Core version of the consumer
  • NetCoreModule.cpp – The source file exposing the interfaces for PInvoke

 


extern "C" ImageNetCoreSharedMemoryConsumer* GetImageConsumer(char * name)
{
return new ImageNetCoreSharedMemoryConsumer(std::string(name));
}
extern "C" void DeleteImageNetCoreSharedMemoryConsumer(ImageNetCoreSharedMemoryConsumer* consumer)
{
delete consumer;
}
extern "C" void ImageConsumerSetCallback(ImageNetCoreSharedMemoryConsumer* consumer,
void(*callback)(const unsigned char *,size_t))
{
consumer->set_callback(callback);
}
extern "C" void StartImageConsumer(ImageNetCoreSharedMemoryConsumer* consumer)
{
consumer->start();
}
extern "C" void ImageWait(ImageNetCoreSharedMemoryConsumer* consumer)
{
consumer->wait();
}
extern "C" SeralizedNetCoreSharedMemoryConsumer * GetSerializedConsumer(const char * name)
{
return new SeralizedNetCoreSharedMemoryConsumer(name);
}
extern "C" void DeleteSeralizedNetCoreSharedMemoryConsumer(SeralizedNetCoreSharedMemoryConsumer* consumer)
{
delete consumer;
}
extern "C" void SeralizedConsumerSetCallback(SeralizedNetCoreSharedMemoryConsumer* consumer,
void(*callback)(const unsigned char *,size_t))
{
consumer->set_callback(callback);
}
extern "C" void StartSeralizedConsumer(SeralizedNetCoreSharedMemoryConsumer* consumer)
{
consumer->start();
}
extern "C" void SeralizedWait(SeralizedNetCoreSharedMemoryConsumer* consumer)
{
consumer->wait();
}
extern "C" ImageNetCoreSharedMemoryProducer * GetImageProducer(const char * name)
{
return new ImageNetCoreSharedMemoryProducer(name);
}
extern "C" void DeleteImageProducer(ImageNetCoreSharedMemoryProducer * producer)
{
delete producer;
}
extern "C" void SendImageData(ImageNetCoreSharedMemoryProducer * producer,void * data_ptr,size_t size)
{
producer->write_data(std::string(static_cast<char*>(data_ptr),size));
}
extern "C" SeralizedNetCoreSharedMemoryProducer * GetSeralizedProducer(const char * name)
{
return new SeralizedNetCoreSharedMemoryProducer(name);
}
extern "C" void DeleteSeralizedProducer(SeralizedNetCoreSharedMemoryProducer * producer)
{
delete producer;
}
extern "C" void SendSeralizedData(SeralizedNetCoreSharedMemoryProducer * producer,void * data_ptr,size_t size)
{
producer->write_data(std::string(static_cast<char*>(data_ptr),size));
}


template <size_t BufferSize>
class NetCoreSharedMemoryConsumer
{
public:
explicit NetCoreSharedMemoryConsumer(const std::string& name)
: consumer_(name){}
void set_callback(void (*callback_method)(const unsigned char *,size_t))
{
consumer_.set_callable([=](const InterprocessMessage<BufferSize>& message)
{
callback_method(reinterpret_cast<const unsigned char *>(message.message),message.message_length);
});
}
void start()
{
consumer_.start();
}
void wait()
{
consumer_.wait();
}
private:
SharedMemoryConsumer<BufferSize> consumer_;
};


template <size_t BufferSize>
class NetCoreSharedMemoryProducer
{
public:
explicit NetCoreSharedMemoryProducer(const std::string& name)
: producer_(name){}
void write_data(const std::string& data)
{
producer_.write_data(data);
}
private:
SharedMemoryProducer<BufferSize> producer_;
};


extern "C" ImageNetCoreSharedMemoryConsumer* GetImageConsumer(char * name)
{
return new ImageNetCoreSharedMemoryConsumer(std::string(name));
}
extern "C" void DeleteImageNetCoreSharedMemoryConsumer(ImageNetCoreSharedMemoryConsumer* consumer)
{
delete consumer;
}
extern "C" void ImageConsumerSetCallback(ImageNetCoreSharedMemoryConsumer* consumer,
void(*callback)(const unsigned char *,size_t))
{
consumer->set_callback(callback);
}
extern "C" void StartImageConsumer(ImageNetCoreSharedMemoryConsumer* consumer)
{
consumer->start();
}
extern "C" void ImageWait(ImageNetCoreSharedMemoryConsumer* consumer)
{
consumer->wait();
}
extern "C" SeralizedNetCoreSharedMemoryConsumer * GetSerializedConsumer(const char * name)
{
return new SeralizedNetCoreSharedMemoryConsumer(name);
}
extern "C" void DeleteSeralizedNetCoreSharedMemoryConsumer(SeralizedNetCoreSharedMemoryConsumer* consumer)
{
delete consumer;
}
extern "C" void SeralizedConsumerSetCallback(SeralizedNetCoreSharedMemoryConsumer* consumer,
void(*callback)(const unsigned char *,size_t))
{
consumer->set_callback(callback);
}
extern "C" void StartSeralizedConsumer(SeralizedNetCoreSharedMemoryConsumer* consumer)
{
consumer->start();
}
extern "C" void SeralizedWait(SeralizedNetCoreSharedMemoryConsumer* consumer)
{
consumer->wait();
}
extern "C" ImageNetCoreSharedMemoryProducer * GetImageProducer(const char * name)
{
return new ImageNetCoreSharedMemoryProducer(name);
}
extern "C" void DeleteImageProducer(ImageNetCoreSharedMemoryProducer * producer)
{
delete producer;
}
extern "C" void SendImageData(ImageNetCoreSharedMemoryProducer * producer,void * data_ptr,size_t size)
{
producer->write_data(std::string(static_cast<char*>(data_ptr),size));
}
extern "C" SeralizedNetCoreSharedMemoryProducer * GetSeralizedProducer(const char * name)
{
return new SeralizedNetCoreSharedMemoryProducer(name);
}
extern "C" void DeleteSeralizedProducer(SeralizedNetCoreSharedMemoryProducer * producer)
{
delete producer;
}
extern "C" void SendSeralizedData(SeralizedNetCoreSharedMemoryProducer * producer,void * data_ptr,size_t size)
{
producer->write_data(std::string(static_cast<char*>(data_ptr),size));
}


template <size_t BufferSize>
class NetCoreSharedMemoryConsumer
{
public:
explicit NetCoreSharedMemoryConsumer(const std::string& name)
: consumer_(name){}
void set_callback(void (*callback_method)(const unsigned char *,size_t))
{
consumer_.set_callable([=](const InterprocessMessage<BufferSize>& message)
{
callback_method(reinterpret_cast<const unsigned char *>(message.message),message.message_length);
});
}
void start()
{
consumer_.start();
}
void wait()
{
consumer_.wait();
}
private:
SharedMemoryConsumer<BufferSize> consumer_;
};


template <size_t BufferSize>
class NetCoreSharedMemoryProducer
{
public:
explicit NetCoreSharedMemoryProducer(const std::string& name)
: producer_(name){}
void write_data(const std::string& data)
{
producer_.write_data(data);
}
private:
SharedMemoryProducer<BufferSize> producer_;
};

Now we need to call that code from C# using PInvoke Interop


class ImageSharedMemoryConsumer
: ISharedMemoryConsumer
{
[DllImport("ScryUnlimitedCommunicator.so")]
private static extern IntPtr GetImageConsumer(string name);
[DllImport("ScryUnlimitedCommunicator.so")]
private static extern void DeleteImageNetCoreSharedMemoryConsumer(IntPtr instance);
[DllImport("ScryUnlimitedCommunicator.so")]
private static extern void ImageConsumerSetCallback(IntPtr instance, IntPtr callback);
[DllImport("ScryUnlimitedCommunicator.so")]
private static extern void StartImageConsumer(IntPtr instance);
[DllImport("ScryUnlimitedCommunicator.so")]
private static extern void SeralizedWait(IntPtr instance);
private readonly IntPtr _nativePointer;
private IntPtr _callbackPtr;
private GCHandle _callbackHandle;
public ImageSharedMemoryConsumer(string name)
{
_nativePointer = GetImageConsumer(name);
}
~ImageSharedMemoryConsumer()
{
DeleteImageNetCoreSharedMemoryConsumer(_nativePointer);
}
public long Size()
{
return 3000000;
}
public void SetCallback(HandleMessage handler)
{
void Callback(IntPtr x, int y)
{
byte[] managedArray = new byte[y];
Marshal.Copy(x, managedArray, 0, y);
handler(managedArray);
}
_callbackPtr = Marshal.GetFunctionPointerForDelegate(new InternalHandleMessage(Callback));
_callbackHandle = GCHandle.Alloc((Action<IntPtr, int>) Callback);
ImageConsumerSetCallback(_nativePointer,_callbackPtr);
}
public void Start()
{
StartImageConsumer(_nativePointer);
}
public void Wait()
{
SeralizedWait(_nativePointer);
}
}


class ImageSharedMemoryProducer
: ISharedMemoryProducer
{
[DllImport("ScryUnlimitedCommunicator.so")]
private static extern IntPtr GetImageProducer(string name);
[DllImport("ScryUnlimitedCommunicator.so")]
private static extern void DeleteImageProducer(IntPtr handle);
[DllImport("ScryUnlimitedCommunicator.so")]
private static extern void SendImageData(IntPtr handle, byte[] data,long length);
private IntPtr _nativeHandle;
public ImageSharedMemoryProducer(string name)
{
_nativeHandle = GetImageProducer(name);
}
~ImageSharedMemoryProducer()
{
DeleteImageProducer(_nativeHandle);
}
public long Size()
{
return 3000000;
}
public void Write(byte[] message)
{
SendImageData(_nativeHandle,message,message.LongLength);
}
}


using System;
public delegate void HandleMessage(byte[] message);
internal delegate void InternalHandleMessage(IntPtr array, int length);
public interface ISharedMemoryConsumer
{
long Size();
void SetCallback(HandleMessage handler);
void Start();
void Wait();
}


public interface ISharedMemoryProducer
{
long Size();
void Write(byte[] message);
}