1 /// modbus with back end 2 module modbus.facade; 3 4 import modbus.backend; 5 import modbus.protocol; 6 7 public import std.datetime : Duration, dur, hnsecs, usecs, nsecs, msecs, seconds; 8 9 import modbus.connection.tcp; 10 import std.socket; 11 12 import modbus.connection.rtu; 13 14 public import serialport; 15 public import std.socket : InternetAddress, Internet6Address; 16 17 /// Modbus master with RTU backend 18 class ModbusRTUMaster : ModbusMaster 19 { 20 protected: 21 SerialPortConnection spcom; 22 23 public: 24 25 /// 26 this(SerialPort sp, SpecRules sr=null) 27 { 28 spcom = new SerialPortConnection(sp); 29 super(new RTU(sr), spcom); 30 } 31 32 /// 33 inout(SerialPort) port() inout @property { return spcom.port; } 34 } 35 36 /// Modbus slave with RTU backend 37 class ModbusRTUSlave : ModbusSlave 38 { 39 protected: 40 SerialPortConnection spcom; 41 42 public: 43 44 /// 45 this(ModbusSlaveModel mdl, SerialPort sp, SpecRules sr=null, MessageFinder mf=null) 46 { 47 spcom = new SerialPortConnection(sp); 48 super(mdl, new RTU(sr), spcom); 49 } 50 51 /// 52 inout(SerialPort) port() inout @property { return spcom.port; } 53 } 54 55 /// Modbus master with TCP backend based on TcpSocket from std.socket 56 class ModbusTCPMaster : ModbusMaster 57 { 58 protected: 59 MasterTcpConnection mtc; 60 61 public: 62 /// 63 this(Address addr, void delegate(Duration) sf=null, SpecRules sr=null) 64 { 65 mtc = new MasterTcpConnection(addr, sf); 66 super(new TCP(sr), mtc); 67 } 68 69 /// 70 inout(Socket) socket() inout @property { return mtc.socket; } 71 72 /// 73 void halt() { mtc.close(); } 74 } 75 76 /// Modbus 77 class ModbusTCPSlaveServer 78 { 79 protected: 80 81 import core.thread : Fiber; 82 import modbus.msleep : msleep; 83 84 ModbusSlave.MessageFinder messageFinder; 85 ModbusSlaveModel model; 86 TCP be; 87 TcpSocket serv; 88 SocketSet ss; 89 90 void delegate(Duration) sleepFunc; 91 92 void sleep(Duration d) 93 { 94 if (sleepFunc !is null) sleepFunc(d); 95 else msleep(d); 96 } 97 98 void yield() { this.sleep(Duration.zero); } 99 100 static class MBS : Fiber 101 { 102 ModbusSlave mb; 103 SlaveTcpConnection con; 104 105 this(ModbusSlave mb, SlaveTcpConnection con) 106 { 107 this.mb = mb; 108 this.con = con; 109 super(&run); 110 } 111 112 void run() { while (true) mb.iterate; } 113 } 114 115 MBS[] slaves; 116 size_t maxConCount; 117 118 public: 119 120 /// 121 this(ModbusSlaveModel mdl, Address addr, void delegate(Duration) sf, 122 SpecRules sr=null, ModbusSlave.MessageFinder mf=null) 123 { this(mdl, addr, 16, 128, sf, sr, mf); } 124 125 /// 126 this(ModbusSlaveModel mdl, Address addr, int acceptConQueueLen=16, 127 size_t maxConCount=128, void delegate(Duration) sf=null, SpecRules sr=null, 128 ModbusSlave.MessageFinder mf=null) 129 { this(mdl, addr, false, acceptConQueueLen, maxConCount, sf, sr, mf); } 130 131 /// 132 this(ModbusSlaveModel mdl, Address addr, bool reuseAddr, int acceptConQueueLen=16, 133 size_t maxConCount=128, void delegate(Duration) sf=null, SpecRules sr=null, 134 ModbusSlave.MessageFinder mf=null) 135 { 136 model = mdl; 137 be = new TCP(sr); 138 sleepFunc = sf; 139 messageFinder = mf; 140 141 serv = new TcpSocket(addr.addressFamily); 142 serv.blocking = false; 143 if (reuseAddr) 144 serv.setOption(SocketOptionLevel.SOCKET, 145 SocketOption.REUSEADDR, 1); 146 serv.bind(addr); 147 serv.listen(acceptConQueueLen); 148 this.maxConCount = maxConCount; 149 150 ss = new SocketSet; 151 } 152 153 /// 154 void iterate() 155 { 156 ss.reset(); 157 ss.add(serv); 158 159 if (slaves.any!(a=>!a.con.isAlive)) 160 { 161 debug version (unittest) auto oldCount = slaves.length; 162 import std.range : enumerate; 163 ptrdiff_t last=-1; 164 foreach (i, s; enumerate(slaves.filter!(a=>a.con.isAlive))) 165 { 166 slaves[i] = s; 167 last = i; 168 } 169 slaves.length = last+1; 170 debug version (unittest) testPrintf!("reduce slaves: %d -> %d")(oldCount, slaves.length); 171 } 172 173 foreach (sl; slaves) 174 { 175 try 176 { 177 try sl.call; 178 catch (CloseTcpConnection) 179 sl.con.close(); 180 } 181 catch (Exception e) 182 { 183 error("modbus tcp exception: ", e.msg); 184 sl.con.close(); 185 } 186 187 this.yield(); 188 } 189 190 while (Socket.select(ss, null, null, Duration.zero) && ss.isSet(serv)) 191 { 192 auto s = serv.accept; 193 debug version (unittest) testPrintf!("slaves: %d [max %d]")(slaves.length, maxConCount); 194 if (slaves.length >= maxConCount) 195 { 196 s.shutdown(SocketShutdown.BOTH); 197 s.close(); 198 return; 199 } 200 auto con = new SlaveTcpConnection(s, sleepFunc); 201 slaves ~= new MBS(new ModbusSlave(model, be, con, messageFinder), con); 202 this.yield(); 203 } 204 } 205 206 /// 207 inout(Socket) socket() inout @property { return serv; } 208 209 /// 210 void halt() 211 { 212 foreach (sl; slaves) sl.con.close(); 213 serv.shutdown(SocketShutdown.BOTH); 214 serv.close(); 215 } 216 } 217 218 version (unittest): 219 220 import modbus.ut; 221 222 enum dataRegCnt = TestModbusSlaveDevice.Data.sizeof/2; 223 224 struct TInfo 225 { 226 ulong mbn; 227 string[2] dev; 228 string mode; 229 string addr; 230 ushort port; 231 Duration worktime; 232 } 233 234 struct Exc 235 { 236 string msg; 237 string file; 238 size_t line; 239 } 240 241 Exc exc(string msg, string file=__FILE__, size_t line=__LINE__) 242 { return Exc(msg, file, line); } 243 244 unittest 245 { 246 mixin(mainTestMix); 247 248 auto cp = getPlatformComPipe(BUFFER_SIZE); 249 250 if (cp is null) 251 { 252 stderr.writeln(" platform doesn't support real test"); 253 return; 254 } 255 256 stderr.writefln(" port source `%s`\n", cp.command); 257 try cp.open(); 258 catch (Exception e) stderr.writeln(" can't open com pipe: ", e.msg); 259 scope (exit) cp.close(); 260 stderr.writefln(" pipe ports: %s <=> %s", cp.ports[0], cp.ports[1]); 261 262 auto tInfo = TInfo(42, cp.ports, "8N1", "127.0.0.1", cast(ushort)uniform(8110, 8120), 5.seconds); 263 264 ut!(multiTreadFacadeTest, "multiThread facade test (slave as slave)")(tInfo, true); 265 tInfo.port++; 266 ut!(multiTreadFacadeTest, "multiThread facade test (slave as sniffer)")(tInfo, false); 267 } 268 269 void multiTreadFacadeTest(TInfo tInfo, bool ss) 270 { 271 size_t n; 272 spawnLinked(&sFnc, tInfo, ss); n++; 273 spawnLinked(&mFnc, tInfo); n++; 274 spawnLinked(&mTcpFnc, tInfo, true); n++; 275 276 version (linux) 277 { 278 spawnLinked(&mTcpFnc, tInfo, false); n++; 279 } 280 281 foreach (i; 0 .. n) 282 receive( 283 (LinkTerminated lt) { }, 284 (Exc e) { throw new Exception(e.msg); } 285 ); 286 287 } 288 289 void sFnc(TInfo info, bool ss) 290 { 291 try 292 { 293 auto mslp = delegate (Duration d) @nogc { msleep(d); }; 294 295 auto mdl = new MultiDevModbusSlaveModel; 296 mdl.devices ~= new TestModbusSlaveDevice(info.mbn); 297 298 auto mdlnode = new NodeModbusSlaveModel; 299 mdlnode.models ~= mdl; 300 301 auto sp = new SerialPortFR(info.dev[1], info.mode, mslp); 302 scope (exit) sp.close(); 303 auto ia = new InternetAddress(info.addr, info.port); 304 305 auto rtumbs = new ModbusRTUSlave(mdlnode, sp, null, 306 ss ? null : new ModbusSlave.SnifferMessageFinder); 307 auto tcpmbs = new ModbusTCPSlaveServer(mdl, ia, 16, 16, mslp, null, 308 ss ? null : new ModbusSlave.SnifferMessageFinder); 309 scope (exit) tcpmbs.halt(); 310 311 const sw = StopWatch(AutoStart.yes); 312 while (sw.peek < info.worktime + 500.msecs) 313 { 314 rtumbs.iterate(); 315 tcpmbs.iterate(); 316 mslp(1.msecs); 317 } 318 } 319 catch (Throwable e) 320 send(ownerTid, exc(e.msg, e.file, e.line)); 321 } 322 323 void mFnc(TInfo info) 324 { 325 auto mslp = delegate (Duration d) @nogc { msleep(d); }; 326 327 auto sp = new SerialPortFR(info.dev[0], info.mode, mslp); 328 scope (exit) sp.close(); 329 auto ia = new InternetAddress(info.addr, info.port); 330 331 auto rtumbm = new ModbusRTUMaster(sp); 332 auto tcpmbm = new ModbusTCPMaster(ia, mslp); 333 rtumbm.connection.readTimeout = 1.seconds; 334 tcpmbm.connection.readTimeout = 1.seconds; 335 scope (exit) tcpmbm.halt(); 336 337 const sw = StopWatch(AutoStart.yes); 338 try 339 { 340 while (sw.peek < info.worktime) 341 { 342 auto s = cast(ushort)uniform(0, dataRegCnt-2); 343 auto c = cast(ushort)uniform(1, dataRegCnt-s); 344 345 const(ushort)[] rtu_vals, tcp_vals; 346 347 try rtu_vals = rtumbm.readInputRegisters(info.mbn, s, c); 348 catch (Exception e) testPrint("RTU read throws: " ~ e.msg); 349 350 try tcp_vals = tcpmbm.readInputRegisters(info.mbn, s, c); 351 catch (Exception e) testPrint("TCP read throws: " ~ e.msg); 352 353 if (rtu_vals != rtu_vals.init && tcp_vals != tcp_vals.init) 354 if (!equal(rtu_vals, tcp_vals)) 355 send(ownerTid, exc("fail check")); 356 357 msleep(1.msecs); 358 } 359 } 360 catch (Throwable e) 361 { 362 testPrint(__FUNCTION__~ " "~ e.msg); 363 assert(0); 364 } 365 } 366 367 void mTcpFnc(TInfo info, bool needHalt=false) 368 { 369 auto mslp = delegate (Duration d) @nogc { msleep(d); }; 370 371 auto ia = new InternetAddress(info.addr, info.port); 372 373 const sw = StopWatch(AutoStart.yes); 374 while (sw.peek < info.worktime) 375 { 376 try 377 { 378 auto tcpmbm = new ModbusTCPMaster(ia, mslp); 379 tcpmbm.connection.readTimeout = 1.seconds; 380 auto vals = tcpmbm.readInputRegisters(info.mbn, 0, dataRegCnt); 381 msleep(10.msecs); 382 if (needHalt) tcpmbm.halt(); 383 } 384 catch (Throwable e) 385 { 386 testPrintf!(__FUNCTION__~ " needHalt %s, msg: %s")(needHalt, e.msg); 387 assert(0); 388 } 389 msleep(10.msecs); 390 } 391 }