这是我们实习的一部分任务,由于全部是在内网上做的,拿不出来我就重新打一遍好了。
总共分为这么几步。1.阅读索引文件。2.阅读数据文件。3将文件反序列化。4.将文件写入kafka
1 阅读索引文件。
阅读二进制索引文件前,由于我的文件是小端方式,字节流存放,所以中间需要将数值大于127(会表现为负数以补码形式存放)的字节先变为正数。
public transform
{
public static int getpos(int a)
int l;
int[] tc=new int[7];
int b;
b=Math.abs(a);//取绝对值
b-=1;
for(int j=6;j>=0;j--)
{
tc[j]=b%2;
b=b/2;
}
for(int k=0;k<7;k++)
{
if(tc[k]==0){tc[k]=1;}
else if(tc[k]==1;){tc[k]=0;}//C语言中~方法也可以取反 不知道在java中是否适用
}
l=tc[6]+tc[5]*2+tc[4]*2*2+tc[3]*2*2*2+tc[2]*2*2*2*2+tc[1]*2*2*2*2*2+tc[0]*2*2*2*2*2*2+128;
return l;
}
然后读取索引文件
import java.io.FileInputStream;
import java.io.IOException;
import java.io.DataInputStream;
public static long getindex(int a)//返回第a个索引所代表的的值
{
long[] t1=new long[10000];
int [] tb=new int[8];
long g;
try{
FileInputStream f=new FileInputStream("文件路径");
DataInputStream df=new DataInputStream(f);
int i=0,j=0;
int k=0;
for(j=0;j<80000;j++)
if(j%8==0)
for(i=0;i<8;i++)
{
tb[i]=df.readByte();
if(tb[i]<0){tb[i]=getpos(tb[i]);}
}
t1[k]=tb[0]+tb[1]*16*16+tb[2]*16*16*16*16+tb[3]*16*16*16*16*16*16
k++;
}
f.close();
catch(IOException e)
{
System.out.println("a mistake has taken place"+e);
e.printStackTrace();}
g=t1[a];
return g;
然后根据索引文件读取数据文件就行了
import java.io.FileInputStream;
import java.io.IOException;
import java.io.DataInputStream;
public class readdata{
public static void main(String arg[]s)
byte[] b1=new byte[100];
long iv[]=new long[100];
int h;
int i=.0;
int k;//第k个消息
int p=0;//
long c =0;
for(int j=0;j<100;j++)
{
iv[j]=readindex.getindex(j);
}
try{
FileInputStream f=new FileInputStreaam("文件路径”);
DataInputStream df=new DataInputStream(f);
for(i=0;k<100;k++)
{
c=iv[k]-iv[k-1]
for(i=0;i<c;i++)
{
b1[i]=df.readByte();
System.out.print(b1[i]);
}
System.out.println();
for(i=0;i<40;i++)
{b1[i]=0;}
f.close();
}catch(IOException e)
{
System.out.println("mistake"+e);
e.printStackTrace();
}
}
}
这样就可以将每包的数据读出来了,不过有一些局限性,读取少量数据没有问题,还有就是没有读第一包数据。
下面就是将文件反序列化并写入kafka了。下面这部分是用的公司的反序列化工具然后用生产者进程写入kafka。
我就不细将了。