Python直接序列扩频仿真
基于Python实现的直接序列扩频仿真系统
一、背景
直接序列扩频通信系统(Direct-Sequence Spread Spectrum, DSSS)是目前应用最为广泛的系统。
在发送端,DSSS将发送序列利用伪随机序列扩展到一个很宽的频带上去,
在接收端又用相同的扩频序列进行解扩,恢复出原有信息。由于干扰信息
与伪随机序列不相关,扩频后能够使窄带干扰得到有效抑制,提高输出信噪比。
二、系统目标
本次仿真的DSSS系统目标是利用python实现100Hz扩频序列,100/7Hz的二进制比特信息 100Hz的7位双极性m序列,2000Hz的载波信号cos4000$\pi$t,BPSK调制以及高斯白噪声, 在接收端实现2000Hz的恢复载波,以及凯塞窗低通滤波器,对滤波后的信号进行抽样判决。
三、Python实现
发射端
M序列码产生
7位M序列码可以直接通过3个比特位的寄存器实现。具体实现可以查看别的博客。 7位M序列码可以通过对第一个比特位和最后一个比特位的模二相加实现。 笔者算得的一个7位M序列码为1101001,具体代码实现如下:
def mSequence(num: int = 3, pose: list[int] = [1, 3]) -> list[list]:
mSeqGen: list[int] = [1 if i == 0 else 0 for i in range(num)]
mSeq: list[int] = list(range(2**num-1))
for i in range(2**num-1):
mSeqGen_ = mSeqGen.copy()
for j in range(num-1):
mSeqGen[j+1] = mSeqGen_[j]
mSeqGen[0] = sum(
[mSeqGen_[k]if k+1 in pose else 0 for k in range(num)]) % 2
mSeq[i] = mSeqGen[0]
mSeqContinuos: list[Union[int, float]] = continuousFormat(
[1 if i == 0 else -1 for i in mSeq], 100)
mSeqContinuos_: list[int] = [int(i) for i in mSeqContinuos]
return [mSeq, mSeqContinuos_]
参数解释,输入参数num是m序列的阶数,因为要输出7位m序列,所以阶数默认是3。
pose参数是反馈的参数。因为3阶m序列信号的产生是第1位和第3位为反馈,所以是1,3。
连续采样
信号是连续的,但是计算机存储信息是离散的,所以笔者实现了一个对信号进行高频率离散采样的python函数。 因为调制频率是2000Hz,根据奈奎斯特采样定律,要想无失真从采样信号中恢复出来原始信号,采样频率至少需要4000Hz。 所以笔者采用5000Hz的采样频率的采样信号表达连续信号。 具体的python程序实现如下:
def continuousFormat(data: Union[list[Union[float, int]], list[float], list[int]], freq: Union[float, int], samplingFreq: Union[float, int] = 5000) -> list[Union[float, int]]:
listLen: int = math.ceil(len(data)*samplingFreq/freq)
continuousOut: list[float] = list(range(listLen))
pointer: int = 1
for i in range(listLen):
if i/samplingFreq > pointer/freq:
pointer += 1
continuousOut[i] = data[pointer-1]
else:
continuousOut[i] = data[pointer-1]
return continuousOut
这里data是输入的信号,freq是输入信号的频率。samlingFreq是采样的频率,默认是5000Hz。 输出是经过5000Hz采样后的信号。
输入信号产生
本节就通过python的随机数模块产生100个二进制信号,然后将0映射为1,将1映射为-1产生100/7Hz的NRZ信号。 具体python实现如下:
def generateData(length: int = 100, freq: Union[float, int] = 100/7, samFreq: Union[float, int] = 5000) -> list[list]:
data: list[int] = [random.randrange(0, 2) for _ in range(length)]
dataSignal: list[int] = [1 if i == 0 else -1 for i in data]
data_: list[Union[float, int]] = continuousFormat(
dataSignal, freq, samFreq)
dataOut: list[int] = [int(i)for i in data_]
return [data, dataOut]
输入参数length是要产生的数据长度,freq是产生输入信号的频率。samFreq是将信号连续采样的频率。
输出的data是原始的二进制信号,dataOut是输出的5000Hz采样的NRZ信号。
DSSS扩频
DSSS扩频就直接通过对M序列码和NRZ输入信号相乘即可得出DSSS调制信号。这里直接给出python实现。
def generateDSSSData(data: list[int], mSeq: list[int], samDataFreq: Union[float, int] = 100/7, samMSeqFreq: Union[float, int] = 100, samFreq: Union[float, int] = 5000) -> list[int]:
mSeqLastTime: float = len(mSeq)/samFreq
dataLastTime: float = len(data)/samFreq
mSeqSamping: list[int] = []
dataSam: list[int] = []
samTime: float = 0.5/samMSeqFreq
dataDSSS: list[int] = []
while samTime < mSeqLastTime:
y: float = dataSampling(mSeq, samTime)
mSeqSamping.append(1)if y > 0 else mSeqSamping.append(-1)
samTime += 1/samMSeqFreq
samTime = 0.5/samDataFreq
while samTime < dataLastTime:
y: float = dataSampling(data, samTime)
dataSam.append(1) if y > 0 else dataSam.append(-1)
samTime += 1/samDataFreq
for i in dataSam:
[dataDSSS.append(i*j)for j in mSeqSamping]
return [int(i) for i in continuousFormat(dataDSSS, samMSeqFreq)]
当然,笔者想了个笨方法。对输入NRZ信号进行了抽样判决,再作乘法。
抽样判决函数如下:
def dataSampling(data: list, samplingTime: Union[float, int], samFreq: Union[float, int] = 5000) -> float:
dataLastTime: float = len(data)/samFreq
if samplingTime > dataLastTime:
raise ValueError("采样时间大于信号持续时间。")
elif samplingTime < 0:
raise ValueError("采样时间不得为负数。")
else:
x: NDArray = np.arange(0, dataLastTime, 1/samFreq)
pointer: int = 0
flag: bool = False
for i in range(len(data)):
if samplingTime < x[i]:
pointer = i
flag = True
break
if not flag:
raise ValueError('取样失败,请检查取样时间')
y: float = data[pointer]-(x[pointer]-samplingTime) * \
(data[pointer]-data[pointer-1])*samFreq
return y
因为抽样是离散的,不一定会刚好抽样到数据,所以笔者采取的策略是局部线性插值决定采样值。
实际采样公式如下:
$$y=x_i-(x_i-t)\times (x_i-x_{i-1})\times f$$
其中y是采样值,$x_i$是第i个采样点的数值,t是采样时间,f是采样频率。
BPSK调制
BPSK调制只需要将信号直接和$\cos 4000\pi t$相乘即可。这里直接给出python实现:
def bpskModulator(data: list[int], freqMod: Union[float, int] = 2000, samFreq: Union[float, int] = 5000) -> list[float]:
time: NDArray[np.float128] = np.linspace(
0, len(data)/samFreq, num=len(data))
carrier: NDArray = np.cos(2*freqMod*np.pi*time)
return [i for i in carrier*data]
信道
加性高斯白噪声
笔者这里直接通过生成一个取值-1到1的长度和输入信号一样的随机数数列,通过乘上一个系数实现加性高斯白噪声信道。
当然首先需要得到信号x和噪声r的功率。公式如下:
$$P=\frac{1}{n} \sum^{n-1}_{i=0}{|x_i|}^2$$
python实现如下:
def countAverPower(signal: list[float]) -> float:
return sum([i**2 for i in signal])/len(signal)
计算出信号和生成的随机数的功率,可以通过以下的公式得出相应SNR的噪声的系数:
$$k=\sqrt{\frac{P_S}{P_N\times e^{\frac{SNR}{10}}}}$$
其中$P_S$是信号的功率,$P_N$是生成的随机数序列的功率,SNR是给定的信道信噪比。
加性高斯白噪声的python实现如下:
def awgn(signal: list[float], SNR: Union[int, float] = 10) -> list[float]:
signalAverPower: float = countAverPower(signal)
y: list[float] = [random.uniform(-1, 1)for _ in range(len(signal))]
yPower: float = countAverPower(y)
y_: list[float] = y.copy()
k: float = signalAverPower/(yPower*(10**(SNR/10)))
k = math.sqrt(k)
y = [i*k for i in y_]
return [y[i]+signal[i]for i in range(len(signal))]
接收端
BPSK解调器
这里直接将输入信号和$\cos 4000\pi t$相乘即可得到解调信号,当然还需要将信号送入低通滤波器。
以下是bpsk解调器的python实现,对于凯塞窗低通数字滤波器的实现可以通过阅读数字信号处理相关书籍了解。
def I0(x: Union[float, int], num: int = 100) -> float:
y: float = 0
for i in range(num):
if i == 0:
y += 1
else:
y += ((1/math.factorial(i))*((x/2)**i))**2
return y
def lowPF(beta: Union[float, int] = 4.538, N: int = 30) -> list[float]:
a: float = (N-1)/2
hn: list[float] = [math.sin(0.3*np.pi*(i-a))/(np.pi*(i-a))
for i in range(N)]
wn: list[float] = [0 for _ in range(N)]
for i in range(N):
x0: float = beta*math.sqrt(1-(1-(2*i/(N-1)))**2)
wn[i] = I0(x0)/I0(beta)
hn: list[float] = [hn[i]*wn[i]for i in range(N)]
return hn
def bpskDemodulate(signal: list[float], freqDemod: Union[float, int] = 2000, samFreq: Union[int, float] = 5000) -> list[float]:
time: NDArray[np.float128] = np.linspace(
0, len(signal)/samFreq, num=len(signal))
carrier: NDArray = np.cos(2*freqDemod*np.pi*time)
y: list[float] = [signal[i]*carrier[i]for i in range(len(signal))]
hn: list[float] = lowPF()
demodulateSignal = np.convolve(hn, y)
return [i for i in demodulateSignal]
DSSS解调
DSSS解调仅需将输入信号和7位m序列信号相乘即可得到DSSS解调信号。
python实现如下:
def demodDSSS(signal: list[float], mSeqSignal: list[int]) -> list[float]:
mSeqSig: list[int] = []
while len(signal) > len(mSeqSig):
mSeqSig.extend(mSeqSignal)
return [mSeqSig[i]*signal[i] for i in range(len(signal))]
抽样判决
先前已经实现了抽样判决的函数了,这里直接使用即可。 python代码如下:
def samData(signal: list[float], freq: Union[float, int] = 100/7, samFreq: Union[float, int] = 5000) -> list[int]:
samTime: float = 0.5/freq
signalLastTime: float = len(signal)/samFreq
data: list[int] = []
while samTime < signalLastTime:
y: float = dataSampling(signal, samTime)
data.append(0 if y > 0 else 1)
samTime += 1/freq
return data
四、完整代码
import math
from typing import Union
import numpy as np
import matplotlib.pyplot as plt
from numpy.typing import NDArray
import random
def samData(signal: list[float], freq: Union[float, int] = 100/7, samFreq: Union[float, int] = 5000) -> list[int]:
samTime: float = 0.5/freq
signalLastTime: float = len(signal)/samFreq
data: list[int] = []
while samTime < signalLastTime:
y: float = dataSampling(signal, samTime)
data.append(0 if y > 0 else 1)
samTime += 1/freq
return data
def demodDSSS(signal: list[float], mSeqSignal: list[int]) -> list[float]:
mSeqSig: list[int] = []
while len(signal) > len(mSeqSig):
mSeqSig.extend(mSeqSignal)
return [mSeqSig[i]*signal[i] for i in range(len(signal))]
def I0(x: Union[float, int], num: int = 100) -> float:
y: float = 0
for i in range(num):
if i == 0:
y += 1
else:
y += ((1/math.factorial(i))*((x/2)**i))**2
return y
def lowPF(beta: Union[float, int] = 4.538, N: int = 30) -> list[float]:
a: float = (N-1)/2
hn: list[float] = [math.sin(0.3*np.pi*(i-a))/(np.pi*(i-a))
for i in range(N)]
wn: list[float] = [0 for _ in range(N)]
for i in range(N):
x0: float = beta*math.sqrt(1-(1-(2*i/(N-1)))**2)
wn[i] = I0(x0)/I0(beta)
hn: list[float] = [hn[i]*wn[i]for i in range(N)]
return hn
def bpskDemodulate(signal: list[float], freqDemod: Union[float, int] = 2000, samFreq: Union[int, float] = 5000) -> list[float]:
time: NDArray[np.float128] = np.linspace(
0, len(signal)/samFreq, num=len(signal))
carrier: NDArray = np.cos(2*freqDemod*np.pi*time)
y: list[float] = [signal[i]*carrier[i]for i in range(len(signal))]
hn: list[float] = lowPF()
demodulateSignal = np.convolve(hn, y)
return [i for i in demodulateSignal]
def countAverPower(signal: list[float]) -> float:
return sum([i**2 for i in signal])/len(signal)
def awgn(signal: list[float], SNR: Union[int, float] = 10) -> list[float]:
signalAverPower: float = countAverPower(signal)
y: list[float] = [random.uniform(-1, 1)for _ in range(len(signal))]
yPower: float = countAverPower(y)
y_: list[float] = y.copy()
k: float = signalAverPower/(yPower*(10**(SNR/10)))
k = math.sqrt(k)
y = [i*k for i in y_]
return [y[i]+signal[i]for i in range(len(signal))]
def bpskModulator(data: list[int], freqMod: Union[float, int] = 2000, samFreq: Union[float, int] = 5000) -> list[float]:
time: NDArray[np.float128] = np.linspace(
0, len(data)/samFreq, num=len(data))
carrier: NDArray = np.cos(2*freqMod*np.pi*time)
return [i for i in carrier*data]
def dataSampling(data: list, samplingTime: Union[float, int], samFreq: Union[float, int] = 5000) -> float:
dataLastTime: float = len(data)/samFreq
if samplingTime > dataLastTime:
raise ValueError("采样时间大于信号持续时间。")
elif samplingTime < 0:
raise ValueError("采样时间不得为负数。")
else:
x: NDArray = np.arange(0, dataLastTime, 1/samFreq)
pointer: int = 0
flag: bool = False
for i in range(len(data)):
if samplingTime < x[i]:
pointer = i
flag = True
break
if not flag:
raise ValueError('取样失败,请检查取样时间')
y: float = data[pointer]-(x[pointer]-samplingTime) * \
(data[pointer]-data[pointer-1])*samFreq
return y
def generateDSSSData(data: list[int], mSeq: list[int], samDataFreq: Union[float, int] = 100/7, samMSeqFreq: Union[float, int] = 100, samFreq: Union[float, int] = 5000) -> list[int]:
mSeqLastTime: float = len(mSeq)/samFreq
dataLastTime: float = len(data)/samFreq
mSeqSamping: list[int] = []
dataSam: list[int] = []
samTime: float = 0.5/samMSeqFreq
dataDSSS: list[int] = []
while samTime < mSeqLastTime:
y: float = dataSampling(mSeq, samTime)
mSeqSamping.append(1)if y > 0 else mSeqSamping.append(-1)
samTime += 1/samMSeqFreq
samTime = 0.5/samDataFreq
while samTime < dataLastTime:
y: float = dataSampling(data, samTime)
dataSam.append(1) if y > 0 else dataSam.append(-1)
samTime += 1/samDataFreq
for i in dataSam:
[dataDSSS.append(i*j)for j in mSeqSamping]
return [int(i) for i in continuousFormat(dataDSSS, samMSeqFreq)]
def generateData(length: int = 100, freq: Union[float, int] = 100/7, samFreq: Union[float, int] = 5000) -> list[list]:
data: list[int] = [random.randrange(0, 2) for _ in range(length)]
dataSignal: list[int] = [1 if i == 0 else -1 for i in data]
data_: list[Union[float, int]] = continuousFormat(
dataSignal, freq, samFreq)
dataOut: list[int] = [int(i)for i in data_]
return [data, dataOut]
def continuousFormat(data: Union[list[Union[float, int]], list[float], list[int]], freq: Union[float, int], samplingFreq: Union[float, int] = 5000) -> list[Union[float, int]]:
listLen: int = math.ceil(len(data)*samplingFreq/freq)
continuousOut: list[float] = list(range(listLen))
pointer: int = 1
for i in range(listLen):
if i/samplingFreq > pointer/freq:
pointer += 1
continuousOut[i] = data[pointer-1]
else:
continuousOut[i] = data[pointer-1]
return continuousOut
def drawSpectrum(input: list, title: str, samFreq: Union[float, int] = 5000) -> None:
y: NDArray[np.complex128] = np.fft.fftshift(np.fft.fft(input))
yAbs: NDArray[np.float128] = np.abs(y)
yPha: NDArray[np.float128] = np.angle(y)
x: NDArray = np.fft.fftshift(np.fft.fftfreq(len(y)))
x: NDArray = np.linspace(-samFreq/2, samFreq/2, num=len(y))
plt.figure(figsize=(16, 9))
plt.suptitle(title)
plt.subplot(1, 2, 1)
plt.plot(x, yAbs)
plt.title('幅度谱')
plt.xlabel("频率")
plt.ylabel("幅度")
plt.subplot(1, 2, 2)
plt.plot(x, yPha)
plt.title("相位谱")
plt.xlabel("频率")
plt.ylabel("相位")
plt.savefig(f'{title}.png')if title is not None else None
plt.show()
return None
def drawData(input: list, title: Union[str, None] = None, freq: Union[int, float] = 5000) -> None:
x: NDArray[np.float128] = np.arange(0, len(input)/freq, 1/freq)
plt.figure(figsize=(16, 9))
plt.plot(x, input)
plt.title(title) if title is not None else None
plt.xlabel("时间/s")
plt.ylabel('幅度')
plt.savefig(f'{title}')if title is not None else None
plt.show()
return None
def mSequence(num: int = 3, pose: list[int] = [1, 3]) -> list[list]:
mSeqGen: list[int] = [1 if i == 0 else 0 for i in range(num)]
mSeq: list[int] = list(range(2**num-1))
for i in range(2**num-1):
mSeqGen_ = mSeqGen.copy()
for j in range(num-1):
mSeqGen[j+1] = mSeqGen_[j]
mSeqGen[0] = sum(
[mSeqGen_[k]if k+1 in pose else 0 for k in range(num)]) % 2
mSeq[i] = mSeqGen[0]
mSeqContinuos: list[Union[int, float]] = continuousFormat(
[1 if i == 0 else -1 for i in mSeq], 100)
mSeqContinuos_: list[int] = [int(i) for i in mSeqContinuos]
return [mSeq, mSeqContinuos_]
def main() -> None:
f = open('输出.txt', 'w')
plt.rcParams['font.family'] = 'WenQuanYi Micro Hei'
mSeq: list[int]
mSeqSingal: list[int]
data: list[int]
dataSignal: list[int]
snr: int = -10
[mSeq, mSeqSingal] = mSequence()
[data, dataSignal] = generateData()
f.write(f'm序列为:{mSeq}\n')
f.write(f'数据为:{data}\n')
f.write(f'信道信噪比为:{snr}dB\n')
drawSpectrum(mSeqSingal, '双极性7位m序列频谱')
drawData(dataSignal, '扩频前待发送二进制信息序列')
dataDSSS: list[int] = generateDSSSData(dataSignal, mSeqSingal)
drawData(dataDSSS, '扩频后待发送序列码')
bpskSignal: list[float] = bpskModulator(dataDSSS)
bpskSignalNoSpread: list[float] = bpskModulator(dataSignal)
drawData(bpskSignalNoSpread, '扩频前BPSK信号时域波形')
drawData(bpskSignal, '扩频后BPSK信号时域波形')
drawSpectrum(bpskSignalNoSpread, '扩频前调制信号频谱图')
drawSpectrum(bpskSignal, '扩频后调制信号频谱图')
yOut: list[float] = awgn(bpskSignal, snr)
drawData(yOut, '信道输出')
bpskdemoOutput: list[float] = bpskDemodulate(yOut)
demodDSSSData = demodDSSS(bpskdemoOutput, mSeqSingal)
dataOut: list[int] = samData(demodDSSSData)
ber: float = sum([1 if data[i] != dataOut[i]
else 0 for i in range(len(data))])/len(data)
f.write(f'判决出的信号为:{dataOut}\n')
f.write(f"误码率为:{round(ber*100,3)}%")
f.close()
return None
if __name__ == '__main__':
main()
Reference
[1]刘顺兰,吴杰编著.数字信号处理 第4版[M].西安:西安电子科技大学出版社,2021
[2]陈树新,于龙强,李勇军.通信原理[M].北京:清华大学出版社,2020
[3]高伟东,啜钢,刘倩,杜海清编著.移动通信原理 第3版[M].北京:电子工业出版社,2022