1 ///
2 module modbus.connection.base;
3 
4 public import serialport.exception;
5 
6 static import serialport.base;
7 
8 public import std.datetime : Duration;
9 
10 /// Connection
11 interface Connection
12 {
13     @property
14     {
15         ///
16         Duration readTimeout();
17         ///
18         Duration writeTimeout();
19         ///
20         void readTimeout(Duration);
21         ///
22         void writeTimeout(Duration);
23     }
24 
25     alias CanRead = serialport.base.SerialPort.CanRead;
26 
27     /++ Write data to connection
28 
29         Returns:
30             writed data length
31      +/
32     void write(const(void)[] data);
33 
34     /++ Read data from connection
35 
36         Params:
37             buffer = preallocated buffer for reading
38 
39         Returns:
40             slice of buffer with readed data
41      +/
42     void[] read(void[] buffer, CanRead cr=CanRead.allOrNothing);
43 
44     ///
45     void reconnect();
46 }
47 
48 ///
49 abstract class AbstractConnection : Connection
50 {
51     import std.datetime : msecs;
52 protected:
53     Duration _rtm = 10.msecs, _wtm = 10.msecs;
54 
55 public:
56 
57     @property override
58     {
59         Duration readTimeout() { return _rtm; }
60         Duration writeTimeout() { return _wtm; }
61         void readTimeout(Duration d) { _rtm = d; }
62         void writeTimeout(Duration d) { _wtm = d; }
63     }
64 
65     abstract void write(const(void)[] data);
66     abstract void[] read(void[] buffer, CanRead cr=CanRead.allOrNothing);
67     abstract void reconnect();
68 }
69 
70 version (unittest)
71 {
72     import std.exception : assertThrown, assertNotThrown, enforce;
73     import std.stdio;
74     import core.thread;
75     import std.datetime.stopwatch;
76     import std.array;
77     import std.format;
78 }
79 
80 import modbus.cbuffer;
81 
82 /++ Circle buffer, for fibers only
83  +/
84 class VirtualConnection : AbstractConnection
85 {
86     import std.datetime.stopwatch;
87     import core.thread : Fiber;
88     import serialport.exception : TimeoutException;
89     import std.exception : enforce;
90     import std.algorithm : min;
91     import std.stdio;
92 
93     string name;
94 
95     CBufferCls rx, tx;
96 
97     this(CBufferCls rx, CBufferCls tx, string name)
98     {
99         this.name = name;
100         this.rx = rx;
101         this.tx = tx;
102     }
103 
104 override:
105 
106     void write(const(void)[] data)
107     {
108         auto sw = StopWatch(AutoStart.yes);
109         auto fb = enforce(Fiber.getThis, "must run in fiber");
110 
111         auto udat = cast(ubyte[])data;
112 
113         foreach (i; 0 .. data.length)
114         {
115             while (tx.full)
116             {
117                 fb.yield();
118                 if (sw.peek > _wtm)
119                     throwTimeoutException(name, "write timeout");
120             }
121             tx.put(udat[i]);
122         }
123     }
124 
125     void[] read(void[] ext, CanRead cr=CanRead.allOrNothing)
126     {
127         auto sw = StopWatch(AutoStart.yes);
128         auto fb = enforce(Fiber.getThis, "must run in fiber");
129 
130         auto uret = cast(ubyte[])ext;
131 
132         foreach (i; 0 .. uret.length)
133         {
134             while (rx.empty)
135             {
136                 if (sw.peek > _rtm)
137                 {
138                     if (cr == CanRead.allOrNothing)
139                         throwTimeoutException(name, "read timeout");
140                     else if (cr == CanRead.anyNonZero)
141                     {
142                         if (i != 0) return ext[0..i];
143                         throwTimeoutException(name, "read timeout");
144                     }
145                     else return ext[0..i];
146                 }
147                 fb.yield();
148             }
149             uret[i] = rx.front;
150             rx.popFront;
151         }
152         return ext[];
153     }
154 
155     void reconnect()
156     {
157         rx.clear();
158         tx.clear();
159     }
160 }
161 
162 unittest
163 {
164     auto cb = new CBufferCls(40);
165 
166     auto c = new VirtualConnection(cb, cb, "test");
167 
168     void fnc()
169     {
170         enum data = "1qazxsw23edcv";
171         void[128] tmp = void;
172         foreach (i; 0 .. 2000)
173         {
174             c.write(data);
175             auto r = cast(string)c.read(tmp[0..data.length]).idup;
176             assert(data == r);
177         }
178     }
179 
180     auto f = new Fiber(&fnc);
181 
182     auto sw = StopWatch(AutoStart.yes);
183     while (f.state != Fiber.State.TERM)
184     {
185         f.call;
186         Thread.sleep(1.msecs);
187     }
188 }
189 
190 ///
191 VirtualConnection[2] virtualPipeConnection(size_t bufSize, string prefix)
192 {
193     auto a = new CBufferCls(bufSize);
194     auto b = new CBufferCls(bufSize);
195     return [new VirtualConnection(a, b, prefix ~ "A"),
196             new VirtualConnection(b, a, prefix ~ "B")];
197 }
198 
199 unittest
200 {
201     enum data = "1qazxsw23edcv";
202     void[data.length] buf = void;
203 
204     auto cc = virtualPipeConnection(77, "test");
205 
206     void fncA()
207     {
208         foreach (i; 0 .. 200)
209         {
210             cc[0].write(data);
211             Fiber.yield();
212         }
213     }
214 
215     void fncB()
216     {
217         foreach (i; 0 .. 100)
218         {
219             auto r = cast(string)cc[1].read(buf).idup;
220             assert(data == r);
221             Fiber.yield();
222         }
223 
224         foreach (i; 0 .. 100)
225         {
226             enum k = 5;
227             auto r = cast(string)cc[1].read(buf[0..k]).idup;
228             assert(data[0..k] == r);
229             r = cast(string)cc[1].read(buf[k..data.length]).idup;
230             assert(data[k..$] == r);
231             Fiber.yield();
232         }
233     }
234 
235     auto f1 = new Fiber(&fncA);
236     auto f2 = new Fiber(&fncB);
237 
238     auto sw = StopWatch(AutoStart.yes);
239 
240     while (f1.state != Fiber.State.TERM &&
241            f2.state != Fiber.State.TERM &&
242            sw.peek < 500.msecs)
243     {
244         f1.call;
245         f2.call;
246         Thread.sleep(1.msecs);
247     }
248 
249     assertThrown( (){ foreach (i; 0 .. 100) cc[0].write(data); }());
250     assertThrown( (){ foreach (i; 0 .. 100) cc[1].read(buf); }());
251 }