This is Unofficial EPICS BASE Doxygen Site
transportRegistry.cpp
Go to the documentation of this file.
1 
7 #define epicsExportSharedSymbols
8 #include <pv/transportRegistry.h>
9 #include <pv/logger.h>
10 
11 namespace pvd = epics::pvData;
12 
13 namespace epics {
14 namespace pvAccess {
15 
16 
17 bool TransportRegistry::Key::operator<(const Key& o) const
18 {
19  if(addr.sa.sa_family<o.addr.sa.sa_family)
20  return true;
21  if(addr.sa.sa_family>o.addr.sa.sa_family)
22  return false;
23  if(addr.ia.sin_addr.s_addr<o.addr.ia.sin_addr.s_addr)
24  return true;
25  if(addr.ia.sin_addr.s_addr>o.addr.ia.sin_addr.s_addr)
26  return false;
27  if(addr.ia.sin_port<o.addr.ia.sin_port)
28  return true;
29  if(addr.ia.sin_port>o.addr.ia.sin_port)
30  return false;
31  if(prio<o.prio)
32  return true;
33  return false;
34 }
35 
37  const osiSockAddr& address,
38  pvd::int16 prio)
39  :owner(owner)
40  ,key(address, prio)
41 {
42  {
43  pvd::Lock G(owner->_mutex);
44 
45  std::tr1::shared_ptr<pvd::Mutex>& lock = owner->locks[key]; // fetch or alloc
46  if(!lock)
47  lock.reset(new pvd::Mutex());
48 
49  mutex = lock;
50  }
51 
52  mutex->lock();
53 }
54 
56 {
57  mutex->unlock();
58 
59  pvd::Lock G(owner->_mutex);
60 
61  assert(mutex.use_count()>=2);
62 
63  if(mutex.use_count()==2) {
64  // no other concurrent connect(), so can drop this lock
65  owner->locks.erase(key);
66  }
67 
68  assert(mutex.use_count()==1);
69 }
70 
72 {
73  pvd::Lock G(_mutex);
74  if(!transports.empty())
75  LOG(logLevelWarn, "TransportRegistry destroyed while not empty");
76 }
77 
78 Transport::shared_pointer TransportRegistry::get(const osiSockAddr& address, epics::pvData::int16 prio)
79 {
80  const Key key(address, prio);
81 
82  pvd::Lock G(_mutex);
83 
84  transports_t::iterator it(transports.find(key));
85  if(it!=transports.end()) {
86  return it->second;
87  }
88  return Transport::shared_pointer();
89 }
90 
91 void TransportRegistry::install(const Transport::shared_pointer& ptr)
92 {
93  const Key key(ptr->getRemoteAddress(), ptr->getPriority());
94 
95  pvd::Lock G(_mutex);
96 
97  std::pair<transports_t::iterator, bool> itpair(transports.insert(std::make_pair(key, ptr)));
98  if(!itpair.second)
99  THROW_EXCEPTION2(std::logic_error, "Refuse to insert dup");
100 }
101 
102 Transport::shared_pointer TransportRegistry::remove(Transport::shared_pointer const & transport)
103 {
104  assert(!!transport);
105  const Key key(transport->getRemoteAddress(), transport->getPriority());
106  Transport::shared_pointer ret;
107 
108  pvd::Lock guard(_mutex);
109  transports_t::iterator it(transports.find(key));
110  if(it!=transports.end()) {
111  ret.swap(it->second);
112  transports.erase(it);
113  }
114  return ret;
115 }
116 
117 #define LEAK_CHECK(PTR, NAME) if((PTR) && !(PTR).unique()) { std::cerr<<"Leaking Transport " NAME " use_count="<<(PTR).use_count()<<"\n"<<show_referrers(PTR, false);}
118 
120 {
121  transports_t temp;
122  {
123  pvd::Lock guard(_mutex);
124  transports.swap(temp);
125  }
126 
127  if(temp.empty())
128  return;
129 
130  LOG(logLevelDebug, "Context still has %zu transport(s) active and closing...", temp.size());
131 
132  for(transports_t::iterator it(temp.begin()), end(temp.end());
133  it != end; ++it)
134  {
135  it->second->close();
136  }
137 
138  for(transports_t::iterator it(temp.begin()), end(temp.end());
139  it != end; ++it)
140  {
141  const Transport::shared_pointer& transport = it->second;
142  transport->waitJoin();
143  LEAK_CHECK(transport, "tcp transport")
144  if(!transport.unique()) {
145  LOG(logLevelError, "Closed transport %s still has use_count=%u",
146  transport->getRemoteName().c_str(),
147  (unsigned)transport.use_count());
148  }
149  }
150 }
151 
153 {
154  pvd::Lock guard(_mutex);
155  return transports.size();
156 }
157 
158 void TransportRegistry::toArray(transportVector_t & transportArray, const osiSockAddr *dest)
159 {
160  pvd::Lock guard(_mutex);
161 
162  transportArray.reserve(transportArray.size() + transports.size());
163 
164  for(transports_t::const_iterator it(transports.begin()), end(transports.end());
165  it != end; ++it)
166  {
167  const Key& key = it->first;
168  const Transport::shared_pointer& tr = it->second;
169 
170  if(!dest || sockAddrAreIdentical(dest, &key.addr))
171  transportArray.push_back(tr);
172  }
173 }
174 
175 }
176 }
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
epicsMutexId lock
Definition: osiClockTime.c:37
#define THROW_EXCEPTION2(TYPE, MSG)
Transport::shared_pointer remove(Transport::shared_pointer const &transport)
TODO only here because of the Lockable.
Definition: ntaggregate.cpp:16
A lock for multithreading.
Definition: lock.h:36
void toArray(transportVector_t &transportArray, const osiSockAddr *dest=0)
#define LOG(level, format,...)
Definition: logger.h:48
pvData
Definition: monitor.h:428
Transport::shared_pointer get(const osiSockAddr &address, epics::pvData::int16 prio)
bool operator<(shared_ptr< T > const &a, shared_ptr< U > const &b) BOOST_NOEXCEPT
Definition: shared_ptr.hpp:778
std::vector< Transport::shared_pointer > transportVector_t
#define LEAK_CHECK(PTR, NAME)
int16_t int16
Definition: pvType.h:79
epicsMutex Mutex
Definition: lock.h:28
Reservation(TransportRegistry *owner, const osiSockAddr &address, epics::pvData::int16 prio)
void install(const Transport::shared_pointer &ptr)
int epicsStdCall sockAddrAreIdentical(const osiSockAddr *plhs, const osiSockAddr *prhs)
Definition: osiSock.c:35