多线程采集数据处理怎样同步才能达到不漏采数据同时处理效率最优?

解释1:你可以把数据放到队列,每次采集到数据就放入队列,这样后面采集的,就是新增到队列,而分析线程等就是到队列中一个个读取。

解释2:

仅供参考:

C/C++ code
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
//循环向a函数每次发送200个字节长度(这个是固定的)的buffer,
//a函数中需要将循环传进来的buffer,组成240字节(也是固定的)的新buffer进行处理,
//在处理的时候每次从新buffer中取两个字节打印
#ifdef _MSC_VER
     #pragma warning(disable:4996)
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifdef _MSC_VER
     #include <windows.h>
     #include <process.h>
     #include <io.h>
     #define  MYVOID              void
     #define  vsnprintf          _vsnprintf
#else
     #include <unistd.h>
     #include <sys/ time .h>
     #include <pthread.h>
     #define  CRITICAL_SECTION   pthread_mutex_t
     #define  MYVOID              void  *
#endif
//Log{
#define MAXLOGSIZE 20000000
#define MAXLINSIZE 16000
#include <time.h>
#include <sys/timeb.h>
#include <stdarg.h>
char  logfilename1[]= "MyLog1.log" ;
char  logfilename2[]= "MyLog2.log" ;
static  char  logstr[MAXLINSIZE+1];
char  datestr[16];
char  timestr[16];
char  mss[4];
CRITICAL_SECTION cs_log;
FILE  *flog;
#ifdef _MSC_VER
void  Lock(CRITICAL_SECTION *l) {
     EnterCriticalSection(l);
}
void  Unlock(CRITICAL_SECTION *l) {
     LeaveCriticalSection(l);
}
void  sleep_ms( int  ms) {
     Sleep(ms);
}
#else
void  Lock(CRITICAL_SECTION *l) {
     pthread_mutex_lock(l);
}
void  Unlock(CRITICAL_SECTION *l) {
     pthread_mutex_unlock(l);
}
void  sleep_ms( int  ms) {
     usleep(ms*1000);
}
#endif
void  LogV( const  char  *pszFmt, va_list  argp) {
     struct  tm  *now;
     struct  timeb tb;
 
     if  (NULL==pszFmt||0==pszFmt[0])  return ;
     vsnprintf(logstr,MAXLINSIZE,pszFmt,argp);
     ftime(&tb);
     now= localtime (&tb. time );
     sprintf (datestr, "%04d-%02d-%02d" ,now->tm_year+1900,now->tm_mon+1,now->tm_mday);
     sprintf (timestr, "%02d:%02d:%02d" ,now->tm_hour     ,now->tm_min  ,now->tm_sec );
     sprintf (mss, "%03d" ,tb.millitm);
     printf ( "%s %s.%s %s" ,datestr,timestr,mss,logstr);
     flog= fopen (logfilename1, "a" );
     if  (NULL!=flog) {
         fprintf (flog, "%s %s.%s %s" ,datestr,timestr,mss,logstr);
         if  ( ftell (flog)>MAXLOGSIZE) {
             fclose (flog);
             if  ( rename (logfilename1,logfilename2)) {
                 remove (logfilename2);
                 rename (logfilename1,logfilename2);
             }
         else  {
             fclose (flog);
         }
     }
}
void  Log( const  char  *pszFmt,...) {
     va_list  argp;
 
     Lock(&cs_log);
     va_start (argp,pszFmt);
     LogV(pszFmt,argp);
     va_end (argp);
     Unlock(&cs_log);
}
//Log}
#define ASIZE    200
#define BSIZE    240
#define CSIZE      2
char  Abuf[ASIZE];
char  Cbuf[CSIZE];
CRITICAL_SECTION cs_HEX;
CRITICAL_SECTION cs_BBB;
struct  FIFO_BUFFER {
     int   head;
     int   tail;
     int   size;
     char  data[BSIZE];
} BBB;
int  No_Loop=0;
void  HexDump( int  cn, char  *buf, int  len) {
     int  i,j,k;
     char  binstr[80];
 
     Lock(&cs_HEX);
     for  (i=0;i<len;i++) {
         if  (0==(i%16)) {
             sprintf (binstr, "%03d %04x -" ,cn,i);
             sprintf (binstr, "%s %02x" ,binstr,(unsigned  char )buf[i]);
         else  if  (15==(i%16)) {
             sprintf (binstr, "%s %02x" ,binstr,(unsigned  char )buf[i]);
             sprintf (binstr, "%s  " ,binstr);
             for  (j=i-15;j<=i;j++) {
                 sprintf (binstr, "%s%c" ,binstr,( '!' <buf[j]&&buf[j]<= '~' )?buf[j]: '.' );
             }
             Log( "%s\n" ,binstr);
         else  {
             sprintf (binstr, "%s %02x" ,binstr,(unsigned  char )buf[i]);
         }
     }
     if  (0!=(i%16)) {
         k=16-(i%16);
         for  (j=0;j<k;j++) {
             sprintf (binstr, "%s   " ,binstr);
         }
         sprintf (binstr, "%s  " ,binstr);
         k=16-k;
         for  (j=i-k;j<i;j++) {
             sprintf (binstr, "%s%c" ,binstr,( '!' <buf[j]&&buf[j]<= '~' )?buf[j]: '.' );
         }
         Log( "%s\n" ,binstr);
     }
     Unlock(&cs_HEX);
}
int  GetFromRBuf( int  cn,CRITICAL_SECTION *cs, struct  FIFO_BUFFER *fbuf, char  *buf, int  len) {
     int  lent,len1,len2;
 
     lent=0;
     Lock(cs);
     if  (fbuf->size>=len) {
         lent=len;
         if  (fbuf->head+lent>BSIZE) {
             len1=BSIZE-fbuf->head;
             memcpy (buf     ,fbuf->data+fbuf->head,len1);
             len2=lent-len1;
             memcpy (buf+len1,fbuf->data           ,len2);
             fbuf->head=len2;
         else  {
             memcpy (buf     ,fbuf->data+fbuf->head,lent);
             fbuf->head+=lent;
         }
         fbuf->size-=lent;
     }
     Unlock(cs);
     return  lent;
}
MYVOID thdB( void  *pcn) {
     char         *recv_buf;
     int           recv_nbytes;
     int           cn;
     int           wc;
     int           pb;
 
     cn=( int )pcn;
     Log( "%03d thdB              thread begin...\n" ,cn);
     while  (1) {
         sleep_ms(10);
         recv_buf=( char  *)Cbuf;
         recv_nbytes=CSIZE;
         wc=0;
         while  (1) {
             pb=GetFromRBuf(cn,&cs_BBB,&BBB,recv_buf,recv_nbytes);
             if  (pb) {
                 Log( "%03d recv %d bytes\n" ,cn,pb);
                 HexDump(cn,recv_buf,pb);
                 sleep_ms(1);
             else  {
                 sleep_ms(1000);
             }
             if  (No_Loop)  break ; //
             wc++;
             if  (wc>3600) Log( "%03d %d==wc>3600!\n" ,cn,wc);
         }
         if  (No_Loop)  break ; //
     }
#ifndef _MSC_VER
     pthread_exit(NULL);
#endif
}
int  PutToRBuf( int  cn,CRITICAL_SECTION *cs, struct  FIFO_BUFFER *fbuf, char  *buf, int  len) {
     int  lent,len1,len2;
 
     Lock(cs);
     lent=len;
     if  (fbuf->size+lent>BSIZE) {
         lent=BSIZE-fbuf->size;
     }
     if  (fbuf->tail+lent>BSIZE) {
         len1=BSIZE-fbuf->tail;
         memcpy (fbuf->data+fbuf->tail,buf     ,len1);
         len2=lent-len1;
         memcpy (fbuf->data           ,buf+len1,len2);
         fbuf->tail=len2;
     else  {
         memcpy (fbuf->data+fbuf->tail,buf     ,lent);
         fbuf->tail+=lent;
     }
     fbuf->size+=lent;
     Unlock(cs);
     return  lent;
}
MYVOID thdA( void  *pcn) {
     char         *send_buf;
     int           send_nbytes;
     int           cn;
     int           wc;
     int            a;
     int           pa;
 
     cn=( int )pcn;
     Log( "%03d thdA              thread begin...\n" ,cn);
     a=0;
     while  (1) {
         sleep_ms(100);
         memset (Abuf,a,ASIZE);
         a=(a+1)%256;
         if  (16==a) {No_Loop=1; break ;} //去掉这句可以让程序一直循环直到按Ctrl+C或Ctrl+Break或当前目录下存在文件No_Loop
         send_buf=( char  *)Abuf;
         send_nbytes=ASIZE;
         Log( "%03d sending %d bytes\n" ,cn,send_nbytes);
         HexDump(cn,send_buf,send_nbytes);
         wc=0;
         while  (1) {
             pa=PutToRBuf(cn,&cs_BBB,&BBB,send_buf,send_nbytes);
             Log( "%03d sent %d bytes\n" ,cn,pa);
             HexDump(cn,send_buf,pa);
             send_buf+=pa;
             send_nbytes-=pa;
             if  (send_nbytes<=0)  break ; //
             sleep_ms(1000);
             if  (No_Loop)  break ; //
             wc++;
             if  (wc>3600) Log( "%03d %d==wc>3600!\n" ,cn,wc);
         }
         if  (No_Loop)  break ; //
     }
#ifndef _MSC_VER
     pthread_exit(NULL);
#endif
}
int  main() {
#ifdef _MSC_VER
     InitializeCriticalSection(&cs_log);
     InitializeCriticalSection(&cs_HEX);
     InitializeCriticalSection(&cs_BBB);
#else
     pthread_t threads[2];
     int  threadsN;
     int  rc;
     pthread_mutex_init(&cs_log,NULL);
     pthread_mutex_init(&cs_HEX,NULL);
     pthread_mutex_init(&cs_BBB,NULL);
#endif
     Log( "Start===========================================================\n" );
 
     BBB.head=0;
     BBB.tail=0;
     BBB.size=0;
 
#ifdef _MSC_VER
     _beginthread(( void (__cdecl *)( void  *))thdA,0,( void  *)1);
     _beginthread(( void (__cdecl *)( void  *))thdB,0,( void  *)2);
#else
     threadsN=0;
     rc=pthread_create(&(threads[threadsN++]),NULL,thdA,( void  *)1); if  (rc) Log( "%d=pthread_create %d error!\n" ,rc,threadsN-1);
     rc=pthread_create(&(threads[threadsN++]),NULL,thdB,( void  *)2); if  (rc) Log( "%d=pthread_create %d error!\n" ,rc,threadsN-1);
#endif
 
     if  (!access( "No_Loop" ,0)) {
         remove ( "No_Loop" );
         if  (!access( "No_Loop" ,0)) {
             No_Loop=1;
         }
     }
     while  (1) {
         sleep_ms(1000);
         if  (No_Loop)  break ; //
         if  (!access( "No_Loop" ,0)) {
             No_Loop=1;
         }
     }
     sleep_ms(3000);
     Log( "End=============================================================\n" );
#ifdef _MSC_VER
     DeleteCriticalSection(&cs_BBB);
     DeleteCriticalSection(&cs_HEX);
     DeleteCriticalSection(&cs_log);
#else
     pthread_mutex_destroy(&cs_BBB);
     pthread_mutex_destroy(&cs_HEX);
     pthread_mutex_destroy(&cs_log);
#endif
     return  0;
}

任何收发两端速度不一致的通讯,都需要在它们之间使用一个足够大的FIFO缓冲区。
对任何FIFO缓冲区的使用,都需要仔细考虑接收端接收时超时无数据和发送端发送时FIFO缓冲区已满这两种情况下该如何做。
这些概念都在这段经典代码中有所体现。
这段经典代码还包括以下必须考虑的因素:
◆跨Windows和Linux平台
◆多线程锁
◆多线程日志
◆日志文件占用的磁盘空间可控
◆日志中的时间包括毫秒
◆传输的数据对应的每个字节到底是几
◆如何退出多线程程序
◆……

转:https://bbs.csdn.net/topics/392217131?page=1

猜你喜欢

转载自blog.csdn.net/eric_e/article/details/80585663