1 /// 2 module modbus.connection.base; 3 4 public import serialport.exception; 5 6 static import serialport.base; 7 8 public import std.datetime : Duration; 9 10 /// Connection 11 interface Connection 12 { 13 @property 14 { 15 /// 16 Duration readTimeout(); 17 /// 18 Duration writeTimeout(); 19 /// 20 void readTimeout(Duration); 21 /// 22 void writeTimeout(Duration); 23 } 24 25 alias CanRead = serialport.base.SerialPort.CanRead; 26 27 /++ Write data to connection 28 29 Returns: 30 writed data length 31 +/ 32 void write(const(void)[] data); 33 34 /++ Read data from connection 35 36 Params: 37 buffer = preallocated buffer for reading 38 39 Returns: 40 slice of buffer with readed data 41 +/ 42 void[] read(void[] buffer, CanRead cr=CanRead.allOrNothing); 43 44 /// 45 void reconnect(); 46 } 47 48 /// 49 abstract class AbstractConnection : Connection 50 { 51 import std.datetime : msecs; 52 protected: 53 Duration _rtm = 10.msecs, _wtm = 10.msecs; 54 55 public: 56 57 @property override 58 { 59 Duration readTimeout() { return _rtm; } 60 Duration writeTimeout() { return _wtm; } 61 void readTimeout(Duration d) { _rtm = d; } 62 void writeTimeout(Duration d) { _wtm = d; } 63 } 64 65 abstract void write(const(void)[] data); 66 abstract void[] read(void[] buffer, CanRead cr=CanRead.allOrNothing); 67 abstract void reconnect(); 68 } 69 70 version (unittest) 71 { 72 import std.exception : assertThrown, assertNotThrown, enforce; 73 import std.stdio; 74 import core.thread; 75 import std.datetime.stopwatch; 76 import std.array; 77 import std.format; 78 } 79 80 import modbus.cbuffer; 81 82 /++ Circle buffer, for fibers only 83 +/ 84 class VirtualConnection : AbstractConnection 85 { 86 import std.datetime.stopwatch; 87 import core.thread : Fiber; 88 import serialport.exception : TimeoutException; 89 import std.exception : enforce; 90 import std.algorithm : min; 91 import std.stdio; 92 93 string name; 94 95 CBufferCls rx, tx; 96 97 this(CBufferCls rx, CBufferCls tx, string name) 98 { 99 this.name = name; 100 this.rx = rx; 101 this.tx = tx; 102 } 103 104 override: 105 106 void write(const(void)[] data) 107 { 108 auto sw = StopWatch(AutoStart.yes); 109 auto fb = enforce(Fiber.getThis, "must run in fiber"); 110 111 auto udat = cast(ubyte[])data; 112 113 foreach (i; 0 .. data.length) 114 { 115 while (tx.full) 116 { 117 fb.yield(); 118 if (sw.peek > _wtm) 119 throwTimeoutException(name, "write timeout"); 120 } 121 tx.put(udat[i]); 122 } 123 } 124 125 void[] read(void[] ext, CanRead cr=CanRead.allOrNothing) 126 { 127 auto sw = StopWatch(AutoStart.yes); 128 auto fb = enforce(Fiber.getThis, "must run in fiber"); 129 130 auto uret = cast(ubyte[])ext; 131 132 foreach (i; 0 .. uret.length) 133 { 134 while (rx.empty) 135 { 136 if (sw.peek > _rtm) 137 { 138 if (cr == CanRead.allOrNothing) 139 throwTimeoutException(name, "read timeout"); 140 else if (cr == CanRead.anyNonZero) 141 { 142 if (i != 0) return ext[0..i]; 143 throwTimeoutException(name, "read timeout"); 144 } 145 else return ext[0..i]; 146 } 147 fb.yield(); 148 } 149 uret[i] = rx.front; 150 rx.popFront; 151 } 152 return ext[]; 153 } 154 155 void reconnect() 156 { 157 rx.clear(); 158 tx.clear(); 159 } 160 } 161 162 unittest 163 { 164 auto cb = new CBufferCls(40); 165 166 auto c = new VirtualConnection(cb, cb, "test"); 167 168 void fnc() 169 { 170 enum data = "1qazxsw23edcv"; 171 void[128] tmp = void; 172 foreach (i; 0 .. 2000) 173 { 174 c.write(data); 175 auto r = cast(string)c.read(tmp[0..data.length]).idup; 176 assert(data == r); 177 } 178 } 179 180 auto f = new Fiber(&fnc); 181 182 auto sw = StopWatch(AutoStart.yes); 183 while (f.state != Fiber.State.TERM) 184 { 185 f.call; 186 Thread.sleep(1.msecs); 187 } 188 } 189 190 /// 191 VirtualConnection[2] virtualPipeConnection(size_t bufSize, string prefix) 192 { 193 auto a = new CBufferCls(bufSize); 194 auto b = new CBufferCls(bufSize); 195 return [new VirtualConnection(a, b, prefix ~ "A"), 196 new VirtualConnection(b, a, prefix ~ "B")]; 197 } 198 199 unittest 200 { 201 enum data = "1qazxsw23edcv"; 202 void[data.length] buf = void; 203 204 auto cc = virtualPipeConnection(77, "test"); 205 206 void fncA() 207 { 208 foreach (i; 0 .. 200) 209 { 210 cc[0].write(data); 211 Fiber.yield(); 212 } 213 } 214 215 void fncB() 216 { 217 foreach (i; 0 .. 100) 218 { 219 auto r = cast(string)cc[1].read(buf).idup; 220 assert(data == r); 221 Fiber.yield(); 222 } 223 224 foreach (i; 0 .. 100) 225 { 226 enum k = 5; 227 auto r = cast(string)cc[1].read(buf[0..k]).idup; 228 assert(data[0..k] == r); 229 r = cast(string)cc[1].read(buf[k..data.length]).idup; 230 assert(data[k..$] == r); 231 Fiber.yield(); 232 } 233 } 234 235 auto f1 = new Fiber(&fncA); 236 auto f2 = new Fiber(&fncB); 237 238 auto sw = StopWatch(AutoStart.yes); 239 240 while (f1.state != Fiber.State.TERM && 241 f2.state != Fiber.State.TERM && 242 sw.peek < 500.msecs) 243 { 244 f1.call; 245 f2.call; 246 Thread.sleep(1.msecs); 247 } 248 249 assertThrown( (){ foreach (i; 0 .. 100) cc[0].write(data); }()); 250 assertThrown( (){ foreach (i; 0 .. 100) cc[1].read(buf); }()); 251 }