4 * Created on: Aug 11, 2010
10 #include "ServerEventThread.hh"
11 #include "OrbServerImpl.hh"
12 #include "ClientServerCommon.hh"
13 #include "BusMessageInternal.hh"
16 using namespace plpbus;
17 using namespace plpbus_orb;
19 static omni_mutex server_thread_mutex;
20 static omni_condition server_thread_shutdown_signal(&server_thread_mutex);
22 OrbServerImpl::OrbServerImpl()
26 _server_thread_count = 0;
27 _shutdown_pending = 0;
31 OrbServerImpl::~OrbServerImpl()
33 log_info("OrbServerImpl destroyed.\n");
36 void OrbServerImpl::send_message_and_request_response(OrbClient_ptr response_listener_param,
42 if (CORBA::is_nil(response_listener_param) == false) {
43 if (_listener != NULL) {
44 log_debug("send_message_and_request_response(): %s, server callback != NULL\n", msg_req);
45 _listener->request_received(msg_req, &msg_rsp);
46 log_debug("send_message_and_request_response(), request_received\n");
47 response_listener_param->receive_response_message(msg_rsp);
50 log_error("send_message_and_request_response() error, server callback == NULL\n");
54 log_error("invalid callback object received.\n");
58 char *OrbServerImpl::send_message_and_wait_response(const char* msg_req_param, ::CORBA::Long& err_flg) {
64 if (_listener != NULL) {
65 log_debug("send_message_and_wait_response(): %s, server_callback != NULL\n", msg_req_param);
66 _listener->request_received(msg_req_param, &msg_rsp);
69 log_error("send_message_and_wait_response() error, server callback == NULL\n");
70 msg_rsp = strdup(msg_req_param);
73 ret_val = CORBA::string_dup(msg_rsp);
77 void OrbServerImpl::send_dataitem_message_and_request_response(OrbClient_ptr response_listener_param,
78 const ::DataItemSequence& msg_req_param) {
81 DataItemSequence *seq;
84 if (CORBA::is_nil(response_listener_param) == false) {
85 if (_listener != NULL) {
86 msg_req = new BusMessageInternal(msg_req_param);
87 log_debug("send_dataitem_message_and_request_response(), server_callback != NULL\n");
88 log_debug("msg_req:\n");
92 msg_rsp = new BusMessage(msg_req->get_type(&err_flg));
93 _listener->request_received(msg_req, msg_rsp);
94 seq = (DataItemSequence *)msg_rsp->_dataItemSeq;
95 log_debug("msg_rsp length: %ld\n", seq->length());
97 response_listener_param->receive_response_dataitem_sequence(*seq);
102 log_error("send_dataitem_message_and_request_response() error, server_callback == NULL\n");
107 CORBA::Long OrbServerImpl::send_dataitem_message_and_wait_response(const DataItemSequence& req_seq_param,
108 DataItemSequence_out rsp_seq_param) {
111 DataItemSequence *seq;
116 msg_req = new BusMessageInternal(req_seq_param);
117 msg_rsp = new BusMessage(msg_req->get_type(&err_flg));
118 _listener->request_received(msg_req, msg_rsp);
119 if (msg_rsp != NULL) {
120 seq = (DataItemSequence *)msg_rsp->_dataItemSeq;
123 seq = new DataItemSequence();
125 rsp_seq_param._data = seq;
130 void OrbServerImpl::add_event_listener(OrbClient_ptr client_param,
132 CORBA::UShort period_secs)
134 ServerEventThread *srvr_thread;
136 if (CORBA::is_nil(client_param) == false) {
137 log_debug("add_event_listener()\n");
138 server_thread_mutex.lock();
139 _server_thread_count++;
140 server_thread_mutex.unlock();
141 srvr_thread = new ServerEventThread(client_param, msg, period_secs, this);
142 srvr_thread->start();
145 log_error("Failed to add event listener, listener NULL.\n");
149 void OrbServerImpl::shutdown()
151 omni_mutex_lock sync(server_thread_mutex);
152 if (is_shutdown_pending() == 0) {
153 log_debug("shutdown request received!\n");
154 // Tell the servers to exit, and wait for them to do so.
155 _shutdown_pending = 1;
156 while(_server_thread_count > 0) {
157 server_thread_shutdown_signal.timedwait(5, 0);
158 log_debug("signal received\n");
160 // Shutdown the ORB (but do not wait for completion). This also
161 // causes the main thread to unblock from CORBA::ORB::run().
162 log_debug("calling orb shutdown\n");
164 log_debug("orb shutdown called\n");
168 int OrbServerImpl::init() {
176 _orb = CORBA::ORB_init(argc, argv);
178 _poa = create_poa(_orb);
182 log_error("init() failed\n");
187 int OrbServerImpl::launch(const char *server_name)
189 CORBA::Object_var server_ref;
190 PortableServer::ObjectId_var server_id;
193 CosNaming::NamingContext_var naming_context;
196 POA_plpbus_orb::OrbServer_tie<OrbServerImpl> server_impl(this);
197 server_id = _poa->activate_object(&server_impl);
198 server_ref = server_impl._this();
199 //this->_remove_ref();
200 naming_context = get_service_naming_context(CONST_CONTEXT_NAME__PLPBUS, CONST_CONTEXT_KIND__PLPBUS);
201 if (naming_context != NULL) {
202 ok_flg = bind_naming_context_and_service(naming_context,
205 CONST_CONTEXT_KIND__PLPBUS);
206 if (ok_flg == true) {
207 log_debug("Registered to naming service: %s\n", server_name);
212 log_error("Failed to register to naming service: %s\n", server_name);
218 PortableServer::POA_var OrbServerImpl::create_poa(CORBA::ORB_var orb) {
219 PortableServer::POA_var ret_val;
220 CORBA::Object_var poa_obj;
221 CORBA::PolicyList policy_list;
222 CORBA::Any policyVal;
223 PortableServer::POAManager_var poa_man;
224 PortableServer::POA_var rootpoa;
227 poa_obj = orb->resolve_initial_references(CONST_ROOT_POA_NAME);
228 if (poa_obj != NULL) {
229 rootpoa = PortableServer::POA::_narrow(poa_obj);
230 if (rootpoa != NULL) {
231 poa_man = rootpoa->the_POAManager();
232 if (poa_man != NULL) {
234 // bidirectional policy
235 policy_list.length(1);
236 policyVal <<= BiDirPolicy::BOTH;
237 policy_list[0] = orb->create_policy(BiDirPolicy::BIDIRECTIONAL_POLICY_TYPE, policyVal);
238 ret_val = rootpoa->create_POA(CONST_ROOT_POA_BIDIR_POLICY_NAME,
245 log_error("Failed to create RootPOA.\n");
250 CosNaming::NamingContext_var OrbServerImpl::get_service_naming_context(const char *service_name_param,
251 const char *service_kind_param)
253 CosNaming::NamingContext_var ret_val;
254 CosNaming::NamingContext_var ns_context;
255 CORBA::Object_var ns_obj;
256 CORBA::Object_var service_obj;
257 CosNaming::Name context_data;
261 // get nameservice reference
262 log_debug("get_service_naming_context() 1\n");
263 ns_obj = _orb->resolve_initial_references(CONST_NAME_SERVICE_NAME);
264 log_debug("get_service_naming_context() 2\n");
265 // get name service context
266 ns_context = CosNaming::NamingContext::_narrow(ns_obj);
267 log_debug("get_service_naming_context() 3\n");
268 if (CORBA::is_nil(ns_context) == false) {
269 context_data.length(1);
270 context_data[0].id = service_name_param;
271 context_data[0].kind = service_kind_param;
272 log_debug("get_service_naming_context() 4\n");
274 service_obj = ns_context->resolve(context_data);
276 catch(CosNaming::NamingContext::NotFound& ex) {
277 log_error("Failed to get context from name service for %s. Context does not yet exist, but will try to create.\n", service_name_param);
280 log_debug("get_service_naming_context() 5\n");
281 if (CORBA::is_nil(service_obj)) {
282 // not found, try to bind the new context to name service
283 log_debug("get_service_naming_context() 6\n");
284 ret_val = ns_context->bind_new_context(context_data);
285 log_debug("get_service_naming_context() 7\n");
286 if (CORBA::is_nil(ret_val) ) {
287 log_error("Failed to create new context to name service for %s.\n", service_name_param);
291 log_debug("get_service_naming_context() 8\n");
292 ret_val = CosNaming::NamingContext::_narrow(service_obj);
293 log_debug("get_service_naming_context() 9\n");
294 if (CORBA::is_nil(ret_val) ) {
295 log_error("Failed to get existing context from name service for %s, narrowing failed.\n", service_name_param);
299 catch(CosNaming::NamingContext::AlreadyBound& ex) {
300 log_error("Failed to get context from name service for %s. Context with same name already exist.\n", service_name_param);
304 catch (CORBA::ORB::InvalidName&) {
305 // This should not happen!
306 log_error("Failed to get context from name service for %s, name service does not exist.\n", service_name_param);
308 catch(CORBA::TRANSIENT& ex) {
309 log_error("Failed to get context from name service for %s, verify that name service is running.\n", service_name_param);
311 catch (CORBA::NO_RESOURCES&) {
312 log_error("Failed to get context from name service for %s, Name service is not running or has configuration problem.\n", service_name_param);
313 log_error("Check-list:\n");
314 log_error("If you have OMNIORB_CONFIG environment variable defined, verify that omniORB.cfg file exist in that location.\n");
315 log_error("If you do not have OMNIORB_CONFIG environment variable defined, verify that you have /etc/omniORB.cfg file\n");
316 log_error("Verify that InitRef line is defined in omniORB.cfg file.\n");
318 catch(CORBA::SystemException& ex) {
319 log_error("Failed to get context from name service for %s, system error.\n", service_name_param);
321 log_debug("get_service_naming_context() done\n");
325 bool OrbServerImpl::bind_naming_context_and_service(CosNaming::NamingContext_var service_context_param,
326 CORBA::Object_ptr service_ref_param,
327 const char *service_name_param,
328 const char *service_kind_param)
331 CosNaming::Name context_data;
335 context_data.length(1);
336 context_data[0].id = service_name_param;
337 context_data[0].kind = service_kind_param;
339 service_context_param->bind(context_data, service_ref_param);
342 catch(CosNaming::NamingContext::AlreadyBound& ex) {
344 * service existed already for the naming context with similar description.
345 * Replace the existing one with a new one.
347 log_warning("service %s exist, replacing it\n", service_name_param);
348 service_context_param->rebind(context_data, service_ref_param);
352 catch (CosNaming::NamingContext::InvalidName&) {
353 log_error("Could not register service to name server, invalid service name.\n");
355 catch (CosNaming::NamingContext::NotFound&) {
356 log_error("Could not register service to name server, service object reference is invalid.\n");
358 catch (CosNaming::NamingContext::CannotProceed&) {
359 // This should not happen!
360 log_error("Could not register service to name server, unknown error.\n");
362 catch(CORBA::SystemException& ex) {
363 log_error("Could not register service to name server, unknown error.\n");
368 int OrbServerImpl::add_server_listener(IServerListener *listener_param)
373 log_debug("register_request_received_callback() started\n");
374 _listener = listener_param;
378 int OrbServerImpl::is_shutdown_pending() {
379 return _shutdown_pending;
382 void OrbServerImpl::server_thread_closed() {
385 log_debug("server_thread_closed() started\n");
387 server_thread_mutex.lock();
388 log_debug("server_thread_closed(), server_thread_count: %d\n", _server_thread_count);
389 _server_thread_count--;
390 if (_server_thread_count == 0) {
393 server_thread_mutex.unlock();
394 if (send_signal == true) {
395 log_debug("server_thread_closed(), sending signal\n");
396 server_thread_shutdown_signal.signal();
397 log_debug("server_thread_closed(), signal send\n");