4 * Created on: Aug 11, 2010
10 #include "ServerEventThread.hh"
11 #include "OrbServerImpl.hh"
12 #include "ClientServerCommon.hh"
13 #include "BusMessageInternal.hh"
16 using namespace plpbus;
17 using namespace plpbus_orb;
19 static omni_mutex server_thread_mutex;
20 static omni_condition server_thread_shutdown_signal(&server_thread_mutex);
22 OrbServerImpl::OrbServerImpl()
26 _server_thread_count = 0;
27 _shutdown_pending = 0;
31 OrbServerImpl::~OrbServerImpl()
37 void OrbServerImpl::send_message_and_request_response(OrbClient_ptr response_listener_param,
43 if (CORBA::is_nil(response_listener_param) == false) {
44 if (_listener != NULL) {
45 log_debug("send_message_and_request_response(): %s, server callback != NULL\n", msg_req);
46 _listener->request_received(msg_req, &msg_rsp);
47 log_debug("send_message_and_request_response(), request_received\n");
48 response_listener_param->receive_response_message(msg_rsp);
51 log_error("send_message_and_request_response() error, server callback == NULL\n");
55 log_error("invalid callback object received.\n");
59 char *OrbServerImpl::send_message_and_wait_response(const char* msg_req_param, ::CORBA::Long& err_flg) {
65 if (_listener != NULL) {
66 log_debug("send_message_and_wait_response(): %s, server_callback != NULL\n", msg_req_param);
67 _listener->request_received(msg_req_param, &msg_rsp);
68 //ret_val = CORBA::string_dup(msg_rsp);
69 ret_val = CORBA::string_dup(msg_rsp);
72 log_error("send_message_and_wait_response() error, server callback == NULL\n");
73 ret_val = CORBA::string_dup(msg_req_param);
79 void OrbServerImpl::send_dataitem_message_and_request_response(OrbClient_ptr response_listener_param,
80 const ::DataItemSequence& msg_req_param) {
83 DataItemSequence *seq;
86 if (CORBA::is_nil(response_listener_param) == false) {
87 if (_listener != NULL) {
88 msg_req = new BusMessageInternal(msg_req_param);
89 log_debug("send_dataitem_message_and_request_response(), server_callback != NULL\n");
90 log_debug("msg_req:\n");
94 msg_rsp = new BusMessage(msg_req->get_type(&err_flg));
95 _listener->request_received(msg_req, msg_rsp);
96 seq = (DataItemSequence *)msg_rsp->_dataItemSeq;
97 log_debug("msg_rsp length: %lu\n", (long unsigned int)seq->length());
99 response_listener_param->receive_response_dataitem_sequence(*seq);
104 log_error("send_dataitem_message_and_request_response() error, server_callback == NULL\n");
109 void copy( const DataItemSequence& orig_seq )
111 DataItemSequence new_seq;
116 CORBA::Long OrbServerImpl::send_dataitem_message_and_wait_response(const DataItemSequence& req_seq_param,
117 DataItemSequence_out rsp_seq_param) {
122 msg_req = new BusMessageInternal(req_seq_param);
123 msg_rsp = new BusMessage(msg_req->get_type(&err_flg));
124 _listener->request_received(msg_req, msg_rsp);
125 rsp_seq_param._data = new DataItemSequence(*(msg_rsp->_dataItemSeq));
132 void OrbServerImpl::add_event_listener(OrbClient_ptr client_param,
134 CORBA::UShort period_secs)
136 ServerEventThread *srvr_thread;
138 if (CORBA::is_nil(client_param) == false) {
139 log_debug("add_event_listener()\n");
140 server_thread_mutex.lock();
141 _server_thread_count++;
142 server_thread_mutex.unlock();
143 srvr_thread = new ServerEventThread(client_param, msg, period_secs, this);
144 srvr_thread->start();
147 log_error("Failed to add event listener, listener NULL.\n");
151 void OrbServerImpl::shutdown()
153 omni_mutex_lock sync(server_thread_mutex);
154 if (is_shutdown_pending() == 0) {
155 log_debug("shutdown request received!\n");
156 // Tell the servers to exit, and wait for them to do so.
157 _shutdown_pending = 1;
158 while(_server_thread_count > 0) {
159 server_thread_shutdown_signal.timedwait(5, 0);
160 log_debug("signal received\n");
162 // Shutdown the ORB (but do not wait for completion). This also
163 // causes the main thread to unblock from CORBA::ORB::run().
164 log_debug("calling orb shutdown\n");
166 log_debug("orb shutdown called\n");
170 int OrbServerImpl::init() {
178 log_debug("started\n");
179 _orb = CORBA::ORB_init(argc, argv);
181 _poa = create_poa(_orb);
182 if (CORBA::is_nil(_poa) == false) {
184 log_info("init success\n");
187 log_info("init failed\n");
191 log_error("init() failed\n");
196 int OrbServerImpl::launch(const char *server_name)
198 CORBA::Object_var server_ref;
199 PortableServer::ObjectId_var server_id;
202 CosNaming::NamingContext_var naming_cntx;
205 POA_plpbus_orb::OrbServer_tie<OrbServerImpl> server_impl(this);
206 server_id = _poa->activate_object(&server_impl);
207 server_ref = server_impl._this();
208 //this->_remove_ref();
209 naming_cntx = get_naming_service_context(CONST_CONTEXT_NAME__PLPBUS, CONST_CONTEXT_KIND__PLPBUS);
210 if (naming_cntx != NULL) {
211 ok_flg = bind_naming_context_and_service(naming_cntx,
214 CONST_CONTEXT_KIND__PLPBUS);
215 if (ok_flg == true) {
216 log_debug("Registered to naming service: %s\n", server_name);
218 log_debug("run stopped: %s\n", server_name);
222 log_error("Failed to register to naming service: %s\n", server_name);
228 PortableServer::POA_var OrbServerImpl::create_poa(CORBA::ORB_var orb) {
229 PortableServer::POA_var ret_val;
230 CORBA::Object_var poa_obj;
231 CORBA::PolicyList policy_list;
232 CORBA::Any policyVal;
233 PortableServer::POAManager_var poa_man;
234 PortableServer::POA_var rootpoa;
237 log_debug("started\n");
239 poa_obj = orb->resolve_initial_references(CONST_ROOT_POA_NAME);
240 log_debug("received initial reference to %s\n", CONST_ROOT_POA_NAME);
241 rootpoa = PortableServer::POA::_narrow(poa_obj);
242 poa_man = rootpoa->the_POAManager();
244 // bidirectional policy
245 policy_list.length(1);
246 policyVal <<= BiDirPolicy::BOTH;
247 policy_list[0] = orb->create_policy(BiDirPolicy::BIDIRECTIONAL_POLICY_TYPE, policyVal);
248 ret_val = rootpoa->create_POA(CONST_ROOT_POA_BIDIR_POLICY_NAME,
251 if (CORBA::is_nil(ret_val) == false) {
252 log_info("Created root poa %s\n", CONST_ROOT_POA_NAME);
255 log_error("Failed to create RootPOA %s\n", CONST_ROOT_POA_NAME);
258 catch(CORBA::SystemException& ex) {
259 log_error("Failed to create RootPOA, received system exception CORBA::%s\n", ex._name());
261 catch(CORBA::Exception& ex) {
262 log_error("Failed to create RootPOA, received exception CORBA::%s\n", ex._name());
264 catch(omniORB::fatalException& ex) {
265 log_error("Failed to create RootPOA, received fatal exception CORBA::%s\n", ex.errmsg());
270 CosNaming::NamingContext_var OrbServerImpl::get_naming_service_context(const char *service_name_param,
271 const char *service_kind_param)
273 CosNaming::NamingContext_var ret_val;
274 CosNaming::NamingContext_var ns_cntx;
275 CORBA::Object_var ns_obj;
276 CORBA::Object_var service_obj;
277 CosNaming::Name cntx_dta;
281 // get nameservice reference
282 //log_debug("started\n");
283 ns_obj = _orb->resolve_initial_references("NameService");
284 if (CORBA::is_nil(ns_obj) == false) {
285 // get naming service context
286 ns_cntx = CosNaming::NamingContext::_narrow(ns_obj.in());
287 if (CORBA::is_nil(ns_cntx.in()) == false) {
289 cntx_dta[0].id = CORBA::string_dup(service_name_param);
290 cntx_dta[0].kind = CORBA::string_dup(service_kind_param);
292 service_obj = ns_cntx->resolve(cntx_dta);
294 catch(CosNaming::NamingContext::NotFound& ex) {
295 log_error("Trying to create new naming service context %s.\n", service_name_param);
298 if (CORBA::is_nil(service_obj)) {
299 // not found, try to bind new context from naming service
300 ret_val = ns_cntx->bind_new_context(cntx_dta);
301 if (CORBA::is_nil(ret_val) ) {
302 log_error("Failed to create new context to name service for %s.\n", service_name_param);
306 ret_val = CosNaming::NamingContext::_narrow(service_obj);
307 if (CORBA::is_nil(ret_val) ) {
308 log_error("Failed to get naming service context for %s, narrowing failed for resolved service context.\n", service_name_param);
312 catch(CosNaming::NamingContext::AlreadyBound& ex) {
313 log_error("Failed to get naming service context for %s. Context with same name already exist.\n", service_name_param);
315 //CORBA::release(ns_cntx);
318 log_error("Failed to get naming service context for %s. Could not narrow the name service.\n", service_name_param);
320 //CORBA::release(ns_obj);
323 log_error("Failed to get naming service context for %s. Could not get reference to name service.\n", service_name_param);
326 catch (CORBA::ORB::InvalidName&) {
327 // This should not happen!
328 log_error("Failed to get naming service context for %s, name service does not exist.\n", service_name_param);
330 catch(CORBA::TRANSIENT& ex) {
331 log_error("Failed to get naming service context for %s, verify that name service is running.\n", service_name_param);
333 catch (CORBA::NO_RESOURCES&) {
334 log_error("Failed to get context from name service for %s, Naming service is not running or has configuration problem.\n", service_name_param);
335 log_error("Check-list:\n");
336 log_error("If you have OMNIORB_CONFIG environment variable defined, verify that omniORB.cfg file exist in that location.\n");
337 log_error("If you do not have OMNIORB_CONFIG environment variable defined, verify that you have /etc/omniORB.cfg file.\n");
338 log_error("Verify that InitRef line is defined in /etc/omniORB.cfg file.\n");
340 catch(CORBA::SystemException& ex) {
341 log_error("Failed to get naming service context for %s, system error.\n", service_name_param);
343 //log_debug("done\n");
347 bool OrbServerImpl::bind_naming_context_and_service(CosNaming::NamingContext_var srv_cntx_param,
348 CORBA::Object_ptr srv_ref_param,
349 const char *srv_name_param,
350 const char *srv_kind_param)
353 CosNaming::Name cntx_dta;
358 cntx_dta[0].id = CORBA::string_dup(srv_name_param);
359 cntx_dta[0].kind = CORBA::string_dup(srv_kind_param);
361 srv_cntx_param->bind(cntx_dta, srv_ref_param);
364 catch(CosNaming::NamingContext::AlreadyBound& ex) {
366 * service existed already for the naming context with similar description.
367 * Replace the existing one with a new one.
369 log_warning("service %s exist, replacing it\n", srv_name_param);
370 srv_cntx_param->rebind(cntx_dta, srv_ref_param);
374 catch (CosNaming::NamingContext::InvalidName&) {
375 log_error("Could not register service to name server, invalid service name.\n");
377 catch (CosNaming::NamingContext::NotFound&) {
378 log_error("Could not register service to name server, service object reference is invalid.\n");
380 catch (CosNaming::NamingContext::CannotProceed&) {
381 // This should not happen!
382 log_error("Could not register service to name server, unknown error.\n");
384 catch(CORBA::SystemException& ex) {
385 log_error("Could not register service to name server, unknown error.\n");
390 int OrbServerImpl::add_server_listener(IServerListener *listener_param)
395 log_debug("started\n");
396 _listener = listener_param;
400 int OrbServerImpl::is_shutdown_pending() {
401 return _shutdown_pending;
404 void OrbServerImpl::server_thread_closed() {
407 log_debug("started\n");
409 server_thread_mutex.lock();
410 log_debug("server_thread_count: %d\n", _server_thread_count);
411 _server_thread_count--;
412 if (_server_thread_count == 0) {
415 server_thread_mutex.unlock();
416 if (send_signal == true) {
417 log_debug("sending signal\n");
418 server_thread_shutdown_signal.signal();
419 log_debug("signal send\n");