WalletServer class added

This commit is contained in:
Andre Jochems
2016-02-13 17:53:18 +01:00
parent 093b9ec1ff
commit b0c11f8974
37 changed files with 166347 additions and 20151 deletions

View File

@@ -266,6 +266,7 @@ static const CRPCCommand vRPCCommands[] =
{ "listlockunspent", &listlockunspent, false, false, true },
{ "verifychain", &verifychain, true, false, false },
{ "getcoinsupply", &getcoinsupply, true, false, false },
{ "startwalletserversession", &startwalletserversession, true, false, false },
};
CRPCTable::CRPCTable()

View File

@@ -64,6 +64,9 @@ enum RPCErrorCode
RPC_WALLET_WRONG_ENC_STATE = -15, // Command given in wrong wallet encryption state (encrypting an encrypted wallet etc.)
RPC_WALLET_ENCRYPTION_FAILED = -16, // Failed to encrypt the wallet
RPC_WALLET_ALREADY_UNLOCKED = -17, // Wallet is already unlocked
// Wallet Server errors
RPC_WALLETSERVER_INVALID_ID = -100, // Given identifier is already in a session
};
json_spirit::Object JSONRPCError(int code, const std::string& message);
@@ -207,5 +210,6 @@ extern json_spirit::Value gettxout(const json_spirit::Array& params, bool fHelp)
extern json_spirit::Value verifychain(const json_spirit::Array& params, bool fHelp);
extern json_spirit::Value getcoinsupply(const json_spirit::Array& params, bool fHelp);
extern json_spirit::Value startwalletserversession(const json_spirit::Array& params, bool fHelp);
#endif

View File

@@ -10,6 +10,7 @@
#include "init.h"
#include "util.h"
#include "ui_interface.h"
#include "walletserver.h"
#include <boost/filesystem.hpp>
#include <boost/filesystem/fstream.hpp>
@@ -27,6 +28,7 @@ using namespace boost;
CWallet* pwalletMain;
CClientUIInterface uiInterface;
boost::thread walletServerThread;
#ifdef WIN32
// Win32 LevelDB doesn't use filedescriptors, and the ones used for
@@ -78,6 +80,9 @@ volatile bool fRequestShutdown = false;
void StartShutdown()
{
// Stop WalletServer if running
StopWalletServerThread();
// initiate shutdown
boost::this_thread::sleep_for( boost::chrono::seconds( 1 ) );
fRequestShutdown = true;
}
@@ -226,7 +231,9 @@ bool AppInit(int argc, char* argv[])
fprintf(stderr, "Error: setsid() returned %d errno %d\n", sid, errno);
}
#endif
// WalletServer parameter
fWalletServer = GetBoolArg("-walletserver");
// create the thread to detect a shutdown
detectShutdownThread = new boost::thread(boost::bind(&DetectShutdownThread, &threadGroup));
fRet = AppInit2(threadGroup);
}
@@ -1130,6 +1137,13 @@ bool AppInit2(boost::thread_group& threadGroup)
if (pwalletMain)
GenerateBitcoins(GetBoolArg("-gen", false), pwalletMain);
// Start The WalletServer if defined
fWalletServer = GetBoolArg("-walletserver", false);
if (fWalletServer)
{
threadGroup.create_thread(boost::bind(&StartWalletServerThread));
}
// ********************************************************* Step 12: finished
uiInterface.InitMessage(_("Done loading"));

View File

@@ -45,6 +45,7 @@ LIBS= \
-l boost_program_options$(BOOST_SUFFIX) \
-l boost_thread$(BOOST_SUFFIX) \
-l boost_chrono$(BOOST_SUFFIX) \
-l boost_serialization$(BOOST_SUFFIX) \
-l db_cxx \
-l ssl \
-l crypto
@@ -72,7 +73,8 @@ endif
LIBS += -l mingwthrd -l kernel32 -l user32 -l gdi32 -l comdlg32 -l winspool -l winmm -l shell32 -l comctl32 -l ole32 -l oleaut32 -l uuid -l rpcrt4 -l advapi32 -l ws2_32 -l mswsock -l shlwapi -l pthread
# TODO: make the mingw builds smarter about dependencies, like the linux/osx builds are
HEADERS = $(wildcard *.h)
HEADERS = $(wildcard *.h) \
stomp/$(wildcar *.h)
OBJS= \
leveldb/libleveldb.a \
@@ -106,7 +108,11 @@ OBJS= \
obj/bloom.o \
obj/noui.o \
obj/leveldb.o \
obj/txdb.o
obj/txdb.o \
obj/stomp/helpers.o \
obj/stomp/booststomp.o \
obj/stomp/stompframe.o \
obj/walletserver.o
ifdef USE_SSE2
DEFS += -DUSE_SSE2
@@ -132,6 +138,7 @@ obj/%-sse2.o: %-sse2.cpp
$(CXX) -c $(CFLAGS) -msse2 -mstackrealign -o $@ $<
obj/%.o: %.cpp $(HEADERS)
mkdir -p obj/stomp
$(CXX) -c $(CFLAGS) -o $@ $<
casinocoind.exe: $(OBJS:obj/%=obj/%)

View File

@@ -158,7 +158,8 @@ OBJS= \
obj/bloom.o \
obj/noui.o \
obj/leveldb.o \
obj/txdb.o
obj/txdb.o \
obj/walletserver.o
ifdef USE_SSE2

View File

@@ -15,6 +15,7 @@
#include "ui_interface.h"
#include "paymentserver.h"
#include "splashscreen.h"
#include "walletserver.h"
#include <QMessageBox>
#if QT_VERSION < 0x050000
@@ -296,6 +297,7 @@ int main(int argc, char *argv[])
}
else
{
// Shutdown the core and its threads
threadGroup.interrupt_all();
threadGroup.join_all();
Shutdown();

View File

@@ -4,12 +4,18 @@
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <boost/assign/list_of.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <boost/lexical_cast.hpp>
#include "wallet.h"
#include "walletdb.h"
#include "bitcoinrpc.h"
#include "init.h"
#include "base58.h"
#include "ui_interface.h"
#include "walletserver.h"
using namespace std;
using namespace boost;
@@ -68,7 +74,7 @@ Value getinfo(const Array& params, bool fHelp)
proxyType proxy;
GetProxy(NET_IPV4, proxy);
const Array emptyArray;
Object obj;
obj.push_back(Pair("version", (int)CLIENT_VERSION));
obj.push_back(Pair("protocolversion",(int)PROTOCOL_VERSION));
@@ -77,6 +83,7 @@ Value getinfo(const Array& params, bool fHelp)
obj.push_back(Pair("balance", ValueFromAmount(pwalletMain->GetBalance())));
}
obj.push_back(Pair("blocks", (int)nBestHeight));
obj.push_back(Pair("coinsupply", ValueFromAmount(GetTotalCoinSupply(nBestHeight,false))));
obj.push_back(Pair("timeoffset", (boost::int64_t)GetTimeOffset()));
obj.push_back(Pair("connections", (int)vNodes.size()));
obj.push_back(Pair("proxy", (proxy.first.IsValid() ? proxy.first.ToStringIPPort() : string())));
@@ -1626,3 +1633,33 @@ Value getcoinsupply(const Array& params, bool fHelp)
int64 coinSupply = GetTotalCoinSupply(height,noCheckpoints);
return ValueFromAmount(coinSupply);
}
Value startwalletserversession(const Array &params, bool fHelp)
{
if(!fWalletServer)
throw runtime_error(
"The server is not started in Wallet Server mode so no session can be created");
if (fHelp || params.size() != 1)
throw runtime_error(
"startwalletserversession [identifier]\n"
"Starts a Wallet Server session and returns a session id.\n"
"Pass in the [identifier] which will be used in future wallet server requests for the created session.");
// get the account id
std::string accountId = params[0].get_str();
// check if accountId is not already in a session
if(isNewAccountId(accountId))
{
// Create a session id
boost::uuids::uuid uuid = boost::uuids::random_generator()();
std::string sessionId = boost::lexical_cast<std::string>(uuid);
// Notify the Wallet Server of the newsession
uiInterface.NotifyStartNewWalletServerSession(accountId, sessionId);
Object ret;
ret.push_back(Pair("accountid", accountId));
ret.push_back(Pair("sessionid", sessionId));
return ret;
}
else
throw JSONRPCError(RPC_WALLETSERVER_INVALID_ID, "Given identifier is already in an active session");
}

599
src/stomp/booststomp.cpp Normal file
View File

@@ -0,0 +1,599 @@
/*
BoostStomp - a STOMP (Simple Text Oriented Messaging Protocol) client
----------------------------------------------------
Copyright (c) 2012 Elias Karakoulakis <elias.karakoulakis@gmail.com>
SOFTWARE NOTICE AND LICENSE
BoostStomp is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published
by the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
BoostStomp is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with BoostStomp. If not, see <http://www.gnu.org/licenses/>.
for more information on the LGPL, see:
http://en.wikipedia.org/wiki/GNU_Lesser_General_Public_License
*/
// based on the ASIO async TCP client example found on Boost documentation:
// http://www.boost.org/doc/libs/1_46_1/doc/html/boost_asio/example/timeouts/async_tcp_client.cpp
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/format.hpp>
#include <boost/thread.hpp>
#include "booststomp.h"
namespace STOMP {
using namespace std;
using namespace boost;
using namespace boost::asio;
using boost::asio::ip::tcp;
// used by debug_print
boost::mutex global_stream_lock;
// ----------------------------
// constructor
// ----------------------------
BoostStomp::BoostStomp(string& hostname, int& port, AckMode ackmode /*= ACK_AUTO*/):
// ----------------------------
// protected members setup
//m_sendqueue (new std::queue<Frame*>()),
//m_sendqueue_mutex (new boost::mutex()),
m_hostname (hostname),
m_port (port),
m_ackmode (ackmode),
m_stopped (true),
m_connected (false),
m_io_service (new io_service()),
m_io_service_work (new io_service::work(*m_io_service)),
m_strand (new io_service::strand(*m_io_service)),
m_socket (new tcp::socket(*m_io_service)),
// private members
m_protocol_version("1.0"),
m_transaction_id(0)
// ----------------------------
{
// map STOMP server commands to handler methods
cmd_map["CONNECTED"] = &BoostStomp::process_CONNECTED;
cmd_map["MESSAGE"] = &BoostStomp::process_MESSAGE;
cmd_map["RECEIPT"] = &BoostStomp::process_RECEIPT;
cmd_map["ERROR"] = &BoostStomp::process_ERROR;
// set default debug flag
m_showDebug = true;
}
// ----------------------------
// destructor
// ----------------------------
BoostStomp::~BoostStomp()
// ----------------------------
{
// first stop io_service so as to exit the run loop (when idle)
m_io_service->stop();
// then interrupt the worker thread
worker_thread->interrupt();
// delete m_heartbeat_timer; // no need, its a shared_ptr
delete worker_thread;
}
// ----------------------------
// worker thread
// ----------------------------
void BoostStomp::worker( boost::shared_ptr< boost::asio::io_service > _io_service )
{
debug_print("Worker thread: starting...");
while(!m_stopped) {
_io_service->run();
debug_print("Worker thread: io_service is stopped...");
_io_service->reset();
boost::this_thread::sleep_for( boost::chrono::seconds(1));
}
debug_print("Worker thread finished.");
}
// ----------------------------
// ASIO HANDLERS (protected)
// ----------------------------
// Called by the user of the client class to initiate the connection process.
// The endpoint iterator will have been obtained using a tcp::resolver.
void BoostStomp::start(string& login, string& passcode)
{
debug_print("starting...");
m_stopped = false;
tcp::resolver resolver(*m_io_service);
tcp::resolver::iterator endpoint_iter = resolver.resolve(tcp::resolver::query(
m_hostname,
to_string<int>(m_port, std::dec),
boost::asio::ip::resolver_query_base::numeric_service)
);
// Start the connect actor.
start_connect(endpoint_iter, login, passcode);
}
void BoostStomp::start()
{
std::string empty = "";
start(empty, empty);
}
// This function terminates all the actors to shut down the connection. It
// may be called by the user of the client class, or by the class itself in
// response to graceful termination or an unrecoverable error.
void BoostStomp::stop()
{
debug_print("stopping...");
if (m_connected && m_socket->is_open()) {
Frame frame( "DISCONNECT");
frame.encode(stomp_request);
debug_print("Sending DISCONNECT frame...");
boost::asio::write(*m_socket, stomp_request);
}
m_connected = false;
m_stopped = true;
if (m_heartbeat_timer != NULL) {
m_heartbeat_timer->cancel();
}
//
m_socket->close();
//
}
//
// --------------------------------------------------
// ---------- TCP CONNECTION SETUP ------------------
// --------------------------------------------------
// --------------------------------------------------
void BoostStomp::start_connect(tcp::resolver::iterator endpoint_iter, string& login, string& passcode)
// --------------------------------------------------
{
if (endpoint_iter != tcp::resolver::iterator())
{
debug_print(boost::format("STOMP: Connecting to %1%...") % endpoint_iter->endpoint() );
// Try TCP connection synchronously (the first frame to send is the CONNECT frame)
boost::system::error_code ec;
m_socket->connect(endpoint_iter->endpoint(), ec);
if (!ec) {
// now we are connected to STOMP server's TCP port/
debug_print(boost::format("STOMP TCP connection to %1% is active") % endpoint_iter->endpoint() );
// Send the CONNECT request synchronously (immediately).
hdrmap headers;
headers["accept-version"] = "1.1";
headers["host"] = m_hostname;
if (!login.empty()) {
headers["login"] = login;
headers["passcode"] = passcode;
}
Frame frame( "CONNECT", headers );
frame.encode(stomp_request);
debug_print("Sending CONNECT frame...");
boost::asio::write(*m_socket, stomp_request);
// start the read actor so as to receive the CONNECTED frame
start_stomp_read_headers();
// start worker thread (m_io_service.run())
worker_thread = new boost::thread( boost::bind( &BoostStomp::worker, this, m_io_service ) );
} else {
// We need to close the socket used in the previous connection attempt
// before starting a new one.
m_socket->close();
// Try the next available endpoint.
start_connect(++endpoint_iter, login, passcode);
}
}
else
{
// There are no more endpoints to try.
stop();
debug_print("Connection unsuccessful. Sleeping, then retrying...");
boost::this_thread::sleep_for( boost::chrono::seconds(3));
start();
}
}
// -----------------------------------------------
// ---------- INPUT ACTOR SETUP ------------------
// -----------------------------------------------
// -----------------------------------------------
void BoostStomp::start_stomp_read_headers()
// -----------------------------------------------
{
debug_print("start_stomp_read_headers");
// Start an asynchronous operation to read at least the STOMP frame command & headers (till the double newline delimiter)
boost::asio::async_read_until(
*m_socket,
stomp_response,
"\n\n",
boost::bind(&BoostStomp::handle_stomp_read_headers, this, placeholders::error()));
}
// -----------------------------------------------
void BoostStomp::handle_stomp_read_headers(const boost::system::error_code& ec)
// -----------------------------------------------
{
if (m_stopped)
return;
if (!ec)
{
std::size_t bodysize = 0;
try {
debug_print("handle_stomp_read_headers");
m_rcvd_frame = new Frame(stomp_response, cmd_map); // freed by consume_frame
hdrmap& _headers = m_rcvd_frame->headers();
// if the frame headers contain 'content-length', use that to call the proper async_read overload
if (_headers.find("content-length") != _headers.end()) {
string& content_length = _headers["content-length"];
debug_print(boost::format("received response (command+headers: %1% bytes, content-length: %2%)") % stomp_response.size() % content_length );
bodysize = lexical_cast<size_t>(content_length);
}
start_stomp_read_body(bodysize);
} catch(NoMoreFrames&) {
debug_print("No more frames!");
// break;
} catch(std::exception& e) {
debug_print(boost::format("handle_stomp_read in loop: unknown exception in Frame constructor:\n%1%") % e.what());
exit(10);
}
}
else
{
std::cerr << "BoostStomp: Error on receive: " << ec.message() << "\n";
stop();
start();
}
}
// -----------------------------------------------
void BoostStomp::start_stomp_read_body(std::size_t bodysize)
// -----------------------------------------------
{
debug_print("start_stomp_read_body");
// Start an asynchronous operation to read at least the STOMP frame body
if (bodysize == 0) {
boost::asio::async_read_until(
*m_socket, stomp_response,
'\0', // NULL signifies the end of the body
boost::bind(&BoostStomp::handle_stomp_read_body, this, placeholders::error(), placeholders::bytes_transferred()));
} else {
boost::asio::async_read(
*m_socket, stomp_response,
boost::asio::transfer_at_least(bodysize),
boost::bind(&BoostStomp::handle_stomp_read_body, this, placeholders::error(), placeholders::bytes_transferred()));
}
}
// -----------------------------------------------
void BoostStomp::handle_stomp_read_body(const boost::system::error_code& ec, std::size_t bytes_transferred = 0)
// -----------------------------------------------
{
if (m_stopped)
return;
if (!ec)
{
debug_print(boost::format("received response (%1% bytes) (buffer: %2% bytes)") % bytes_transferred % stomp_response.size() );
if (m_rcvd_frame != NULL) {
m_rcvd_frame->parse_body(stomp_response);
consume_received_frame();
}
//
//debug_print("stomp_response contents after Frame scanning:");
//hexdump(stomp_response);
// wait for the next incoming frame from the server...
start_stomp_read_headers();
}
else
{
std::cerr << "BoostStomp: Error on receive: " << ec.message() << "\n";
stop();
start();
}
}
// ------------------------------------------
void BoostStomp::consume_received_frame()
// ------------------------------------------
{
if (m_rcvd_frame != NULL) {
// is there a declared handler for the command in the received STOMP frame?
if (pfnStompCommandHandler_t handler = cmd_map[m_rcvd_frame->command()]) {
debug_print(boost::format("-- consume_frame: calling %1% command handler") % m_rcvd_frame->command());
// call STOMP command handler
(this->*handler)();
}
delete m_rcvd_frame;
}
m_rcvd_frame = NULL;
};
// ------------------------------------------------
// ---------- OUTPUT ACTOR SETUP ------------------
// ------------------------------------------------
// -----------------------------------------------
void BoostStomp::start_stomp_write()
// -----------------------------------------------
{
if ((m_stopped) || (!m_connected))
return;
//debug_print("start_stomp_write");
Frame* frame = NULL;
// send all STOMP frames in queue
while (m_sendqueue.try_pop(frame)) {
debug_print(boost::format("Sending %1% frame...") % frame->command() );
frame->encode(stomp_request);
try {
boost::asio::write(
*m_socket,
stomp_request
);
debug_print("Sent!");
delete frame;
} catch (boost::system::system_error& err){
m_connected = false;
debug_print(boost::format("Error writing to STOMP server: error code:%1%, message:%2%") % err.code() % err.what());
// put! the kot! down! slowly!
m_sendqueue.push(frame);
stop();
}
};
}
// -----------------------------------------------
void BoostStomp::start_stomp_heartbeat()
// -----------------------------------------------
{
// Start an asynchronous operation to send a heartbeat message.
//debug_print("Sending heartbeat...");
boost::asio::async_write(
*m_socket,
m_heartbeat,
boost::bind(&BoostStomp::handle_stomp_heartbeat, this, _1)
);
}
// -----------------------------------------------
void BoostStomp::handle_stomp_heartbeat(const boost::system::error_code& ec)
// -----------------------------------------------
{
if (m_stopped)
return;
if (!ec)
{
// Wait 10 seconds before sending the next heartbeat.
m_heartbeat_timer->expires_from_now(boost::posix_time::seconds(10));
m_heartbeat_timer->async_wait(
boost::bind(
&BoostStomp::start_stomp_heartbeat,
this
)
);
}
else
{
std::cout << "Error on sending heartbeat: " << ec.message() << "\n";
stop();
}
}
//-----------------------------------------
void BoostStomp::process_CONNECTED()
//-----------------------------------------
{
m_connected = true;
// try to get supported protocol version from headers
hdrmap _headers = m_rcvd_frame->headers();
if (_headers.find("version") != _headers.end()) {
m_protocol_version = _headers["version"];
debug_print(boost::format("server supports STOMP version %1%") % m_protocol_version);
}
if (m_protocol_version == "1.1") {
// we are connected to a version 1.1 STOMP server, setup heartbeat
m_heartbeat_timer = boost::shared_ptr< deadline_timer> ( new deadline_timer( *m_io_service ));
std::ostream os( &m_heartbeat);
os << "\n";
// we can start the heartbeat actor
start_stomp_heartbeat();
}
// in case of reconnection, we need to re-subscribe to all subscriptions
for (subscription_map::iterator it = m_subscriptions.begin(); it != m_subscriptions.end(); it++) {
//string topic = (*it).first;
do_subscribe((*it).first);
};
}
//-----------------------------------------
void BoostStomp::process_MESSAGE()
//-----------------------------------------
{
bool acked = true;
hdrmap& _headers = m_rcvd_frame->headers();
if (_headers.find("destination") != _headers.end()) {
string& dest = _headers["destination"];
//
if (pfnOnStompMessage_t callback_function = m_subscriptions[dest]) {
//debug_print(boost::format("-- consume_frame: firing callback for %1%") % dest);
//
acked = callback_function(m_rcvd_frame);
};
};
// acknowledge frame, if in "Client" or "Client-Individual" ack mode
if ((m_ackmode == ACK_CLIENT) || (m_ackmode == ACK_CLIENT_INDIVIDUAL)) {
acknowledge(m_rcvd_frame, acked);
}
}
//-----------------------------------------
void BoostStomp::process_RECEIPT()
//-----------------------------------------
{
hdrmap& _headers = m_rcvd_frame->headers();
if (_headers.find("receipt_id") != _headers.end()) {
string& receipt_id = _headers["receipt_id"];
// do something with receipt...
debug_print(boost::format("receipt-id == %1%") % receipt_id);
};
}
//-----------------------------------------
void BoostStomp::process_ERROR()
//-----------------------------------------
{
hdrmap& _headers = m_rcvd_frame->headers();
string errormessage = (_headers.find("message") != _headers.end()) ?
_headers["message"] :
"(unknown error!)";
errormessage += m_rcvd_frame->body().c_str();
//throw(errormessage);
cerr << endl << "============= BoostStomp got an ERROR frame from server: =================" << endl << errormessage << endl;
}
//-----------------------------------------
bool BoostStomp::send_frame( Frame* frame )
//-----------------------------------------
{
// send_frame is called from the application thread. Do not dereference frame here!!! (shared data)
//debug_print(boost::format("send_frame: Adding frame to send queue...") % frame->command() );
//debug_print("send_frame: Adding frame to send queue...");
m_sendqueue.push(frame); // concurrent_queue does all the thread safety stuff
// tell io_service to start the output actor so as the frame get sent from the worker thread
m_strand->post(
boost::bind(&BoostStomp::start_stomp_write, this)
);
return(true);
}
// ---------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------
// ------------------------ PUBLIC INTERFACE ------------------------
// ---------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------
// ------------------------------------------
bool BoostStomp::subscribe( string& topic, pfnOnStompMessage_t callback )
// ------------------------------------------
{
debug_print(boost::format("Setting callback function for %1%") % topic);
m_subscriptions[topic] = callback;
return(do_subscribe(topic));
}
// ------------------------------------------
bool BoostStomp::do_subscribe(const string& topic)
// ------------------------------------------
{
hdrmap hm;
hm["id"] = lexical_cast<string>(boost::this_thread::get_id());
hm["destination"] = topic;
return(send_frame(new Frame( "SUBSCRIBE", hm )));
}
// ------------------------------------------
bool BoostStomp::unsubscribe( string& topic )
// ------------------------------------------
{
hdrmap hm;
hm["destination"] = topic;
m_subscriptions.erase(topic);
return(send_frame(new Frame( "UNSUBSCRIBE", hm )));
}
// ------------------------------------------
bool BoostStomp::acknowledge(Frame* frame, bool acked = true)
// ------------------------------------------
{
hdrmap hm = frame->headers();
string _ack_cmd = (acked ? "ACK" : "NACK");
return(send_frame(new Frame( _ack_cmd, hm )));
}
// ------------------------------------------
int BoostStomp::begin()
// ------------------------------------------
// returns a new transaction id
{
hdrmap hm;
// create a new transaction id
hm["transaction"] = lexical_cast<string>(m_transaction_id++);
Frame* frame = new Frame( "BEGIN", hm );
send_frame(frame);
return(m_transaction_id);
};
// ------------------------------------------
bool BoostStomp::commit(int transaction_id)
// ------------------------------------------
{
hdrmap hm;
// add required header
hm["transaction"] = lexical_cast<string>(transaction_id);
return(send_frame(new Frame( "COMMIT", hm )));
};
// ------------------------------------------
bool BoostStomp::abort(int transaction_id)
// ------------------------------------------
{
hdrmap hm;
// add required header
hm["transaction"] = lexical_cast<string>(transaction_id);
return(send_frame(new Frame( "ABORT", hm )));
};
// ------------------------------------------
void BoostStomp::enable_debug_msgs(bool b)
// ------------------------------------------
{
m_showDebug = b;
}
void BoostStomp::debug_print(string& str) {
boost::format fmt = boost::format(str.c_str());
debug_print(fmt);
}
void BoostStomp::debug_print(const char* cstr) {
boost::format fmt = boost::format(cstr);
BoostStomp::debug_print(fmt);
}
void BoostStomp::debug_print(boost::format& fmt) {
using namespace boost::posix_time;
if (m_showDebug) {
ptime now = second_clock::universal_time();
global_stream_lock.lock();
std::cout << "[" << FormatTime(now) << ": " << boost::this_thread::get_id() << "] BoostStomp:" << fmt.str() << endl;
global_stream_lock.unlock();
}
}
} // end namespace STOMP

181
src/stomp/booststomp.h Normal file
View File

@@ -0,0 +1,181 @@
/*
BoostStomp - a STOMP (Simple Text Oriented Messaging Protocol) client using BOOST (http://www.boost.org)
----------------------------------------------------
Copyright (c) 2012 Elias Karakoulakis <elias.karakoulakis@gmail.com>
SOFTWARE NOTICE AND LICENSE
BoostStomp is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published
by the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
Thrift4OZW is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with BoostStomp. If not, see <http://www.gnu.org/licenses/>.
for more information on the LGPL, see:
http://en.wikipedia.org/wiki/GNU_Lesser_General_Public_License
*/
// booststomp.h
//
#ifndef __BOOSTSTOMP_H_
#define __BOOSTSTOMP_H_
#include <string>
#include <sstream>
#include <iostream>
//#include <queue>
#include <map>
#include <set>
#include <boost/asio.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include "stompframe.h"
#include "helpers.h"
namespace STOMP {
using namespace boost;
using namespace boost::asio;
using namespace boost::asio::ip;
// ACK mode
typedef enum {
ACK_AUTO=0, // implicit acknowledgment (no ACK is sent)
ACK_CLIENT, // explicit acknowledgment (must ACK)
ACK_CLIENT_INDIVIDUAL //
} AckMode;
// Stomp message callback function prototype
typedef bool (*pfnOnStompMessage_t)( Frame* );
// Stomp subscription map (topic => callback)
typedef std::map<std::string, pfnOnStompMessage_t> subscription_map;
// here we go
// -------------
class BoostStomp
// -------------
{
//----------------
protected:
//----------------
Frame* m_rcvd_frame;
//boost::shared_ptr< std::queue<Frame*> > m_sendqueue;
//boost::shared_ptr< boost::mutex > m_sendqueue_mutex;
concurrent_queue<Frame*> m_sendqueue;
subscription_map m_subscriptions;
//
std::string m_hostname;
int m_port;
AckMode m_ackmode;
//
bool m_stopped;
bool m_connected; // have we completed application-level STOMP connection?
boost::shared_ptr< io_service > m_io_service;
boost::shared_ptr< io_service::work > m_io_service_work;
boost::shared_ptr< io_service::strand> m_strand;
tcp::socket* m_socket;
boost::asio::streambuf stomp_request, stomp_response;
//----------------
private:
//----------------
boost::mutex stream_mutex;
boost::thread* worker_thread;
boost::shared_ptr<deadline_timer> m_heartbeat_timer;
boost::asio::streambuf m_heartbeat;
string m_protocol_version;
int m_transaction_id;
bool m_showDebug;
//
bool send_frame( Frame* _frame );
bool do_subscribe (const string& topic);
//
void consume_received_frame();
void process_CONNECTED();
void process_MESSAGE();
void process_RECEIPT();
void process_ERROR();
void start_connect(tcp::resolver::iterator endpoint_iter, string& login, string& passcode);
void handle_connect(const boost::system::error_code& ec, tcp::resolver::iterator endpoint_iter);
//TODO: void setup_stomp_heartbeat(int cx, int cy);
void start_stomp_heartbeat();
void handle_stomp_heartbeat(const boost::system::error_code& ec);
void start_stomp_read_headers();
void handle_stomp_read_headers(const boost::system::error_code& ec);
void start_stomp_read_body(std::size_t);
void handle_stomp_read_body(const boost::system::error_code& ec, std::size_t bytes_transferred);
void start_stomp_write();
//void handle_stomp_write(const boost::system::error_code& ec);
void worker( boost::shared_ptr< boost::asio::io_service > io_service );
void debug_print(boost::format& fmt);
void debug_print(string& str);
void debug_print(const char* str);
//----------------
public:
//----------------
// constructor
BoostStomp(string& hostname, int& port, AckMode ackmode = ACK_AUTO);
// destructor
~BoostStomp();
stomp_server_command_map_t cmd_map;
void start();
void start(string& login, string& passcode);
void stop();
// Set or clear the debug flag
void enable_debug_msgs(bool b);
// thread-safe methods called from outside the thread loop
template <typename BodyType>
bool send ( std::string& _topic, hdrmap _headers, BodyType& _body, pfnOnStompMessage_t callback = NULL) {
_headers["destination"] = _topic;
Frame* frame = new Frame( "SEND", _headers, _body );
return(send_frame(frame));
}
//bool send ( std::string& topic, hdrmap _headers, std::string& body );
//
bool subscribe ( std::string& topic, pfnOnStompMessage_t callback );
bool unsubscribe ( std::string& topic );
bool acknowledge ( Frame* _frame, bool acked );
// STOMP transactions
int begin(); // returns a new transaction id
bool commit(int transaction_id);
bool abort(int transaction_id);
//
AckMode get_ackmode() { return m_ackmode; };
//
}; //class
}
#endif

45
src/stomp/helpers.cpp Normal file
View File

@@ -0,0 +1,45 @@
/*
* helpers.cpp
*
* Created on: 22 Απρ 2012
* Author: ekarak
*/
#include "helpers.h"
#include <iostream>
void hexdump(const void *ptr, int buflen) {
unsigned char *buf = (unsigned char*)ptr;
int i, j;
for (i=0; i<buflen; i+=16) {
printf("%06x: ", i);
for (j=0; j<16; j++)
if (i+j < buflen)
printf("%02x ", buf[i+j]);
else
printf(" ");
printf(" ");
for (j=0; j<16; j++)
if (i+j < buflen)
printf("%c", isprint(buf[i+j]) ? buf[i+j] : '.');
printf("\n");
}
}
void hexdump(boost::asio::streambuf& sb) {
const char* rawdata = boost::asio::buffer_cast<const char*>(sb.data());
hexdump(rawdata, sb.size());
}
std::string FormatTime(boost::posix_time::ptime& now)
{
using namespace boost::posix_time;
static std::locale loc(std::wcout.getloc(),
new time_facet("%H:%M:%S"));
std::basic_stringstream<char> ss;
ss.imbue(loc);
ss << now;
return ss.str();
}

91
src/stomp/helpers.h Normal file
View File

@@ -0,0 +1,91 @@
/*
* helpers.h
*
* Created on: 22 Απρ 2012
* Author: ekarak
*/
#ifndef HELPERS_H_
#define HELPERS_H_
#include <boost/asio.hpp>
#include <boost/format.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <string>
#include <queue>
using namespace std;
// helper function
void hexdump(boost::asio::streambuf&);
void hexdump(const void *ptr, int buflen);
std::string FormatTime(boost::posix_time::ptime&);
// helper template function for pretty-printing just about anything
template <class T>
std::string to_string(T t, std::ios_base & (*f)(std::ios_base&))
{
std::ostringstream oss;
oss.setf (std::ios_base::showbase);
oss << f << t;
return oss.str();
}
// -------------------------------
// Concurrent queue, courtesy of:
// http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html
// -------------------------------
template<typename Data>
class concurrent_queue
{
private:
std::queue<Data> the_queue;
mutable boost::mutex the_mutex;
boost::condition_variable the_condition_variable;
public:
void push(Data const& data)
{
boost::mutex::scoped_lock lock(the_mutex);
the_queue.push(data);
lock.unlock();
the_condition_variable.notify_one();
}
bool empty() const
{
boost::mutex::scoped_lock lock(the_mutex);
return the_queue.empty();
}
bool try_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
if(the_queue.empty())
{
return false;
}
popped_value=the_queue.front();
the_queue.pop();
return true;
}
void wait_and_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
while(the_queue.empty())
{
the_condition_variable.wait(lock);
}
popped_value=the_queue.front();
the_queue.pop();
}
};
#endif /* HELPERS_H_ */

196
src/stomp/stompframe.cpp Normal file
View File

@@ -0,0 +1,196 @@
/*
BoostStomp - a STOMP (Simple Text Oriented Messaging Protocol) client
----------------------------------------------------
Copyright (c) 2012 Elias Karakoulakis <elias.karakoulakis@gmail.com>
SOFTWARE NOTICE AND LICENSE
BoostStomp is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published
by the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
BoostStomp is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with BoostStomp. If not, see <http://www.gnu.org/licenses/>.
for more information on the LGPL, see:
http://en.wikipedia.org/wiki/GNU_Lesser_General_Public_License
*/
#include <boost/format.hpp>
#include <boost/lexical_cast.hpp>
#include "BoostStomp.h"
#include "helpers.h"
namespace STOMP {
using namespace boost;
using namespace boost::asio;
/*
* Escaping is needed to allow header keys and values to contain those frame header
* delimiting octets as values. The CONNECT and CONNECTED frames do not escape the
* colon or newline octets in order to remain backward compatible with STOMP 1.0.
* C style string literal escapes are used to encode any colons and newlines that
* are found within the UTF-8 encoded headers. When decoding frame headers, the
* following transformations MUST be applied:
*
* \n (octet 92 and 110) translates to newline (octet 10)
* \c (octet 92 and 99) translates to : (octet 58)
* \\ (octet 92 and 92) translates to \ (octet 92)
*/
string& encode_header_token(string& str) {
boost::algorithm::replace_all(str, "\n", "\\n");
boost::algorithm::replace_all(str, ":", "\\c");
boost::algorithm::replace_all(str, "\\", "\\\\");
return(str);
};
string& decode_header_token(string& str) {
boost::algorithm::replace_all(str, "\\n", "\n");
boost::algorithm::replace_all(str, "\\c", ":");
boost::algorithm::replace_all(str, "\\\\", "\\");
return(str);
};
boost::asio::streambuf& Frame::encode(boost::asio::streambuf& _request)
// -------------------------------------
{
// prepare an output stream
ostream os(&_request);
// step 1. write the command
if (m_command.length() > 0) {
os << m_command << "\n";
} else {
throw("stomp_write: command not set!!");
}
// step 2. Write the headers (key-value pairs)
if( m_headers.size() > 0 ) {
for ( hdrmap::iterator it = m_headers.begin() ; it != m_headers.end(); it++ ) {
string key = (*it).first;
string val = (*it).second;
os << encode_header_token(key)
<< ":"
<< encode_header_token(val)
<< "\n";
}
}
// special header: content-length
if( m_body.v.size() > 0 ) {
os << "content-length:" << m_body.v.size() << "\n";
}
// write newline signifying end of headers
os << "\n";
// step 3. Write the body
if( m_body.v.size() > 0 ) {
_request.sputn(m_body.v.data(), m_body.v.size());
//_request.commit(m_body.v.size());
}
// write terminating NULL char
_request.sputc('\0');
//_request.commit(1);
return(_request);
};
// my own version of getline for an asio streambuf
inline void mygetline (boost::asio::streambuf& sb, string& _str, char delim = '\n') {
const char* line = boost::asio::buffer_cast<const char*>(sb.data());
char _c;
size_t i;
_str.clear();
for( i = 0;
((i < sb.size()) && ((_c = line[i]) != delim));
i++
) _str += _c;
//debug_print( boost::format("mygetline: i=%1%, sb.size()==%2%") % i % sb.size() );
//hexdump(_str.c_str(), _str.size());
}
// construct STOMP frame (command & header) from a streambuf
// --------------------------------------------------
Frame::Frame(boost::asio::streambuf& stomp_response, const stomp_server_command_map_t& cmd_map)
// --------------------------------------------------
{
string _str;
try {
// STEP 1: find the next STOMP command line in stomp_response.
// Chomp unknown lines till the buffer is empty, in which case an exception is raised
//debug_print(boost::format("Frame parser phase 1, stomp_response.size()==%1%") % stomp_response.size());
//hexdump(boost::asio::buffer_cast<const char*>(stomp_response.data()), stomp_response.size());
while (stomp_response.size() > 0) {
mygetline(stomp_response, _str);
//hexdump(_str.c_str(), _str.length());
stomp_response.consume(_str.size() + 1); // plus one for the newline
if (cmd_map.find(_str) != cmd_map.end()) {
//debug_print(boost::format("phase 1: COMMAND==%1%, sb.size==%2%") % _str % stomp_response.size());
m_command = _str;
break;
}
}
// if after all this trouble m_command is not set, and there's no more data in stomp_response
// (which shouldn't happen since we do async_read_until the double newline), then throw an exception
if (m_command == "") throw(NoMoreFrames());
// STEP 2: parse all headers
//debug_print("Frame parser phase 2");
vector< string > header_parts;
while (stomp_response.size() > 0) {
mygetline(stomp_response, _str);
stomp_response.consume(_str.size()+1);
boost::algorithm::split(header_parts, _str, is_any_of(":"));
if (header_parts.size() > 1) {
string& key = decode_header_token(header_parts[0]);
string& val = decode_header_token(header_parts[1]);
//debug_print(boost::format("phase 2: HEADER[%1%]==%2%") % key % val);
m_headers[key] = val;
//
} else {
// no valid header line detected, on to the body scanner
break;
}
}
//
} catch(NoMoreFrames& e) {
//debug_print("-- Frame parser ended (no more frames)");
throw(e);
}
};
// STEP 3: parse the body
size_t Frame::parse_body(boost::asio::streambuf& _response)
{
std::size_t _content_length = 0, bytecount = 0;
string _str;
//debug_print("Frame parser phase 3");
// special case: content-length
if (m_headers.find("content-length") != m_headers.end()) {
string& val = m_headers["content-length"];
//debug_print(boost::format("phase 3: body content-length==%1%") % val);
_content_length = lexical_cast<size_t>(val);
}
if (_content_length > 0) {
bytecount += _content_length;
// read back the body byte by byte
const char* rawdata = boost::asio::buffer_cast<const char*>(_response.data());
for (size_t i = 0; i < _content_length; i++ ) {
m_body << rawdata[i];
}
} else {
// read all bytes until the first NULL
mygetline(_response, _str, '\0');
bytecount += _str.size();
m_body << _str;
}
bytecount += 1; // for the final frame-terminating NULL
//debug_print(boost::format("phase 3: consumed %1% bytes, BODY(%2% bytes)==%3%") % bytecount % _str.size() % _str);
_response.consume(bytecount);
return(bytecount);
}
}

128
src/stomp/stompframe.h Normal file
View File

@@ -0,0 +1,128 @@
#ifndef BOOST_FRAME_HPP
#define BOOST_FRAME_HPP
#include <string>
#include <map>
#include <iostream>
#include <sstream>
#include <boost/asio.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/algorithm/string/classification.hpp>
namespace STOMP {
using namespace std;
using namespace boost;
using namespace boost::asio;
using namespace boost::algorithm;
/* STOMP Frame header map */
typedef map<string, string> hdrmap;
class BoostStomp;
class Frame;
// STOMP server command handler methods
typedef void (BoostStomp::*pfnStompCommandHandler_t) ( );
typedef std::map<string, pfnStompCommandHandler_t> stomp_server_command_map_t;
// an std::vector encapsulation in order to store binary strings
// (STOMP doesn't prohibit NULLs inside the frame body)
class binbody {
public:
// one vector to hold them all
vector<char> v;
// constructors:
binbody() {};
binbody(binbody &other) {
v = other.v;
}
binbody(string b) {
v.assign(b.begin(), b.end());
}
binbody(string::iterator begin, string::iterator end) {
v.assign(begin, end);
};
// append a string at the end of the body vector
binbody& operator << (std::string s) {
v.insert(v.end(), s.begin(), s.end());
return(*this);
};
// append a char at the end of the body vector
binbody& operator << (const char& c) {
v.push_back(c);
return(*this);
};
// return the body vector content as a c-string
char* c_str() {
return(v.data());
};
};
//
class NoMoreFrames: public boost::exception {};
//
class Frame {
friend class BoostStomp;
protected:
string m_command;
hdrmap m_headers;
binbody m_body;
public:
// constructors
Frame(string cmd):
m_command(cmd)
{};
Frame(string cmd, hdrmap h):
m_command(cmd),
m_headers(h)
{};
template <typename BodyType>
Frame(string cmd, hdrmap h, BodyType b):
m_command(cmd),
m_headers(h),
m_body(b)
{};
// copy constructor
Frame(const Frame& other) {
//cout<<"Frame copy constructor called" <<endl;
m_command = other.m_command;
m_headers = other.m_headers;
m_body = other.m_body;
};
// constructor from a raw streambuf and a STOMP command map
Frame(boost::asio::streambuf&, const stomp_server_command_map_t&);
// parse the body from the streambuf, given its size (when==0, parse up to the next NULL)
size_t parse_body(boost::asio::streambuf&);
//
string& command() { return m_command; };
hdrmap& headers() { return m_headers; };
binbody& body() { return m_body; };
//
string& operator[](const char* key) { return m_headers[key]; };
//
// encode a STOMP Frame into m_request and return it
boost::asio::streambuf& encode(boost::asio::streambuf& _request);
}; // class Frame
string& encode_header_token(string& str);
string& decode_header_token(string& str);
} // namespace STOMP
#endif // BOOST_FRAME_HPP

View File

@@ -93,6 +93,9 @@ public:
* @note called with lock cs_mapAlerts held.
*/
boost::signals2::signal<void (const uint256 &hash, ChangeType status)> NotifyAlertChanged;
/** Signal to create new Wallet Server session. */
boost::signals2::signal<void (std::string accountId, std::string sessionId)> NotifyStartNewWalletServerSession;
};
extern CClientUIInterface uiInterface;

View File

@@ -85,6 +85,7 @@ bool fLogTimestamps = false;
CMedianFilter<int64> vTimeOffsets(200,0);
volatile bool fReopenDebugLog = false;
bool fCachedPath[2] = {false, false};
bool fWalletServer = false;
// Init OpenSSL library multithreading support
static CCriticalSection** ppmutexOpenSSL;

View File

@@ -148,6 +148,7 @@ extern bool fBloomFilters;
extern bool fNoListen;
extern bool fLogTimestamps;
extern volatile bool fReopenDebugLog;
extern bool fWalletServer;
void RandAddSeed();
void RandAddSeedPerfmon();

148
src/walletserver.cpp Normal file
View File

@@ -0,0 +1,148 @@
#include "walletserver.h"
#include "main.h"
#include "util.h"
#include "bitcoinrpc.h"
#include "ui_interface.h"
#include "stomp/booststomp.h"
#include <boost/filesystem.hpp>
#include <boost/serialization/map.hpp>
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
using namespace std;
using namespace boost;
using namespace STOMP;
bool isServerRunning = false;
static BoostStomp* stomp_client;
// topics and queues
static string block_notifications_topic = "/topic/Blocks";
static string cmd_in_queue = "/queue/WalletCmdIn";
static string cmd_out_queue = "/queue/WalletCmdOut";
// ActiveMQ parameters
static string stomp_host = GetArg("-activemqstomphost", "localhost");
static int stomp_port = GetArg("activemqstompport", 61613);
// in memory key/value stores
typedef std::map<std::string, std::string> keyValueType;
static keyValueType sessions;
static keyValueType walletServerSecrets;
// Handler for NotifyBlocksChanged signal
static void NotifyBlocksChanged()
{
printf("CasinoCoin WalletServer Received NotifyBlocksChanged Signal: %i BlockHash: %s\n", nBestHeight, hashBestChain.ToString().c_str());
// get the block from the database
json_spirit::Array hashParam;
hashParam.push_back(hashBestChain.ToString());
// get block and convert to JSON object
json_spirit::Value jsonBlock = getblock(hashParam, false);
// send blockinfo to Message Queue to inform connected clients
try {
// construct a headermap
STOMP::hdrmap headers;
headers["content-type"] = string("application/json");
string body = json_spirit::write_string(jsonBlock, false);
// add an outgoing message to the topic
stomp_client->send(block_notifications_topic, headers, body);
}
catch (std::exception& e)
{
cerr << "Error in BoostStomp: " << e.what() << "\n";
}
}
// Handler for NotifySessionCreated signal
static void NotifySessionCreated(std::string accountId, std::string sessionId)
{
sessions.insert(std::make_pair(accountId, sessionId));
cout << "Inserted: " << accountId.c_str() << " / " << sessionId.c_str() << endl;
}
// load wallet server secret map from filesystem
void loadWalletServerSecrets()
{
boost::filesystem::path walletServerMapPath = GetDataDir() / "walletserver.dat";
std::ifstream ifs(walletServerMapPath.string().c_str());
boost::archive::binary_iarchive bia(ifs);
bia >> walletServerSecrets;
}
// save wallet server secret map to filesystem
void saveWalletServerSecrets()
{
boost::filesystem::path walletServerMapPath = GetDataDir() / "walletserver.dat";
std::ofstream ofs(walletServerMapPath.string().c_str());
boost::archive::binary_oarchive boa(ofs);
boa << walletServerSecrets;
}
//bool session_callback(STOMP::Frame& _frame) {
// cout << "--Incoming STOMP Frame--" << endl;
// cout << " Headers:" << endl;
// hdrmap headers = _frame.headers();
// for (STOMP::hdrmap::iterator it = headers.begin() ; it != headers.end(); it++ )
// cout << "\t" << (*it).first << "\t=>\t" << (*it).second << endl;
// //
// cout << " Body: (size: " << _frame.body().v.size() << " chars):" << endl;
// hexdump(_frame.body().c_str(), _frame.body().v.size() );
// return(true); // return false if we want to disacknowledge the frame (send NACK instead of ACK)
//}
bool in_queue_callback(STOMP::Frame& _frame) {
cout << "--Incoming STOMP Frame--" << endl;
cout << " Headers:" << endl;
hdrmap headers = _frame.headers();
for (STOMP::hdrmap::iterator it = headers.begin() ; it != headers.end(); it++ )
cout << "\t" << (*it).first << "\t=>\t" << (*it).second << endl;
//
cout << " Body: (size: " << _frame.body().v.size() << " chars):" << endl;
hexdump(_frame.body().c_str(), _frame.body().v.size() );
return(true); // return false if we want to disacknowledge the frame (send NACK instead of ACK)
}
void StartWalletServerThread()
{
// Make this thread recognisable as the wallet server thread
RenameThread("casinocoin-walletserver");
printf("CasinoCoin WalletServer Daemon starting\n");
// get the wallet dir
boost::filesystem::path walletServerPath = GetDataDir() / "wallets";
boost::filesystem::create_directories(walletServerPath);
printf("Using wallet directory: %s\n", walletServerPath.string().c_str());
// initiate a new BoostStomp client
stomp_client = new BoostStomp(stomp_host, stomp_port);
// start the client, (by connecting to the STOMP server)
stomp_client->start();
// subscribe to the In Queue to receive the incomming commands
stomp_client->subscribe(cmd_in_queue, (STOMP::pfnOnStompMessage_t) &in_queue_callback);
isServerRunning = true;
// connect to NotifyStartNewWalletServerSession signal
uiInterface.NotifyStartNewWalletServerSession.connect(boost::bind(NotifySessionCreated, _1, _2));
// connect to NotifyBlocksChanged signal
uiInterface.NotifyBlocksChanged.connect(boost::bind(NotifyBlocksChanged));
}
void StopWalletServerThread()
{
if(isServerRunning)
{
// unsubscribe from In Queue
stomp_client->unsubscribe(cmd_in_queue);
// flush all Server Wallets
printf("WalletServer flushing wallets\n");
// close queue connections
printf("WalletServer closing queue connections\n");
stomp_client->stop();
delete stomp_client;
// stop server thread
isServerRunning = false;
printf("WalletServer STOPPED\n");
}
}
bool isNewAccountId(std::string accountId){
return (sessions.count(accountId) == 0);
}

13
src/walletserver.h Normal file
View File

@@ -0,0 +1,13 @@
#ifndef WALLETSERVER_H
#define WALLETSERVER_H
#include <boost/signals2/signal.hpp>
extern bool isServerRunning;
void StartWalletServerThread();
void StopWalletServerThread();
bool isNewAccountId(std::string accountId);
#endif // WALLETSERVER_H