前置き
前回の記事からの続きです。以下のページにある内容をPowershellの Invoke-RestMethod コマンドレットで実施していきます。
https://qiita.com/math1101/items/311277789868ebd07835
MAP再作成
例によって例のごとく、以下のコードをPowershellのプロンプトへ順次張り付けていきます。# ■作業ディレクトリー再設定
$WorkDir = "C:\Elastic\data"
Set-Location $WorkDir
# ■Elasticsearch 登録先情報
$Hostname = "localhost"
$PortNumber = 9200
$IndexName = "air"
$TypeName = "ppm"
$BaseURL = "http://${Hostname}:${PortNumber}"
$RequestURL = $BaseURL + $APIURL
# ■MAP定義更新
# ○既存Indexの削除
Function Delete-Index () {
$APIURL = "/${IndexName}"
$RequestURL = $BaseURL + $APIURL
$ReqestType = "DELETE"
Invoke-RestMethod -Uri $RequestURL -Method $ReqestType
Write-Output $("`t Delete-Index() : " + $(Get-Date) )
}
Delete-Index
# ○MAP定義登録
Function Create-TypeItem ( [string]$JsonFilePath ) {
$APIURL = "/${IndexName}"
$TargetURL = $BaseURL + $APIURL
$ReqestType = "PUT"
$JsonData = Get-Content $JsonFilePath
# 文字化けするのでUTF8を強制
$PostParam = [System.Text.Encoding]::UTF8.GetBytes($JsonData)
Invoke-RestMethod -Uri $TargetURL -Body $PostParam -ContentType 'application/json' -Method $ReqestType
Write-Output $("`t Create-TypeItem( ${JsonFilePath} ) : " + $(Get-Date) )
}
$JsonFilePath = Join-Path $WorkDir "map.json"
Create-TypeItem -JsonFilePath $JsonFilePath
# ■現在のMAPを取得
$MapFile = Join-Path $WorkDir "newmap.json"
$APIURL = "/${IndexName}/${TypeName}/_mapping"
$RequestURL = $BaseURL + $APIURL
$Result = Invoke-RestMethod -Uri $RequestURL
ConvertTo-JSON $Result.air -Depth 10 | Out-File -Encoding Default -FilePath $MapFile
# ■件数チェック
$APIURL = "/${IndexName}/${TypeName}/_count"
$RequestURL = $BaseURL + $APIURL
$( Invoke-RestMethod -Uri $RequestURL ).count
新しく作成したnewmap.jsonを確認すると、TimeStampのデータ型が無事dateになっていることが確認できます。あとはデータを投入するだけで完了です。実は先に挙げた元記事で、一番変更したかったのもここです。
元の内容は、CSVをBULK用のJSONファイルに変換しており、その変換処理だけで数時間掛かるので、気軽に試せなかったんですね。
でも、前回の記事にて示したように、CSVから直接BULK登録が可能なので、その工程を省いてしまえばいいのです。
この処理は、再利用性を考慮してスクリプトファイルとして保存しましょう。
※Powershellの実行ポリシーを変更していないと動かないのでまだの人は Set-ExecutionPolicy でググってください。
以下の内容を「UpdateESFromCsv.ps1」というファイル名で、フォルダー「C:\Elastic\data」に保存します。
<#
■Elasticsearch へのBulk投入
#>
####################################################################### 引数定義
[CmdletBinding()]
Param(
$Hostname = "localhost"
,$PortNumber = 9200
,$IndexName = "air"
,$TypeName = "ppm"
,$CsvFile = ".\target.csv"
)
##################################################################### 初期化処理
# ■ ログ採取 //////////////////////////////////////////////
# Set-PSDebug -Trace 2
# ログファイル名設定
$ScriptPath = Split-Path -Parent $MyInvocation.MyCommand.Path # スクリプトパス指定
$LogFileName = $MyInvocation.MyCommand.Name.substring(0,$MyInvocation.MyCommand.Name.length - 4) + $(Get-Date).ToString("yyyyMMddHHmm") + "_" + (Get-ChildItem env:computername).value + ".log"
$LogFilePath = Join-Path $ScriptPath $LogFileName
# ログ採取開始
Start-Transcript $LogFilePath
# ログ出力の折り返し防止設定
$BufferSize = $(Get-Host).UI.RawUI.BufferSize
$BufferSize.Width = 512 ; $BufferSize.Height = 512
$(Get-Host).UI.RawUI.BufferSize = $BufferSize
# ■ 既定値入力 ////////////////////////////////////////////
$BulkMax = 5000 # Bulkの実行件数
$BaseURL = "http://${Hostname}:${PortNumber}"
$BulkCommand = @{ "index" = @{ "_index" = $IndexName ;"_type" = $TypeName } } | ConvertTo-Json -Compress
$Counter = 0 # 初期化
$BulkData = New-Object System.Collections.ArrayList # 新規配列
####################################################################### 関数定義
Function BulkCreate-DocsItemFromJson ( [array]$BulkData ) {
$TimeoutSec = 600 # タイムアウト時間を秒で指定(一括取り込みに時間が掛かる)
$APIURL = "/${IndexName}/${TypeName}/_bulk"
$RequestURL = $BaseURL + $APIURL
$ReqestType = "POST"
$JsonData = $( $BulkData -join "`n" ) + "`n"
$PostParam = [System.Text.Encoding]::UTF8.GetBytes($JsonData) # 日本語が文字化けするのでUTF8を強制
$Result = Invoke-RestMethod -Uri $RequestURL -Body $PostParam -ContentType 'application/json' -Method $ReqestType -TimeoutSec $TimeoutSec
Write-Output $("`t BulkCreate-DocsItemFromJson() : " + $(Get-Date) + " <= ${CsvFile}")
}
##################################################################### メイン処理
# ■メイン処理開始 /////////////////////////////////////////
Write-Output $("■メイン処理開始 : " + $(Get-Date) )
$CsvData = Import-Csv -Path $CsvFile -Encoding Default | Where-Object { $_.'測定項目名称' -like "*${TypeName}*"}
ForEach ($CsvRecord in $CsvData) {
If ($Counter -gt $BulkMax) { # 規定件数を超えたらBULK実行
BulkCreate-DocsItemFromJson $BulkData
$Counter = 0 # 初期化
$BulkData = New-Object System.Collections.ArrayList # 新規配列
}
$Counter = $Counter + 1
$TimeStamp = $CsvRecord."年" + $CsvRecord."月" + $CsvRecord."日" + "-" + $CsvRecord."時"
$CsvRecord | Add-Member -NotePropertyName "TimeStamp" -NotePropertyValue $TimeStamp
$JsonData = $CsvRecord | ConvertTo-Json -Compress
$BulkData.Add($BulkCommand) > $Null
$BulkData.Add($JsonData) > $Null
}
BulkCreate-DocsItemFromJson $BulkData
# 終了処理 ////////////////////////////////////////////////
Write-Output $("■スクリプト終了 : " + $(Get-Date) )
Stop-Transcript
前回の記事で5件だけ登録した処理を、スクリプトファイルに変更し、一定件数毎にBULK実行するように改定したものになります。5000件であれば、10以内にはBULK実行するのではないでしょうか。
余り応答がないと不安になるでしょうし、配列に突っ込むためのメモリーも無尽蔵ではないので上限値を設けています。5000でメモリーが足りないという方は$BulkMaxを1000なりに値を変更してください。
そして、以下のコードを実行すれば順次CSVファイルを読み取ってElasticsearchへ登録してくれます。
# ■作業ディレクトリー再設定
$WorkDir = "C:\Elastic\data"
Set-Location $WorkDir
# ■ファイル確認
Get-Item ".\UpdateESFromCsv.ps1"
# ■スクリプト実行
$CsvList = Get-ChildItem $WorkDir | Where-Object {$_.Extension -eq ".csv" }
ForEach ($CsvFile in $CsvList) {
.\UpdateESFromCsv.ps1 -CsvFile $CsvFile.FullName
}
# ■投入結果確認
$Hostname = "localhost"
$PortNumber = 9200
$IndexName = "air"
$TypeName = "ppm"
$BaseURL = "http://${Hostname}:${PortNumber}"
$APIURL = "/${IndexName}/${TypeName}/_search"
$RequestURL = $BaseURL + $APIURL
$Result = Invoke-RestMethod -Uri $RequestURL
ConvertTo-JSON $Result -Depth 10
# ■件数チェック
$APIURL = "/${IndexName}/${TypeName}/_count"
$RequestURL = $BaseURL + $APIURL
$( Invoke-RestMethod -Uri $RequestURL ).count
ここの内容は全CSVファイルを先のスクリプトで順次処理しているだけですので、詳細な説明は割愛します。
結果が126040件になってれば同じ結果です。うまくいかなかった人は頑張って原因を探してください。
三回に渡ってお送りしましたが、件の記事をForkした箇所は以上となります。お付き合いいただきありがとうございました。
なお、Kibanaの内容はWebUIですので、Windows関係ないし、変更点もありません。
元記事を参照して試してみてください。
0 件のコメント:
コメントを投稿