1 /// modbus with back end
2 module modbus.facade;
3 
4 import modbus.backend;
5 import modbus.protocol;
6 
7 public import std.datetime : Duration, dur, hnsecs, usecs, nsecs, msecs, seconds;
8 
9 import modbus.connection.tcp;
10 import std.socket;
11 
12 import modbus.connection.rtu;
13 
14 public import serialport;
15 public import std.socket : InternetAddress, Internet6Address;
16 
17 /// Modbus master with RTU backend
18 class ModbusRTUMaster : ModbusMaster
19 {
20 protected:
21     SerialPortConnection spcom;
22 
23 public:
24 
25     ///
26     this(SerialPort sp, SpecRules sr=null)
27     {
28         spcom = new SerialPortConnection(sp);
29         super(new RTU(sr), spcom);
30     }
31 
32     ///
33     inout(SerialPort) port() inout @property { return spcom.port; }
34 }
35 
36 /// Modbus slave with RTU backend
37 class ModbusRTUSlave : ModbusSlave
38 {
39 protected:
40     SerialPortConnection spcom;
41 
42 public:
43 
44     ///
45     this(ModbusSlaveModel mdl, SerialPort sp, SpecRules sr=null, MessageFinder mf=null)
46     {
47         spcom = new SerialPortConnection(sp);
48         super(mdl, new RTU(sr), spcom);
49     }
50 
51     ///
52     inout(SerialPort) port() inout @property { return spcom.port; }
53 }
54 
55 /// Modbus master with TCP backend based on TcpSocket from std.socket
56 class ModbusTCPMaster : ModbusMaster
57 {
58 protected:
59     MasterTcpConnection mtc;
60 
61 public:
62     ///
63     this(Address addr, void delegate(Duration) sf=null, SpecRules sr=null)
64     {
65         mtc = new MasterTcpConnection(addr, sf);
66         super(new TCP(sr), mtc);
67     }
68 
69     ///
70     inout(Socket) socket() inout @property { return mtc.socket; }
71 
72     ///
73     void halt() { mtc.close(); }
74 }
75 
76 /// Modbus
77 class ModbusTCPSlaveServer
78 {
79 protected:
80 
81     import core.thread : Fiber;
82     import modbus.msleep : msleep;
83 
84     ModbusSlave.MessageFinder messageFinder;
85     ModbusSlaveModel model;
86     TCP be;
87     TcpSocket serv;
88     SocketSet ss;
89 
90     void delegate(Duration) sleepFunc;
91 
92     void sleep(Duration d)
93     {
94         if (sleepFunc !is null) sleepFunc(d);
95         else msleep(d);
96     }
97 
98     void yield() { this.sleep(Duration.zero); }
99 
100     static class MBS : Fiber
101     {
102         ModbusSlave mb;
103         SlaveTcpConnection con;
104 
105         this(ModbusSlave mb, SlaveTcpConnection con)
106         {
107             this.mb = mb;
108             this.con = con;
109             super(&run);
110         }
111 
112         void run() { while (true) mb.iterate; }
113     }
114 
115     MBS[] slaves;
116     size_t maxConCount;
117 
118 public:
119 
120     ///
121     this(ModbusSlaveModel mdl, Address addr, void delegate(Duration) sf,
122          SpecRules sr=null, ModbusSlave.MessageFinder mf=null)
123     { this(mdl, addr, 16, 128, sf, sr, mf); }
124 
125     ///
126     this(ModbusSlaveModel mdl, Address addr, int acceptConQueueLen=16,
127          size_t maxConCount=128, void delegate(Duration) sf=null, SpecRules sr=null,
128          ModbusSlave.MessageFinder mf=null)
129     { this(mdl, addr, false, acceptConQueueLen, maxConCount, sf, sr, mf); }
130 
131     ///
132     this(ModbusSlaveModel mdl, Address addr, bool reuseAddr, int acceptConQueueLen=16,
133          size_t maxConCount=128, void delegate(Duration) sf=null, SpecRules sr=null,
134          ModbusSlave.MessageFinder mf=null)
135     {
136         model = mdl;
137         be = new TCP(sr);
138         sleepFunc = sf;
139         messageFinder = mf;
140 
141         serv = new TcpSocket(addr.addressFamily);
142         serv.blocking = false;
143         if (reuseAddr)
144             serv.setOption(SocketOptionLevel.SOCKET,
145                            SocketOption.REUSEADDR, 1);
146         serv.bind(addr);
147         serv.listen(acceptConQueueLen);
148         this.maxConCount = maxConCount;
149 
150         ss = new SocketSet;
151     }
152 
153     ///
154     void iterate()
155     {
156         ss.reset();
157         ss.add(serv);
158 
159         if (slaves.any!(a=>!a.con.isAlive))
160         {
161             debug version (unittest) auto oldCount = slaves.length;
162             import std.range : enumerate;
163             ptrdiff_t last=-1;
164             foreach (i, s; enumerate(slaves.filter!(a=>a.con.isAlive)))
165             {
166                 slaves[i] = s;
167                 last = i;
168             }
169             slaves.length = last+1;
170             debug version (unittest) testPrintf!("reduce slaves: %d -> %d")(oldCount, slaves.length);
171         }
172 
173         foreach (sl; slaves)
174         {
175             try
176             {
177                 try sl.call;
178                 catch (CloseTcpConnection)
179                     sl.con.close();
180             }
181             catch (Exception e)
182             {
183                 error("modbus tcp exception: ", e.msg);
184                 sl.con.close();
185             }
186 
187             this.yield();
188         }
189 
190         while (Socket.select(ss, null, null, Duration.zero) && ss.isSet(serv))
191         {
192             auto s = serv.accept;
193             debug version (unittest) testPrintf!("slaves: %d [max %d]")(slaves.length, maxConCount);
194             if (slaves.length >= maxConCount)
195             {
196                 s.shutdown(SocketShutdown.BOTH);
197                 s.close();
198                 return;
199             }
200             auto con = new SlaveTcpConnection(s, sleepFunc);
201             slaves ~= new MBS(new ModbusSlave(model, be, con, messageFinder), con);
202             this.yield();
203         }
204     }
205 
206     ///
207     inout(Socket) socket() inout @property { return serv; }
208 
209     ///
210     void halt()
211     {
212         foreach (sl; slaves) sl.con.close();
213         serv.shutdown(SocketShutdown.BOTH);
214         serv.close();
215     }
216 }
217 
218 version (unittest):
219 
220 import modbus.ut;
221 
222 enum dataRegCnt = TestModbusSlaveDevice.Data.sizeof/2;
223 
224 struct TInfo
225 {
226     ulong mbn;
227     string[2] dev;
228     string mode;
229     string addr;
230     ushort port;
231     Duration worktime;
232 }
233 
234 struct Exc
235 {
236     string msg;
237     string file;
238     size_t line;
239 }
240 
241 Exc exc(string msg, string file=__FILE__, size_t line=__LINE__)
242 { return Exc(msg, file, line); }
243 
244 unittest
245 {
246     mixin(mainTestMix);
247 
248     auto cp = getPlatformComPipe(BUFFER_SIZE);
249 
250     if (cp is null)
251     {
252         stderr.writeln(" platform doesn't support real test");
253         return;
254     }
255 
256     stderr.writefln(" port source `%s`\n", cp.command);
257     try cp.open();
258     catch (Exception e) stderr.writeln(" can't open com pipe: ", e.msg);
259     scope (exit) cp.close();
260     stderr.writefln(" pipe ports: %s <=> %s", cp.ports[0], cp.ports[1]);
261 
262     auto tInfo = TInfo(42, cp.ports, "8N1", "127.0.0.1", cast(ushort)uniform(8110, 8120), 5.seconds);
263 
264     ut!(multiTreadFacadeTest, "multiThread facade test (slave as slave)")(tInfo, true);
265     tInfo.port++;
266     ut!(multiTreadFacadeTest, "multiThread facade test (slave as sniffer)")(tInfo, false);
267 }
268 
269 void multiTreadFacadeTest(TInfo tInfo, bool ss)
270 {
271     size_t n;
272     spawnLinked(&sFnc, tInfo, ss); n++;
273     spawnLinked(&mFnc, tInfo); n++;
274     spawnLinked(&mTcpFnc, tInfo, true); n++;
275 
276     version (linux)
277     {
278         spawnLinked(&mTcpFnc, tInfo, false); n++;
279     }
280 
281     foreach (i; 0 .. n)
282         receive(
283             (LinkTerminated lt) { },
284             (Exc e) { throw new Exception(e.msg); }
285             );
286 
287 }
288 
289 void sFnc(TInfo info, bool ss)
290 {
291     try
292     {
293         auto mslp = delegate (Duration d) @nogc { msleep(d); };
294 
295         auto mdl = new MultiDevModbusSlaveModel;
296         mdl.devices ~= new TestModbusSlaveDevice(info.mbn);
297 
298         auto mdlnode = new NodeModbusSlaveModel;
299         mdlnode.models ~= mdl;
300 
301         auto sp = new SerialPortFR(info.dev[1], info.mode, mslp);
302         scope (exit) sp.close();
303         auto ia = new InternetAddress(info.addr, info.port);
304 
305         auto rtumbs = new ModbusRTUSlave(mdlnode, sp, null,
306                         ss ? null : new ModbusSlave.SnifferMessageFinder);
307         auto tcpmbs = new ModbusTCPSlaveServer(mdl, ia, 16, 16, mslp, null,
308                         ss ? null : new ModbusSlave.SnifferMessageFinder);
309         scope (exit) tcpmbs.halt();
310 
311         const sw = StopWatch(AutoStart.yes);
312         while (sw.peek < info.worktime + 500.msecs)
313         {
314             rtumbs.iterate();
315             tcpmbs.iterate();
316             mslp(1.msecs);
317         }
318     }
319     catch (Throwable e)
320         send(ownerTid, exc(e.msg, e.file, e.line));
321 }
322 
323 void mFnc(TInfo info)
324 {
325     auto mslp = delegate (Duration d) @nogc { msleep(d); };
326 
327     auto sp = new SerialPortFR(info.dev[0], info.mode, mslp);
328     scope (exit) sp.close();
329     auto ia = new InternetAddress(info.addr, info.port);
330 
331     auto rtumbm = new ModbusRTUMaster(sp);
332     auto tcpmbm = new ModbusTCPMaster(ia, mslp);
333     rtumbm.connection.readTimeout = 1.seconds;
334     tcpmbm.connection.readTimeout = 1.seconds;
335     scope (exit) tcpmbm.halt();
336 
337     const sw = StopWatch(AutoStart.yes);
338     try
339     {
340         while (sw.peek < info.worktime)
341         {
342             auto s = cast(ushort)uniform(0, dataRegCnt-2);
343             auto c = cast(ushort)uniform(1, dataRegCnt-s);
344 
345             const(ushort)[] rtu_vals, tcp_vals;
346 
347             try rtu_vals = rtumbm.readInputRegisters(info.mbn, s, c);
348             catch (Exception e) testPrint("RTU read throws: " ~ e.msg);
349 
350             try tcp_vals = tcpmbm.readInputRegisters(info.mbn, s, c);
351             catch (Exception e) testPrint("TCP read throws: " ~ e.msg);
352 
353             if (rtu_vals != rtu_vals.init && tcp_vals != tcp_vals.init)
354                 if (!equal(rtu_vals, tcp_vals))
355                     send(ownerTid, exc("fail check"));
356 
357             msleep(1.msecs);
358         }
359     }
360     catch (Throwable e)
361     {
362         testPrint(__FUNCTION__~ " "~ e.msg);
363         assert(0);
364     }
365 }
366 
367 void mTcpFnc(TInfo info, bool needHalt=false)
368 {
369     auto mslp = delegate (Duration d) @nogc { msleep(d); };
370 
371     auto ia = new InternetAddress(info.addr, info.port);
372 
373     const sw = StopWatch(AutoStart.yes);
374     while (sw.peek < info.worktime)
375     {
376         try
377         {
378             auto tcpmbm = new ModbusTCPMaster(ia, mslp);
379             tcpmbm.connection.readTimeout = 1.seconds;
380             auto vals = tcpmbm.readInputRegisters(info.mbn, 0, dataRegCnt);
381             msleep(10.msecs);
382             if (needHalt) tcpmbm.halt();
383         }
384         catch (Throwable e)
385         {
386             testPrintf!(__FUNCTION__~ " needHalt %s, msg: %s")(needHalt, e.msg);
387             assert(0);
388         }
389         msleep(10.msecs);
390     }
391 }