Goのchannelについて

Go言語は並行処理を得意とする言語として知られています。特に関数を並行処理させるgoroutineと、goroutine間で安全にデータの送受信を可能にするchannelによって、効率的な並行処理を実現できます。

本記事では、古典的なアルゴリズムのひとつである、「エラトステネスのふるい」の実装を紹介します。このコードを通して、goroutinechannelの基本的な使い方を理解していきましょう。まずはgoroutinechannelについての基本を見ていきます。

想定読者

  • 初歩的なプログラミング経験がある人
  • Goの基礎構文がわかる人

goroutine

まずはgoroutineについてです。Goの仕様書には、以下のように記載されています。

A "go" statement starts the execution of a function call as an independent concurrent thread of control, or goroutine, within the same address space.

訳: 「go」ステートメントは、関数呼び出しを、同一アドレス空間内の独立した並行制御スレッド、すなわちゴルーチンとして実行開始します。 引用元: The Go Programming Language Specification - The Go Programming Language

つまり、goというキーワードを使うだけで、関数をgoroutineとして非同期に実行できるということです。ただし、注意が必要な点として、goroutineは並行に動作しますが、必ずしも並列に動作するとは限りません。並行(concurrency)と並列(parallelism)は異なる概念です。簡単に言うと、並行は複数のタスクを切り替えながら進めること、並列は複数のタスクを同時に進めることです。詳しくは、こちらの記事などを参考にしてください。

それでは実際に、goroutineの使い方と、その動作を確認してみましょう。

goroutineの使い方

package main

import (
    "fmt"
    "time"
) 

// goroutineの基本的な使い方と実行順序の非決定性を示す例
func main() {
    // 通常の関数呼び出しの場合
    fmt.Println("通常の関数実行:")
    for i := 0; i < 5; i++ {
        printNumber(i)
    }
    
    fmt.Println("\ngoroutineでの実行:")
    // goroutineでの並行実行
    for i := 0; i < 5; i++ {
        go printNumber(i) // 「go」キーワードを付けるだけで並行実行
    }
    
    // メイン処理が終了しないように少し待機
    time.Sleep(time.Millisecond * 100)
    
    fmt.Println("\n実行終了")
}

func printNumber(num int) { 
    fmt.Printf("%d ", num)
}

このコードを実行すると、以下のような結果になる場合があります。

通常の関数実行:
0 1 2 3 4 

goroutineでの実行:
0 2 1 4 3 

実行終了

なる場合があると曖昧な表現をしたのは、goroutineで呼び出された関数の実行順序が保証されないためです。これは並行処理の特性であり、OSのスケジューリング(どのgoroutineにCPU時間を割り当てるか)によって、実行のたびに順序が変わる可能性があるからです。

goroutineは、同一のアドレス空間で並行に動作します。そのため、複数のgoroutineが同時に同じデータ(メモリ上の変数など)にアクセスしようとすると、予期せぬ結果を引き起こす「データ競合(data race)」が発生する可能性があります。これを防ぐ仕組みの一つが、次に説明するchannelです。

channel

channelは、並行に実行されているgoroutine間で安全に値の送受信を行うための仕組み(型)です。チャネルを通じて送受信するデータの型は、chanキーワードの後ろに指定します。例えば、整数を送受信するチャネルは chan int、文字列を送受信するチャネルは chan string のように宣言します。

チャネルには、使い方によって主に以下の3つの種類があり、それぞれ異なる方法で宣言します。 チャネルの宣言方法には以下の3通りがあります。

  1. var ch chan T
    nilチャネルを宣言します。初期化されていないチャネルであり、送受信操作を行うと永久にブロックされます(デッドロックの原因になります)。主に、意図しないチャネル操作を防ぐために使われます。
  2. ch := make(chan T)
    バッファを持たない同期チャネルを宣言します。送信と受信が同時に行われる場合にのみ通信が成功します。
  3. ch := make(chan T, 2)
    バッファサイズを指定した非同期チャネルを宣言します。括弧内の数字(この例では2)がバッファサイズとなり、この数まで送信データを一時的に溜めておくことができます。

また、チャネルを使い終わったら、close(ch)を使って閉じることができます。これにより、送信側はこれ以上データを送れなくなります(送信しようとするとパニックが発生します)。受信側は、チャネルが閉じられた後も、バッファに残っているデータを全て受け取ることができます。バッファが空になった後に閉じられたチャネルから受信しようとすると、データの型のゼロ値と、チャネルが閉じていることを示すfalseが返されます(x, ok := <-chの形で受け取れます)。

ch := make(chan int, 3)
ch <- 1  // 送信
ch <- 2  // 送信
close(ch) // チャネルを閉じる

// ch <- 3  // ⚠エラー: 閉じられたチャネルには送信できない

x, ok := <-ch  // x=1, ok=true
y, ok := <-ch  // y=2, ok=true
z, ok := <-ch  // z=0(int型のゼロ値), ok=false(チャネルが閉じられ、値がない)

チャネルが閉じられると、for rangeループを使ってチャネルから値を順次受け取る場合、バッファ内のすべての値が処理された後にループが自動的に終了します。これは、チャネルを使った一般的なデータ処理パターンです。

for value := range ch {
    // valueを処理
}
// チャネルが閉じられるとループ終了

では、同期チャネルと非同期チャネルには、具体的にどのような特徴があるのでしょうか? 次からは、同期チャネルと非同期チャネルの違いについて解説していきます。

同期チャネルと非同期チャネルの特徴

まずは、同期チャネルと非同期チャネル、それぞれの動作の違いを整理します。

同期チャネル (Unbuffered Channel):
バッファを持たないチャネルです。送信操作は、対応する受信操作が始まるまでブロック(待機)されます。同様に、受信操作も、対応する送信操作が始まるまでブロックされます。つまり、送信と受信が文字通り「同期」して行われ、一対一の通信が保証されます。

非同期チャネル (Buffered Channel):
指定されたサイズのバッファを持つチャネルです。送信操作は、バッファに空きがある限り、受信側を待たずに完了します(ブロックされません)。バッファがいっぱいの場合は、空きができるまで送信操作はブロックされます。受信操作は、バッファにデータがあれば即座に完了します。バッファが空の場合は、データが送信されるまで受信操作はブロックされます。

同期チャネルの主な利点:

  • goroutine間の処理の完了を確実に待機できる(通信の成功が同期を意味する)
  • 送信側は受信側の準備ができるまで、受信側は送信側の準備ができるまで処理をブロックするため、処理のタイミングを制御しやすい
  • デッドロックの検出がしやすい(バッファがないため、送受信の不一致が即座に問題になる)

非同期チャネルの主な利点:

  • 送信側と受信側の処理速度に差がある場合、バッファがその差を吸収し、一時的な負荷変動に強いシステムを作りやすい
  • バッファに空きがある限り送信側は待機しないため、送信側の処理を効率化できる場合がある
  • 送信側が受信側を待たずに次の処理へ進める(バッファに空きがある限り)

では、実際の動作を見てみましょう。

package main

import (
	"fmt"
	"time"
)

func main() {
	fmt.Println("=== 同期チャネルのデモ ===")
	demonstrateSyncChannel()
	
	fmt.Println("\n=== 非同期チャネルのデモ ===")
	demonstrateAsyncChannel()
}

// 同期チャネルのデモ
func demonstrateSyncChannel() {
	// バッファなしの同期チャネルを作成
	ch := make(chan string)
	
	// 送信処理を別のgoroutineで実行
	go func() {
		fmt.Println("送信者: 送信開始します")
		
		// 1つ目のメッセージを送信
		fmt.Println("送信者: 「メッセージ1」を送信します...")
		startTime := time.Now()
		ch <- "メッセージ1"
		fmt.Printf("送信者: 「メッセージ1」の送信完了!(経過時間: %v)\n", time.Since(startTime))
		
		// 2つ目のメッセージを送信
		fmt.Println("送信者: 「メッセージ2」を送信します...")
		ch <- "メッセージ2"
		fmt.Printf("送信者: 「メッセージ2」の送信完了!(経過時間: %v)\n", time.Since(startTime))
		
		// チャネルを閉じる
		close(ch)
	}()
	
	// メインgoroutineは少し待ってから受信を開始
	fmt.Println("受信者: 1秒後に受信を開始します")
	time.Sleep(1 * time.Second)
	
	// チャネルからメッセージを受信
	for message := range ch {
		fmt.Printf("受信者: 「%s」を受信しました\n", message)
		fmt.Println("受信者: 次のメッセージを受信する前に1秒待ちます")
		time.Sleep(1 * time.Second)
	}
	
	fmt.Println("=== 同期チャネルのデモ完了 ===")
}

// 非同期チャネルのデモ
func demonstrateAsyncChannel() {
	// バッファサイズ2の非同期チャネルを作成
	ch := make(chan string, 2)
	
	// 送信処理を別のgoroutineで実行
	go func() {
		fmt.Println("送信者: 送信開始します")
		
		// 1つ目のメッセージを送信
		fmt.Println("送信者: 「メッセージ1」を送信します...")
		startTime := time.Now()
		ch <- "メッセージ1"
		fmt.Printf("送信者: 「メッセージ1」の送信完了!(経過時間: %v)\n", time.Since(startTime))
		
		// 2つ目のメッセージを送信
		fmt.Println("送信者: 「メッセージ2」を送信します...")
		ch <- "メッセージ2"
		fmt.Printf("送信者: 「メッセージ2」の送信完了!(経過時間: %v)\n", time.Since(startTime))
		
		// 3つ目のメッセージを送信(バッファが満杯なので待機する)
		fmt.Println("送信者: 「メッセージ3」を送信します...")
		fmt.Println("送信者: バッファがいっぱいなので、受信されるまで待機します...")
		ch <- "メッセージ3"
		fmt.Printf("送信者: 「メッセージ3」の送信完了!(経過時間: %v)\n", time.Since(startTime))
		
		// チャネルを閉じる
		close(ch)
	}()
	
	// メインgoroutineは少し待ってから受信を開始
	fmt.Println("受信者: 3秒後に受信を開始します")
	time.Sleep(3 * time.Second)
	
	// チャネルからメッセージを受信
	for message := range ch {
		fmt.Printf("受信者: 「%s」を受信しました\n", message)
		fmt.Println("受信者: 次のメッセージを受信する前に1秒待ちます")
		time.Sleep(1 * time.Second)
	}
	
	fmt.Println("=== 非同期チャネルのデモ完了 ===")
}

このデモからわかるように、同期チャネルは送信と受信が厳密に同期するため、値の受け渡しを確認しながら進める場合に適しています。一方、非同期チャネルはバッファがあるため、送信側はバッファに空きがあれば待たずにデータを送ることができ、処理速度の差を吸収するのに有効です。どちらのチャネルを使うかは、並行する処理間の連携の性質によって適切に選択する必要があります。

それでは、「エラトステネスのふるい」のソースコードを見ていきましょう。

エラトステネスのふるい

以下は私が『Goでの並行処理を徹底解剖!』という書籍を読んでいる際に参考にしたGoのサンプルコードです。この書籍は無料にも関わらず大変参考になる内容ですので、ぜひ読んでみてください。

本にもある通り、このコードの出典はThe Go Programming Language Specification#An_example_packageで、「エラトステネスのふるい」というアルゴリズムを実装しています。

アルゴリズムの概要

エラトステネスのふるいは、特定の範囲内の素数を効率的に見つけるアルゴリズムです。基本的な手順は以下の通りです:

  1. 2から始めて、その数を素数として記録する
  2. その素数の倍数をすべて除外する
  3. 除外されていない次に小さい数を新しい素数として記録する
  4. 手順2と3を繰り返す

GoのChannelとgoroutineを使うことで、このアルゴリズムを並行処理として実装できます。

package main

import (
    "fmt"
)

// 2, 3, 4, 5...と自然数を送信するチャネルを作る
func generate(ch chan<- int) {
	for i := 2; ; i++ {
		ch <- i
	}
}

// srcチャネルから送られてくる値の中で、primeの倍数でない値だけをdstチャネルに送信する関数
func filter(src <-chan int, dst chan<- int, prime int) {
	for i := range src {
		if i%prime != 0 {
			dst <- i
		}
	}
}

// エラトステネスのふるいのアルゴリズム本体
func sieve() {
	ch := make(chan int)
	go generate(ch)
	for {
		prime := <-ch // ここから受け取るものは素数で確定
		fmt.Print(prime, "\n")

		// 素数と確定した数字の倍数は
		// もう送ってこないようなチャネルを新規作成→chに代入
		ch1 := make(chan int)
		go filter(ch, ch1, prime)
		ch = ch1
	}
}

func main() {
	sieve()
}

それではこれから、各関数の動作を詳しく見ていきます。

generate

generate(ch chan<- int)は、コメントにもある通り自然数を引数のチャネルに送信し続ける関数です。

func generate(ch chan<- int) {
	for i := 2; ; i++ {
		ch <- i
	}
}

この関数は、2から始まる整数を無限に生成し、引数で受け取ったチャネルchに送信し続けます。chan<- intという型指定は、「int型のデータを送信するためのチャネル」という意味です(送信専用チャネル)。これにより、この関数内ではchに対して送信操作 (ch <- i) のみが許可され、受信操作 (<-ch) を行うとコンパイルエラーになります。

filter

filter(src <-chan int, dst chan<- int, prime int)は、入力チャネルsrcから受け取った値の中で、引数primeで割り切れない値だけを、出力チャネルdstに送信し続ける関数です。

func filter(src <-chan int, dst chan<- int, prime int) {
	for i := range src {
		if i%prime != 0 {
			dst <- i
		}
	}
}

<-chan int受信専用チャネル、chan<- int送信専用チャネルを表します。このようにチャネルの方向を指定することで、関数内での誤った操作を防ぎ、コードの意図を明確にすることができます。for i := range srcという構文は、srcチャネルからデータが送信されてくる限りループを続け、チャネルが閉じられるとループが終了します。もしiprimeで割り切れなければ(剰余i % primeが0でなければ)、その値idstチャネルに送信します。

sieve

sieve()関数は、エラトステネスのふるいのアルゴリズムのメイン部分を実装しています。この関数が、generatefilterを組み合わせて素数を順に見つけていく役割を担います。

func sieve() {
	ch := make(chan int)
	go generate(ch)
	for {
		prime := <-ch // ここから受け取るものは素数で確定
		fmt.Print(prime, "\n")

		// 素数と確定した数字の倍数は
		// もう送ってこないようなチャネルを新規作成→chに代入
		ch1 := make(chan int)
		go filter(ch, ch1, prime)
		ch = ch1
	}
}

この関数の中でのチャネルchの役割は、ループが進むごとに変化していく点が重要です。最初の段階ではgenerateから全ての自然数を直接受け取りますが、素数が見つかるたびに、その素数の倍数を除外する新しいfilter goroutineと、その出力チャネルが作られ、変数chはその新しいチャネルを指すように更新されます。

ch変数の主な遷移は以下のようになります。

ループの回数 chから受け取る最初の値(素数) chが受け渡し始める値
1回目 2 2, 3, 4, 5, 6, 7, 8, 9, 10, ... (1以外の自然数すべて)
2回目 3 3, 5, 7, 9, 11, 13, ... (2の倍数以外)
3回目 5 5, 7, 11, 13, 17, 19, ... (2と3の倍数以外)
4回目以降 次に見つかる最小の素数 それまでに見つかった素数すべての倍数以外

具体的には以下の流れで動作します。

  1. 最初の素数(2)の発見とフィルタリング:
    最初にgenerateから送られてくる最初の値2を受け取ります。これが最初の素数となります。次に、2の倍数を除外するfilterを起動し、その出力チャネルを新しいchとします。
  2. 次の素数(3)の発見とフィルタリング:
    新しいchからは、2の倍数を除外された数が流れてきます(3, 5, 7, 9, 11, 13, ...)。この中で最も小さい数(3)が次の素数として確定します。そして、今度は3の倍数を除外するfilterを起動し、その出力チャネルをさらに新しいchとします。
  3. 繰り返し:
    このプロセスが繰り返されます。sieve関数は、現在のchから送られてくる数の中で最も小さいものを次の素数として見つけ、その素数の倍数を除外する新しいfilterのパイプラインを追加していきます。

このようにして、チャネルの連鎖が構築され、素数が順にフィルタリングされて見つけ出されるという仕組みです。

ここまで仕組みが分かったところで、実際にThe Go Playgroundで、コードを実行してみてください。

終わりに

ここでは、Goのgoroutinechannelを使った並行処理の一例として、エラトステネスのふるいを実装するコードを紹介しました。この例を通して、goroutineが関数をバックグラウンドで並行に動かし、channelがそれらgoroutine間で安全かつ効率的にデータをやり取りする基本的な仕組みを、少しでもご理解いただけたでしょうか。

この記事で紹介した基本的なgoroutinechannelの考え方を応用することで、より複雑な並行処理パターンを安全かつ効率的に実装することが可能です。Goの並行処理の世界は奥深く、他にもselectステートメントによる複数チャネルの待受や、syncパッケージによる高度な同期制御など、多くの有用な機能があります。ぜひ、これらの基本的な概念を足掛かりに、Goでの並行プログラミングをさらに探求してみてください。

私自身も、この記事をまとめることを通して、Goの並行処理について改めて整理し、理解を深める良い機会となりました。今後も、Goの魅力を引き出すような記事を書いていきたいと思いますので、ぜひご期待ください。