/* * OrbServerImpl.cc * * Created on: Aug 11, 2010 * Author: lamikr */ #include #include "ServerEventThread.hh" #include "OrbServerImpl.hh" #include "ClientServerCommon.hh" #include "BusMessageInternal.hh" using namespace std; using namespace plpbus; using namespace plpbus_orb; static omni_mutex server_thread_mutex; static omni_condition server_thread_shutdown_signal(&server_thread_mutex); OrbServerImpl::OrbServerImpl() { _orb = NULL; _poa = NULL; _server_thread_count = 0; _shutdown_pending = 0; _listener = NULL; } OrbServerImpl::~OrbServerImpl() { log_info("OrbServerImpl destroyed.\n"); } void OrbServerImpl::send_message_and_request_response(OrbClient_ptr response_listener_param, const char* msg_req) { char *msg_rsp; msg_rsp = NULL; if (CORBA::is_nil(response_listener_param) == false) { if (_listener != NULL) { log_debug("send_message_and_request_response(): %s, server callback != NULL\n", msg_req); _listener->request_received(msg_req, &msg_rsp); response_listener_param->receive_response_message(msg_rsp); } else { log_error("send_message_and_request_response() error, server callback == NULL\n"); } } else { log_error("invalid callback object received.\n"); } } char *OrbServerImpl::send_message_and_wait_response(const char* msg_req_param, ::CORBA::Long& err_flg) { char *msg_rsp; char *ret_val; err_flg = 0; msg_rsp = NULL; if (_listener != NULL) { log_debug("send_message_and_wait_response(): %s, server_callback != NULL\n", msg_req_param); _listener->request_received(msg_req_param, &msg_rsp); } else { log_error("send_message_and_wait_response() error, server callback == NULL\n"); msg_rsp = strdup(msg_req_param); err_flg = -1; } ret_val = CORBA::string_dup(msg_rsp); return ret_val; } void OrbServerImpl::send_dataitem_message_and_request_response(OrbClient_ptr response_listener_param, const ::DataItemSequence& msg_req_param) { BusMessage *msg_req; BusMessage *msg_rsp; if (CORBA::is_nil(response_listener_param) == false) { if (_listener != NULL) { msg_req = new BusMessageInternal(msg_req_param); log_debug("send_dataitem_message_and_request_response(), server_callback != NULL\n"); log_debug("msg_req:\n"); msg_req->printout(); msg_rsp = NULL; _listener->request_received(msg_req, &msg_rsp); log_debug("msg_rsp length: %d\n", msg_rsp->_dataItemSeq.length()); msg_rsp->printout(); response_listener_param->receive_response_dataitem_sequence(msg_rsp->_dataItemSeq); } else { log_error("send_dataitem_message_and_request_response() error, server_callback == NULL\n"); } } } CORBA::Long OrbServerImpl::send_dataitem_message_and_wait_response(const DataItemSequence& req_seq_param, DataItemSequence_out rsp_seq_param) { BusMessage *msg_req; BusMessage *msg_rsp; msg_req = new BusMessageInternal(req_seq_param); msg_rsp = NULL; _listener->request_received(msg_req, &msg_rsp); //rsp_seq_param = new DataItemSequence_out(msg_rsp._dataItemSeq); rsp_seq_param._data = &(msg_rsp->_dataItemSeq); return 0; } void OrbServerImpl::add_event_listener(OrbClient_ptr client_param, const char *msg, CORBA::UShort period_secs) { ServerEventThread *srvr_thread; if (CORBA::is_nil(client_param) == false) { log_debug("add_event_listener()\n"); server_thread_mutex.lock(); _server_thread_count++; server_thread_mutex.unlock(); srvr_thread = new ServerEventThread(client_param, msg, period_secs, this); srvr_thread->start(); } else { cerr << "Failed to add event listener, listener NULL.\n"; } } void OrbServerImpl::shutdown() { omni_mutex_lock sync(server_thread_mutex); if (is_shutdown_pending() == 0) { cout << "shutdown request received!" << endl; // Tell the servers to exit, and wait for them to do so. _shutdown_pending = 1; while(_server_thread_count > 0) { server_thread_shutdown_signal.wait(); } // Shutdown the ORB (but do not wait for completion). This also // causes the main thread to unblock from CORBA::ORB::run(). _orb->shutdown(0); } } int OrbServerImpl::init() { int argc; char **argv; int retVal; argc = -1; argv = NULL; retVal = 0; _orb = CORBA::ORB_init(argc, argv); if (_orb != NULL) { _poa = create_poa(_orb); retVal = 0; } else { cout << "init() failed" << endl; } return retVal; } int OrbServerImpl::launch(const char *server_name) { CORBA::Object_var server_ref; PortableServer::ObjectId_var server_id; bool ok_flg; int ret_val; CosNaming::NamingContext_var naming_context; ret_val = -1; POA_plpbus_orb::OrbServer_tie server_impl(this); server_id = _poa->activate_object(&server_impl); server_ref = server_impl._this(); //this->_remove_ref(); naming_context = get_service_naming_context(CONST_CONTEXT_NAME__PLPBUS, CONST_CONTEXT_KIND__PLPBUS); if (naming_context != NULL) { ok_flg = bind_naming_context_and_service(naming_context, server_ref, server_name, CONST_CONTEXT_KIND__PLPBUS); if (ok_flg == true) { cout << "Registered to naming service: " << server_name << endl; _orb->run(); ret_val = 0; } else { cout << "Failed to register to naming service: " << server_name << endl; } } return ret_val; } PortableServer::POA_var OrbServerImpl::create_poa(CORBA::ORB_var orb) { PortableServer::POA_var ret_val; CORBA::Object_var poa_obj; CORBA::PolicyList policy_list; CORBA::Any policyVal; PortableServer::POAManager_var poa_man; PortableServer::POA_var rootpoa; ret_val = NULL; poa_obj = orb->resolve_initial_references(CONST_ROOT_POA_NAME); if (poa_obj != NULL) { rootpoa = PortableServer::POA::_narrow(poa_obj); if (rootpoa != NULL) { poa_man = rootpoa->the_POAManager(); if (poa_man != NULL) { poa_man->activate(); // bidirectional policy policy_list.length(1); policyVal <<= BiDirPolicy::BOTH; policy_list[0] = orb->create_policy(BiDirPolicy::BIDIRECTIONAL_POLICY_TYPE, policyVal); ret_val = rootpoa->create_POA(CONST_ROOT_POA_BIDIR_POLICY_NAME, poa_man, policy_list); } } } else { cerr << "Failed to create RootPOA." << endl; } return ret_val; } CosNaming::NamingContext_var OrbServerImpl::get_service_naming_context(const char *service_name_param, const char *service_kind_param) { CosNaming::NamingContext_var ret_val; CosNaming::NamingContext_var ns_context; CORBA::Object_var ns_obj; CORBA::Object_var service_obj; CosNaming::Name context_data; ret_val = NULL; try { // get nameservice reference ns_obj = _orb->resolve_initial_references(CONST_NAME_SERVICE_NAME); // get nameservice context ns_context = CosNaming::NamingContext::_narrow(ns_obj); if (CORBA::is_nil(ns_context) == false) { context_data.length(1); context_data[0].id = service_name_param; context_data[0].kind = service_kind_param; try { service_obj = ns_context->resolve(context_data); if (CORBA::is_nil(service_obj)) { // not found, try to bind the new context to name service ret_val = ns_context->bind_new_context(context_data); if (CORBA::is_nil(ret_val) ) { cerr << "Failed to create new context to name service for " << service_name_param << "." << endl; } } else { ret_val = CosNaming::NamingContext::_narrow(service_obj); if (CORBA::is_nil(ret_val) ) { cerr << "Failed to get existing context from name service for " << service_name_param << ", narrowing failed." << endl; } } } catch(CosNaming::NamingContext::AlreadyBound& ex) { cerr << "Could not get context from nameservice for " << service_name_param << ". Context with same name already existed."<< endl; } } } catch (CORBA::ORB::InvalidName&) { // This should not happen! cerr << "Could not get context from name service for " << service_name_param << ", name service does not exist." << endl; } catch(CORBA::TRANSIENT& ex) { cerr << "Could not get context from name service for " << service_name_param << ", verify that name service is running. " << service_name_param << endl; } catch (CORBA::NO_RESOURCES&) { cerr << "Could not get context from name service for " << service_name_param << ". Name service is not running or has configuration problem." << endl; } catch(CORBA::SystemException& ex) { cerr << "Could not get context from name service for " << service_name_param << ", could not determine reason." << endl; } return ret_val; } bool OrbServerImpl::bind_naming_context_and_service(CosNaming::NamingContext_var service_context_param, CORBA::Object_ptr service_ref_param, const char *service_name_param, const char *service_kind_param) { bool retVal; CosNaming::Name context_data; retVal = false; try { context_data.length(1); context_data[0].id = service_name_param; context_data[0].kind = service_kind_param; try { service_context_param->bind(context_data, service_ref_param); retVal = true; } catch(CosNaming::NamingContext::AlreadyBound& ex) { /** * service existed already for the naming context with similar description. * Replace the existing one with a new one. */ cout << "service " << service_name_param << " existed, replacing it." << endl; service_context_param->rebind(context_data, service_ref_param); retVal = true; } } catch (CosNaming::NamingContext::InvalidName&) { cerr << "Could not register service to name server, invalid service name." << endl; } catch (CosNaming::NamingContext::NotFound&) { cerr << "Could not register service to name server, service object reference is invalid." << endl; } catch (CosNaming::NamingContext::CannotProceed&) { // This should not happen! cerr << "Could not register service to name server, unknown error." << endl; } catch(CORBA::SystemException& ex) { cerr << "Could not register service to name server, unknown error." << endl; } return retVal; } int OrbServerImpl::add_server_listener(IServerListener *listener_param) { int retVal; retVal = 0; cout << "register_request_received_callback() started" << endl; _listener = listener_param; cout << "register_callback() done" << endl; return retVal; } int OrbServerImpl::is_shutdown_pending() { return _shutdown_pending; } void OrbServerImpl::server_thread_closed() { bool send_signal = false; server_thread_mutex.lock(); _server_thread_count--; if (_server_thread_count == 0) { send_signal = true; } server_thread_mutex.unlock(); if (send_signal == true) { server_thread_shutdown_signal.signal(); } }