4 * Created on: Aug 11, 2010
10 #include "ServerEventThread.hh"
11 #include "OrbServerImpl.hh"
12 #include "ClientServerCommon.hh"
13 #include "BusMessageInternal.hh"
16 using namespace plpbus;
17 using namespace plpbus_orb;
19 static omni_mutex server_thread_mutex;
20 static omni_condition server_thread_shutdown_signal(&server_thread_mutex);
22 OrbServerImpl::OrbServerImpl()
26 _server_thread_count = 0;
27 _shutdown_pending = 0;
31 OrbServerImpl::~OrbServerImpl()
36 void OrbServerImpl::send_message_and_request_response(OrbClient_ptr response_listener_param,
42 if (CORBA::is_nil(response_listener_param) == false) {
43 if (_listener != NULL) {
44 log_debug("send_message_and_request_response(): %s, server callback != NULL\n", msg_req);
45 _listener->request_received(msg_req, &msg_rsp);
46 log_debug("send_message_and_request_response(), request_received\n");
47 response_listener_param->receive_response_message(msg_rsp);
50 log_error("send_message_and_request_response() error, server callback == NULL\n");
54 log_error("invalid callback object received.\n");
58 char *OrbServerImpl::send_message_and_wait_response(const char* msg_req_param, ::CORBA::Long& err_flg) {
64 if (_listener != NULL) {
65 log_debug("send_message_and_wait_response(): %s, server_callback != NULL\n", msg_req_param);
66 _listener->request_received(msg_req_param, &msg_rsp);
67 //ret_val = CORBA::string_dup(msg_rsp);
68 ret_val = CORBA::string_dup(msg_rsp);
71 log_error("send_message_and_wait_response() error, server callback == NULL\n");
72 ret_val = CORBA::string_dup(msg_req_param);
78 void OrbServerImpl::send_dataitem_message_and_request_response(OrbClient_ptr response_listener_param,
79 const ::DataItemSequence& msg_req_param) {
82 DataItemSequence *seq;
85 if (CORBA::is_nil(response_listener_param) == false) {
86 if (_listener != NULL) {
87 msg_req = new BusMessageInternal(msg_req_param);
88 log_debug("send_dataitem_message_and_request_response(), server_callback != NULL\n");
89 log_debug("msg_req:\n");
93 msg_rsp = new BusMessage(msg_req->get_type(&err_flg));
94 _listener->request_received(msg_req, msg_rsp);
95 seq = (DataItemSequence *)msg_rsp->_dataItemSeq;
96 log_debug("msg_rsp length: %ld\n", seq->length());
98 response_listener_param->receive_response_dataitem_sequence(*seq);
103 log_error("send_dataitem_message_and_request_response() error, server_callback == NULL\n");
108 CORBA::Long OrbServerImpl::send_dataitem_message_and_wait_response(const DataItemSequence& req_seq_param,
109 DataItemSequence_out rsp_seq_param) {
112 DataItemSequence *seq;
117 msg_req = new BusMessageInternal(req_seq_param);
118 msg_rsp = new BusMessage(msg_req->get_type(&err_flg));
119 _listener->request_received(msg_req, msg_rsp);
120 if (msg_rsp != NULL) {
121 seq = (DataItemSequence *)msg_rsp->_dataItemSeq;
124 seq = new DataItemSequence();
126 rsp_seq_param._data = seq;
131 void OrbServerImpl::add_event_listener(OrbClient_ptr client_param,
133 CORBA::UShort period_secs)
135 ServerEventThread *srvr_thread;
137 if (CORBA::is_nil(client_param) == false) {
138 log_debug("add_event_listener()\n");
139 server_thread_mutex.lock();
140 _server_thread_count++;
141 server_thread_mutex.unlock();
142 srvr_thread = new ServerEventThread(client_param, msg, period_secs, this);
143 srvr_thread->start();
146 log_error("Failed to add event listener, listener NULL.\n");
150 void OrbServerImpl::shutdown()
152 omni_mutex_lock sync(server_thread_mutex);
153 if (is_shutdown_pending() == 0) {
154 log_debug("shutdown request received!\n");
155 // Tell the servers to exit, and wait for them to do so.
156 _shutdown_pending = 1;
157 while(_server_thread_count > 0) {
158 server_thread_shutdown_signal.timedwait(5, 0);
159 log_debug("signal received\n");
161 // Shutdown the ORB (but do not wait for completion). This also
162 // causes the main thread to unblock from CORBA::ORB::run().
163 log_debug("calling orb shutdown\n");
165 log_debug("orb shutdown called\n");
169 int OrbServerImpl::init() {
177 log_debug("started\n");
178 _orb = CORBA::ORB_init(argc, argv);
180 _poa = create_poa(_orb);
181 if (CORBA::is_nil(_poa) == false) {
183 log_info("init success\n");
186 log_info("init failed\n");
190 log_error("init() failed\n");
195 int OrbServerImpl::launch(const char *server_name)
197 CORBA::Object_var server_ref;
198 PortableServer::ObjectId_var server_id;
201 CosNaming::NamingContext_var naming_cntx;
204 POA_plpbus_orb::OrbServer_tie<OrbServerImpl> server_impl(this);
205 server_id = _poa->activate_object(&server_impl);
206 server_ref = server_impl._this();
207 //this->_remove_ref();
208 naming_cntx = get_naming_service_context(CONST_CONTEXT_NAME__PLPBUS, CONST_CONTEXT_KIND__PLPBUS);
209 if (naming_cntx != NULL) {
210 ok_flg = bind_naming_context_and_service(naming_cntx,
213 CONST_CONTEXT_KIND__PLPBUS);
214 if (ok_flg == true) {
215 log_debug("Registered to naming service: %s\n", server_name);
217 log_debug("run stopped: %s\n", server_name);
221 log_error("Failed to register to naming service: %s\n", server_name);
227 PortableServer::POA_var OrbServerImpl::create_poa(CORBA::ORB_var orb) {
228 PortableServer::POA_var ret_val;
229 CORBA::Object_var poa_obj;
230 CORBA::PolicyList policy_list;
231 CORBA::Any policyVal;
232 PortableServer::POAManager_var poa_man;
233 PortableServer::POA_var rootpoa;
236 log_debug("started\n");
238 poa_obj = orb->resolve_initial_references(CONST_ROOT_POA_NAME);
239 log_debug("received initial reference to %s\n", CONST_ROOT_POA_NAME);
240 rootpoa = PortableServer::POA::_narrow(poa_obj);
241 poa_man = rootpoa->the_POAManager();
243 // bidirectional policy
244 policy_list.length(1);
245 policyVal <<= BiDirPolicy::BOTH;
246 policy_list[0] = orb->create_policy(BiDirPolicy::BIDIRECTIONAL_POLICY_TYPE, policyVal);
247 ret_val = rootpoa->create_POA(CONST_ROOT_POA_BIDIR_POLICY_NAME,
250 if (CORBA::is_nil(ret_val) == false) {
251 log_info("Created root poa %s\n", CONST_ROOT_POA_NAME);
254 log_error("Failed to create RootPOA %s\n", CONST_ROOT_POA_NAME);
257 catch(CORBA::SystemException& ex) {
258 log_error("Failed to create RootPOA, received system exception CORBA::%s\n", ex._name());
260 catch(CORBA::Exception& ex) {
261 log_error("Failed to create RootPOA, received exception CORBA::%s\n", ex._name());
263 catch(omniORB::fatalException& ex) {
264 log_error("Failed to create RootPOA, received fatal exception CORBA::%s\n", ex.errmsg());
269 CosNaming::NamingContext_var OrbServerImpl::get_naming_service_context(const char *service_name_param,
270 const char *service_kind_param)
272 CosNaming::NamingContext_var ret_val;
273 CosNaming::NamingContext_var ns_cntx;
274 CORBA::Object_var ns_obj;
275 CORBA::Object_var service_obj;
276 CosNaming::Name cntx_dta;
280 // get nameservice reference
281 log_debug("started\n");
282 ns_obj = _orb->resolve_initial_references(CONST_NAME_SERVICE_NAME);
283 if (CORBA::is_nil(ns_obj) == false) {
284 // get naming service context
285 ns_cntx = CosNaming::NamingContext::_narrow(ns_obj);
286 if (CORBA::is_nil(ns_cntx) == false) {
288 cntx_dta[0].id = CORBA::string_dup(service_name_param);
289 cntx_dta[0].kind = CORBA::string_dup(service_kind_param);
291 service_obj = ns_cntx->resolve(cntx_dta);
293 catch(CosNaming::NamingContext::NotFound& ex) {
294 log_error("Trying to create new naming service context %s.\n", service_name_param);
297 if (CORBA::is_nil(service_obj)) {
298 // not found, try to bind new context from naming service
299 ret_val = ns_cntx->bind_new_context(cntx_dta);
300 if (CORBA::is_nil(ret_val) ) {
301 log_error("Failed to create new context to name service for %s.\n", service_name_param);
305 ret_val = CosNaming::NamingContext::_narrow(service_obj);
306 if (CORBA::is_nil(ret_val) ) {
307 log_error("Failed to get naming service context for %s, narrowing failed for resolved service context.\n", service_name_param);
311 catch(CosNaming::NamingContext::AlreadyBound& ex) {
312 log_error("Failed to get naming service context for %s. Context with same name already exist.\n", service_name_param);
316 log_error("Failed to get naming service context for %s. Could not narrow the name service.\n", service_name_param);
318 //CORBA::release(ns_obj);
321 log_error("Failed to get naming service context for %s. Could not get reference to name service.\n", service_name_param);
324 catch (CORBA::ORB::InvalidName&) {
325 // This should not happen!
326 log_error("Failed to get naming service context for %s, name service does not exist.\n", service_name_param);
328 catch(CORBA::TRANSIENT& ex) {
329 log_error("Failed to get naming service context for %s, verify that name service is running.\n", service_name_param);
331 catch (CORBA::NO_RESOURCES&) {
332 log_error("Failed to get context from name service for %s, Naming service is not running or has configuration problem.\n", service_name_param);
333 log_error("Check-list:\n");
334 log_error("If you have OMNIORB_CONFIG environment variable defined, verify that omniORB.cfg file exist in that location.\n");
335 log_error("If you do not have OMNIORB_CONFIG environment variable defined, verify that you have /etc/omniORB.cfg file.\n");
336 log_error("Verify that InitRef line is defined in /etc/omniORB.cfg file.\n");
338 catch(CORBA::SystemException& ex) {
339 log_error("Failed to get naming service context for %s, system error.\n", service_name_param);
345 bool OrbServerImpl::bind_naming_context_and_service(CosNaming::NamingContext_var srv_cntx_param,
346 CORBA::Object_ptr srv_ref_param,
347 const char *srv_name_param,
348 const char *srv_kind_param)
351 CosNaming::Name cntx_dta;
356 cntx_dta[0].id = CORBA::string_dup(srv_name_param);
357 cntx_dta[0].kind = CORBA::string_dup(srv_kind_param);
359 srv_cntx_param->bind(cntx_dta, srv_ref_param);
362 catch(CosNaming::NamingContext::AlreadyBound& ex) {
364 * service existed already for the naming context with similar description.
365 * Replace the existing one with a new one.
367 log_warning("service %s exist, replacing it\n", srv_name_param);
368 srv_cntx_param->rebind(cntx_dta, srv_ref_param);
372 catch (CosNaming::NamingContext::InvalidName&) {
373 log_error("Could not register service to name server, invalid service name.\n");
375 catch (CosNaming::NamingContext::NotFound&) {
376 log_error("Could not register service to name server, service object reference is invalid.\n");
378 catch (CosNaming::NamingContext::CannotProceed&) {
379 // This should not happen!
380 log_error("Could not register service to name server, unknown error.\n");
382 catch(CORBA::SystemException& ex) {
383 log_error("Could not register service to name server, unknown error.\n");
388 int OrbServerImpl::add_server_listener(IServerListener *listener_param)
393 log_debug("started\n");
394 _listener = listener_param;
398 int OrbServerImpl::is_shutdown_pending() {
399 return _shutdown_pending;
402 void OrbServerImpl::server_thread_closed() {
405 log_debug("started\n");
407 server_thread_mutex.lock();
408 log_debug("server_thread_count: %d\n", _server_thread_count);
409 _server_thread_count--;
410 if (_server_thread_count == 0) {
413 server_thread_mutex.unlock();
414 if (send_signal == true) {
415 log_debug("sending signal\n");
416 server_thread_shutdown_signal.signal();
417 log_debug("signal send\n");