API ストリーミングと stream_to_file | Python
- Moku:Lab
- Moku:Go
-
Moku:Pro
Moku:Proに関するよくある質問 Moku:Pro波形発生器 Moku:Proタイム&周波数アナライザ Moku:Proロジックアナライザ/パターンジェネレーター Moku:Proレレーザーロックボックス Moku:Proロックインアンプ Moku:Proスペクトラムアナライザ Moku:Proデータロガー Moku:Pro任意波形発生器 Moku:Proマルチ機器モード Moku:Pro位相計 Moku:Pro FIRフィルタービルダー Moku:Pro PIDコントローラー Moku:Proオシロスコープ Moku:Pro周波数応答アナライザ Moku:Proデジタルフィルターボックス
- Python API
- MATLAB API
- 任意波形発生器
- データロガー
- デジタルフィルターボックス
- FIR フィルタ ビルダー
- 周波数応答アナライザー
- レーザーロックボックス
- ロックインアンプ
- オシロスコープ
- 位相計
- PIDコントローラー
- スペクトラムアナライザー
- 時間と周波数アナライザー
- 波形発生器
- ロジックアナライザ/パターンジェネレーター
- マルチ機器モード
- Mokuクラウドコンパイル
- Mokuに関するよくある質問
- LabVIEW API
ストリーミングオプション
APIの詳細については、 データロガー | API リファレンス
ストリーミングstream_to_file Python のデータストリーミングの例
stream_to_file メソッド
これは、データをログに記録したいが、mokucli を使用して外部で変換したくない場合に便利です。このユースケースについては、 API ログ ファイルのダウンロードと変換 | Python を参照してください。これにより、ダウンロード中にファイルが変換され、ストリーミング セッションが完了すると分析できるようになります。
# moku example: Datalogger Stream to File
# (c) 2025 Liquid Pty. Ltd.
import os
import csv
import time
import numpy as np
from datetime import datetime
import matplotlib.pyplot as plt
from moku.instruments import Datalogger
# Connect to your Moku
i = Datalogger('192.168.###.###', force_connect=False)
try:
# Set-up the Datalogger
i.set_samplerate(100)
i.set_acquisition_mode(mode='Precision')
i.generate_waveform(channel=1, type='Sine', amplitude=1, frequency=10e3)
# Start a streaming session and stream to a file, include the file type in the
# file name; csv, hdf5, npy, or mat
i.start_streaming(duration=10)
now = datetime.now().strftime("%Y%m%d_%H%M%S")
file_name = f'MokuDataStreamingData_{now}.csv'
i.stream_to_file(file_name)
print(f"Streaming to file: {file_name}")
print(i.get_stream_status()['status'])
# Track progress percentage of the data streaming session
status = 'RUNNING'
while status == 'RUNNING':
time.sleep(0.5)
progress = i.get_stream_status()
status = progress['status']
# It may take a few moments for the file to appear in the system as accessible
time.sleep(5)
channels = 2
print(file_name)
except Exception as e:
i.relinquish_ownership()
raise e
else:
# Close the connection to the Moku device
i.relinquish_ownership()
# Check for the file, then open the file and create data dictionary
assert os.path.isfile(file_name), "Streaming failed, no file received"
data = {'time':[], 'ch1':[]}
with open(file_name, 'r') as f:
load = csv.reader(f)
file = np.array([[float(row[0])] + [float(row[i]) for i in range(1, channels+1)] for row in load if row[0][0] != '%'])
f.close()
data['time'] = file[:,0]
for i in range(1, channels+1):
data[f'ch{i}'] = file[:,i]
# Display the streamed data
keys = data.keys()
for key in keys:
print(f"'{key}': {str(data[key][:3])[:-1]} ... {data[key][-1]}]")
# Plot the streamed data
fig = plt.figure(figsize=(10, 3))
for i in range(1, channels+1):
plt.plot(data['time'], data[f'ch{i}'], label=f'Channel {i}')
plt.xlim([data['time'][0], data['time'][-1]])
plt.title("Streamed to File Data")
plt.grid(True)
plt.xlabel("Time (s)")
plt.ylabel("Voltage (V)")
plt.legend(loc=0)
plt.show()
出力例:
Streaming to file: MokuDataStreamingData_20250116_165246.csv
RUNNING
MokuDataStreamingData_20250116_165246.csv
'time': [0. 0.01 0.02 ... 9.99]
'ch1': [0.00117748 0.00105914 0.00114431 ... 0.001142182695]
'ch2': [0.01485419 0.01477556 0.01476971 ... 0.01479722506]

他のファイルタイプへのストリーミングの場合:
file_name = f'MokuDataStreamingData_{now}.npy'
i.stream_to_file(file_name)
file_name = f'MokuDataStreamingData_{now}.hdf5'
i.stream_to_file(file_name)
file_name = f'MokuDataStreamingData_{now}.mat'
i.stream_to_file(file_name)
これらのファイル タイプからのデータを分析するには、この記事を参照してください。
ストリーミング方式
この方法は、ストリーミング セッションが完了するのを待つのではなく、リアルタイムでデータを分析する場合に便利です。
# moku example: Datalogger Streaming
# (c) 2025 Liquid Pty. Ltd.
import os
import time
from moku.instruments import Datalogger
import matplotlib.pyplot as plt
# Connect to your Moku
i = Datalogger('192.168.###.###', force_connect=False)
try:
# Set-up the Datalogger
i.set_samplerate(100)
i.set_acquisition_mode(mode='Precision')
i.generate_waveform(channel=1, type='Sine', amplitude=1, frequency=10e3)
# stream the data for 10 seconds..
i.start_streaming(duration=10)
# Set up the plotting parameters
plt.ion()
plt.show()
plt.grid(visible=True)
plt.ylim([-1, 1])
plt.title("Streamed Data")
plt.xlabel("Time (s)")
plt.ylabel("Voltage (V)")
line1, = plt.plot([])
line2, = plt.plot([])
plt.legend(['Channel 1', 'Channel 2'], loc=1)
# Configure labels for axes
ax = plt.gca()
# This loops continuously updates the plot with new data
while True:
# get the chunk of streamed data
data = i.get_stream_data()
if data:
plt.xlim([data['time'][0], data['time'][-1]])
# Update the plot
line1.set_ydata(data['ch1'])
line1.set_xdata(data['time'])
line2.set_ydata(data['ch2'])
line2.set_xdata(data['time'])
plt.pause(0.001)
except Exception as e:
i.stop_streaming()
print(e)
finally:
i.relinquish_ownership()
.gif)